#TECH

Spark Structured Streaming: Ingesting tweets to show trends using Grafana in real time

If you’re a software engineer, there’s a possibility that you’ll relate to this– aren’t you always wondering about how things work? Aren’t you always curious about how one thing fits into another? It could be as little as wondering about how traffic lights are synchronized or something as complex as how Indian Railways manage and run their services on so many routes. 

These inquisitive thoughts can occur anytime and one such struck me while watching the news. I was switching channels on my TV when I stopped on one particular news channel. The news reporter was shouting his guts out about a news which had garnered a lot of public hatred. At the bottom of the screen, a series of tweets showed people’s reaction to it. I was wondering what can be the relation between a breaking news, people’s reaction to it and how it amplifies the magnanimity of the news. For example- how would people react when something as big as demonetization would be announced?

Once this thought came into my mind, my engineering mind started kicking in. I began to formulate a problem statement- “If I wanted to achieve something like this how would that happen? What would I need to start it? How would I want it to work?” 

After defining the problem statement, I started looking for tools which would help me in achieving the goals I had set for myself. I started looking for how such problems are solved in the industry.

And I took a step forward and shared this idea with my colleagues at Quovantis. I told them that I am interested to learn how real time data works, and quite unsurprisingly, I found a bunch of like-minded people (trust me, we’ve got loads here). We were motivated to chase our goals and we immediately started working on it. We decided on a problem statement and formed teams to tackle the problems in different ways.

The problem was simple – 

“Analyzing real time streaming Twitter data and displaying it in a meaningful way”

In simple words, we aimed to design a sample streaming data solution of analyzing tweets frequency and display the data visually. A single Twitter producer would be fetching streaming data from twitter and sending it out to a Kafka topic. Each team would be connecting to Kafka to get the stream of data.

I know it might be a little bit too much for you, so before we start diving into the details, let us cover up some basics first.

What is streaming data? 

Streaming data is the data which is generated repeatedly through several data sources operating at the same time. Such data may include information like log files, server metrics, trades in market etc. 

Streaming using Apache Spark

The simplest example of streaming data is Twitter tweets. Tweets are like a never ending flow of data which can be used to perform interesting analysis. We would be using this to analyze the tweet frequency per unit time.

What are the tools available to process streaming data? 

The challenge with processing streaming data is that there is no end to the incoming data, so you’re never done with processing. Another challenge that comes with streaming data is of aggregating it. Since it’s a continuous stream of data we need to be careful of how much data we want to aggregate.
There are different tools for processing streaming data, one of them is Spark Structured Streaming.

Spark Structured Streaming is a fault-tolerant stream processing engine built on the Spark SQL Engine (ref). 

Twitter Producer

We would be using tweepy for consuming the twitter stream. The tweepy package makes it easier to handle uninteresting but important stuff like authentication, session management, reading incoming messages etc.

Before starting with the code, we would need to create an application and generate the keys from there. Copy the following:

  • API key
  • API key secret
  • Access token
  • Access token secret

Here’s the Link for app creation steps.

Once we have the above, we can start with the code. Please note: we would be manually handling the threading here because there is some issue with streaming library in tweepy. It raises the IncompleteRead Exception which causes it to stop the stream. (There are a lot of SO threads around it. Link | Link | Link

First, let’s create a StreamListener class which would handle the incoming streaming data. We would also create a class named Twitter which would handle the exceptions and restart the restart the stream.

twitter_client.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import _thread
import json
import logging
import time

from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener

from kafka_producer import KafkaSender


class Listener(StreamListener):
    def __init__(self, kafka_producer, topic):
        super().__init__()
        self.logger = logging.getLogger(__name__)
        self.kafka_producer = kafka_producer
        self.topic = topic
        self.count = 0

    def on_connect(self):
        self.logger.info("Twitter stream connected")

    def on_status(self, status):
        self.count += 1
        status_str = json.dumps(status._json)
        self.kafka_producer.send_message(self.topic, status_str)
        if self.count % 1000 == 0:
            self.logger.info(
                "{count} tweets received for topic: {topic}".format(count=str(self.count), topic=self.topic))

    def on_error(self, status):
        self.logger.error("Error received for topic: {topic}".format(topic=self.topic))
        self.logger.error(status)
        if status == 420:
            self.logger.error(
                "rate limit received for topic: {topic}. Sleeping for 60 seconds".format(topic=self.topic))
            time.sleep(60)
            self.logger.info("resetting counter for topic: {topic}".format(topic=self.topic))


class Twitter:
    def __init__(self, consumer_key, consumer_secret, access_token, access_token_secret, bootstrap_servers):
        self.logger = logging.getLogger(__name__)
        self.auth = OAuthHandler(consumer_key, consumer_secret)
        self.auth.set_access_token(access_token, access_token_secret)
        self.kafka_producer = KafkaSender(bootstrap_servers=bootstrap_servers)

    def create_stream(self, keywords, topic):
        while True:
            try:
                self.logger.info(f"{topic} Thread starting...")

                twitter_stream = Stream(self.auth, Listener(kafka_producer=self.kafka_producer, topic=topic),
                                        retry_count=100)
                twitter_stream.filter(track=keywords, is_async=False, stall_warnings=True)
            except Exception as e:
                self.logger.error(f"Exception received for topic: {topic} - " + str(e))
                self.logger.error(f"{topic} - restarting stream...")

    def run_streaming_thread(self, keywords, topic):
        self.logger.info(
            "creating twitter stream. Keywords: {keywords} | topic: {topic}".format(keywords=",".join(keywords),
                                                                                    topic=topic))
        _thread.start_new_thread(self.create_stream, (keywords, topic))

We have a kafka producer instance in the StreamListener class. This is used to send out the data to Kafka. 

The on_status method of the Listener receives the tweets and pushes it to Kafka.

kafka_producer.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import logging

from kafka import KafkaProducer


class KafkaSender:
    def __init__(self, bootstrap_servers):
        self.logger = logging.getLogger(__name__)
        self.producer = KafkaProducer(bootstrap_servers=[bootstrap_servers])

    def send_message(self, topic, message):
        self.producer.send(topic=topic, value=message.encode('utf-8'))

The driver module which would run read configs and run the Twitter Stream.

driver.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import logging
import yaml

from twitter_client import Twitter

logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)


def read_yaml(path):
    obj = {}
    try:
        obj = yaml.load(open(path, "rb"))
    except Exception as e:
        logger.error("Unable to read yaml. Path: " + path)
        logger.error(e)
    return obj


def main():
    cfg_file = "config.yaml"
    cfg = read_yaml(cfg_file)
    logger.info("config file loading complete")

    for team in cfg["teams"]:
        logger.info("=================Starting Streaming for {team}===================".format(team=team))
        team_cfg = cfg["teams"][team]

        twitter = Twitter(consumer_key=team_cfg["twitter"]["consumer_key"],
                          consumer_secret=team_cfg["twitter"]["consumer_secret"],
                          access_token=team_cfg["twitter"]["access_token"],
                          access_token_secret=team_cfg["twitter"]["access_token_secret"],
                          bootstrap_servers=",".join(cfg["kafka"]["bootstrap_servers"]))

        twitter.run_streaming_thread(keywords=team_cfg["topics"], topic=team)

    while True:
        # to keep the program running infinitely
        pass


if __name__ == '__main__':
    main()

This takes care of our source of streaming data. You can use tools like supervisor to easily start and stop the producer.

Now, let’s move to the solution part.

Solution

In this approach, we would be using the spark structured streaming to analyze the tweets frequency and display the same using Grafana.

We’ve kept the spark streaming consumer separate from the database writer because of the following reasons:

  • Spark Structured Streaming doesn’t have a native stream sink to InfluxDB. We would need to implement our own ForEachWriter if we want the spark application to write to InfluxDB directly.
  • Spark consumer can be scaled separately from the database writer. 
  • Having both separately gives us fault tolerance against database failures. In case the database goes down, our spark consumer would still keep working and pushing the records to Kafka.

Database

With streaming data the choice of database becomes a little different than our traditional databases because the nature of data being stored is very different. Mostly the data stored with streaming data sources is time series data. 

Time series data refers to any similar data which is stored in time order. This could include server metrics, performance monitoring, network data, sensor data, events, clicks, etc.

Since our data would be growing over time and there isn’t any relationship in the data, we don’t need a RDBMS. TSDBs are very efficient in performing large aggregation queries on the timestamps.

Out of the available options we decided to move forward with InfluxDB, which is made grounds up for such type of data. More info can be found here.

Visualisation of the data

Storing and retrieving the data is only half the story. Another important part of real time streaming solutions is the visualization of data. Visualizations make it easier to understand what’s going on. Looking at plain numbers can be difficult while making out sense from huge amounts of data. This is where the visualization tools come into play. 

There are a lot of such dashboards available for use and we narrowed down to Grafana for this case. The primary concern in our scenario is to display large time series data which is stored in InfluxDB. Grafana provides easy connection to InfluxDB. It is one of the best tools for visualization out there. More info can be found here.

With our architecture in place, let’s get our hands a little dirty with code.

Spark Consumer

Spark Structured streaming consists of 3 parts:

  • Source – This is the Kafka from which where we are getting our raw tweets data. 
  • Operations – These are the aggregation operations which we perform on the data. In our case we would be doing a simple groupBy and counting the number of records in the given window.
  • Sink – This refers to the Kafka topic where we would be sending out our data. 

To start with any spark application, we need to create a spark session. Let’s create a spark session with the app name as Twitter count.

1
2
3
4
    spark = SparkSession \
        .builder \
        .appName("TwitterCount") \
        .getOrCreate()

For reading data from a Kafka topic, we would need to create a stream. Specify the settings such as the bootstrap servers and the topic to listen. While fetching from Kafka, the data is returned in the form of key and value. The key being the topic (and partition) and the value is the actual data. So we need to extract data from the single data column into a dataframe.

To do this, we would need to specify the schema of the data. A schema is of StructType which is a list of StructFields.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def get_schema():
    schema = StructType([
        StructField("created_at", StringType()),
        StructField("id", StringType()),
        StructField("text", StringType()),
        StructField("source", StringType()),
        StructField("truncated", StringType()),
        StructField("in_reply_to_status_id", StringType()),
        StructField("in_reply_to_user_id", StringType()),
        StructField("in_reply_to_screen_name", StringType()),
        StructField("user", StringType()),
        StructField("coordinates", StringType()),
        StructField("place", StringType()),
        StructField("quoted_status_id", StringType()),
        StructField("is_quote_status", StringType()),
        StructField("quoted_status", StringType()),
        StructField("retweeted_status", StringType()),
        StructField("quote_count", StringType()),
        StructField("reply_count", StringType()),
        StructField("retweet_count", StringType()),
        StructField("favorite_count", StringType()),
        StructField("entities", StringType()),
        StructField("extended_entities", StringType()),
        StructField("favorited", StringType()),
        StructField("retweeted", StringType()),
        StructField("possibly_sensitive", StringType()),
        StructField("filter_level", StringType()),
        StructField("lang", StringType()),
        StructField("matching_rules", StringType()),
        StructField("name", StringType()),
        StructField("timestamp_ms", StringType())
    ])
    return schema

Once we have the schema in place, we can load the json from the data column as follows:

1
2
3
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

df = df.withColumn("data", from_json(df.value, get_schema())).select("data.*")

One thing to note here is that with spark 2.4+, we don’t need to specify the schema. This example was written with spark 2.3

With our source ready, we can move to operations part. We wanted to calculate the number of tweets per minute. In spark structured streaming, there are concepts of windows and watermark.

A window is time-event grouping. It denotes a logical start and end for a sliding dataframe.

Since streaming data can receive data out of sync, we use a watermark to decide till how long we would accept the data. We can keep the watermark to 10 secs, which would mean that we are not allowing data later than the current max event time – 10s.

To create a window we need to have a timestamp type column. Since our original data had timestamp in UNIXMS, we need to modify that to Timestamp to create a window and define the watermark. 

As per our need, we would be doing a groupBy on the window to get the count of the data. The window length is 1 minute and the sliding interval is 30 secs. It means that we are calculating the number of tweets in a minute for each 30 secs.

1
2
3
4
5
6
7
df = df \
    .withColumn("ts", to_timestamp(from_unixtime(expr("timestamp_ms / 1000")))) \
    .withWatermark("ts", "1 seconds")

windowed_counts = df \
    .groupBy(window(df['ts'], "1 minute", "30 seconds")) \
    .count()

Watermark is the threshold on how late the data is expected in terms of event time.

In our case, the watermark of 1 sec means that any event which is earlier than 1 sec from the max event time faced would be ignored.

To write data to Kafka, we need to do the reverse of what we did while reading the data from Kafka. We need to transform the data so that it is in the form of JSON string. The following code transforms the data and starts the stream.

1
2
3
4
5
6
7
8
df = df.withColumn("value", to_json(struct(df.columns)))

query = df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", ",".join(config["kafka_sink"]["bootstrap_servers"])) \
    .option("checkpointLocation", config["spark_checkpoint"] + "/kafka/") \
    .option("topic", config["kafka_sink"]["topic"]) \
    .start()

Till now, we have created a stream, performed operations on it, specified the sink and started the query. Now if we exit the program here, nothing would happen. The streams are running on spark and your primary thread is free. We need to make sure our application doesn’t exit till the streams (known as query) aren’t terminated. To achieve that we can make use of the following:

1
spark.streams.awaitAnyTermination()

So, now we have started getting our data on Kafka. We need to store this data to InfluxDB.

Writing to influxDB is quite simple. We would first create a InfluxDBWriter Class. This class has two methods – switch_database and write_rows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class InfluxDb:
    def __init__(self, host, database, port=8086):
        self.client = InfluxDBClient(host=host, port=port)
        self.database = database
        self.switch_database()

    def switch_database(self):
        database_list = self.client.get_list_database()
        if not list(filter(lambda x: x["name"] == self.database, database_list)):
            print("database not found. creating database: " + self.database)
            self.client.create_database(self.database)
        self.client.switch_database(self.database)

    def write_rows(self, record):
        item = [{
            "measurement": "Twitter",
            "tags": {
                "topic": "modi"
            },
            "time": record["window"]["start"],
            "fields": {
                "count": record["count"]
            }
        }]
        self.client.write_points(item)

We would also be creating a simple Kafka consumer to fetch the data from Kafka. We would use value_deserializer to transform the incoming records to JSON for easier parsing downstream.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def main():
    config = read_yaml(path)
    logging.info("Starting writing to InfluxDb")

    consumer = KafkaConsumer(
        config["kafka_sink"]["topic"],
        bootstrap_servers=config["kafka_sink"]["bootstrap_servers"],
        enable_auto_commit=True,
        group_id='my-group',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')))

    db = InfluxDb(config["database"]["host"], config["database"]["database"], config["database"]["port"])

    for message in consumer:
        message = message.value
        db.write_rows(message)

Till now have the data in database, the only thing left is to show the data on the UI. For that we would need to add a data source for InfluxDb in Grafana. Follow the steps here

  1. From the top menu click on Dashboards and then new dashboard.
  2.  You would see a small green icon on the page. Click on it and choose “add panel” -> “graph”
  3. In the From choose the name of the database and in the where choose the topic. These would be the same ones which we use in the database writer.
  4. Do select field(count).
  5. Now you should see something like the following:

Click on save dashboard on the bar and it should be saved.

We can play more around various options here.

Hope you find the steps easy and have as much fun as we guys had while developing this. 

We can use Spark Structured Streaming in a variety of places, like analyzing the real time stock prices and comparing them with their historical values.

Or else, we can use analyze two different sources of data to find the correlation between them. 

Possibilities are endless. We’re only limited by our thinking 🙂

You might also like