Update: Confluent has renamed KSQL to ksqlDB
It’s that Kafka Summit time of year again. There are lots of announcements. Some are good and some you have to sift through in order to figure out what’s the best for you and your organization.
I’ll briefly state my opinions and then go through my opinions and the technical reasons in more depth.
I recommend my clients not use Kafka Streams because it lacks checkpointing. Kafka Streams also lacks and only approximates a shuffle sort. KSQL sits on top of Kafka Streams and so it inherits all of these problems and then some more.
Kafka isn’t a database. It is a great messaging system, but saying it is a database is a gross overstatement. Saying Kafka is a database comes with so many caveats I don’t have time to address all of them in this post. Unless you’ve really studied and understand Kafka, you won’t be able to understand these differences.
I find talks and rebuttals like this don’t really separate out opinions from facts. I’m going to try to separate out my opinion from the facts. I feel like Confluent’s slide content should have *, †, ‡, §, ‖, ¶ after every statement so that you can look up all of the caveats they’re glossing over.
Checkpointing
It’s a fact that Kafka Streams – and by inheritance KSQL – lacks checkpointing:
That sounds esoteric and me being pedantic right?
Checkpointing is fundamental to operating distributed systems. Let me explain what checkpointing is.
Kafka is a distributed log. You can replay any message that was sent by Kafka. For message processing, it can be stateless or stateful. For stateless processing, you just receive a message and then process it. As soon as you get stateful, everything changes. Now, you have to deal with storing the state and storing state means having to recover from errors while maintaining state.
For Kafka Streams, they say no problem, we have all of the messages to reconstruct the state. They even say that you can use a compacted topic to keep the messages stored in Kafka to be limited to the ~1 per key.
Solved right? No, because ~1 message per key can still be a massive amount of state. If you have 100 billion keys, you will 100 billion+ messages still in the state topic because all state changes are put into the state change topic. If you have a growing number of keys, the size of your state will gradually increase too.
The operational manifestation of this is that if a node dies, all of those messages have to be replayed from the topic and inserted into the database. It’s only once all of these mutations are done that the processing can start again.
In a disaster scenario – or human error scenario – where all machines running the Kafka Streams job die or are killed, all nodes will have to replay all state mutation messages before a single message can be processed.
This replaying of state mutation messages could translate into hours of downtime. Some potential users of Kafka Streams have told me they calculated this scenario out to be 4+ hours of downtime. Now you’re 4+ hours behind and still have to process all of the messages that accrued over that time just to get back to the current time.
For this reason, databases and processing frameworks implement checkpointing (in Flink this is called snapshots). This is where the entire state at that point in time is written out to durable storage (S3/HDFS). When there is a massive error, the program will start up, read the previous checkpoint, replay any messages after the checkpoint (usually in the 1000s), and start processing again. Overall, this process will take seconds to minutes.
Downtime for systems with checkpointing should be in the seconds to minutes instead of hours with Kafka Streams. Overall, downtime for real-time systems should be as short as possible. Great effort goes into distributed systems to recover from failure as fast as possible.
Update: I forgot to talk about one of Kafka Stream’s workaround to a lack of checkpointing. They recommend using a standby replica. I consider this more of a hack than a solution to the problem. It still doesn’t handle the worst-case scenario of losing all Kafka Streams processes – including the standby replica.
Shuffle Sort
Shuffle sort is an important part of distributed processing. It’s the method of bringing data together with the same key. If you’re analytics, chances are that you will need shuffle sorts. If you don’t know what a shuffle sort is, I suggest you watch this video.
It’s a fact that Kafka Stream’s shuffle sort is different than Flink’s or Spark Streaming’s. The way it works is buried in the JavaDoc (bolding mine):
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper), or transform(TransformerSupplier, String…)), and no data redistribution happened afterwards (e.g., via through(String)) an internal repartitioning topic will be created in Kafka. This topic will be named “${applicationId}-XXX-repartition”, where “applicationId” is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, “XXX” is an internally generated name, and “-repartition” is a fixed suffix. You can retrieve all generated internal topic names via KafkaStreams.toString().
This means that anytime you change a key – very often done for analytics – a new topic is created to approximate the Kafka Streams’ shuffle sort. This method of doing shuffle sorts assumes several things that I talked about in this thread:
I’ve talked to some (former) users of Kafka Streams who didn’t know about this new workaround topic. They blew up their cluster by doing real-time analytics and creating too much load and data on their brokers. They simply thought they were doing some processing.
If performance isn’t a key metric in your system, maybe this is a way to go. However, there are other and much better processing frameworks that have a built-in shuffle sort and work with Kafka like Apache Flink. These systems give you the best of both worlds.
Update: there have been a few questions on shuffle sorts. The broker will save and replicate all data in the internal repartitioning topic. That is a pretty heavyweight operation for something that is considered intermediate and of short-term usage in other systems like Flink. Normally, intermediate data for a shuffle sort is kept for a short period of time.
Kafka Streams and KSQL
Lacking these two crucial features, it makes Kafka Streams unusable from an operational perspective. You can’t have hours of downtime on a production real-time system. You can’t blow up your cluster with shuffle sorts. Unless you run an explain plan before every KSQL query, you won’t know the shuffle sorts (topic creation) that will happen.
It’s for these main reasons that my clients don’t use Kafka Streams or KSQL in their critical paths or in production.
Confluent’s Kafka Summit 2019 Announcements
Confluent made several announcements in their various keynotes. These ranged from their plans for Kafka and KSQL.
Some of these keynotes set up straw man arguments on architectures that aren’t really used. For example, they talked about databases being the place where processing is done. There are some small data architectures and more data warehouse technologies that use the database for processing. However, I haven’t seen a big data architecture repeat these problems. We know they don’t scale. We’re using actual processing engines that can scale to overcome all of these data processing issues. In big data, we’ve been solving these issues for years and without the need for database processing.
They positioned KSQL as being able to take up some workloads being done now by big data ecosystem projects. They pointed to so many ecosystem projects as an issue. There are so many technologies in the big data ecosystem because each one solves or addresses a use case. An organization could eliminate various parts but that would either drastically slow down or eliminate the ability to handle a use case. KSQL – even after these new features – will still be of limited utility to organizations.
That leads you to wonder why Confluent is pushing into new uses. Confluent’s pricing model is already really unpopular with organizations. A pure Kafka company will have difficulty expanding its footprint unless it can do more. They’re having to branch out to increase the market share and revenue. This means they have to try to land grab and say that they are a database.
This messaging includes – in my opinion – incorrect applications of Kafka. Confluent is pushing to store your data forever in Kafka. Kafka is a really poor place to store your data forever. That long-term storage should be an S3 or HDFS. There is a significant performance difference between a filesystem and Kafka. There is a big price difference too.
I encourage architects to look at this difference. There are only 2 ways to access previous data in Kafka by timestamp or by commit id. Normally use cases need random access with a where clause and we’re seeing Confluent try to handle this with KSQL. Database optimization for random access reads is a non-trivial problem and very large companies with large engineering teams are built around this problem.
The reality is this database should either be in the broker process or at the application level with a solid and durable storage layer. In my opinion, an architecture using KSQL for current status using a where clause doesn’t really make sense. There are other proven architectures to get current status of data like a database or using a processor with checkpointing.
At the end of the keynote, they talked about not wanting to replace all databases. I expect this message to change. We’ll start to see more and more database use cases where Confluent pushes KSQL as the replacement for the database. You should know that creating a database is a non-trivial thing and creating a distributed database is extremely difficult.
What Should You Do?
The point of this post is not to discourage use of Kafka. Kafka is a great publish/subscribe system – when you know and understand its uses and limitations. Rather, the point is to use Kafka correctly and not based on a company’s marketing. Remember that vendors don’t always have their customers’ best interests in mind. Their focus is to increase revenues and product usage. These new ways of using a product may or may not be in your organization’s best interests.
As you decide to start doing real-time, make sure that you have a clear and specific business case that going from batch to real-time makes. This business case could be current or in the future. If you don’t establish this business value upfront, you could be creating more problems for yourselves that you ever had.
Finally, you should know the architectural limitations of Kafka. Some of these are buried or you need a deep understanding of distributed systems to understand them. Make sure the person creating your architecture really understands these implications on your organization and use case. Otherwise, you’ll be implementing someone else’s vision and painting yourself into an operational corner.
Hi Jesse,
How do you see Apache Pulsar overcoming or shortcoming on these challenges? Is there any stream processing framework which covers these issues.
While I really like Pulsar, Pulsar is orthogonal to the issues I point out. Kafka Steams and KSQL don’t use Pulsar. Their SQL on Pulsar uses Presto and I haven’t dug into it much.
Interesting article. But to my knowledge Kafka doesn’t have node(s). Did you mean Kafka cluster or broker?
In distributed systems, you’ll often see the computers running the processes called nodes. In this case, I mean the computer running the Kafka Broker. So, yes a Kafka cluster is made up of nodes running the broker process.
I’m confused how you see shuffling in Kafka streams being significantly different to Sparks or Flinks shuffling unless your compute happens on a single machine. Spark as well as Flink need to transfer any message to the relevant target processor instance which is likely over the wire to another node in the processing cluster. Using a re-keyed topic in Kafka for that purpose seems like a pretty clean solution. It also nicely utilises all the build in Kafka consumer coordination for the target processors consuming off the shuffled/re-keyed topic. So it makes away with any additional layer of coordination. Deploying our processors as standard Java apps really helped our team stay clear of the intricacies of having to deploy on the shared Flink platform operated/looked after by a central team.
Agree with your observations around cost for long term storage though. Have you looked at Pravega?
It’s the number of times data is moved during a re-key. In Kafka Streams it is:
KS->Broker->KS
For Flink/Spark it is:
TaskManager->TaskManager
If you think you’re keeping yourselves from the issues of distributed systems by using Kafka Streams, you’re not. You now have a state problem that your team will have to support instead of having a central team support state management.
Yes, I’ve recently looked at Pravega and have been blogging about Pulsar. I wrote a post specifically around long-term storage with Pulsar.
Hi Jessie, thanks for your article.
I want to know your opinion about a use case.
I have a stream of data throught kafka, and i want to join it with changing data from database, i used kafka connect and ktable from kafka stream and join it with kstream, is there an alternative using flink ?
Yes, you can do real-time joins with Flink. You should make sure there is a good business or technical reason for doing a real-time join.
Great article as always. Is it accurate to say that the state you referred to is a function of the window size? Meaning, larger windows results in potentially more messages to catch up, while smaller windows could result in less messages to catch up?
This state isn’t relegated to window size. I haven’t seen any documentation on if they optimize for windows to reduce the amount of replay.
I guess you are assuming that your stateful Kafka Streams application also loses the local state store (for example RocksDB) persisted in disk?
A distributed system needs to be designed expecting failure. Losing the local state store is a failure that should be taken into account. Losing a local state store and taking hours to recover isn’t something I want my clients to deal with.
Hi Jesse,
In the cloud world this might not be a problem while I agree recovering a state is slow but on say AWS I will rely on EBS volumes for the state stores and remount this to a new node in the event of node loss and also configure with a standby replica.
The short answer is no.
Reading your post carefully, you seem to be saying that performance of Kafka and KSQL becomes an issue when states get large. That sounds valid. But does your advice, “Don’t use KSQL and Kafka Streams”, also hold for small states?
How would you make sure that your state is small? If your state is that small, maybe it’s better stored/transmitted/used in a different way.
Hi Jesse. Thanks for the nice article.
I would like to share my experience with kafka/kstreams dsl.
Indeed, we also faced with a lot of different issues using kstreams.
Based on confluent articles and acting according to their recommendations we decided to implement all business logic using kstreams of course without any additional databases.
At that point, I didn’t understand yet that they (confluent) are trying to “put on the needle” as many clients as possible… 😉 my patience snapped when I once again added an internal topic only with the aim of having a cache (it has to be materialized later to be able use it as a ktable/globalktable interactive queries etc…)
Second thing, as you mention there is no error handling support. Any thrown exception inside the kstream operation (map(), transform() etc…) caused the shutdown of the stream, even if you will restart the app it will still read the same event and fails with the same error. We were forced to write some extension methods to the kstreams library to be able to send the problematic events to DLQ.
And finally:
– we found that our architecture is not layered (not hexagonal) as all domain logic is implemented de-facto in the application layer.
– we are NOT technology agnostic (hard addicted to kafka and their dsl api). It means there is not a chance to replace kafka on any other broker
Later, of course, we rewrote these services adding storage and get rid of joins leaving only map(), transform() operations delegating business logic to the domain services.
It’s very important to remember that KAFKA it’s only implementation detail (the same like a database). Just a buffer of exchange data between services.
BTW: I think it would be a good analogy from DB perspective that KStreams it’s SQL, KSQL it’s storage procedures. Is it very welcome in our time to write native sql or storage procedures in typical applications? 😉
Thanks for sharing. A big invitation to others to share their stories.
Great article. There is one thing I couldn’t fully grasp. When a Kafka Streams node dies, a new node has to read the state from Kafka, and this is considered slow.
But when a Flink node dies, a new node has to read the state from the latest checkpoint point from HDFS/S3 and this is considered a fast operation
Obviously I’m missing something. Why reading the state in Kafka case is slow while reading it in Flink case is considered much faster?
Because Flink state is written out as a checkpoint to S3. All of the checkpoint is written out and nothing needs to be recreated.
With Kafka Streams, the topic’s data has to be reread and all state has to be recreated.
This read and recreation are why there is a major speed difference.
There is a way to validate that the checkpoint Is valid(isn’t corrupted)? I imagine a scenario when the node restarts and needs to read latest state from checkpoint and the checkpoint is corrupt.
AFAIK, Kafka Streams lacks this.
Hey, I’m fairly new to all of this and would love some clarity. Could you commit offsets while processing the stream so that you could have some semblance of a snapshot? If no, what is it that inhibits that from working? Thanks for the write-up.
The committing of offsets has nothing to do this and wouldn’t help. I don’t have time to explain why not as you need a much deeper understanding of Kafka to why.
Hi Jesse,
Thanks for your article.
I have a question regarding the point of lacking checkpoint in Kafka Streams.
In our Kafka Streams applications, we normally configure state stores with logging enabled, meaning they are also backed up into a compacted changelog topic inside Kafka.
Would it achieve the same benefits as checkpoints, since I assume the cost of rebuilding states from the changelog topic would be not much higher than rebuilding state from S3 / HDFS backup?
Thanks.
A compacted changelog won’t be 100% compacted. A read from a broker won’t be as performant as a read from S3/HDFS. Also, reads from the broker have to be re-inserted into the local RocksDB where a file would already have everything stored in the binary format already.
Hi Jesse,
Suppose that the topic data were streamed to a KSQLDB table and a criteria of a set of attributes used to track the last successfully consumed message. The criteria could be built using Rowtime, Rowkey and some app specific attributes. This criteria data could itself be stored in another topic analogous to the offset topic that Kafka internally maintians.
Do you think on a large scale, this sort of a design has a better chance at succeeding?
Thanks,
The short answer is no because you’re still layering on top of ksqlDB.
Hi Jesse!
As per your expertise, does “Automatic Observer Promotion” with lastest Kafka/Confluent releases mitigate or even solve the reliability downsides of event streaming with Kafka Streams?
Thanks!
No, it doesn’t.