Using The Apache Pulsar publish and consume messages via Spring Boot.

Using The Apache Pulsar publish and consume messages via Spring Boot.

Most of the developer community is aware of Spring boot and its versatility and ease of adaptability.
In simple words to explain Apache Pulsar is another open-source alternative to Kafka.
It is not just a distributed pub-sub but a stream processing framework.  eliminates the need of having "Apache Flink"  in simple words this is a lightweight ETL tool for teal time data.
And we still have Kaka Streams and Ksql to achieve this in Kafka, but in long-term storage, Apache Pulsar has the upper hand and provides, Geo-replication features out of the box, Multitenancy, Cloud Native.
Use case which we preferred Pulsar we wanted to strip credit card info and push it our log storage, we used pulsar function written in java cleaned our data published to another topic using only a single tool.
Another scenario where we used the Pulsar function and came in handy is we wanted to store payments-related logs info into separate DB so the payments teams can run their reports.

So now Let's jump into the simple part of producing and consuming messages using Apache Pulsar via spring boot.

I could have made this simple example using a while loop and making writing multiple consumer objects inside while loop, I took a hard approach wanted to remove most of my boilerplate code make it simple as like  @KafkaListener as we write Kafka, listener.

1) Dependency

I have used a traditional spring boot starter and added pulsar java client and reflections in my dependency. pasting my entire demo project pom for reference

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.4</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.ashrithgn</groupId>
	<artifactId>pulsar_demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>pulsar_demo</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client -->
		<dependency>
			<groupId>org.apache.pulsar</groupId>
			<artifactId>pulsar-client</artifactId>
			<version>2.9.1</version>
		</dependency>

		<dependency>
			<groupId>org.reflections</groupId>
			<artifactId>reflections</artifactId>
			<version>0.9.11</version>
		</dependency>




	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

2) Setting up The Apache Pulsar Client that can be Auto wired using annotation


@Configuration
public class PulsarConfig {
    @Value("${pulsar.service.url:pulsar://localhost:6650}")
    String pulsarUrl;

    @SneakyThrows
    @Bean
    public PulsarClient buildClient() {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        return client;
    }


}

3) Custom Annotations

Here I have written two custom Annotations,  `@PulsarListener`  to scan which classes to look for pulsar consumers and  '@PulsarConsumer'  provide topic and subscriber details and subscription type for consumption.

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface PulsarConsumer {
    String topic() default "" ;
    String subscriptionName() default "";
    SubscriptionType subscriptionType() default SubscriptionType.Exclusive;

}
package com.ashrithgn.pulsar_demo;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface PulsarListener {
}

4) Router Class

this class uses spring Scheduled annotation to avoid while loop and start the background process

and the next part is using reflections to search the classes which have the message consumer logic and invoke the functions to process the messages. This class uses the above annotations to achieve

package com.ashrithgn.pulsar_demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.reflections.Reflections;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.scanners.TypeAnnotationsScanner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.*;

@Slf4j
@EnableAsync
@Component
public class pulsarSchedulerRouting {
    Map<String, Consumer> consumerMap = new HashMap<>();

    @Autowired
    PulsarClient client;

    @Scheduled(fixedRate = 1000)
    public void runPulsarTask() {
        try {
            Producer<String> stringProducer = client.newProducer(Schema.STRING)
                    .topic("test")
                    .create();
            stringProducer.send("Hello world");
            stringProducer.flush();
            stringProducer.close();
        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
        Set<Class<?>> classes = new Reflections("com.ashrithgn", new TypeAnnotationsScanner(), new SubTypesScanner()).getTypesAnnotatedWith(PulsarListener.class);

        classes.forEach(classObj -> {

            Arrays.stream(classObj.getMethods()).forEach(f -> {
                if(f.isAnnotationPresent(PulsarConsumer.class)) {
                    PulsarConsumer details = f.getAnnotation(PulsarConsumer.class);
                    Consumer consumer = null;
                    if (consumerMap.containsKey(details.subscriptionName() + details.topic())) {
                        consumer = consumerMap.get(details.subscriptionName() + details.topic());
                    } else {
                        try {
                            consumer = client.newConsumer()
                                    .topic(details.topic())
                                    .subscriptionName(details.subscriptionName())
                                    .subscriptionType(details.subscriptionType())
                                    .subscribe();
                            consumerMap.put(details.subscriptionName() + details.topic(), consumer);
                        } catch (PulsarClientException e) {
                            e.printStackTrace();
                        }
                    }
                    if (consumer != null) {
                        Message<String> message = null;
                        try {
                            message = consumer.receive();
                        } catch (PulsarClientException e) {
                            e.printStackTrace();
                        }
                        if (message != null) {
                            try {
                                Object[] args = new Object[1];
                                args[0] = message;
                                f.invoke(classObj.getDeclaredConstructor().newInstance(),args);
                            } catch (Throwable e) {
                                e.printStackTrace();
                            }
                        }

                    }
                }
            });

        });


    }
}

5)MessageConsumer

This class contains the code in which we read the message and implement our business logic on message data.

in this example code, it is to print to the console

@Slf4j
@PulsarListener
public class MessageConsumer {
    @PulsarConsumer(subscriptionName = "test", topic = "test", subscriptionType = SubscriptionType.Shared)
    public void listen(Message<?> message) {
        System.out.println("------------------------------------------------------------------------------");
        Object response = message.getValue();
        log.info(new String(message.getData()));
        System.out.println("------------------------------------------------------------------------------");
    }


    @PulsarConsumer(subscriptionName = "test1", topic = "test", subscriptionType = SubscriptionType.Shared)
    public void listen2(Message<?> message) {
        System.out.println("--------------------------------2----------------------------------------------");
        Object response = message.getValue();
        log.info(new String(message.getData()));
        System.out.println("--------------------------------2----------------------------------------------");
    }


}

Main Application Class for reference:

package com.ashrithgn.pulsar_demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class PulsarDemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(PulsarDemoApplication.class, args);
	}

}