Kafka consumer in Java

Michał Chmielarz
SoftwareMill Tech Blog
11 min readDec 16, 2020

--

Kafka is a well-known event streaming platform. We are using it in plenty of projects. Nothing unusual — the tooling is just great. Various frameworks and libraries offer integration with Kafka. In this post, I’d like to go through some of them for Java language and see how we can create a customer instance, reading Kafka’s messages.

Kafka Client library

The first way to connect to Kafka is by using the KafkaConsumer class from the kafka-clients library. Other libraries or framework integrations commonly use the library. In this section, I will focus on using it directly. While it is pretty straightforward, we would need to put some effort into making it efficient.

At first, we want our consumers to work continuously. Thus, we will run it in a separate thread, and we need to manage it on our own. Additionally, we need to put polling into an infinitive loop.

The other thing is the closing of the consumer, which can be tricky. We could close the thread and use timeouts to close sockets and network connections. However, with such an approach, we miss two essential points:

  1. Explicit closing of a consumer triggers immediate rebalancing since the group coordinator doesn’t discover the consumer leaving by missing heartbeats.
  2. The operation completes pending offsets commits as well. Thus, after running the consumer once again, we don’t consume some messages twice.

Next, if we’d like to consume messages in parallel, we need to provide a custom solution for running a specific number of consumers within the same consumer group. However, every consumer requires two threads –- one for polling and the other for heartbeats.

In terms of batch consumption of messages, we get a collection of records (could be empty) after polling a queue. Thus, we don’t have to provide any particular configuration or mechanism.

When we’d like to stream the received data, we can use JDK’s Stream API. But the scenario complicates if we’d like to consume them in parallel. And more complicated code becomes more error-prone.

By default, a consumer commits offsets automatically. However, we can change this and do the work manually. The API gives us several methods to call the operation synchronously or asynchronously. Additionally, we can commit all offsets for all messages received from the last poll on the queue or provide specific topic-partition values.

Using the plain Kafka consumer, we work on ConsumerRecord instances, containing a message itself and its metadata. It is not a drawback on its own. However, if we’d like to parse it, we need to provide our mechanism.

So, in general, I would be cautious using this approach and instead consider other possibilities available. So, let’s look at how we can use Kafka in some frameworks or toolkits.

Spring Boot

When you’re using Spring Boot in your project, you have the Spring for Kafka integration at your disposal. It provides a convenient listener mechanism implementing the consumption of Kafka messages.

We can consume messages in two ways:

  • using containers of message listeners,
  • or by providing a class with @KafkaListener annotation.

When we’d like to use the message listener approach, we need to provide one of two types of containers that will run our listener:

  • KafkaMessageListenerContainer — serves consumption of messages on a single thread for all topics provides in the container configuration,
  • ConcurrentMessageListenerContainer — enables the consumption of messages in a multi-threaded environment, providing a single KafkaMessageeListenerContainer per thread.

A container has a rich API, allowing us to set up various configuration parameters (like threading, batching, ack, error handlers, and many others). The important thing to set up is a listener class –- a message-driven POJO. It’s an instance of the MessageListener or BatchMessageListener interfaces. Both are fundamental ones and allow us to consume typed ConsumerRecord instances. Spring offers other, more sophisticated interfaces as well.

However, the most straightforward way to consume Kafka messages in Spring is to implement a bean using @KafkaListener annotation. Signatures of methods processing received messages may vary. Input arguments you will use depend on your needs, and there are plenty of possibilities (for details, check the annotation javadocs). On startup, Spring looks up for the annotation usage (a class with the annotation has to be a Spring component) and creates Kafka consumers running logic defined in the listener.

By default, @KafkaListener runs in a single thread — we don’t consume messages from topic partitions in parallel. However, we can change this behavior in two ways.

The first one is defining the concurrency parameter of the annotation, where we can set how many threads the given listener is using.

The second option is providing a value for the containerFactory parameter. It is the name of the container factory bean producing containers used to run the listener’s logic. When the factory is not single-threaded (concurrency set to value bigger than 1), the framework assigns container threads to partitions.

In both cases, if we have more threads than partitions, some remain idle.

This is not the end — we can even specify listener methods for specific partitions using the topicPartitions parameter. With such a solution, Spring automatically runs each one in a separate thread.

Spring for Kafka provides a feature of consuming messages in batches as well. And of course, we have more than one option possible.

The first one is the configuration switch for batch processing in the container factory. When enabled, we can provide a listener accepting a list of messages. What’s important — we need to use the batching container as the value of the containerFactory parameter in KafkaListener annotation.

The other option uses a message listener interface with the Batch prefix. They accept a list of consumer records instead of a single one.

When it comes to manual commits of message offsets, we have the same abundance of options. First, we have the original Kafka setting, i.e. `enable.auto.commit`. When it is true, Kafka commits all messages according to its configuration. Otherwise, the entity responsible for making a commit is chosen based on the value of the ack mode set in the configuration. For ack set to MANUAL or MANUAL_IMMEDIATE, it’s up to the developer to commit an offset. For all other values, it’s up to the container to run it. Additionally, we can specify the synchronicity of the commit operation.

When we work with manual commits, we have the Acknowledgment class at our disposal in some of the framework’s message listeners. The interface offers methods for calling the commit operation for processed messages or discard remaining records from the last poll.

What strikes me with Spring for Kafka is the number of ways to set up a working Kafka consumer. We can do this in several ways, which is good because of the elasticity of the framework. But it can be harmful as well when we get lost among various options available.

Micronaut

As in Spring, the Micronaut framework has its dedicated integration with Kafka and works on a message-driven POJO as well. The configuration of a consumer is even similar.

We start with @KafkaListener annotation on a class level. It is the place where we define a group of consumers. Such a group is configured based on the configuration file content that provides defaults or values for a specific group with a given groupId. We can override the values using the annotation parameters.

Every public or package-private method of a listener class, annotated with @Topic (with a mandatory topic name/pattern provided), becomes a Kafka consumer running in the background. We can put the annotation on a class level too, but all public or private-package methods become Kafka consumers. So we need to be careful with this.

To set up concurrent message processing, we can define the number of threads as the @KafkaListener annotation parameter. If we provide fewer threads than the number of partitions, some consumers will work on messages from two or more partitions. On the other hand, if we set more of them, some remain idle and do nothing. That’s the same behavior as in the Spring integrations.

Similarly, we can enable batch processing using the batch parameter on the annotation. Then we can consume a list of records (or domain classes) in the consumer method.

Micronaut management of offset committing provides a few options. We define which one to use by the `OffsetStrategy` enum. You can find an excellent description of it in the framework documentation. Here is an example of using manual commits after processing a record:

The configuration of Kafka classes in Micronaut is more concise compared to the one in Spring. Having fewer places to change or update makes life easier in terms of maintaining the code. However, unlike in Spring, we cannot define a consumer programmatically, with no annotations in use.

Akka Streams

The next on our list is the Akka Streams library with the Alpakka connector. This setup provides a reactive way of consuming messages from Kafka. It uses the Akka Actors framework under the hood.

Here, we get Kafka consumers as instances of sources of events. The library offers plenty of them, differing from each other with data types served, metadata and partition information provided, and way of handling an offset commit.

Let’s start with the Consumer.plainSource. It emits ConsumerRecord messages for the whole topic in a single thread, preserving the order of consuming messages for a given partition. Depending on the Kafka consumer configuration, the stream can automatically commit processed records.

We can choose to commit messages by hand, as well. If so, we need to use one of the committable sources that provide consumer records and information about the current offset. After processing a message, we can utilize additional data to call a Committer instance to make the manual commit.

Regarding the processing of events in parallel, the library offers excellent tooling too. The simplest solution is to use the plain partitioned source that emits sources of records together with topic-partition information. When we consume messages from sub-sources, the action runs on a separate thread for each partition. However, we can use partition information to distribute thread assignment in a custom fashion (we would need to use flatMapMerge and groupBy operators).

What’s essential when consuming data in parallel is the order of the results. We have two options. The first one is using the mapAsync operator. It uses a pool of threads of the size specified in the parameter. The stage ensures the order of emitted messages downstream but with no guarantee of processing order. On the other hand — suppose the order of emitted messages is not essential to us. In that case, we can use themapAsyncUnordered operator — it passes messages downstream just after the processing is finished regardless of the receiving order.

Batch processing is available too. We can achieve it by using a batching operator like grouped or batch. In such a case, we need to use a CommittableOffsetBatch instance and update it with an offset of the last processed message in the batch. Then, we need to call commit in the next step of the flow.

The Akka Streams’ support for Kafka is awe-inspiring. It consumes messages as a stream of data, which is the most matching way for a Kafka consumer. Thanks to message sources’ granularity, we can easily choose the most suitable one for our case. By utilizing the powerful API of Streams, we get features like batching or backpressure very quickly. The biggest drawback for me when using Akka Streams is the abundance of operators. You would probably need some time to get familiar with it. However, when you look at the connector sources, you can find many examples of how to use Streams with Kafka.

Vert.x

The last way of implementing the consumption of messages from Kafka I’d like to present in this blog uses the Vert.x toolkit. The approach is similar to the one with Akka Streams –- it works on verticles, a form of lightweight actors. The verticles pass messages between each other using an event bus. They can run on event loops and worker threads.

The core library provides basic functionality (as in Akka Streams), and we need to use an external component to connect with Kafka, i.e. vertx-kafka-client. While general assumptions are quite similar to those in Akka Streams, working with code looks different.

Vert.x applications use event loops and worker threads. The former delivers events to the destination vertices and can run fast, non-blocking code. The purpose of the latter is to do heavy work like I/O or expensive computations. Thus, we should instead consider consuming Kafka messages using worker threads, so that the loops aren’t blocked, and the application runs smoothly. The example code contains Kafka verticles running as workers.

Right, so how can we create a consumer of Kafka messages? Vert.x client provides a class for this — KafkaConsumer. It provides several factory methods for creating instances based on a configuration provided.

Having a consumer in hand, we need to subscribe to some Kafka topic before starting the vertex. We can choose from a few variations of the `subscribe` method. Calling one of them enables the vertex to read data from single or multiple topics. The next step is the registration of a handler function consuming received messages. All of this, we do by using the fluent API on the consumer.

That’s the way of creating a plain consumer for one or more topics, with no partition split to different threads. As you can see in the example project, I have encapsulated the verticle’s logic and deployed it as a worker verticle. With this solution, all of the messages will be consumed by the same worker thread.

Depending on the Kafka configuration, the consumer may commit offsets automatically. But what about triggering the action manually? The Vert.x consumer provides the commit methods doing the job. We can call them for a single message or a bunch of offsets for specific topics and partitions.

When it comes to creating consumers for all partitions for a given topic,

we need to set vertices manually. First, we need to know how many partitions of what topics we’d like to handle. We can call KafkaAdminClient to describe topics of our interest. Next, we need to create for every topic-partition pair a dedicated vertex, containing Kafka consumer. Inside the vertex, we assign the consumer to required topic-partition data and specify handlers as in the “plain” consumer vertex.

As I mentioned above, every Kafka consumer uses a dedicated handler to process received messages. Depending on our needs, we can consume records one by one or in batches. In the former scenario, we define a function using the handler method, while for the latter, we use the batchHandler method. The Kafka component provides records in the form of KafkaConsumerRecord or KafkaConsumerRecords, respectively. In both cases, under the hood, we can find a good old ConsumerRecord instance.

Summary

In this blog, I presented various ways of defining Kafka consumers in Java. Spring, Micronaut, Vert.x, and Akka Streams use the kafka-clients library under the hood and provide a complete feature-set to consume Kafka messages.

Which one is superior? I would say none, except for one case. If you have a project where you consider using the kafka-clients library directly, I would advise against the idea. The library is a kind of a driver connecting to Kafka. Using it in the project, we need to accept that we will reinvent, to some degree, the wheel that is already implemented in other tools. I would suggest using Akka Streams if a framework in your project does not constrain you. Otherwise, look for already existing integrations. There is no sense to apply Akka Streams when your service is developed within the Spring framework. Or Micronaut within a Vert.x application, right? ;)

Most of the code snippets presented in the article, you can find in SoftwareMill’s GitHub.

Have you enjoyed the article? Or maybe you would like to see more similar writings regarding Kafka (producers, customized data types, or more advanced stuff)? Let me know!

--

--

Passionate software developer. Focused on a good design and the best quality.