skip to Main Content

Unit testing Kafka producer using MockProducer

Sometimes your Kafka producer code is doing things that need to be properly validated and of course, we developers resort to writing a test. If the functionality we want to test is nicely encapsulated we can do that using a unit test. Kafka helps us with that by providing a mock implementation of Producer<> interface called, you guessed it, MockProducer.

Preparation for test

TransactionProcessor class below is our class under test. It has a process(...) method that receives a Transaction object which in our example only contains userId and amount properties. Depending on the amount the processor will decide to which topic to write the object. If the amount is above 100.000 it will use the transactions_high_prio topic. Otherwise, it will write a transaction to the transactions_regular_prio topic. 

public class TransactionProcessor {
    public static final double HIGH_PRIORITY_THRESHOLD = 100.000;
    private final Producer<String, String> kafkaProducer;
    private final String highPrioTopic;
    private final String regularPrioTopic;

    private final Gson gson = new Gson();

    public TransactionProcessor(Producer<String, String> kafkaProducer, String highPrioTopic, String regularPrioTopic) {
        this.kafkaProducer = kafkaProducer;
        this.highPrioTopic = highPrioTopic;
        this.regularPrioTopic = regularPrioTopic;
    }

    public void process(Transaction transaction){
        String selectedTopic = regularPrioTopic;
        if (transaction.getAmount() >= HIGH_PRIORITY_THRESHOLD) {
            selectedTopic = highPrioTopic;
        }
        String transactionJson = gson.toJson(transaction);
        ProducerRecord<String, String> record = 
            new ProducerRecord<>(selectedTopic, transaction.getUserId(), transactionJson);
        kafkaProducer.send(record);
    }
}

And Transaction class looks like this:

public class Transaction {
    private String userId;
    private double amount;
    //removed for brevity
}

An important thing to notice here is that TransactionProcessor uses the Producer<> interface, not the implementation (which is the KafkaProducer class). This will make it possible to unit test our adapter using the MockProducer.

MockProducer in action

Ok, now onto the test class. TransactionProcessorTest creates an instance of the MockProducer that we will provide to the TransactionProcessor under test.

class TransactionProcessorTest {

    private static final String HIGH_PRIO_TOPIC = "transactions_high_prio";
    private static final String REGULAR_PRIO_TOPIC = "transactions_regular_prio";
    MockProducer<String, String> mockProducer = 
        new MockProducer<>(true, new StringSerializer(), new StringSerializer());

MockProducer constructor takes a couple of parameters, namely key and value serializers, in our case StringSerializer(s). The first parameter, autocomplete, is a boolean that tells MockProducer to automatically complete all requests immediately. In regular testing, you want to set this parameter to true so that messages are immediately ‘sent’. If you set it to false you will need to explicitly call completeNext() and errorNext(RuntimeException) methods after calling the send() method. You would want to do this to e.g. test the error handling in your producer (by providing the exception you want to handle as the parameter to the errorNext method).

After we’ve created the MockProducer, we create the instance of the class we wish to test.

TransactionProcessor processor = 
    new TransactionProcessor(mockProducer, HIGH_PRIO_TOPIC, REGULAR_PRIO_TOPIC);

Now it’s time to test whether the selection of topics based on amount is correct. We will create two Transaction objects, the first one with a low amount and the second one with an amount higher than our threshold (which is 100.000).

    @Test
    public void testPrioritySelection(){
        Double lowAmount = 50.2d;
        Double highAmount = 250000d;
        Transaction regularPrioTransaction = new Transaction("user1", lowAmount);
        processor.process(regularPrioTransaction);
        Transaction highPrioTransaction = new Transaction("user2", highAmount);
        processor.process(highPrioTransaction);

        assertThat(mockProducer.history()).hasSize(2);

        ProducerRecord<String, String> regTransactionRecord = mockProducer.history().get(0);
        assertThat(regTransactionRecord.value()).contains(lowAmount.toString());
        assertThat(regTransactionRecord.topic()).isEqualTo(REGULAR_PRIO_TOPIC);

        ProducerRecord<String, String> highTransactionRecord = mockProducer.history().get(1);
        assertThat(highTransactionRecord.value()).contains(highAmount.toString());
        assertThat(highTransactionRecord.topic()).isEqualTo(HIGH_PRIO_TOPIC);
    }

After calling processor.process(…) method twice we want to validate that there are two records sent to Kafka. For that, we use MockProducer#history() method which returns the list of records that the producer received to send to Kafka. We fetch each record from the history to ensure it is ‘sent’ to the proper topic.

Code on Github

All code examples from this blog post are available on Coding Harbour’s GitHub.

Would you like to learn more about Kafka?

I have created a Kafka mini-course that you can get absolutely free. Sign up below and I will send you lessons directly to your inbox.

Photo credit: @paulschnuerle

Back To Top