Unit Testing Kafka Streams

Blog Summary: (AI Summaries by Summarizes)
  • Unit testing Kafka Streams code is important and can be done using the ProcessorTopologyTestDriver.
  • To get started, you need to include the test libraries for Kafka Streams and Kafka in your Maven pom.xml file.
  • Configuration properties are not needed for a unit test, but you can define topic names as variables to reuse them.
  • The code to start processing involves creating a StreamsBuilder, defining a source, applying a filter, and sending the filtered output to a new topic.
  • The ProcessorTopologyTestDriver is used to build the topology and bypass the need for a Kafka cluster and ZooKeeper.

Unit testing your Kafka code is incredibly important. I’ve already written about integration testing, consumer testing, and producer testing. Now, I’m going to share how to unit test your Kafka Streams code.

To start off with, you will need to change your Maven pom.xml file. You’ll need to include the test libraries for Kafka Streams and Kafka itself (there is one annoying class it requires out of it). You should add:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${kafka.version}</version>
    <classifier>test</classifier>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
    <classifier>test</classifier>
</dependency>

Once that’s done, you’ll need to create the unit test itself. Since people were confused by my previous code that dependency injected the code, I’m just going to share this all at once.

You start off by making the usual configuration properties. These aren’t really needed for a unit test.

// Configure KStreams properties
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
          Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
          Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsConfig config = new StreamsConfig(props);

Next, we define the topic names that we are going to use as the input and output. We’re specifying them as a variable because they’re used several times in the test. You’d probably want these to be public static final in production code.

// Define the topics as variables to reuse them
String inputTopic = "filtered-input";
String outputTopic = "filtered-output";

Then we start creating the actual code to start processing. In this case, we’re doing a very simple filter to remove everything whose value doesn’t start with the letter j.

// Start creating and configuring the stream processing
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream(inputTopic);

KStream<String, String> filtered = source.filter((key,
                                   value) -> value.startsWith("j"));

filtered.to(outputTopic);

We build the topology now. Instead of using the normal KafkaStreams object, we’re going to use the ProcessorTopologyTestDriver. This runs bypasses the need for a Kafka cluster and ZooKeeper.

// Create the topology to start testing
Topology topology = builder.build();
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(
    config, topology);

We define the Serializer and Deserializer for the key and value. This is used repeatedly, so we’ll keep these in a variable.

// Define the serializer and deserializer to reuse them
Serializer<String> keySerializer = Serdes.String().serializer();
Serializer<String> valueSerializer = Serdes.String().serializer();

Deserializer<String> keyDeserializer = Serdes.String().deserializer();
Deserializer<String> valueDeserializer =
    Serdes.String().deserializer();

Once all of the setup overhead is done, we can run our unit tests.

The first unit test is to run a value of j through the code. We’ll double check that the output matches the ProducerRecord we’re expecting with a assertEquals.

// Run a test with something that should pass the filter
driver.process(inputTopic, "mykey", "j", keySerializer,
               valueSerializer);

ProducerRecord<String, String> recordPassesFilter = driver.readOutput(
            outputTopic, keyDeserializer,
            valueDeserializer);

assertEquals("The filtered output didn't match",
             new ProducerRecord<>(outputTopic, null, 0L, "mykey", "j"),
             recordPassesFilter);

The second unit test is to run a value of a through the code. We’ll double check that the output is null because a does not start with j. We verify this with an assertNull.

// Run a test with something that shouldn't pass the filter
driver.process(inputTopic, "mykey", "a", keySerializer,
               valueSerializer);

ProducerRecord<String, String> recordFiltered = driver.readOutput(
            outputTopic, keyDeserializer,
            valueDeserializer);
assertNull("The input wasn't filtered correctly", recordFiltered);

driver.close();

Here is the full code for the unit test:

// Configure KStreams properties
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
          Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
          Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsConfig config = new StreamsConfig(props);

// Define the topics as variables to reuse them
String inputTopic = "wordcount-input";
String outputTopic = "wordcount-output";

// Start creating and configuring the stream processing
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream(inputTopic);

KStream<String, String> filtered = source.filter((key,
                                   value) -> value.startsWith("j"));

filtered.to(outputTopic);

// Create the topology to start testing
Topology topology = builder.build();
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(
    config, topology);

// Define the serializer and deserializer to reuse them
Serializer<String> keySerializer = Serdes.String().serializer();
Serializer<String> valueSerializer = Serdes.String().serializer();

Deserializer<String> keyDeserializer = Serdes.String().deserializer();
Deserializer<String> valueDeserializer =
    Serdes.String().deserializer();

// Run a test with something that should pass the filter
driver.process(inputTopic, "mykey", "j", keySerializer,
               valueSerializer);

ProducerRecord<String, String> recordPassesFilter = driver.readOutput(
            outputTopic, keyDeserializer,
            valueDeserializer);

assertEquals("The filtered output didn't match",
             new ProducerRecord<>(outputTopic, null, 0L, "mykey", "j"),
             recordPassesFilter);

// Run a test with something that shouldn't pass the filter
driver.process(inputTopic, "mykey", "a", keySerializer,
               valueSerializer);

ProducerRecord<String, String> recordFiltered = driver.readOutput(
            outputTopic, keyDeserializer,
            valueDeserializer);
assertNull("The input wasn't filtered correctly", recordFiltered);

driver.close();

Related Posts

The Difference Between Learning and Doing

Blog Summary: (AI Summaries by Summarizes)There are several types of learning videos: hype, low effort, novice, and professional.It is important to avoid hype, low-effort, and

The Data Discovery Team

Blog Summary: (AI Summaries by Summarizes)The concept of a “data discovery team” is introduced, which focuses on searching for data in an enterprise data reality.Data

Black and white photo of three corporate people discussing with a view of the city's buildings

Current 2023 Announcements

Blog Summary: (AI Summaries by Summarizes)Confluent’s Current Conference featured several announcements that are important for both technologists and investors.Confluent has two existing moats (replication and

zoomed in line graph photo

Data Teams Survey 2023 Follow-Up

Blog Summary: (AI Summaries by Summarizes)Many companies, regardless of size, are using data mesh as a methodology.Smaller companies may not necessarily need a data mesh

Laptop on a table showing a graph of data

Data Teams Survey 2023 Results

Blog Summary: (AI Summaries by Summarizes)A survey was conducted between January 24, 2023, and February 28, 2023, to gather data for the book “Data Teams”

Black and white photo of three corporate people discussing with a view of the city's buildings

Analysis of Confluent Buying Immerok

Blog Summary: (AI Summaries by Summarizes)Confluent has announced the acquisition of Immerok, which represents a significant shift in strategy for Confluent.The future of primarily ksqlDB

Tall modern buildings with the view of the ocean's horizon

Brief History of Data Engineering

Blog Summary: (AI Summaries by Summarizes)Google created MapReduce and GFS in 2004 for scalable systems.Apache Hadoop was created in 2005 by Doug Cutting based on