Kafka 2.0 added a new poll() method that takes a Duration as an argument. The previous poll() took a long as an argument. The differences between the two polls don’t stop there. You should know about the differences before porting your poll from a long to a Duration.

In general, an overloaded method should have the same functionality as its sibling. For example, a System.out.println(char[]) and a System.out.println(String) aren’t dramatically different. They’re simply convenience methods so you don’t have to convert a char[] to a String to do println(). Also, the overloaded methods don’t add or remove functionality where a char[] adds multiple exclamation points at the beginning and end of the characters. Both methods have the exact same output.

This is where Kafka’s new poll() is different. There is different functionality based on the argument’s type. The work on it was tracked with this KIP.

For the poll(long), the general behavior was to block on while getting a consumer assignment. With the poll(Duration), the behavior is to only block as long as defined in the Duration, whether or not there is a consumer assignment. The KIP points out this issue too “Some care must be taken in the case of the poll() since many applications depend on the current behavior of blocking until an assignment is found.”

Given that this change goes against best practices with method overloading, people need to know that the new poll works differently. To somewhat tell people that this change isn’t just a change in the arguments, the poll(long) method is deprecated as of Kafka 2.0. The plan is to completely remove the poll(long) in a future major release.

I created a JIRA issue to point out there isn’t a clear way to replicate doing a poll and making sure that all consumer assignments are there that isn’t deprecated. I proposed a blocking get assignments.

Gwen Shapira and I had a conversation about the possible workarounds. One workaround is to poll for a short amount of time in a while loop and check the size of assignment(). The other workaround is to register for the ConsumerRebalanceListener’s onPartitionsAssigned event and have a way to communicate that the object was received between the two threads. Neither of these methods is particularly great, but I’d avoid adding interthread communication whenever possible. The code for the while loop would look something like this:

// You have to keep polling until you get an assignment.
// Otherwise, there won't be any assignments in the list.
while(consumer.assignment().size() == 0) {
  consumer.poll(Duration.ofMillis(100));
}

This code doesn’t have a limit on the number of loops through the while loop. You’d want to add some logic around that. If a consumer does exceed this logic, you should log the error then exit.

Share This