Spring Boot HandBook

    Configuring Kafka with Spring Boot

    In our previous blogs, we explored Kafka's architecture, installed Kafka, and gained a basic understanding of how it operates. Now, let's take the next step by setting up Kafka in a Spring Boot application. In this blog, we'll learn how to configure Kafka and use it effectively within a Spring Boot project.

    Creating dummy microservices#

    Create two dummy microservices :

    1. User microservice
    2. Notification microservice

    While creating this microservices from spring initializer you have to add Kafka dependency in-order integrate Kafka in our application.

    Dependency name:- Spring for Apache Kafka

    <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>

    Using Kafka in User and Notification Microservices#

    We will create a simple endpoint in the user-service to send messages. When this endpoint is hit with a message in the path, Kafka will publish the message to the topic user-random-topic. Here, the user-service will act as a producer, generating events for the topic and sending them to the Kafka broker. The broker will then assign the message to one of the topic's partitions in a round-robin fashion.

    The notification-service will act as a consumer, listening to events from the topic user-random-topic and consuming the messages. For simplicity, we will log the consumed messages to the console.

    Adding Kafka Bootstrap Server Configuration#

    Both user-service and notification-service need to know the location of the Kafka server to communicate with it. For this, we will add the necessary configuration in the application.yml file of both services. Ensure that the Kafka server is up and running on port 9092. We will also configure the topic user-random-topic as shown below:

    kafka: topic: user-random-topic: user-random-topic spring: kafka: bootstrap-servers: localhost:9092

    This configuration enables both services to connect to the Kafka server and interact with the specified topic.

    Configuring User Service as a Kafka Producer#

    To enable the user-service to send messages to Kafka, add the following endpoint to user controller class:

    @RestController @Slf4j @RequestMapping("/users") @RequiredArgsConstructor public class UserController { @Value("${kafka.topic.user-random-topic}") private String KAFKA_RANDOM_USER_TOPIC; private final KafkaTemplate<String, String> kafkaTemplate; @PostMapping("/{message}") public ResponseEntity<String> sendMessage(@PathVariable String message) { // Sending 1000 messages to the specified Kafka topic for (int i = 0; i < 10; i++) { kafkaTemplate.send(KAFKA_RANDOM_USER_TOPIC, ""+i%3, message+i); } return ResponseEntity.ok("Message queued"); } }

    The above code defines a POST endpoint at /users/{message}, which allows the user-service to send messages to a Kafka topic. The topic name is dynamically fetched using the @Value annotation from the application properties. Inside the sendMessage method, a loop iterates 10 times to send messages to the specified Kafka topic. Each message includes a key (i % 3) to ensure messages are evenly distributed across partitions, and the message content is dynamically appended with the loop index (message + i). The KafkaTemplate is used to interact with Kafka, queuing the messages efficiently. Once all messages are queued, the endpoint returns a success response, confirming that the operation was completed. This configuration enables the user-service to act as a Kafka producer, sending multiple messages to the specified topic with ease.

    Configuring the Kafka Topic and Partitions#

    To configure the Kafka topic and its partitions, add the following configuration to the application startup. This setup ensures that if a topic named user-random-topic does not already exist with 3 partitions, it will be created. If the topic already exists, the configuration will not create it again it will simply skip it.

    @Configuration public class KafkaTopicConfig { @Value("${kafka.topic.user-random-topic}") private String KAFKA_RANDOM_USER_TOPIC; @Bean public NewTopic userRandomTopic() { // Create the topic with 3 partitions and a replication factor of 1 return new NewTopic(KAFKA_RANDOM_USER_TOPIC, 3, (short) 1); } }

    Run User Service#

    Once you have completed all the previous steps, run your user-service. It will connect to the Kafka server, and you should see logs similar to the following:

    2025-01-08T19:36:59.195+05:30 INFO 30884 --- [user-service] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.7.1 2025-01-08T19:36:59.195+05:30 INFO 30884 --- [user-service] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: e2494e6ffb89f828 2025-01-08T19:36:59.196+05:30 INFO 30884 --- [user-service] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1736345219192 2025-01-08T19:36:59.979+05:30 INFO 30884 --- [user-service] [service-admin-0] o.a.kafka.common.utils.AppInfoParser : App info kafka.admin.client for user-service-admin-0 unregistered 2025-01-08T19:36:59.989+05:30 INFO 30884 --- [user-service] [service-admin-0] o.apache.kafka.common.metrics.Metrics : Metrics scheduler closed 2025-01-08T19:36:59.989+05:30 INFO 30884 --- [user-service] [service-admin-0] o.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter 2025-01-08T19:36:59.990+05:30 INFO 30884 --- [user-service] [service-admin-0] o.apache.kafka.common.metrics.Metrics : Metrics reporters closed 2025-01-08T19:37:00.034+05:30 INFO 30884 --- [user-service] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 9050 (http) with context path '/' 2025-01-08T19:37:00.061+05:30 INFO 30884 --- [user-service] [ main] c.c.l.u.UserServiceApplication : Started UserServiceApplication in 8.923 seconds (process running for 9.753)

    This log confirms that the user-service has successfully connected to the Kafka server and the application has started without issues. You should now be able to send messages to Kafka using the endpoint we configured earlier.

    In Kafbat UI you will see the topic “user-random-topic” is created with 3 partition and 1 replica.

    Kafka Topic in Kafbat UI

    Configuring Notification service as Consumer#

    Now, let’s configure the notification-service to listen for all messages from the user-random-topic. First, add the following configuration in the application.yml file to set up the Kafka consumer and specify the consumer group ID:

    spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: ${spring.application.name}

    This configuration connects the notification-service to the Kafka server running on localhost:9092 and sets the consumer's group ID to the service's application name. The consumer group ID ensures that messages from the topic are consumed by the appropriate service instances.

    Creating the Kafka Consumer Service#

    Next, create a Kafka consumer service that will log all the messages received from the user-random-topic. This service will listen for messages from the topic and log them for further processing or debugging.

    @Service @Slf4j public class UserKafkaConsumer { @KafkaListener(topics = "user-random-topic") public void handleUserRandomTopic1(String message) { log.info("handleUserRandomTopic1: {}", message); } @KafkaListener(topics = "user-random-topic") public void handleUserRandomTopic2(String message) { log.info("handleUserRandomTopic2: {}", message); } @KafkaListener(topics = "user-random-topic") public void handleUserRandomTopic3(String message) { log.info("handleUserRandomTopic3: {}", message); } }

    In this service, three methods are annotated with @KafkaListener, each listening to the same topic, user-random-topic. Whenever a message is sent to the topic, these methods will receive and log the message. Each method logs the message with a distinct tag (e.g., handleUserRandomTopic1, handleUserRandomTopic2, and handleUserRandomTopic3), which helps to identify the source of the message in the logs.

    Note: In practice, you may want to combine these listeners or handle the messages in a more centralized way. This example illustrates different listeners for demonstration purposes.

    Run Notification Service#

    Once you have completed all the above steps, run your notification-service. Upon successful startup, you should see logs similar to the following at end:

    2025-01-08T20:43:33.481+05:30 INFO 3052 --- [notification-service] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : notification-service: partitions assigned: [user-random-topic-1] 2025-01-08T20:43:33.481+05:30 INFO 3052 --- [notification-service] [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : notification-service: partitions assigned: [user-random-topic-2] 2025-01-08T20:43:33.481+05:30 INFO 3052 --- [notification-service] [ntainer#2-0-C-1] o.s.k.l.KafkaMessageListenerContainer : notification-service: partitions assigned: [user-random-topic-0]

    These logs confirm that the notification-service has successfully connected to Kafka and is now consuming messages from the assigned partitions of the user-random-topic. Each log entry shows which partition the service is currently listening to, ensuring that the consumer is ready to process incoming messages.

    In Kafbat UI you will see that consumer has been created with group-id “notification-service” and Num of members i.e partition as 3.

    Consumer in Kafbat UI

    Demonstration#

    To view the logs from the notification-service, hit the "Send Message" endpoint from localhost with the message "Hie".

    Postman

    You will receive a 200 OK response with the body "Message queued". At this point, the user-service will send 10 messages to the user-random-topic, iterating through a loop 10 times to produce these messages.

    Topic messages in Kafbat UI

    The notification-service will then consume these messages from the topic and log them to the console.

    Consumed messages by consumer

    Conclusion#

    In this blog, we configured Kafka with Spring Boot to establish a simple producer-consumer model between the user-service and notification-service. We learned how to set up Kafka, create topics, and enable message communication between microservices. This integration demonstrates the power of Kafka in building scalable, decoupled systems for efficient message processing.

    Last updated on Jan 14, 2025