According to its own site, "Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters". In simplified terms, Kafka is a publish-subscribe system oriented to streams processing. We can think of Kafka as a kind of crossroads, where information travels from one application to another. For example, from a large-scale distributed microservice application to a monitoring system.
In this post I assume that you have been able to start Kafka and that it is running on your localhost on port 9092, together with Zookeeper, which is available on port 2181. There are sites dedicated to running Kafka, so I will overlook that issue.
I will solve a couple of exercises:
- Counting the occurrences of each key
- Converting the output from Long to String
- Reduce()
- Materialized views
- Windowed streams
Overview
In the end we want to have the following arrangement:
Here, a producer is writing content to the topic, a dedicated application is reading the data from the topic (possibly in parallel with other applications) and outputting results to a second topic. In this second topic, we will have a dedicated consumer waiting to get the results computed by the streams application (in the middle).
To reach this setting, we start from the right. We can resort to shell applications provided with Kafka, to run a consumer waiting on the
resultstopic topic. I did it with the following command:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic resultstopic
Note that the exact way of running this command depends on how you ran Kafka. It might happen that this fails to work, as I saw in some of the students' computers. In that case you might want to create a subscriber yourself. Please resort to other tutorials on Kafka to do that.
Before we reach the streams application, we will discuss the producer, because we need to see what the producer is going to put on the
kstreamstopic. Let's reuse one that comes from the Kafka tutorials. Just don't run it yet, it will be the last piece of the puzzle:
package is.kafkastreamsclass;
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", "localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", "all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.LongSerializer");
Producer<String, Long> producer = new KafkaProducer<>(props);
for(int i = 0; i < 1000; i++)
producer.send(new ProducerRecord<String, Long>(topicName, Integer.toString(i), (long) i));
System.out.println("Message sent successfully to topic " + topicName);
producer.close();
}
}
This producer is somewhat dull, as it just sends a 1000 (key, value) pairs to the topic, where the key is a string and the value a long, but for now it will be enough for our experiments. I represent the output of the producer as follows:
"0" => 0
"1" => 1
"2" => 2
...
"999" => 999
Counting the occurrences of each key
Our focus here is the stream reader. Let us start by a basic one:
package is.kafkastreamsblog;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
public class SimpleStreamsExercises {
public static void main(String[] args) throws InterruptedException, IOException {
String topicName = args[0].toString();
String outtopicname = "resultstopic";
java.util.Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exercises-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> lines = builder.stream(topicName);
KTable<String, Long> outlines = lines.
groupByKey().count();
outlines.toStream().to(outtopicname);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
System.out.println("Reading stream from topic " + topicName);
}
}
Don't forget the pom.xml file:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>is</groupId>
<artifactId>kafkastreamsblog</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafkastreamsclass</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>10</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>10</release>
<!-- <compilerArgs> <arg>add-modules</arg> <arg>javax.xml.bind</arg>
</compilerArgs> -->
</configuration>
</plugin>
</plugins>
</build>
</project>
To understand this application, we need to take a look at a diagram available on the Kafka Streams site, which summarizes the relation between the library's main classes:
The groupByKey operation converts the stream to a KGroupedStream, by creating records of values indexed by the keys. In our case, since the producer will output 1000 different keys, each key will have a single record (a 0 for key 0, a 1 for key 1, a value 2 for key 2 and so on). Hence, the following count() will compute 1 for all keys and convert the KGroupedStream into a KTable, which is similar to a regular database table, having the key as the primary key. This table will have 1000 registers, each with the value 1. To see the result, we cover the KTable back to a KStream using the toStream().
To run the experiment, we need to specify the topic where the streams application will receive the data, as a command line argument. The same for the producer. A lack to do this will crash the programs. In my case, I used
kstreamstopic, but you may use another topic. Just start the applications in this order:
1 - Kafka-console-consumer.sh
2 - then, SimpleStreamsExercises
3 - finally, the Producer.
Regarding the issue of the order at which applications start, Kafka keeps messages on the topic for a configurable amount of time, so we could always get the messages from the topic later, if necessary. You may also repeat the execution of the Producer as many times as you want, with only slight changes in the results (the value of the count() will keep increasing).
Converting from Long to String
But if you run this setting you may end up getting nothing on the Kafka-console-consumer, except 1000 empty lines. Why? Because we are outputting longs instead of strings and, therefore, you will be looking at ASCII character 1. Let us change our code slightly, to ensure that we can properly see the results of our operation:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> lines = builder.stream(topicName);
KTable<String, Long> outlines = lines.groupByKey().count();
outlines.mapValues(v -> "" + v).toStream().to(outtopicname, Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
What is new here? The mapValues(), which uses a lambda expression to transform the long value "v" into a String. However, this change alone crashes the program, because we specified the DEFAULT_VALUE_SERDES to be a Long. Hence, the attempt to write a String on the outtopicname Stream will crash the program. Therefore, we need te explicitly tell the library that we are producing the output with a String fromat (the Produced.with in the end). In other words, the final stream has a format that is different from the initial stream and from the KTable. These two had Long values, while the final stream has a String value.
Now, you should see this in the Kafka-console-consumer shell:
3
3
3
3
3
...
or whatever number of times you ran the whole application, instead of 3 (e.g., I'm actually seeing a thousand 10s).
This is still not very handy, because we cannot see the keys. To see them we may change the lambda expression in the mapValues to become:
mapValues((k, v) -> k + " => " + v)
i.e., it receives the key-value pair and replaces the value (because the function is "mapValues") by the string with the key, the arrow and the value, which is much nicer (don't worry about the 12 your case should be different, perhaps smaller):
...
989 => 12
990 => 12
991 => 12
992 => 12
993 => 12
994 => 12
995 => 12
996 => 12
997 => 12
998 => 12
999 => 12
Reduce()
What about summing all the values of a given key?
KTable<String, Long> outlines = lines.
groupByKey().
reduce((oldval, newval) -> oldval + newval);
outlines.mapValues((k, v) -> k + " => " + v).toStream().to(outtopicname, Produced.with(Serdes.String(), Serdes.String()));
Swap the count() by a reduce(). The lambda expression in the reduce keeps accumulating the new values that show up for the key. The reduce stores the result as it is stateful (mind the legend in the figure before, regarding the reduce()). For example, you might see the following output in Kafka-console-consumer shell:
985 => 3940
986 => 3944
987 => 3948
988 => 3952
989 => 3956
990 => 3960
991 => 3964
992 => 3968
993 => 3972
994 => 3976
995 => 3980
996 => 3984
997 => 3988
998 => 3992
999 => 3996
Materialized views
Now, the case for a materialized view. Materialized views actually allow us to query the tables, either directly by reaching for the value of a key, or in ranges, as show in this case:
package is.kafkastreamsblog;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
public class SimpleStreamsExercises {
private static final String tablename = "exercises";
public static void main(String[] args) throws InterruptedException, IOException {
String topicName = args[0].toString();
String outtopicname = "resultstopic";
java.util.Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exercises-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> lines = builder.stream(topicName);
KTable<String, Long> countlines = lines.
groupByKey().
reduce((oldval, newval) -> oldval + newval, Materialized.as(tablename));
countlines.mapValues(v -> "" + v).toStream().to(outtopicname, Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
System.out.println("Press enter when ready...");
System.in.read();
while (true) {
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(tablename, QueryableStoreTypes.keyValueStore());
System.out.println("count for 355:" + keyValueStore.get("355"));
System.out.println();
// Get the values for a range of keys available in this application instance
KeyValueIterator<String, Long> range = keyValueStore.range("880", "980");
while (range.hasNext()) {
KeyValue<String, Long> next = range.next();
System.out.println("count for " + next.key + ": " + next.value);
}
range.close();
Thread.sleep(30000);
}
}
}
After sending a few more messages with the Producer, and pressing Enter, we get this result
on the streams application:
Press enter when ready...
count for 355:355
count for 880: 880
count for 881: 881
count for 882: 882
count for 883: 883
count for 884: 884
count for 885: 885
count for 886: 886
count for 887: 887
count for 888: 888
count for 889: 889
count for 89: 89
count for 890: 890
count for 891: 891
count for 892: 892
count for 893: 893
count for 894: 894
count for 895: 895
count for 896: 896
count for 897: 897
count for 898: 898
count for 899: 899
count for 9: 9
count for 90: 90
count for 900: 900
count for 901: 901
count for 902: 902
count for 903: 903
count for 904: 904
count for 905: 905
count for 906: 906
count for 907: 907
count for 908: 908
count for 909: 909
count for 91: 91
count for 910: 910
count for 911: 911
count for 912: 912
count for 913: 913
count for 914: 914
count for 915: 915
count for 916: 916
count for 917: 917
count for 918: 918
count for 919: 919
count for 92: 92
count for 920: 920
count for 921: 921
count for 922: 922
count for 923: 923
count for 924: 924
count for 925: 925
count for 926: 926
count for 927: 927
count for 928: 928
count for 929: 929
count for 93: 93
count for 930: 930
count for 931: 931
count for 932: 932
count for 933: 933
count for 934: 934
count for 935: 935
count for 936: 936
count for 937: 937
count for 938: 938
count for 939: 939
count for 94: 94
count for 940: 940
count for 941: 941
count for 942: 942
count for 943: 943
count for 944: 944
count for 945: 945
count for 946: 946
count for 947: 947
count for 948: 948
count for 949: 949
count for 95: 95
count for 950: 950
count for 951: 951
count for 952: 952
count for 953: 953
count for 954: 954
count for 955: 955
count for 956: 956
count for 957: 957
count for 958: 958
count for 959: 959
count for 96: 96
count for 960: 960
count for 961: 961
count for 962: 962
count for 963: 963
count for 964: 964
count for 965: 965
count for 966: 966
count for 967: 967
count for 968: 968
count for 969: 969
count for 97: 97
count for 970: 970
count for 971: 971
count for 972: 972
count for 973: 973
count for 974: 974
count for 975: 975
count for 976: 976
count for 977: 977
count for 978: 978
count for 979: 979
count for 98: 98
count for 980: 980
This seems awkward, because the 98 shows up between the 979 and the 980, but keep in mind that the keys are strings.
Windowed streams
What if we want to restrict the results to the last x minutes, being x variable? In this case we should do as follows:
package is.kafkastreamsblog;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
public class SimpleStreamsExercises {
public static void main(String[] args) throws InterruptedException, IOException {
String topicName = args[0].toString();
String outtopicname = "resultstopic";
java.util.Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exercises-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> lines = builder.stream(topicName);
KTable<Windowed<String>, Long> addvalues = lines.
groupByKey().
windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1))).
reduce((aggval, newval) -> aggval + newval, Materialized.as("lixo"));
addvalues.toStream((wk, v) -> wk.key()).map((k, v) -> new KeyValue<>(k, "" + k + "-->" + v)).to(outtopicname, Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
System.out.println("Reading stream from topic " + topicName);
}
}
We are basically applying a window of 1 minute to the results, and therefore we may get:
988 => 988
989 => 989
990 => 990
991 => 991
992 => 992
993 => 993
994 => 994
995 => 995
996 => 996
997 => 997
998 => 998
999 => 999
I.e., the sum of the values of the last minute. You may play with this value and change it for 10 minutes for example. You will notice that the results might differ (even without the need to send new messages with the producer):
988 => 2964
989 => 2967
990 => 2970
991 => 2973
992 => 2976
993 => 2979
994 => 2982
995 => 2985
996 => 2988
997 => 2991
998 => 2994
999 => 2997
In fact several variants of windows exist, but I will not cover them here.