#TECH

Serialization Evolution (Apache Avro)

 Apache Avro(Schema evolution)

While working with the data, we either store it in a file or send it over network. To achieve this, so far, many phases have been evolved.

Evolution stages:

    1. To achieve serialization, we use many options according to the particular programming language such as  i.e Java serialization, in python pickle, Ruby’s marshal and sometimes our own format.
    2. Later, to come out from tightly coupled with programming language wise serialization, widely supported format like JSON, XML is adapted.

    1. Then, since JSON is too slow to parse and also doesn’t differentiate between data styles i.e (int, float), (string,unicode) further JSON binary format was evolved.
    2. Now, data is growing day by day(Big-Data) and people are shuffling all random fields in their objects using inconsistent types SCHEMA based storage come in the pictures.

Real life problem in data processing:

As data gets updated any time since it is always in flux, hence anyone can add/remove fields in data. Let’s say,  we have producer and consumer system, where producer adds a new column in data and consumer is working with old format.If we are working with standard serialization-deserialization mechanism, it will start breaking every time we need to make changes on consumer side. It seems easy task in small application on both producer and consumer side changes, but talking about big enterprise solution dealing with big data, this becomes trouble. In this case, SCHEMA evolution plays a big role because it allows you to update system components independently and consumer can handle processing that data without making changes in existing system, since data contains attached schema with it.

Now, I am adding a programming problem statement:

Suppose, initially we designed a schema for your Employee POJO

   { 
     {"name": "emp_name", "type":"string"}, 
     {"name":"dob", "type":"string"}, 
     {"name":"age", "type":"int"}
   }

Later you realized that age is redundant and removed it from the schema.

   { 
     {"name": "emp_name", "type":"string"}, 
     {"name":"dob", "type":"string"} 
   }

Now obvious questions that strike my mind are:
1. What about the records that were serialized and stored before this schema change ?
2. How will you read back those records?

That’s why the avro reader/deserializer asks for the reader and writer schema. Internally it does schema resolution ie. it tries to adapt the old schema to new schema.

Property of Avro?

Data serialization system.
JSON based schema .
Data contains schema attached during transfer.

Avro provides ?

Rich data structures.
A fast, compact, binary data format.
It is optional to generate code, to read and write data.
The popular language has support for Avro API’s.

Schema resolution?

Avro uses schema while processing the data. It also attaches schema with data so that any program which is reading data can easily understand the data format.
Schema resolution is done by the attached schema with the data.

Primitive Types ?

Avro supports mostly all the data types.
1. null, boolean, int, long, float, double, bytes, string.
2. Avro also supports complex data types. i.e records, enums, arrays, maps, unions and fixed. For details refer: here.

Step by Step running project with MAVEN, Spring and Apache Avro.

Step 1: Create a maven java project and update pom.xml with given text.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.quovantis.avro</groupId>
    <artifactId>quoavro</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>QuoApacheAvroSampleProject</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>


    <!-- snipped -->

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.7.5</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>
                                ${project.basedir}/src/main/resources
                            </sourceDirectory>
                            <outputDirectory>
                                ${project.basedir}/src/main/java
                            </outputDirectory>
                            <fieldVisibility>PRIVATE</fieldVisibility>
                            <includes>
                                <include>**/*.avsc</include>
                            </includes>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>

        </plugins>
    </build>


    <dependencies>
        <!--<dependency>-->
            <!--<groupId>junit</groupId>-->
            <!--<artifactId>junit</artifactId>-->
            <!--<version>4.6</version>-->
            <!--<scope>test</scope>-->
        <!--</dependency>-->

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-tools</artifactId>
            <version>1.7.5</version>
        </dependency>

        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1</version>
        </dependency>


        <!--<dependency>-->
            <!--<groupId>org.apache.avro</groupId>-->
            <!--<artifactId>avro-maven-plugin</artifactId>-->
            <!--<version>1.7.5</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.apache.avro</groupId>-->
            <!--<artifactId>avro-compiler</artifactId>-->
            <!--<version>1.7.5</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.apache.avro</groupId>-->
            <!--<artifactId>avro-ipc</artifactId>-->
            <!--<version>1.7.5</version>-->
        <!--</dependency>-->


        <!--<dependency>-->
            <!--<groupId>org.apache.logging.log4j</groupId>-->
            <!--<artifactId>log4j-api</artifactId>-->
            <!--<version>2.0-beta9</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.apache.logging.log4j</groupId>-->
            <!--<artifactId>log4j-core</artifactId>-->
            <!--<version>2.0-beta9</version>-->
        <!--</dependency>-->
    </dependencies>

</project>

Step 2: Create data file EmployeeActivityDATA.json in resource folder. I used below data set.
{"id":"A123445","name":"Harsh", "employee_id":9001,"department_id":101,"technical_experties":{"language":["JAVA", "Python", "SCALA", "Hadoop"], "experience":10}}
{"id":"B123445","name":"Sanjeev", "employee_id":9002,"department_id":102,"technical_experties":{"language":["JAVA", "Python"], "experience":8}}
{"id":"C123445","name":"Shanky", "employee_id":9003,"department_id":103,"technical_experties":{"language":["Python", "Ruby"], "experience":4}}

 

Serialize and deserialize WITHOUT java code(POJO) generation:

a) Create package com.avro.withoutschema
b) Create class WithoutSchemaHandler and paste given content.

static void  serialize() throws JsonParseException, JsonProcessingException, IOException {

        InputStream in = new FileInputStream("src/main/resources/EmployeeActivityDATA.json");

        // create a schema
        Schema schema = new Schema.Parser().parse(new File("src/main/resources/employee_details.avsc"));
        // create a record to hold json
        GenericRecord avroData = new GenericData.Record(schema);
        // create a record to hold course_details
        GenericRecord expertiesRec = new GenericData.Record(schema.getField("technical_experties").schema());
        // this file will have AVro output data
        File AvroFile = new File("src/main/resources/EmplyeeDetails_without_schema.avro");
        // Create a writer to serialize the record
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);

        dataFileWriter.create(schema, AvroFile);

        // iterate over JSONs present in input file and write to Avro output file
        for (Iterator it = new ObjectMapper().readValues(
                new JsonFactory().createJsonParser(in), JSONObject.class); it.hasNext();) {

            JSONObject JsonRec = (JSONObject) it.next();
            avroData.put("id", JsonRec.get("id"));
            avroData.put("employee_id", JsonRec.get("employee_id"));
            avroData.put("name", JsonRec.get("name"));
            avroData.put("department_id", JsonRec.get("department_id"));


            LinkedHashMap technicalExperties = (LinkedHashMap) JsonRec.get("technical_experties");
            expertiesRec.put("experience", technicalExperties.get("experience"));
            expertiesRec.put("language", technicalExperties.get("language"));

            avroData.put("technical_experties", expertiesRec);

            dataFileWriter.append(avroData);
        }  // end of for loop

        in.close();
        dataFileWriter.close();

    } // end of serialize method

    public static void deserialize () throws IOException {
        // create a schema
        Schema schema = new Schema.Parser().parse(new File("src/main/resources/employee_details.avsc"));
        // create a record using schema
        GenericRecord AvroRec = new GenericData.Record(schema);
        File AvroFile = new File("src/main/resources/EmplyeeDetails_without_schema.avro");
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(AvroFile, datumReader);
        System.out.println("Deserialized data is :");
        while (dataFileReader.hasNext()) {
            AvroRec = dataFileReader.next(AvroRec);
            System.out.println(AvroRec);
        }
    }


    public static  void main(String args []) throws  Exception{

        serialize();
        deserialize();

}

 

Serialize and deserialize WITH java code(POJO) generation:

A) Create schema file employee_details.avsc and copy attached content.
{
    "namespace": "com.avro.withschema",
    "type": "record",
    "name": "EmployeeDetails",
    "fields": [
        {
            "name": "id",
            "type": "string"
        },
      {
                "name": "name",
                "type": "string"
            },
        {
            "name": "employee_id",
            "type": "int"
        },
        {
            "name": "department_id",
            "type": "int"
        },
        {
            "name": "technical_experties",
            "type": {
                "name": "TechnicalExperties",
                "type": "record",
                "fields": [
                    {
                        "name": "language",
                        "type":{
                            "name":"LanguageList",
                            "type":"array",
                            "items":"string"
                        }
                    },
                    {
                        "name": "experience",
                        "type": "int"
                    }
                ]
            }
        }
    ]
}

 

B) Generate JAVA POJO files.

This can be achieved by standalone command or we can add maven plugin in pom.xml to genarate schema
1. Example: java -jar avro-tools-1.7.5.jar compile schema StudentActivity.avsc
2. I used maven plugin to generate the schema which is in above POM.xml file.
3. Run given command to generate schema. mvn compile
4. run mvn clean install

C) Create Handler java class.

i) Create java package com.avro.withschema.
ii) Create class WithSchemaHandler.java and paste give content.

public static void serialize() throws JsonParseException, JsonProcessingException, IOException {


        InputStream in = new FileInputStream("src/main/resources/EmployeeActivityDATA.json");

        // create a schema
        Schema schema = new Schema.Parser().parse(new File("src/main/resources/employee_details.avsc"));
        // create an object to hold json record
        EmployeeDetails ed = new EmployeeDetails();
        // create an object to hold course_details
        TechnicalExperties te = new TechnicalExperties();

        // this file will have AVro output data
        File AvroFile = new File("src/main/resources/EmplyeeDetails.avro");
        // Create a writer to serialize the record
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);

        dataFileWriter.create(schema, AvroFile);

        // iterate over JSONs present in input file and write to Avro output file
        for (Iterator it = new ObjectMapper().readValues(
                new JsonFactory().createJsonParser(in), JSONObject.class); it.hasNext();) {

            JSONObject JsonRec = (JSONObject) it.next();
            ed.setId((CharSequence) JsonRec.get("id"));
            ed.setName((CharSequence) JsonRec.get("name"));
            ed.setEmployeeId((Integer) JsonRec.get("employee_id"));
            ed.setDepartmentId((Integer) JsonRec.get("department_id"));

            LinkedHashMap technicalExperties = (LinkedHashMap) JsonRec.get("technical_experties");
            te.setLanguage((List<CharSequence>)technicalExperties.get("language"));
            te.setExperience((Integer) technicalExperties.get("experience"));

            ed.setTechnicalExperties(te);

            dataFileWriter.append(ed);
        }  // end of for loop

        in.close();
        dataFileWriter.close();

    } // end of serialize method

    public static void deserialize () throws IOException {
        // create a schema
        Schema schema = new Schema.Parser().parse(new File("src/main/resources/employee_details.avsc"));
        // create a record using schema
        GenericRecord AvroRec = new GenericData.Record(schema);
        File AvroFile = new File("src/main/resources/EmplyeeDetails.avro");
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(AvroFile, datumReader);
        System.out.println("Deserialized data is :");
        while (dataFileReader.hasNext()) {
            AvroRec = dataFileReader.next(AvroRec);
            System.out.println(AvroRec);
        }
    }


    public static  void main(String args []) throws  Exception{

        //serialize();
        deserialize();

}
What next?:

Apache Avro is getting popular day by day in Big data processing. All major map reduce Jobs using Hadoop, hive, pig etc are storing and processing data using apache avro. I will share my experience on Big Data Hadoop with Apache Avro in my next post.

References:

http://avro.apache.org/
http://avro.apache.org/docs/1.7.7/gettingstartedjava.html