Ayadi Tahar | Realtime Analysis using Spark Streaming and Kafka

Realtime Analysis using Spark Streaming and Kafka

Publish Date: 2022-10-09


Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions .

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

In our article we will demonstrate how to read data from kafka , processing it through spark streaming , then write it back to kafka.

1. kafka integration

In pour demo, I am using kafka version 3.0.0 and spark version 3.3.0 , but you are free to use the version already available to you.

The Spark Streaming integration for Kafka 0.10 provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata.

But Before moving further, these are the steps which define our demo:

  • a. launch zookeeper and kafka broker
  • b. create a kafka topic named topic1 with it’s corresponding producer ,
  • c. create a topic named topic2 with its consumer which will receive data from spark
  • d. spark streaming receive data from one kafka topic1, do some simple processing, and write result to another kafka topic2

a. launch zookeeper and kafka broker

Zookeeper is a centralized service for distributed coordination, and here we are using the zookeeper associated with kafka installation. to launch zookeeper, just open a terminal and issue this command:


cd $KAFKA_HOME
zookeeper-server-start.sh config/zookeeper.properties

kafka server or kafka broker is the main part of kafka. open another terminal window and execute the next command to launch a kafka broker:


cd $KAFKA_HOME
kafka-server-start.sh config/server.properties

b. create a kafka topic named topic1 with it’s corresponding producer

we need to create a topic named topic1 which it will be the source of data in our application:


kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic topic1 --bootstrap-server localhost:9092
    Created topic topic1.

if we want to get some infos about our just created topic:


kafka-topics.sh --describe --topic topic1 --bootstrap-server localhost:9092
Topic: topic1   TopicId: iwfDfv5zS9mdxre8Cy9XTA PartitionCount: 1       ReplicationFactor: 1    Configs:
    Topic: topic1   Partition: 0    Leader: 0       Replicas: 0     Isr: 0

now, we have to create a producer for topic1 , which it will receive the data from console and write it to topic 1. issue this command in a separate window to do that:


kafka-console-producer.sh --topic topic1 --bootstrap-server localhost:9092
>

c. create a topic named topic2 with it’s consumer

The same way we create topic1, we have to create another topic named topic2 .that topic2 will receive data from spark streaming:


kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic topic2 --bootstrap-server localhost:9092
Created topic topic2.

we can get more info about our topic2:


kafka-topics.sh --describe --topic topic2 --bootstrap-server localhost:9092
Topic: topic2   TopicId: fEaPywBGR8O3r8qsAlgkCg PartitionCount: 1       ReplicationFactor: 1    Configs:
    Topic: topic2   Partition: 0    Leader: 0       Replicas: 0     Isr: 0

our consumer for topic2 will display data in real time as soon as received from spark, to create that consumer run the next command in separate terminal:


kafka-console-consumer.sh --topic topic2 --from-beginning --bootstrap-server localhost:9092

now , after we're done with the setup of kafka topics, we can move to spark and launch our logic with the appropriate dependency.

d. Launch spark-shell and pyspark with kafka integration

to launch spark-shell with kafka integration , first you need to identify your spark installation version and scala corresponding to it. In my case I am using spark 3.3 with scala 2.12 .

so to launch spark-shell, make sure you have access to internet in order to download the required dependencies . issue the next command:


spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0

The same thing goes true for Pyspark:


pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0
... 
...     org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
... :: resolving dependencies :: org.apache.spark#spark-submit-parent-1f1f6500-cf94-41a3-a8c6-92e3579c3e54;1.0
...         confs: [default]
...         found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central
... 
...         org.xerial.snappy#snappy-java;1.1.8.4 from central in [default]
...         ---------------------------------------------------------------------
...         |                  |            modules            ||   artifacts   |
...         |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
...         ---------------------------------------------------------------------
...         |      default     |   12  |   4   |   4   |   0   ||   12  |   0   |
...         ---------------------------------------------------------------------
... 
...     .....[ommitted output ]
...     
... Setting default log level to "WARN".
... To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
... Welcome to
...       ____              __
...      / __/__  ___ _____/ /__
...     _\ \/ _ \/ _ `/ __/  '_/
...    /__ / .__/\_,_/_/ /_/\_\   version 3.3.0
...       /_/
... 
... Using Python version 3.9.12 (main, Apr  5 2022 06:56:58)
... Spark context Web UI available at http://192.168.50.16:4040
... Spark context available as 'sc' (master = local[*], app id = local-1665218751731).
... SparkSession available as 'spark'.
... >>>

through the rest of our dmo we, will use Pyspark with Python3.

2. Spark Logic

after we set up kafka integration with kafka, we are ready to code and test our streaming pipeline. the first thing we do after we import the essential packages is to subscribe to topic1 in order to receive data:


import os
sc.setLogLevel("ERROR")
from pyspark.sql.functions import explode,split

we read streaming data from kafka broker(127.0.0.1) by subscribing to topic1 and saving it to a dataframe (df) :


# Subscribe to 1 topic
df = spark.readStream \
...  .format("kafka") \
...  .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
...  .option("subscribe", "topic1") \
...  .option("failOnDataLoss","false")\
...  .option("startingOffsets","earliest").load()

by default kafka produce events in binary format, we need to deserialize it by casting the key and value to readable format and in our case we convert it to string format:


df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
DataFrame[key: string, value: string]

we can verify the returned data type of df, and it's surely is a dataframe we have:


    

beside key and value returned, kafka produce other meaningful data as well:


df.printSchema()
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

because we have a basic data composed of key and value, we can do simple processing which consist of word count. the next code will split received words based on white space:


## Split the lines into words
lines = df.select(
...          explode(
...              split(df.value, " ")).alias("value")
...          )

to see the schema of returned lines:


lines.printSchema()
root
 |-- value: string (nullable = false)

 

now we can use these values to generate a word count stream:


wordCounts = lines.groupBy("value").count()
wordCounts.printSchema()
root
 |-- value: string (nullable = false)
 |-- count: long (nullable = false)

after we have our word counts ready from previous snippet, we have to write the wordcounts to kafka. However, in order to write Spark Streaming data to Kafka, the value column is required and all other fields are optional.

as we discussed earlier, the columns key and value are binary in kafka; hence we have to serialize and cast them to appropriate format (String in our case) to write them to kafka. If a key column is not specified, then a null valued key column will be automatically added.

Let’s write the wordcounts data to Kafka topic "topic2". Since we have a key-value pair that consist of word-counts pair, let’s convert data to JSON using to_json() function and store it in a value column:


# push results to topic2
wordCounts.selectExpr("CAST(count AS STRING) AS key", "to_json(struct(*)) AS value").\
    writeStream. \
    outputMode("update"). \
    format('kafka'). \
    option('checkpointLocation', 'data/checkpoint9'). \
    option('kafka.bootstrap.servers', '127.0.0.1:9092'). \
    option('topic', 'topic2'). \
    option('failOnDataLoss', 'false'). \
    start(). \
    awaitTermination()

NB: make sure to change the checkpoint location to the local one corresponds to you. if you don't do, then hdfs must be running because spark use hdfs by default

now if you look to consumer 2 of topic2, you will see what is been produced from producer of topic1 is showed in there, as well an update result for the words type.

for example if we type these words in the terminal window of topic1 producer:


carrot
tomatoes
potatoes
carrot
carrot

here is the result shown in terminal window of topic2 consumer:

carrot
{"value":"carrot","count":1}
carrot
tomatoes
{"value":"tomatoes","count":1}
{"value":"potatoes","count":1}
tomatoes
potatoes
carrot
{"value":"carrot","count":3}

Conclusion

as we come to the end of our demo, we have seen how to read data from kafka , processing it through spark streaming and write back results to kafka using a simple word count example.

in our next article ( ان شاء الله ), we will show you more real world example using spark structured streaming using csv files