How to build a custom Kafka Consumer application?

How to build a custom Kafka Consumer application?

If you have followed my recent post – how to build a custom Kafka Producer application, I’m sure you would also like to learn how to build a custom Kafka Consumer application. Recently I’ve also published a few how-to topics related to Apache Kafka, make sure to read these posts to learn about the basic concepts.

A Kafka Consumer application is just another application that is capable of reading messages from a topic in Kafka cluster (brokers). This message can be anything, for e.g. a business event such as end-user orders a product in an e-commerce application.

The source code is available on GitHub. Feel free to clone or contribute to the repository.

Development Environment

In order to build a Kafka producer or consumer application, it is important to set up the development environment.

An application will need Kafka client dependency which is basically the Kafka APIs that can be used to interact with the Kafka cluster and broker(s). There are many Kafka client packages available and which one to choose entirely depends on the project requirements, the team’s preference for a language/framework, etc.

I’m using Java-based Kafka client for this post. Please feel free to try any other client, the APIs are similar anyway.

Prerequisites:

  • Integrated Development Environment – IntelliJ Idea Community or Ultimate version or Eclipse. I’m using IntelliJ’s community version of this example.
  • Java 8 SDK
  • Maven Dependency Manager installed and plugged into the IDE
  • A running Kafka cluster with a minimum one Kafka broker and two topics – “kafka-training”, “kafka-training-2”. Please refer my post if you would like to learn how to set up Apache Kafka locally.

Project setup in IDE

Open IntelliJ IDE, click on create a new project, choose Maven with the correct Java SDK version (1.8 on my machine) and provide the required information like group-id, artifact-id, project name and location. The idea is to create a simple console based consumer application.

Add Kafka client to the project

Open app’s pom.xml and add Kafka client as a dependency so that Maven can import the dependency in the project. Save this file and look at the External Libraries in the Project Explorer, a new library package folder will be added – kafka-clients:1.1.1.jar in my case.

Explore the folder named org.apache.kafka that contains the client API classes that will be required to create producer or consumer applications.

Kafka Consumer

If you have worked with the Kafka producer application, a consumer application is quite similar when it comes to the set up of the application.

With the help of Kafka Consumer API, it is really easy to create an instance of a consumer in the application. There are many configuration properties available for customisation based on the requirements. However, the following are the three mandatory properties (key-value pair) required to create an instance of KafkaConsumer class:

  • bootstrap.servers – This is a collection of Kafka broker addresses so that consumer can connect to the first available broker. On successful connection with a broker, it comes to know about partition owners and leaders in order to send messages. The best practice is to provide the address for more than one broker.
  • key.deserializer and value.deserializer – As such a consumer’s job is to read messages, it needs to deserialise the message sent by a producer. There are many types of deserialisers available out of the box such as string, JSON, etc. The deserializer here needs to match the serializer in order for a consumer to read message properly.

Reading messages from Topic(s)

There are two ways to read messages – using subscribe and assign methods.

Subscribe method allows a consumer to read messages from a topic that may have one-to-many partitions. If consumer subscribes to many topics, it will be able to read messages from many partitions.

Assign method allows a consumer to read messages from the particular partition(s) of a topic. This can be useful in advanced cases.

However, this is self-managed i.e. assign method will have no effect when a partition is added or removed to the topic. It will just care about the partition(s) that is referenced at the time of calling the method. In the case of the subscribe method, this partition management is done automatically.

Subscribe/Unsubscribe a Topic

Use Kafka’s console based producer or custom producer application to send messages to the broker so that this consumer can read messages.

Let’s look at the consumer code below that uses subscribe method to read messages:

Line no. 27 creates an instance of class KafkaConsumer which needs the mandatory properties in order to create an instance of ConsumerConfig internally.

Line no. 29 is important, it applies the subscribe method on the consumer to subscribe to a collection of topics. This array can contain more than one topic names. All the topics that the consumer wants to subscribe to, must be given to the first subscribe method. Any subsequent calls to the subscribe method will override the previous call. So, the code above illustrates a good practice to maintain a list of topics and then pass it to the subscribe method.

Next, a while loop is used to keep the process in execution with the help of polling at a certain interval of time (to continuously poll the brokers for data). As a result, we get ConsumerRecords collection that contains the messages and using a for loop, the record’s detail is printed on the console.

The poll() process is a single-threaded operation. It is important to keep this in mind while building a consumer application. It is also a good practice to have only one polling in the consumer application.

Note that the main code is surrounded by try-catch-finally block as the instantiation of a Kafka consumer or reading messages may result in a run-time exception. In this case, it is important to log the exception and to close the consumer connection using the finally block.

It is also possible to subscribe to the topic(s) using a regular expression pattern such as consumer.subscribe("topic-*").

It is very easy to unsubscribe using consumer.unsubscribe(). Note, this method takes no argument which means the consumer will unsubscribe from all the topics when this method is called. The other way to unsubscribe is to pass an empty list to the subscribe method.

Assign to Topic’s Partition

Let’s look at the code below:

The code above is almost identical to the code used to describe the subscribe method’s usage. In order to read messages from the particular partition(s), an instance of TopicPartition class is required that needs a topic name and partition-id. Next, the collection of these topic partitions is then given to the consumer using assign() method.

Kafka Producer Perf Test Tool

There is a Kafka producer perf test tool available in the bin folder that can be used to send mocked messages to the Kafka server. Either use this tool or the custom Kafka producer application if you have followed my previous post. The code for the producer is in the same repository so the code for producer and consumer can be executed at the same time.

I hope this post explains how to build a custom Kafka Consumer application.

If you would like to learn more about Apache Kafka, please subscribe to my blog as I’ll be writing more how-to articles very soon.

 

 

Siddharth Pandey

Siddharth Pandey is a Software Engineer with thorough hands-on commercial experience & exposure to building enterprise applications using Agile methodologies. Siddharth specializes in building, managing on-premise, cloud based real-time standard, single page web applications (SPAs). He has successfully delivered applications in health-care, finance, insurance, e-commerce sectors for major brands in the UK. Other than programming, he also has experience of managing teams, trainer, actively contributing to the IT community by sharing his knowledge using Stack Overflow, personal website & video tutorials.

You may also like...

Advertisment ad adsense adlogger