Unit testing your Kafka code is incredibly important. It’s transporting your most important data. As of 0.9.0 there’s a new way to unit test with mock objects.
Refactoring Your Producer
First of all, you’ll need to be able to change your Producer
at runtime. Instead of using the KafkaProducer
object directly, you’ll use the Producer
interface.
public Producer<String, String> producer;
You can use whichever method for dependency injection, but I’m making the Producer
public so I can change it from the unit test.
Next, you’ll want to refactor the code for creating your KafkaProducer
. The creation of the KafkaProducer
should be in separate method that won’t get called by your production Producer
code.
You’ll also need to refactor the code that sends to the data to the Producer
object. This code will need to be callable from the unit test.
Unit Testing Your Producer
Kafka unit tests of the Producer
code use MockProducer
object. The @Before
will initialize the MockProducer
before each test.
MockProducer<String, String> producer;
@Before
public void setUp() {
producer = new MockProducer<String, String>(
true, new StringSerializer(), new StringSerializer());
}
Have you been searching for the best data engineering training? You’ve found it. Sign up for my list so you can get my Professional Data Engineering course.
Once we’ve set the objects up, we can start testing.
@Test
public void testProducer() throws IOException {
MyTestKafkaProducer myTestKafkaProducer = new MyTestKafkaProducer();
myTestKafkaProducer.producer = producer;
myTestKafkaProducer.send();
List<ProducerRecord<String, String>> history = producer.history();
List<ProducerRecord<String, String>> expected = Arrays.asList(
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue0"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue1"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue2"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue3"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue4"));
Assert.assertEquals("Sent didn't match expected", expected, history);
}
We start off by instantiating the Producer
we’re wanting to test. We inject our MockProducer
into the Producer
. We send some data with the Producer
. All of the data sent by the Producer
can be accessed with the history()
method. We create a list of the ProducerRecord
s we expected. Finally, we can assert that the two lists match each other.
In the next post, I show you how to unit test a KafkaConsumer
.
Full producer test code:
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import kafka.MyTestKafkaProducer;
public class ProducerTest {
MockProducer<String, String> producer;
@Before
public void setUp() {
producer = new MockProducer<String, String>(
true, new StringSerializer(), new StringSerializer());
}
@Test
public void testProducer() throws IOException {
MyTestKafkaProducer myTestKafkaProducer = new MyTestKafkaProducer();
myTestKafkaProducer.producer = producer;
myTestKafkaProducer.send();
List<ProducerRecord<String, String>> history = producer.history();
List<ProducerRecord<String, String>> expected = Arrays.asList(
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue0"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue1"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue2"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue3"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue4"));
Assert.assertEquals("Sent didn't match expected", expected, history);
}
}
Dear Jesse, where is your definition of MyTestKafkaProducer?
The producer is just a regular producer.
This code is userless
Which code is userless?
Hi! Would you mind sharing the whole code? Thank you!
The full code for the unit tests is already there.
Hi! Would you mind sharing the whole code? Thank you!
I’ve added the full code for the producer unit test.
Hi,
I must agree with previous comments. history() isn’t a method of the Producer interface! Therefore it’s both incomplete and useless.
I never said
.history()
is part of theProducer
interface and the code doesn’t say that either. It is part of theMockProducer
class. The entire code is there, you just need to be able to read it.This post is useful.The MockProducer is inside kafka-client test part, to use it in your JUnit tests, you should first import it into your project:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
</dependency>
The code shown is interesting. However it is a little bit pointless since testing the output without any given input would just make not much sense in any realistic application.
A purpose I could imagine would be the testing of a certain business logic that would trigger a kafka producer eventually.
By placing the mock one can verify
a) the logic runs through
b) kafka message was published and data mapping worked as expected
The example requires a bit more imagination 🙂
Producers are too varied to give a one size fits all example. I chose to give a basic example that you’d tailor to your needs.
Hello, while testing kafka in production, is there a way we can destroy the topic after testing to ensure production environment does not have anything irrelevant.
Don’t test your code in production. You should be doing any QA against a QA or development cluster. Yes, you can delete the topic.