Unit Testing Kafka Streams

Blog Summary: (AI Summaries by Summarizes)
  • Unit testing Kafka Streams code is important and can be done using the ProcessorTopologyTestDriver.
  • To get started, you need to include the test libraries for Kafka Streams and Kafka in your Maven pom.xml file.
  • Configuration properties are not needed for a unit test, but you can define topic names as variables to reuse them.
  • The code to start processing involves creating a StreamsBuilder, defining a source, applying a filter, and sending the filtered output to a new topic.
  • The ProcessorTopologyTestDriver is used to build the topology and bypass the need for a Kafka cluster and ZooKeeper.

Unit testing your Kafka code is incredibly important. I’ve already written about integration testing, consumer testing, and producer testing. Now, I’m going to share how to unit test your Kafka Streams code.

To start off with, you will need to change your Maven pom.xml file. You’ll need to include the test libraries for Kafka Streams and Kafka itself (there is one annoying class it requires out of it). You should add:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${kafka.version}</version>
    <classifier>test</classifier>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
    <classifier>test</classifier>
</dependency>

Once that’s done, you’ll need to create the unit test itself. Since people were confused by my previous code that dependency injected the code, I’m just going to share this all at once.

You start off by making the usual configuration properties. These aren’t really needed for a unit test.

// Configure KStreams properties
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
          Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
          Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsConfig config = new StreamsConfig(props);

Next, we define the topic names that we are going to use as the input and output. We’re specifying them as a variable because they’re used several times in the test. You’d probably want these to be public static final in production code.

// Define the topics as variables to reuse them
String inputTopic = "filtered-input";
String outputTopic = "filtered-output";

Then we start creating the actual code to start processing. In this case, we’re doing a very simple filter to remove everything whose value doesn’t start with the letter j.

// Start creating and configuring the stream processing
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream(inputTopic);

KStream<String, String> filtered = source.filter((key,
                                   value) -> value.startsWith("j"));

filtered.to(outputTopic);

We build the topology now. Instead of using the normal KafkaStreams object, we’re going to use the ProcessorTopologyTestDriver. This runs bypasses the need for a Kafka cluster and ZooKeeper.

// Create the topology to start testing
Topology topology = builder.build();
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(
    config, topology);

We define the Serializer and Deserializer for the key and value. This is used repeatedly, so we’ll keep these in a variable.

// Define the serializer and deserializer to reuse them
Serializer<String> keySerializer = Serdes.String().serializer();
Serializer<String> valueSerializer = Serdes.String().serializer();

Deserializer<String> keyDeserializer = Serdes.String().deserializer();
Deserializer<String> valueDeserializer =
    Serdes.String().deserializer();

Once all of the setup overhead is done, we can run our unit tests.

The first unit test is to run a value of j through the code. We’ll double check that the output matches the ProducerRecord we’re expecting with a assertEquals.

// Run a test with something that should pass the filter
driver.process(inputTopic, "mykey", "j", keySerializer,
               valueSerializer);

ProducerRecord<String, String> recordPassesFilter = driver.readOutput(
            outputTopic, keyDeserializer,
            valueDeserializer);

assertEquals("The filtered output didn't match",
             new ProducerRecord<>(outputTopic, null, 0L, "mykey", "j"),
             recordPassesFilter);

The second unit test is to run a value of a through the code. We’ll double check that the output is null because a does not start with j. We verify this with an assertNull.

// Run a test with something that shouldn't pass the filter
driver.process(inputTopic, "mykey", "a", keySerializer,
               valueSerializer);

ProducerRecord<String, String> recordFiltered = driver.readOutput(
            outputTopic, keyDeserializer,
            valueDeserializer);
assertNull("The input wasn't filtered correctly", recordFiltered);

driver.close();

Here is the full code for the unit test:

// Configure KStreams properties
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
          Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
          Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsConfig config = new StreamsConfig(props);

// Define the topics as variables to reuse them
String inputTopic = "wordcount-input";
String outputTopic = "wordcount-output";

// Start creating and configuring the stream processing
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream(inputTopic);

KStream<String, String> filtered = source.filter((key,
                                   value) -> value.startsWith("j"));

filtered.to(outputTopic);

// Create the topology to start testing
Topology topology = builder.build();
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(
    config, topology);

// Define the serializer and deserializer to reuse them
Serializer<String> keySerializer = Serdes.String().serializer();
Serializer<String> valueSerializer = Serdes.String().serializer();

Deserializer<String> keyDeserializer = Serdes.String().deserializer();
Deserializer<String> valueDeserializer =
    Serdes.String().deserializer();

// Run a test with something that should pass the filter
driver.process(inputTopic, "mykey", "j", keySerializer,
               valueSerializer);

ProducerRecord<String, String> recordPassesFilter = driver.readOutput(
            outputTopic, keyDeserializer,
            valueDeserializer);

assertEquals("The filtered output didn't match",
             new ProducerRecord<>(outputTopic, null, 0L, "mykey", "j"),
             recordPassesFilter);

// Run a test with something that shouldn't pass the filter
driver.process(inputTopic, "mykey", "a", keySerializer,
               valueSerializer);

ProducerRecord<String, String> recordFiltered = driver.readOutput(
            outputTopic, keyDeserializer,
            valueDeserializer);
assertNull("The input wasn't filtered correctly", recordFiltered);

driver.close();

Related Posts

The Difference Between Learning and Doing

Blog Summary: (AI Summaries by Summarizes)Learning options trading involves data and programming but is not as technical as data engineering or software engineering.Different types of

The Data Discovery Team

Blog Summary: (AI Summaries by Summarizes)Data discovery team plays a crucial role in searching for data in the IT landscape.Data discovery team must make data