#TECH

Learn How to Start Working with KAFKA

Kafka is a distributed publish-subscribe messaging system that is designed to be fast, scalable and durable. It was developed by LinkedIn and opensourced in the year 2011. It makes an extremely desirable option for data integration with the increasing complexity in real time data processing challenges. It is great solution for applications that require large scale message processing.

Components of Kafka are :

  1. Zookeeper
  2. Kafka Cluster – which contains one or more servers called as brokers
  3. Producer – which publish messages to kafka
  4. Consumer – which consumes messages from kafka.

Components of Kafka are

Components of Kafka are :

Kafka saves messages in a disk and allows subscribers to read from it. Communication between producers, kafka cluster and consumers takes place with TCP protocol. All the published messages will be retained for a configurable period of time. Each kakfa broker may contain multiple topics into which producers publish messages. Each topic is broken into one or more ordered partitions. Partitions are replicated across multiple servers for fault tolerance. Each partition has one Leader server and zero or more follower servers depending upon the replication factor of partition.

When a publisher publishes to a Kafka cluster, it queries which partitions exist for that topic and which brokers are responsible for each partition. Publishers sends messages to the broker responsible for that partition (using some hashing algorithm).

Consumers keep track on what they consume (partition id) and store in Zookeeper. In case of consumer failure, a new process can start from the last saved point. Each consumer in the group get assigned a set of partitions to consume from.

Producers can attach key with messages, in which all messages with same key goes to same partition. When consuming from a topic, it is possible to configure a consumer group with multiple consumers. Each consumer in a consumer group will read messages from a unique subset of partitions in each topic they subscribe to, so each message is delivered to one consumer in the group, and all messages with the same key arrive at the same consumer.

Role of Zookeeper in Kafka –

It provides access to clients in a tree like structure. Kafka uses ZooKeeper for storing configurations and use them across the cluster in a distributed fashion. It maintains information like topics under a broker, offset of consumers.

Role of Zookeeper in Kafka

Steps to get started with (For UNIX):

  1. Download Kafka – wget http://mirror.sdunix.com/apache/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz
  2. tar -xzf kafka_2.10-0.8.2.0.tgz
  3. cd kafka_2.10-0.8.2.0/
  4. nside Config folder you will see server, zookeeper config files
  5. Inside bin folder you will see bash files for starting zookeeper, server, producer, consumer
  6. Start zookeeper – bin/zookeeper-server-start.sh config/zookeeper.properties
  7. Start server – bin/kafka-server-start.sh config/server.properties
  8. creating topic –bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor <your_replication_factor> –partitions <no._of_partitions> –topic <your_topic_name>
    This will create a topic with specified name and will be replicated in to brokers based on replication factor and topic will be partitioned based on partition number. Replication factor should not be greater than no. of brokers available.
  9. view topic – bin/kafka-topics.sh –list –zookeeper localhost:2181
  10. delete a topic – add this line to server.properties file delete.topic.enable=true
    then fire this command after starting zookeeper
    bin/kafka-topics.sh –zookeeper localhost:2181 –delete –topic <topic_name>
  11. alter a topic – bin/kafka-topics.sh –zookeeper localhost:2181 –alter –topic <topic_name>
  12. Start producer – bin/kafka-console-producer.sh –broker-list localhost:9092 –topic <your_topic_name> and send some messages
  13. Start consumer – bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic <your_topic_name> and view messages

If you want to have more than one server, say for ex : 4 (it comes with single server), the steps are:

  1. create server config file for each of the servers :
    cp config/server.properties config/server-1.proeprties
    cp config/server.properties config/server-2.properties
    cp config/server.properties config/server-3.properties
  2. Repeat these steps for all property files you have created with different brokerId, port.
    vi server-1.properties and set following properties
    broker.id = 1
     port = 9093
    log.dir = /tmp/kafka-logs-1
  3. Start Servers :
    bin/kafka-server-start.sh config/server-1.properties &
    bin/kafka-server-start.sh config/server-2.properties &
    bin/kafka-server-start.sh config/server-3.properties &

Now we have four servers running (server, server-1,server-2,server-3)

PROGRAMMING

KAFKA program in java includes producer class and consumer class.

Producer Class :

Producer class is used to create message and specify the topic name with optional partition.

The maven dependency jar to be included is

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.9.2</artifactId>
	<version>0.8.1.1</version>
 </dependency>

 
We need to define properties for producer to find brokers, serialize messages and sends it to the partitions it wants.

Properties properties = new Properties();

properties.put(metadata.broker.list, "localhost:9092"); //defines the broker location

properties.put(serializer.class, "kafka.serializer.StringEncoder");

properties.put(partitioner.class, "your partition class "); // partition calss where you have written logic for selecting partitions in the Topic. If not mentioned, Kafka uses default partition

properties.put(request.required.acks, "1"); // tells whether to send ack or not

ProducerConfig config = new ProducerConfig(properties);

Producer<String, String> producer = new Producer<String, String>(config); // Producer Object

String message = Welcome to Kafka, this is Producer;

KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", message); // publishing messages by creating object of kafka.producer.KeyedMessage, pass topic name and message as argument

producer.send(data);

 
Once producer sends data, if we pass an extra item (say id) via data :
ex : 

KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", 1, message);

before publishing data to brokers, it goes to partition class which is mentioned in the properties and select the partition to which data has to be published.

In our above example, id is not passed, so partition selection would be done randomly.
 

Consumer Class :

Properties properties = new Properties();

properties.put("zookeeper.connect", "localhost:2181"); //zookeeper location

properties.put("group.id", "1"); //consumer group

properties.put("zk.sessiontimeout.ms", "400"); // ms kafka waits for zookeeper to respond to a request

properties.put("autocommit.interval.ms", "1000"); //how often updates are written to ZooKeeper.

ConsumerConfig config = new ConsumerConfig(properties);

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =

consumer.createMessageStreams(topicCountMap); //consumer pass information like topic name and no. of consumer threads to kafka using this call (topicCountMap is the map which contains that information)

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); //Iterate over streams and prints the message

 

Topic Creation :

int sessionTimeout = 60000; // in ms

int connectionTimeout = 60000;

ZkClient zkClient = new ZkClient("localhost:2181", sessionTimeout, connectionTimeout,

ZKStringSerializer$.MODULE$);

String topicName = "<your_topic_name>";

int numPartitions = 3;

int replicationFactor = 1;

AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, new Properties()); //this will create a topic with 3 partitions and replication factor 1

 
Kafka is a distributed commit log service that functions much like a publish/subscribe messaging system, but with better throughput, built-in partitioning, replication, and fault tolerance.

Applications of Kafka:

  1. Stream processing
  2. Messaging
  3. Multiplayer online game
  4. Log aggregation

You might also like