Sarvesh Kumar

Lambda Architecture using Apache Spark – with Java Code Examples

Lambda Architecture

Lambda architecture, devised by Nathan Marz, is a layered architecture which solves the problem of computing arbitrary functions on arbitrary data in real time. In a real time system the requirement is something like this –

result = function (all data)

With increasing volume of data, the query will take a significant amount of time to execute no matter what resources we have used.

Lambda Architecture uses three layer architecture and a concept of pre-computed views to solve this problem. Three layers are

  1. Batch Layer
  2. Speed Layer
  3. Serving Layer

Lambda Architecture uses three layer architecture

Batch Layer

Batch layer stores immutable master data, computes arbitrary functions on all data and creates batch views. Function of batch layer can be summarized as

batch view = function (all data)

Batch layer continuously does this job and updates batch views.

Batch Layer

Serving Layer

Purpose of Serving Layer is to store batch views obtained from batch layer and provide random access to batch views. When batch layer computes new views, they are updated in Serving Layer by Batch Layer. The Serving Layer can be achieved by using a random access database.

Speed Layer

While batch layer computes batch view, it will not include data which came while recomputing batch views. The purpose of Speed layer is to compute incremental views on recent data that is not included in batch views. These views are called real time views.

A Speed Layer can be summarized as

real time view = function (real time view, new data)

So, our final query can be served by speed layer or serving layer.

batch view = function (all data)

real time view =  function (real time view, new data)

result = merge (query (batch view), query (real time view))

Speed Layer
 

An Example using Apache Spark

Suppose we want to build a system to find popular hash tags in a twitter stream, we can implement lambda architecture using Apache Spark to build this system.

Batch Layer Implementation

– Batch layer will read a file of tweets and calculate hash tag frequency map and will save it to Cassandra database table.

Batch.java

SparkConf sparkConf = new SparkConf().setAppName("TwitterPopularTagsBatch";
JavaSparkContext sc = new JavaSparkContext(sparkConf);

// Load data file
JavaRDD<String> tags = sc.textFile(args[0]); 

// find (tag, #times)
JavaPairRDD<String, Integer> tagCounts = Business.countTags(tags); 

// save this batch view to Cassandra 
RDDJavaFunctions<Tuple2<String,Integer>> rddJavaFunctions = 
            CassandraJavaUtil.javaFunctions(tagCounts);

rddJavaFunctions.writerBuilder("populartweets", "batch_tag_count",
	CassandraJavaUtil.mapTupleToRow(String.class, Integer.class))
	.withColumnSelector(CassandraJavaUtil.someColumns("tag","count"))
        .saveToCassandra();

 

Speed Layer Implementation – 

Speed layer can also be written in Apache spark using spark streaming feature. We can get a stream of recent tweets and calculate recent real time view from this stream we can also save this real time view to Cassandra for simplicity.

Speed.java

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
	Durations.minutes(60));
JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(ssc);
		
// Create stream of tags		
JavaDStream<String> hashTagStream = stream.flatMap((status) -> {
	String textArr[] = status.getText().split(" ");
        List<String> matching = new ArrayList<String>();
	  for (String str : textArr) {
	      if (str.startsWith("#"))
		matching.add(str);
	  }
	return matching;
});

// create tag,frequency stream
JavaPairDStream<String, Integer> tagCountStream = hashTagStream
	.transformToPair((rdd) -> Business.countTags(rdd));

// Create real time view of tag,frequency for each 60 minute duration
JavaDStream<Tuple2<Integer, String>> countTagSortedStream = tagCountStream
	.reduceByKeyAndWindow(((v1,v2) -> v1 + v2), Durations.minutes(60))
	.map(v1 -> new Tuple2<Integer,String>(v1._2, v1._1))
	.transform((v1) -> v1.sortBy((v2 -> (v2._1)),false,1));
			
// Save this real time view in cassandra
DStreamJavaFunctions<Tuple2<Integer, String>> javaF = 
       CassandraStreamingJavaUtil.javaFunctions(countTagSortedStream);

javaF.writerBuilder("populartweets", "stream_tag_count",
	CassandraJavaUtil.mapTupleToRow(Integer.class,String.class))
	.withColumnSelector(CassandraJavaUtil.someColumns("count", "tag"))
        .saveToCassandra();

 

Serving Layer implementation – 

Serving layer can be implemented as a RESTful web service which will query Cassandra tables to get the final result in real time.

@Path("/getStreamTags")
@GET
@Produces(MediaType.APPLICATION_JSON)
public List<String> getStreamTags() {
		
	String serverIP = "127.0.0.1";
	String keyspace = "populartweets";
	Cluster cluster = Cluster.builder().addContactPoints(serverIP)
                         .build();

	Session session = cluster.connect(keyspace);
	List<String> list = new ArrayList<String>();
	session.execute("select tag,count from stream_tag_count limit 10").                forEach((row) -> {
			String tag = row.get("tag", String.class);
			list.add(tag);
		});
	return list;
}
@Path("/getBatchTags")
@GET
@Produces(MediaType.APPLICATION_JSON)
public List<String> getBatchTags() {
		
	String serverIP = "127.0.0.1";
	String keyspace = "populartweets";

	Cluster cluster = Cluster.builder().addContactPoints(serverIP)
		.build();

	Session session = cluster.connect(keyspace);
	List<String> list = new ArrayList<String>();
        session.execute("select tag,count from batch_tag_count limit 10")
               .forEach((row) -> {
			String tag = row.get("tag", String.class);
			list.add(tag);
	});
	return list;
}

From serving layer we can merge result of batch views and real time views to get final result.

References and image credits

  1. http://www.databasetube.com/database/big-data-lambda-architecture/
  2. Big Data Principles and best practices of scalable real time data systems by Nathan Marz and James Warren
 

Related Articles

#Tech

NHibernate, Linq and Lazy Collections

For the past few months we have been busy building a services framework for one of our clients so that they can build all of their enterprise web services without ever having to worry about the cross cutting concerns and... Read more
#Tech

Page Redirects using Spring.NET

Who is responsible for page redirects in ASPNET MVP – The View or the Presenter None of the above, it is you :) On a serious note, it is the View as one shouldn’t pollute the Presenter with the page navigation... Read more