Sarvesh Kumar

Real-time streaming with Kafka, Logstash and Spark

Real-time streaming with Kafka, Logstash and Spark

Real-time Stream Processing

In this article we will learn how to use clusters of  Kafka, Logstash and Apache Spark to build a real time processing engine.

For a real-time processing engine we need two things – “Event Source” and “Event Processor”

Event Source We need an event source for the events to be processed. A stream of tweets could be an example of event source. In this article we will use logstash for streaming events. Logstash can stream events from various data sources like twitter, log files, tcp ports etc.

Event Processor Event processor will consume events from Kafka topics and will do further processing on events. We will use Apache Spark for real-time event processing.

Use Case

We will create a real time architecture for counting words in a stream of text for each 10 seconds time window.

Real-time processing architecture

Architecture consists of three components

  1. Kafka
  2. Logstash
  3. Apache Spark

Kafka is a publisher – subscriber architecture. Events will be published on kafka topics and any subscriber for that specific topic will get those specific events.

To simulate real-time events we will use a large text file, we can use logstash to create a stream of that text file and output it on kafka server. Each line in that file can be considered as an event.

Real-time processing architecture

We will run a consumer job in Apache spark and do processing on that event as we want. Consumer will listen on kafka topics for events.

We will create clusters of kakfa and apache spark nodes. Steps for installation and configuration for this architecture are as follows.

Kafka Cluster Configuration

Kafka with minimal configuration can be download from here. Follow these steps to create a Kafka cluster-

  • Download and extract Kafka on each cluster
  • Edit config/zookeeper.properties and add Kafka server details on master node
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
server.1=server1-ip:2888:3888
server.2=server2-ip:2888:3888
initLimit=5
syncLimit=2

If you have more servers, add all in similar manner

  • Edit config/server.properties on each node and add/edit following.
broker.id=1
zookeeper.connect=server1-ip:2181,server2-ip:2181

Note: broker.id must be unique for each node.

  • On master node add one more property in config/server.properties
advertised.host.name=server1-ip

 

Logstash Configuration

You can download and install Logstash from here.

Logstash configuration needs input and output plugins, we will create file input and Kafka output plugins for our use case. Following steps need to be done for creating a Logstash configuration

  • We need to create a configuration file say, sample.conf 
  • Add file input plugin
input {
  file {
    path => "path/to/text/file"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
  • Add Kafka output plugin
output {
  kafka {
      codec => plain {
      format => "%{message}"
    }
    topic_id => "kafkatest"
    bootstrap_servers => "server1-ip:9092,server2-ip:9092"
 } 
 stdout { } 
}

 

Apache Spark Configuration

Download and extract Apache spark on each node from here. On master node edit conf/slave and add slave nodes here-

server2-ip

Note: Make sure slave machines are accessible from master node through ssh without password.

Now let’s write a consumer job in Spark and name it kafka_wordcount.py

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

Here, SparkContext represents Spark cluster and it will be used to create RDDs and other spark operations. StreamingContext will create DStreams from input data source for providing streaming functionality. For more information on spark streaming you can refer to spark streaming guide.

Running Application

Now follow these steps to run the whole setup-

  • Start Kafka server on each node from kafka home dir
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

nohup bin/kafka-server-start.sh config/server.properties &
  • Create a kafka topic – kafkatest
bin/kafka-topics.sh --create --zookeeper server1-ip:2181,server2-ip:2181 --replication-factor 3 --partitions 1 --topic kafkatest
  • Now create stream of text by running logstash on any of node from logstash home dir
bin/logstash -f sample.conf

It will start streaming content of text file to kafka ports.

Till now our producer is ready. Now let’s start consumer.

Run spark cluster with following command from spark home dir

sbin/start-all.sh

Now submit spark consumer job.

bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 kafka_wordcount.py server1-ip:2181 kafkatest

This job should print count of words in stream for every 10 seconds.

References

http://armourbear.blogspot.in/2015/03/setting-up-multinode-kafka-cluster.html

 

 
  • Drissi Yazami

    Hi Mr Sarvesh Kumar
    I’m writing to you this message ,to ask you if you can help me with your
    guidance, I start working on an academic project using the elastic
    stack (ELK) and I am wondering if I can use the stack for Buisiness
    Intelligence
    cases I am thinking of using spark streaming in addition to the elastic
    stack but I am stuck I don’t know how to architecture this project and
    I dont know where to start, can you please help, if there is any open
    source code using these technologies together ?or a documentation that
    can help me out?
    yours truly
    El idrissi yazami younes
    y.elidrissiyazami@gmail.com

    • Sarvesh Kumar

      Hi El, It depends on your business use case what architecture you should follow. You should decide the business needs first. Then you can decide the components/solutions that are best suited to them. If you want more information you can write to me on sarvesh.du@gmail.com