Apache-Spark-Streaming-With-Apache-Kafka-DataShark.Academy

Spark Streaming with Kafka

This post may contain affiliate links. Please read our disclosure for more info.

Apache-Spark-Streaming-With-Apache-Kafka-DataShark.Academy-In this post, you will learn about how spark streaming can be integrated with Kafka. Apache Spark is one of the best technology out there to process big data. Spark comes with lots of inbuilt APIs such as Structured APIs which constitute of DataFrames and Datasets & unstructured APIs such as Resilient Distributed Datasets (RDD) which are mostly for low-level programming.

Another amazing API is; Spark streaming. Spark streaming is basically designed to handle micro batches of data in real-time. Before Spark 2.3, a micro batch latencies were as low as 100ms but with latest Spark 2.4, there’s an experimental release of continuous streaming mode. Under Spark’s continuous streaming mode, a micro batch latency can be as low as 1ms. In other words, you get to handle the data in real-time literally.

Apache Kafka is another streaming application which is more or less similar to Spark streaming except the fact that Spark has its own computing engine as well and streaming is a feature on top of computing engine while for Kafka, streaming is the main feature that it is known for and Kafka’s team is just realizing that they would need computing capabilities as well if they want to win the race with Spark. It’s like both products are putting their legs in each other’s core functionalities and a race is on.

Kafka is already widely accepted across the industry and Spark has already made its reputation of the fastest computation engine ever built.

There are many use cases in real world where as a big data engineer you would have to integrate Kafka with Spark and that’s the topic of this post. So, lets dive into it now.

Related Posts

We will start writing a spark application that will accept a stream of data from a Kafka topic and then do some computation.

Spark Application

App Name: SparkStreamingWithKafka

package main.scala.datashark.academy.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.ProcessingTime
object SparkStreamingWithKafka {
  def main(args: Array[String]){
    val spark = SparkSession
                    .builder()
                    .appName("Spark Streaming with Kafka")
                    .getOrCreate()

    //create a structured stream to read data from a kafka topic called 'mytopic'
    val streamingInputDF = spark.readStream
                            .format("kafka")
                            .option("kafka.bootstrap.servers", "localhost:9092")
                            .option("subscribe","mytopic")
                            .load()

Explanation:

In above code, we created a package to keep all the code files for our scala program. Next we imported 2 libraries; org.apache.spark.sql.SparkSession – which will give our program the capability to define a SparkSession object. Another library is org.apache.spark.sql.streaming.ProcessingTime which we will use to control the time to trigger our query later.

You might also like:   Support Vector Machines (SVM): A Powerful Tool for Image Classification

Next we defined a SparkSession object and set the name of our app as “Spark Streaming with Kafka”. This is how you will identify this application in Spark UI at http://localhost:4040 if you are running it in local mode. If you are running it on a production cluster then replace http://localhost with your server address.

The method getOrCreate() basically enforces singleton functionality in our program. In other words, if this object is already existing in memory, then this method will use that object otherwise create a new copy.

After this, we have created a new structured stream using readStream method and as a source we will read the data from Kafka which is running at localhost:9092.

There could be multiple topics available in Kafka at a time. We are interested in reading from only topic called – ‘mytopic’. In case you want to read from multiple topics, then provide each topic name separated by a comma in this format:

.option("subscribe","mytopic1, mytopic2, mytopic3")

That’s it what you need to read the data from a Kafka Stream in Spark.

Complete ElasticSearch Integration with LogStash, Hadoop, Hive, Pig, Kibana and MapReduce - DataSharkAcademy

Word Count Example using Spark Streaming

Next let’s do some computation on this stream of data using Spark’s capabilities before dumping the results on the screen.

import spark.implicits._                        
    val words = streamingInputDF.select("value").as[String].flatMap(_.split(" "))
    val wordCount = words.groupBy("value").count()
    

Explanation:

We need to import spark.implicits because we are going to convert our DataFrame object into a dataset of string types. We need this to use flatMap transformation which works on a string inputs and not on row objects.

In next line, we are selecting the column called ‘value’ (a default column created by Spark for a DataFrame) and casting its values as String. After that we are splitting each line into individual words.

The list of words will be grouped together using groupBy transformation and finally count() action will run the set of transformations and compute the occurrences of each word and store into wordCount DataFrame.

If you run this program now, it will work but you will not see any results. This is because we haven’t added instructions on how to show the results back to us. Let’s do that next.

val query = wordCount.writeStream
                  .format("console")
                  .outputMode("update")
                  .trigger(ProcessingTime("25 seconds"))
                  .start()

    query.awaitTermination()
  }

Explanation:

Next we have created a query object which will basically be the handle to start our Spark streaming application. In this handle, we are defining to write the output of each batch of computed results every 25 seconds on the screen – format (“console”).

There are 3 types of outputModes that are available; complete, update and append. The complete mode will print entire set of records from the result, while append will only show new records (kind of deltas) and update will display only the records which are updated from last batch.

You might also like:   Apache Kafka: A Step-by-Step Guide to Handling Producer and Consumer Failures

In the end, we need to call start() action on this query so the stream can start. But since we don’t want it to end after processing first batch, we have defined query.awaitTermination() which makes this application keep running as long as we don’t stop it manually.

Spark Application Compilation using SBT

Next we need to create a build.sbt file under our application with below content. Place it in root location of the application.

name := "Spark Streaming with Kafka"
version := "1.0"
scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.1" % "provided" withJavadoc(),
"org.apache.spark" %% "spark-sql" % "2.1.1" % "provided" withJavadoc(),
"org.apache.spark" %% "spark-hive" % "2.1.1" % "provided" withJavadoc(),
"org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided" withJavadoc(),
)

Related Posts:

Compile Spark Application

Open the terminal and type below command to compile this Spark application

sbt compile package

You can also use maven instead of sbt if you are comfortable with it. It will work same.

It may take a while to compile depending upon your internet connection. After compilation is completed successfully, you should see a JAR file created as:

<your project path>/target/scala-2.11/spark-streaming-with-kafka_2.11-1.0.jar

Awesome, you have created a Spark Streaming application that will work with Apache Kafka.

Apache Kakfa

Before we can run this application, we need to set up Kafka topic. So, lets move on to it next.

Since Apache Kafka needs Zookeeper so first we will start the Zookeeper server as follows:

Start Zookeeper

Open terminal and go to the location where kafka is installed and run this command:

./bin/zookeeper-server-start.sh config/zookeeper.properties

If everything is good, you should see a Zookeeper instance running. You can check it as:

jps

Result
66647 Jps
66363 QuorumPeerMain
59646 RunJar

Here’ QuorumPeerMain is Zookeeper instance. Great we have it running.

Start Apache Kafka Server

Next we need to start Apache Kafka server. We will follow same steps here as we did for Zookeeper. Go to the path where Kafka is installed and run following command:

./bin/kafka-server-start.sh config/server.properties

If everything is good, you should see something like this:

..

..

…

INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

Related Posts

If you have got this message, then it means Kafka service is running on your local machine. Congratulations.

Mastering Apache Sqoop with Hortonworks Sandbox, Hadoo, Hive & MySQL - DataShark.Academy

Create Kafka Topic

You can imagine a Kafka topic as a queue where packets of data are coming from one end and emitting from other end.

Apache Kafka Topic - DataShark.Academy

source: kafka.apache.org

Now we are ready to create a Kafka topic. In your terminal type following command to create a topic:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
                      --replication-factor 1 \
                      --partitions 1 \
                      --topic mytopic


Result

Created topic "mytopic"

If you see above message that topic is created, then we are good so far.

Create Kafka Producer

Next we need to create a Kafka producer which will produce data streams for our newly created topic. It’s very simple to create the producer. Open a new terminal and run below command:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic

Awesome, now we have a producer that will publish a stream of data to our topic called ‘mytopic’.

You might also like:   How to use OrderedDict class in Python

What about Kafka Consumer

We don’t need a Kafka consumer in this case as our spark application will consume the data stream directly from Kafka topic.

Now we have everything setup for testing our Spark streaming application with Kafka.

Running Spark Streaming Application

Let’s start our spark streaming application now and later we will send some data packets from Kafka and see if our spark streaming application is working with Kafka or not.

Start Spark Streaming Application

Open the terminal, and run following command to start our spark application:

spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 \
--class main.scala.datashark.academy.streaming.SparkStreamingWithKafka \
/SparkStreamingWithKafka/target/scala-2.11/spark-streaming-with-kafka_2.11-1.0.jar

Spark-submit in above command is the utility that comes pre-packaged with Spark distribution. You will need it to run any spark application JAR.

The next line –packages <Kafka package> is basically to ensure that executor node has the necessary libraries downloaded and available on it as we are using kafka as our data source.

P.S: We have noticed that with spark 2.x and scala 2.11, you must use this package otherwise kafka integration won’t work. If you have other tricks, please feel free to let us know in comments below. But this one worked perfectly well for us.

Next is the –class directive which has the value of the main class in our spark streaming application. Finally you need to provide the jar for the spark streaming application. This is the same jar that we had compiled a few minutes ago.

After you have typed this, let’s press enter and you should get following screen on the prompt:

Apache Spark Streaming With Kafka Tutorial 0 - DataSharkAcademy

This means that our spark stream application is waiting for data from Kafka at the moment. So, now lets switch to the terminal where we started Kafka producer and enter some random strings.

Apache Kafka Producer example - DataSharkAcademy

As soon as we type something on Kafka producer terminal, spark console goes crazy with log messages on the screen. Finally you will see something like this indicating word count for each of the word from the sentences we provided from Kafka producer.

Apache Spark Streaming With Kafka Tutorial 1 - DataSharkAcademy

Add some recurring words in next sentence and you will notice the count has increased for those words.

Apache Spark Streaming With Kafka Tutorial 3 - DataSharkAcademy

If you remember while creating spark streaming application, we set the outputMode(‘update’) which means you will only get the records which are updated in the final result. To demonstrate that, let us just type following on Kafka producer terminal:

> spark spark awesome awesome

Here’s the output from Spark terminal:

Apache Spark Streaming With Kafka Tutorial 4 - DataSharkAcademy

Great. In this post you have learned how to integrate spark stream with Kafka. Although both companies are racing to become the best streaming engine in the world but this is simply amazing to see that both of them work so nicely together as well. This is another example of amazing software engineering. kudos to both the teams.


[jetpack-related-posts]
Scroll to top