How to create a Kafka consumer in Java
In this post, I’ll show you how to consume Kafka records in Java. We’ll read data from a topic called java_topic. To test how our consumer is working, we’ll produce data using the Kafka CLI tool. And all this in under 5 minutes, so let’s jump right in.
Running a Kafka cluster locally
We’ll use Docker Compose to run our local Kafka cluster. It will consist of one Zookeeper instance and one Kafka broker.
Don’t have docker-compose? Check: how to install docker-compose
I’ve prepared a docker-compose file which you can grab from Coding Harbour’s GitHub:
git clone https://github.com/codingharbour/kafka-docker-compose.git
Once you have the project, navigate to a folder called single-node-kafka and start the Kafka cluster:
docker-compose up -d
The output should look something like this:
Creating network "single-node-kafka_default" with the default driver
Creating sn-zookeeper ... done
Creating sn-kafka ... done
Your local Kafka cluster is now ready to be used. By running docker-compose ps, we can see that the Kafka broker is available on port 9092. Make a note of that, because we’ll need it soon.
$ docker-compose ps
Name Command State Ports
-------------------------------------------------------------------------------
sn-kafka /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
sn-zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
Dependencies
To be able to consumer records from Kafka we need the Kafka client library. Add this dependency to your pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
Check the latest version of kafka-clients library at maven repository. At the time of writing it was 3.6.0.
Create a kafka topic
To create a topic we’ll use a Kafka CLI tool called kafka-topics, that comes bundled with Kafka binaries. In our case, it means the tool is available in the docker container named sn-kafka.
First, open your favourite terminal and connect to the running Kafka container:
docker exec -it sn-kafka /bin/bash
Now that we’re inside the container where we have access to Kafka CLI tools, let’s create our topic, called java_topic:
kafka-topics --bootstrap-server localhost:9092 \
--create --topic java_topic \
--partitions 1 \
--replication-factor 1
Creating a Kafka consumer
There are a couple of properties we need to set up for Kafka consumer to work properly:
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
First, we specify the URL where our Kafka broker is running, by configuring BOOTSTRAP_SERVERS_CONFIG property. In cases when you have more than one broker (which is more usual scenario), you would specify them in a comma-separated string, e.g:
"serverA:9092,serverB:9092,serverC:9092"
Second, the consumer needs to know how to deserialize the records from a byte array. Kafka brokers are agnostic of the data types and are treating records as byte arrays. This means that producers need to know how to serialize data into byte arrays and consumers need to know how to deserialize it back.
Each Kafka record consists of a key and a value. These can potentially be of different types, so the consumer needs to know which deserializer to use for key and which one to use for value. That’s where KEY_DESERIALIZER_CLASS_CONFIG and VALUE_DESERIALIZER_CLASS_CONFIG come into play. In our case, we’re using String for both key and value.
The next property is a consumer group id. It’s a unique identifier of a group of consumers and is used, among other things, to track which records have been consumed by the consumers in this group. If you want to learn more about consumer groups, check this post.
The property AUTO_OFFSET_RESET_CONFIG, tells the consumer where it should start consuming messages the first time it starts. By default, a consumer will only consume messages that arrive to the topic after the consumer is started for the first time. By setting the value to “earliest” we tell the consumer to read all the records that already exist in the topic.
By default, Kafka consumer commits the offset periodically. Last property, ENABLE_AUTO_COMMIT_CONFIG, tells the consumer that we’ll handle committing the offset in the code. I’ll show you how to do it soon.
Consuming the records
Now that we have our consumer configured and created, it’s time to consume some data. In Kafka producers push the data to topics and consumers are frequently polling the topic(s) to check for new records. So the usual way is to poll for new records in an endless while loop and once there are new records, to process them. But before we can poll topic for records, we need to subscribe our consumer to one or more topics:
consumer.subscribe(Collections.singleton("java_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message received: " + record.value());
}
consumer.commitAsync();
}
consumer.poll(Duration) will return immediately if there are available records. Otherwise, it will block until either a record is available or timeout has expired. If timeout expired, the poll method will return an empty record set.
The record processing would happen in the for loop. We are simply printing the value of the record to system out.
Remember how we told the consumer library that we’ll be committing the offset manually? That’s what the line consumer.commitAsync() is doing. You want to commit the offset after you’ve finished processing the message, not before.
Now that we have our consumer in place, let’s produce some messages to test our consumer.
Producing messages for our consumer
To produce the messages to this topic, we’ll use a CLI tool that comes bundled with Kafka binaries called kafka-console-producer. This is a CLI tool that is part of Apache Kafka binaries and you can download it from the official website. Yet, since we’re using Kafka’s docker image, the CLI tools are already available in the Kafka broker’s container. To be able to use the tool we first need to connect to the container called sn-kafka:
docker exec -it sn-kafka /bin/bash
Now we can produce some messages. If you check the code in the previous section, you will see that our consumer is waiting for the messages to arrive on a topic called java_topic, so let’s produce to that topic:
kafka-console-producer --broker-list localhost:9092 --topic java_topic
>
Producer tool will display the prompt, showing us that it is waiting for the message to send. Type the message and send it by pressing Enter.
Run the consumer
Now that the record is ready in Kafka topic, we can start our consumer and check the output. The message we sent should quickly appear. You can go back to the terminal where kafka-console-producer is running and send a few more messages, then check the consumer to see the messages consumed.
Source code
Source code for this example is available on Coding Harbour’s GitHub.
Would you like to learn more about Kafka?
I have created a Kafka mini-course that you can get absolutely free. Sign up below and I will send you lessons directly to your inbox.
Photo credit: Kazuky Akayashi