How to get started with kafka Streams

 · 12 mins read

Photo by Fernando Jorge On Unsplash

You can also check out my Youtube Channel to get access to various tutorials created by me.

In this article we will see how to use kafka streams to do some real time data processing on data present in a kafka topic.

What is kafka streams

Kafka streams helps build real time applications on top of data which is present in kafka.

kafka streams reads a continuous stream of data from kafka, does some processing on this stream of data and outputs the result back to kafka.

Pre-requisites

Please ensure that you have Maven and Java installed in your System

What kafka streams application are we planning to build?

We will build a system which will work as follows:

  • There will be an input kafka topic in which you will get video titles with number of views per video
  • There will be a kafka streams application which reads the video views data and counts the number of views each video is getting.
  • Kafka streams will write the total video view data into an output topic
  • The output topic at any point will have the total number of views each of our videos have got.

For example if the input is as follows ( video and count are separated by a :)

Video1:1
Video2:20
Video1:23
Video3:8
Video2:5

Then the output will be as follows:

Video1:24
Video2:25
Video3:8

Setup

Download Kafka

Download kafka 2.8.0 from https://kafka.apache.org/downloads

wget https://apachemirror.wuchna.com/kafka/2.8.0/kafka_2.12-2.8.0.tgz

Extract kafka

tar xzf  kafka_2.12-2.8.0.tgz

Open the kafka folder. All kafka commands should be run in the kafka folder

cd kafka_2.12-2.8.0

Setup zookeeper and Kafka

Start zookeeper using the following command:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

Zookeeper starts on port 2181

Start kafka using the following command:

bin/kafka-server-start.sh -daemon config/server.properties

Kafka runs on port 9092. For this example we will use only one kafka instance.

Create the input and output topic for the application

Let us create an input topic from which the kafka streams application will read data.

This is done using the following command:

bin/kafka-topics.sh --create --topic input-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092

The topic name is input-topic and it has 1 replica and 1 partition

Let us also create an output topic, into which the kafka streams application will write the result to.

This is done using the following command:

bin/kafka-topics.sh --create --topic output-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092

The topic name is output-topic and it has 1 replica and 1 partition

We can list out all the topics created using the following command:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

The above command will print the following:

input-topic
output-topic

Writing the kafka streams application

Create a new Java Maven project in your IDE. If you want to know more about maven you can read this beginners guide to maven

pom.xml

In the pom.xml file, add the following dependencies. This includes kafka streams library and the log4j library

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.8.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
</dependencies>

Below is the complete pom.xml file.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafka-streams-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                        <configuration>
                            <archive>
                                <manifest>
                                    <mainClass>KafkaStreamsDemo</mainClass>
                                </manifest>
                            </archive>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

In the above file we have added the maven compiler plugin to support java 1.8

We have also added the maven assembly plugin so that we can build a standalone executable jar for the kafka streams application.

Kafka Streams Java Code

Here is the complete Java code for the kafka streams application

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class KafkaStreamsDemo {
    public static void main(String[] args) {
        BasicConfigurator.configure();
        Logger.getRootLogger().setLevel(Level.INFO);

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> views = builder.stream(
                "input-topic",
                Consumed.with(stringSerde, stringSerde)
        );

        KTable<String, Long> totalViews = views
                .mapValues(v -> Long.parseLong(v))
                .groupByKey(Grouped.with(stringSerde, longSerde))
                .reduce(Long::sum);

        totalViews.toStream().to("output-topic", Produced.with(stringSerde, longSerde));

        final Properties props = new Properties();
        props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-totalviews");
        props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);

        final CountDownLatch latch = new CountDownLatch(1);

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }

        Runtime.getRuntime().addShutdownHook(new Thread("streams-totalviews") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        System.exit(0);
    }
}

Let me go over the code step by step.

First we initialise a default logging mechanism using the following lines of code:

BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.INFO);

Next we setup String, Long Serdes to help with serialising and deserialsing the kafka data

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

Next we build the kafka streams topology. This is where we specify the logic:

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> views = builder.stream(
        "input-topic",
        Consumed.with(stringSerde, stringSerde)
);

KTable<String, Long> totalViews = views
        .mapValues(v -> Long.parseLong(v))
        .groupByKey(Grouped.with(stringSerde, longSerde))
        .reduce(Long::sum);

totalViews.toStream().to("output-topic", Produced.with(stringSerde, longSerde));

First we create a StreamsBuilder object

Then we are creating a KStream object called views to read data from the kafka topic input-topic. In this topic both the key and value are strings. Hence we use string serde for both the key and the value

Next we are created a KTable object with key as string and value as long. This is the object where we will store our computation result.

We are using java lambda expressions to achieve the following

  • Convert the value to Long, since the value here is the video view count
  • Group all the same video titles together.
  • For the Grouped video title, calculate the sum.

Finally the total video view count is written to a kafka topic called output-topic. In this topic, the key is a string and the value is a Long.

Next we create a KafkaStreams object using the following code:

final Properties props = new Properties();
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-totalviews");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

final KafkaStreams streams = new KafkaStreams(builder.build(), props);

In the above code we are giving the application an id of streams-totalviews. This will help us in identifying the application.

We are also giving the kafka bootstrap server details as localhost:9092 since this is where our kafka is running.

Then we create a KafkaStreams object called streams

Until now we have just created the kafka streams flow, but we have not yet started it.

The Next few lines of code, starts the kafka streams application

final CountDownLatch latch = new CountDownLatch(1);

try {
    streams.start();
    latch.await();
} catch (final Throwable e) {
    System.exit(1);
}

Runtime.getRuntime().addShutdownHook(new Thread("streams-totalviews") {
    @Override
    public void run() {
        streams.close();
        latch.countDown();
    }
});

System.exit(0);

We start the flow using streams.start()

We also created a Shutdown Hook in the above lines of code. This ensures that if the application is abnormally shutdown, then the kafka streams application will close properly using the streams.close() function.

Running the Kafka streams application

Build the kafka streams application using the following command:

mvn clean package

This will create a file called kafka-streams-demo-1.0-SNAPSHOT-jar-with-dependencies.jar in the target folder

Run the application in a terminal using the following command

java -jar target/kafka-streams-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

Ensure you give the full path to your jar file

In a second terminal, start a kafka producer using the following command

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
    --topic input-topic \
    --property key.separator=":" \
    --property parse.key=true 

We will be producing into the kafka topic input-topic. Also we will separate the video title from the view count by a :

In a third terminal, start a kafka consumer using the following command

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic output-topic \
    --property print.key=true \
    --property key.separator="-" \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

We will be consuming from the topic output-topic. In the output, the video title will be separated from the total video count with a -. Also in the output, the key is a string and the value is a Long.

In the Producer terminal send the following

Video1:5
Video2:10
Video1:4
Video1:0
Video2:7
Video3:12
Video1:7

In the Consumer terminal you will get the following output

Video2-17
Video3-12
Video1-16

If we further produce the following in the producer:

Video1:51
Video2:12
Video1:41
Video3:15

We will get the following in the consumer terminal

Video2-29
Video1-108
Video3-27

We can see that the kafka streams application is aggregating all the view counts based on the video title.

Code

The code for the kafka streams application created in this blog can be found in this Github Repo

Congrats 😊

You have now learnt how to use kafka streams

If you are interested in realtime analytics using kafka and Apache Pinot, feel free to checkout my course in Udemy by clicking on the Image below

Apache Pinot a hands on course

Feel free to connect with me in LinkedIn or follow me in Twitter

You can also check out my Youtube Channel to get access to various tutorials created by me.