How to create a Kafka producer in Java
In this post, I will show you how to produce messages to Kafka from Java using the kafka-clients library. It takes less than 5 minutes and around 10 lines of code. Don’t believe me? Keep reading.
Running a Kafka cluster locally
To be able to produce messages (known in Kafka world as records) to Kafka we need a Kafka cluster. At a minimum, a Kafka cluster consists of one Kafka server (called broker) and it needs at least one Zookeeper node.
To simplify our job, we will run these two servers as Docker containers, using docker-compose.
Don’t have docker-compose? Check: how to install docker-compose
I’ve prepared a docker-compose file which you can grab from Coding Harbour’s GitHub:
git clone https://github.com/codingharbour/kafka-docker-compose.git
Once you have the project, navigate to a folder called single-node-kafka and start the Kafka cluster:
docker-compose up -d
The output should look something like this:
Creating network "single-node-kafka_default" with the default driver
Creating sn-zookeeper ... done
Creating sn-kafka ... done
Your local Kafka cluster is now ready to be used. By running docker-compose ps, we can see that the Kafka broker is available on port 9092. Make a note of that, because we’ll need it soon.
$ docker-compose ps
Name Command State Ports
-------------------------------------------------------------------------------
sn-kafka /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
sn-zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
Dependencies
To be able to write records to Kafka we need the Kafka client library. Add the dependency to your pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
Check the latest version of kafka-clients library at maven repository. At the time of writing it was 3.6.0.
Creating the Kafka producer
To create Kafka producer we need to provide it with few mandatory properties. I’ll explain them below:
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Producer<String, String> producer = new KafkaProducer<>(properties);
First, the producer needs to know how to reach the Kafka broker. We specify the broker’s address by using the ProducerConfig.BOOTSTRAP_SERVERS_CONFIG property. In cases when you have more than one broker (which is always the case in production), you would specify them in a comma-separated string, e.g:
"serverA:9092,serverB:9092,serverC:9092"
Second, the producer needs to know how to serialize the records to a byte array. Kafka brokers are agnostic of the data types we’re sending and are treating every record as an array of bytes. This means that producers need to know how to serialize data into byte arrays and consumers need to know how to deserialize it back.
Each Kafka record consists of a key and a value. These can potentially be of different types, so the producer needs to know which serializer to use for key and which one to use for value. That’s where KEY_SERIALIZER_CLASS_CONFIG and VALUE_SERIALIZER_CLASS_CONFIG come into play. In our example, we’ll be sending strings, so we’re configuring StringSerializer.
Producing the record
Ok, it’s time to produce some data. Our producer code will write one record when we run it and the record will contain a string telling us the current time when the record was created.
String recordValue = "Current time is " + Instant.now().toString();
ProducerRecord<String, String> record = new ProducerRecord<>("javatopic", null, recordValue);
producer.send(record);
producer.flush();
The first parameter we need to set when creating the ProducerRecord is the topic to which we’re writing the record. Then we set the key and the value of the record. The key is optional and in this case, we won’t be setting it. The value is, as mentioned, the string with the timestamp when we created the record. After everything is set, we call the send method of the producer.
The last line forces the producer to write the record to the topic right away. You would expect that a call to producer.send(record) would write data to Kafka immediately, but what happens under the hood is that the records are queued to be sent in batches. This way Kafka producer optimizes for throughput and latency since network operations are expensive. Yet, since we’re only producing a single message before shutting down our application, we want to tell the producer to send it right away.
And that’s it. If you now run the application you will produce the message to Kafka.
Have you done so? Ok, then let’s read the message, to ensure it’s really there 🙂
Check the record is in Kafka topic
We’ll use kafka-console-consumer utility to validate our message is written to the topic.
kafka-console-consumer is a CLI tool that is part of Apache Kafka binaries and you can download it from the official website. Yet, since we’re using Kafka’s docker image, the CLI tools are already available in the Kafka broker’s container. To be able to use the tool we first need to connect to the container called sn-kafka:
docker exec -it sn-kafka /bin/bash
Now, run kafka-console-consumer using the following command:
kafka-console-consumer --bootstrap-server localhost:9092 --topic javatopic --from-beginning
After few moments you should see the message.
Congratulations, you have produced the message to Kafka from java, and it only took few lines of code 🙂
Source code
The entire example is available on Coding Harbour’s github.
Would you like to learn more about Kafka?
I have created a Kafka mini-course that you can get absolutely free. Sign up below and I will send you lessons directly to your inbox.
Photo credit: Dawid Zawiła