Intro : Getting Started with Kafka producer & consumer On Spring boot

Intro : Getting Started with Kafka producer & consumer On Spring boot

We all know spring boot is an excellent framework where we can write web application, scheduler,command line runner most of the java based application are written in spring boot these days for simplicity and its features also readily available components we and directly include in its dependency. Also when are application are moving towards cloud and we are in the Data monetization era  where we collect lots of data and convert it insights for planning, so traditionally we used to use publisher subscriber pattern in short pub-sub to distribute our process.and nowadays we need more powerful tools and we have distribute in higher magnitude, so lot of parallel processing tools emerged, and one such tool is Kafka

Brief About Kafka

Kafka is an open sour Stream processing software which works on distributed architecture, this works similarly as traditional pub-sub tools which were available,but it can distribute the data and push  messages parallel to different processing units(consumers) . so int this we shall write an produces and consumer to Kafka using spring boot. and lets dive in to example.

In this step we shall add Kafka dependency to  spring application which will be used through application.

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.1.0.RELEASE</version>
</dependency>

Producing Kafka messages In Spring Boot.

So to produce the message to Kafka we shall do it in two parts one is configuring properties and another one to publish messages.

Lets begin the example of configuring code

@Configuration
public class KafkaProducerConfig {
   @Bean
   public ProducerFactory<String, String> producerFactory() {
      Map<String, Object> configProps = new HashMap<>();
      configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      return new DefaultKafkaProducerFactory<>(configProps);
   }
   @Bean
   public KafkaTemplate<String, String> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
   }
}

in the above code we create producer factor bean method which sets basic properties of Kaka server, and the template bean is created to access the methods available from library. so in the next step we shall write the method to produce messages

Class ...Controller {
...
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
 
public void sendMessage(String msg) {
   kafkaTemplate.send(topicName, msg);
}
...

so using the Kafka template which we have @Autowired in the above example we can send message by using function kafkaTemplate.send(topicName, msg);

So now we shall try to consume message from Kafka in our spring boot application.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
   @Bean
   public ConsumerFactory<String, String> consumerFactory() {
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      return new DefaultKafkaConsumerFactory<>(props);
   }
   @Bean
   public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<String, String> 
      factory = new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      return factory;
   }
}

An then we can write the method in our main or any component class to recieve message form and topic as show in the example below

@KafkaListener(topics = "ashrithtopic", groupId = "group")
public void listen(String message) {
   System.out.println("Received Messasge in group - group-id: " + message);
}

THE END...!!!!