Real-time Stream Processing
In this article we will learn how to use clusters of Kafka, Logstash and Apache Spark to build a real time processing engine.
For a real-time processing engine we need two things – “Event Source” and “Event Processor”
Event Source – We need an event source for the events to be processed. A stream of tweets could be an example of event source. In this article we will use logstash for streaming events. Logstash can stream events from various data sources like twitter, log files, tcp ports etc.
Event Processor – Event processor will consume events from Kafka topics and will do further processing on events. We will use Apache Spark for real-time event processing.
Use Case
We will create a real time architecture for counting words in a stream of text for each 10 seconds time window.
Real-time processing architecture
Architecture consists of three components
- Kafka
- Logstash
- Apache Spark
Kafka is a publisher – subscriber architecture. Events will be published on kafka topics and any subscriber for that specific topic will get those specific events.
To simulate real-time events we will use a large text file, we can use logstash to create a stream of that text file and output it on kafka server. Each line in that file can be considered as an event.
We will run a consumer job in Apache spark and do processing on that event as we want. Consumer will listen on kafka topics for events.
We will create clusters of kakfa and apache spark nodes. Steps for installation and configuration for this architecture are as follows.
Kafka Cluster Configuration
Kafka with minimal configuration can be download from here. Follow these steps to create a Kafka cluster-
- Download and extract Kafka on each cluster
- Edit config/zookeeper.properties and add Kafka server details on master node
# disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 server.1=server1-ip:2888:3888 server.2=server2-ip:2888:3888 initLimit=5 syncLimit=2
If you have more servers, add all in similar manner
- Edit config/server.properties on each node and add/edit following.
broker.id=1 zookeeper.connect=server1-ip:2181,server2-ip:2181
Note: broker.id must be unique for each node.
- On master node add one more property in config/server.properties
advertised.host.name=server1-ip
Logstash Configuration
You can download and install Logstash from here.
Logstash configuration needs input and output plugins, we will create file input and Kafka output plugins for our use case. Following steps need to be done for creating a Logstash configuration
- We need to create a configuration file say, sample.conf
- Add file input plugin
input { file { path => "path/to/text/file" start_position => "beginning" sincedb_path => "/dev/null" } }
- Add Kafka output plugin
output { kafka { codec => plain { format => "%{message}" } topic_id => "kafkatest" bootstrap_servers => "server1-ip:9092,server2-ip:9092" } stdout { } }
Apache Spark Configuration
Download and extract Apache spark on each node from here. On master node edit conf/slave and add slave nodes here-
server2-ip
Note: Make sure slave machines are accessible from master node through ssh without password.
Now let’s write a consumer job in Spark and name it kafka_wordcount.py
from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr) exit(-1) sc = SparkContext(appName="PythonStreamingKafkaWordCount") ssc = StreamingContext(sc, 10) zkQuorum, topic = sys.argv[1:] kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) lines = kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
Here, SparkContext represents Spark cluster and it will be used to create RDDs and other spark operations. StreamingContext will create DStreams from input data source for providing streaming functionality. For more information on spark streaming you can refer to spark streaming guide.
Running Application
Now follow these steps to run the whole setup-
- Start Kafka server on each node from kafka home dir
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & nohup bin/kafka-server-start.sh config/server.properties &
- Create a kafka topic – kafkatest
bin/kafka-topics.sh --create --zookeeper server1-ip:2181,server2-ip:2181 --replication-factor 3 --partitions 1 --topic kafkatest
- Now create stream of text by running logstash on any of node from logstash home dir
bin/logstash -f sample.conf
It will start streaming content of text file to kafka ports.
Till now our producer is ready. Now let’s start consumer.
Run spark cluster with following command from spark home dir
sbin/start-all.sh
Now submit spark consumer job.
bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 kafka_wordcount.py server1-ip:2181 kafkatest
This job should print count of words in stream for every 10 seconds.
References
http://armourbear.blogspot.in/2015/03/setting-up-multinode-kafka-cluster.html