Unit testing your Kafka code is incredibly important. It’s transporting your most important data. This is especially true for your Consumer
s. They are the end point for using the data. There are often many different Consumer
s using the data. You’ll want to unit test all of them.
In a previous post, I showed you how to unit test Producers
.
Refactoring Your Consumers
First of all, you’ll need to be able to change your Consumer
at runtime. Instead of using the KafkaConsumer
object directly, you’ll use the Consumer
interface.
public Consumer<String, String> consumer;
You can use whichever method for dependency injection, but I’m making the Consumer
public so I can change it from the unit test.
Next, you’ll want to refactor the code for creating your KafkaConsumer
. The creation of the KafkaConsumer
should be in separate method that won’t get called by your production Consumer
code.
You’ll also need to refactor the code that consumes the data from the Consumer
object. This code will need to be callable from the unit test. Also, the Consumer
object often consumes in an infinite loop (while (true)
). You need to refactor the actual consumption code so it doesn’t get stuck in an infinite loop.
Unit Testing Your Consumer
Kafka unit tests of the Consumer
code use MockConsumer
object. The @Before
will initialize the MockConsumer
before each test.
MockConsumer<String, String> consumer;
@Before
public void setUp() {
consumer = new MockConsumer<String, String>(OffsetResetStrategy.EARLIEST);
}
Have you been searching for the best data engineering training? You’ve found it. Sign up for my list so you can get my Professional Data Engineering course.
Once we’ve set the objects up, we can start testing.
@Test
public void testConsumer() throws IOException {
// This is YOUR consumer object
MyTestConsumer myTestConsumer = new MyTestConsumer();
// Inject the MockConsumer into your consumer
// instead of using a KafkaConsumer
myTestConsumer.consumer = consumer;
consumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition("my_topic", 0), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.addRecord(new ConsumerRecord<String, String>("my_topic",
0, 0L, "mykey", "myvalue0"));
consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
1L, "mykey", "myvalue1"));
consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
2L, "mykey", "myvalue2"));
consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
3L, "mykey", "myvalue3"));
consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
4L, "mykey", "myvalue4"));
// This is where you run YOUR consumer's code
// This code will consume from the Consumer and do your logic on it
myTestConsumer.consume();
// This just tests for exceptions
// Somehow test what happens with the consume()
}
We start off by instantiating the Consumer
we’re wanting to test. We inject our MockConsumer
into the Consumer
. Then, the MockConsumer'
s topic, partitions, and beginning offsets need to be set up. We send some data with the Consumer
. All of the data added by the MockConsumer
will be consumed by the Consumer
. We call the addRecord()
method for every ConsumerRecord
we want the Consumer
to see. Finally, we consume the data.
A quick note that this test only validates that the Consumer
doesn’t throw an exception while processing this data. To verify the actual processing or output, you may need to mock another object or gather the output in a last and run your assertions.
I get this exception when I code under test try to consume.
java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
I faced the same issue, the workaround for this is to assign
new TopicPartition(topic, 0)
to a variable and use that. Following code works for me:TopicPartition topicPartition = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(topicPartition));
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.addRecord(new ConsumerRecord<>(topic, 0, 0L, "some-ref", "some-order"));
but there is no
consume
method inConsumer
class. What should we use instead of it?I added a comment clarifying that MyTestConsumer is your code. The
consume
method is where your code starts to consume and process your messages.Thank you Jesse your blogs have been extremely helpful getting me off the ground. The coverage of my tests is quite low due to differing construction requirements for the mock and concrete implementations but its a start.
Regarding the Mock Producer and Consumer i understand they are now deprecated I don’t suppose you have any blogs on the new testing framework?
Thanks again
I hadn’t heard about any deprecation and my quick checks didn’t show any deprecations in the JavaDocs. Where are you seeing this?
For the first time, thank you about your info.
I’m really interested about Kafka and I’m trying to do a consumer and producer unit tests right now but I’m having some problems and doubts after read a lot of information about Kafka:
1)What is the difference between MockConsumer class/MockProducer class and EmbeddedKafka?
2)I can to do unit test with the both ways…right?
3)Could I do a unit test without my real consumer? I mean, using the MockConsumer and MockProducer classes..
Thanks
Embedded spins up a Kafka cluster. You could unit test this way, but I’d prefer mocking for unit tests. Yes, you can unit test any real consumer as shown here.
Do you even run the code you write here? I’ve followed a couple of your tutorials and they’ve always been broken…
I just recomplied and reran this exact unit test. Not only did it compile, but the unit test passed. It passed the tests when I first did the post too. I suggest you try again, actually learn Kafka, learn how to read comments in code, or maybe you need to learn how to code. Good luck!
Hi Jesse, great post I’ve attended your course on Kafka and learned quite a bit.
I have a quick question, I don’t seem to have consumer.updateBeginningOffsets in my version of Kafka 1.1.0, do you know which versions of Kafka this was added or removed and what the workaround could potentially be?
It in Kafka 1.1. Make sure you’re using the right object and arguments. https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/MockConsumer.html#updateBeginningOffsets-java.util.Map-
Hi,
I’m having same issue of not having MockConsumer updateBeginningOffsets and addRecord(..) !
Do I need to use specific version of kafka for unit test?
Thanks
The methods are still there as of Kafka 2.3 https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/MockConsumer.html#updateBeginningOffsets-java.util.Map-.
Hi thanks for your post.
are you mocking the data ?
Are you connecting to broker to get the real message?
How you are verifying the messages wheather consuming correct message or not?
I talked about that in the post on producers. All of this is mocked.