Change Data Capture for MongoDB using Change Streams in Spring Boot.

Change Data Capture for MongoDB using Change Streams in Spring Boot.

What is Change Data Capture?

Change data capture is short know as CDC,  A process of tracking the changes in the data of a particular Database and logging those changes.

Why do we need CDC?

We need CDC for audit purposes to track. the Audit might help us to understand the changes like who, when, and what were made on the specific record. we can use this log to recover or triage the flow of edits made to the data.

There are several ways to achieve CDC. like in the application layer which is writing the records or capturing the changes directly on the database layer.

We are taking the approach to track changes made to the documents directly on the database. with the help of the change stream provided by the Mongo DB.

What are the Change Streams of Mongo DB?

The change stream is a real-time stream of database changes that are listened to in your application.  you can learn more about How Do Change Streams Work In MongoDB? | MongoDB.

with the help of a small POC, I will be demonstrating to capture the audit logs of an application which do write/update Mongo documents.

Assume we have an application that does the CRUD operations which is written in spring boot. and you have implemented field level Auditing by using @EnableAuditing.

Or in existing spring boot application which writes data, You can capture Mongo Events before saving . below example snippet i am updating the trace id to track horizontal and vertical changes.

@Configuration
public class MongoAuditListener extends AbstractMongoEventListener<BaseEntity> {
	@Autowired
	ReactiveMongoTemplate mongoTemplate;

	@Override
	public void onBeforeConvert(BeforeConvertEvent<BaseEntity> event) {
		super.onBeforeConvert(event);
		var traceId = MDC.get("traceId") == null ? UUID.randomUUID()
				.toString() : MDC.get("traceId");
		event.getSource()
				.setAuditTransactionId(traceId);

	}
 }

Reference of my Base Entity

public abstract class BaseEntity {
    @JsonIgnore
    private String auditTransactionId;

    @JsonIgnore
    public abstract String getId();

    @JsonIgnore
    public abstract Class<?> getClassType();
    @JsonIgnore
    protected Date createdAt = new Date();
    @JsonIgnore
    protected Date updatedAt;
    @JsonIgnore
    protected String createdBy;
    @JsonIgnore
    protected String updateBy;
}

Then we can write spring boot app which listens to mongo change streams and writers the audit logs to an different mongo DB.

  1. Creating class  provides the database instances on which we need to capture the change streams,Also this class can be @AutoWired .
@Configuration
public class MongoClientFactory {

	private String cdcDbURI;

	private String cdcDbName;

	public MongoClientFactory(	@Value("${mongo.source.uri}") String cdcDbURI,
								@Value("${mongo.source.db.name}") String cdcDbName) {
		this.cdcDbURI = cdcDbURI;
		this.cdcDbName = cdcDbName;
	}

	@Bean(name = "cdcSource")
	public MongoDatabase getCDCMongoDatabase() {
		MongoClient client = null;
		try {
			client = MongoClients.create(cdcDbURI);
			return client.getDatabase(cdcDbName);
		} catch (Exception e) {
			Objects.requireNonNull(client)
					.close();
			return getCDCMongoDatabase();
		}

	}

}

And required Dependency is

<dependency>
    <groupId>org.mongodb</groupId>
     <artifactId>mongodb-driver-sync</artifactId>
     <version>4.9.1</version>
 </dependency>

2. On SpringBoot Application startup  lets implement the code to read the change streams and write it an Different mongo DB of your choice.

import java.util.List;
import java.util.Objects;

import org.bson.BsonDocument;
import org.bson.Document;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;

import com.google.gson.Gson;
import com.lowes.cdc.mongochangestreams.MongoUtils;
import com.lowes.cdc.mongochangestreams.model.entity.AuditEntity;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.TruncatedArray;

import lombok.extern.slf4j.Slf4j;
import reactor.core.scheduler.Schedulers;

@Slf4j
@Configuration
public class ApplicationStartUpTasks implements ApplicationListener<ContextRefreshedEvent> {

	private final MongoDatabase cdcDatabase;

	private final ReactiveMongoTemplate auditDbTemplate;

	@Autowired
	public ApplicationStartUpTasks(MongoDatabase source, ReactiveMongoTemplate template) {
		this.cdcDatabase = source;
		this.auditDbTemplate = template;
	}

	/**
	 * Called on each start up....
	 * 
	 * @param event spring context event
	 */
	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		try {
			log.info("Registering CDC on Startup");
			// getting all collections.
			List<String> collectionNames = MongoUtils.getCollections(cdcDatabase)
					.stream()
					.filter(name -> !name.startsWith("audit"))
					.toList();

			// change streams events
			var changeStreams = cdcDatabase.watch()
					.fullDocument(FullDocument.UPDATE_LOOKUP);

			MongoCursor<ChangeStreamDocument<Document>> cursor;

			// event cursor
			cursor = changeStreams.iterator();

			// iterate $ process each event
			process(cursor, collectionNames);
		} catch (Exception e) {
			log.debug(e.getMessage());
			log.debug("restarting the Cursor");
			this.onApplicationEvent(event);
		}
	}

	/**
	 * ETL logs to audit database
	 * 
	 * @param cursor          event cursor
	 * @param collectionNames contains DB collection names
	 */
	private void process(MongoCursor<ChangeStreamDocument<Document>> cursor, List<String> collectionNames) {
		while (cursor.hasNext()) {

			var changeEvent = cursor.next();

			log.info(changeEvent.getOperationTypeString() + " event received on "
					+ Objects.requireNonNull(changeEvent.getNamespace())
							.getCollectionName());

			var collectionName = Objects.requireNonNull(changeEvent.getNamespace())
					.getCollectionName();

			if (collectionNames.contains(collectionName)) {
				var auditEntry = generateAuditEntity(changeEvent, collectionName);

				if (changeEvent.getOperationTypeString()
						.toLowerCase()
						.startsWith("update")) {
					var updateDescription = changeEvent.getUpdateDescription();

					if (updateDescription != null) {
						syncUpdateFields(updateDescription.getUpdatedFields(), auditEntry);
						removeFields(auditEntry, updateDescription.getRemovedFields());
						truncatedArray(updateDescription.getTruncatedArrays(), auditEntry);
					}

				}

				auditDbTemplate.save(auditEntry, "audit_logs")
						.publishOn(Schedulers.parallel())
						.doOnError(err -> log.debug(err.getMessage()))
						.subscribe();
			}

		}
	}

	private AuditEntity generateAuditEntity(ChangeStreamDocument<Document> changeEvent, String collectionName) {
		var document = changeEvent.getFullDocument();
		AuditEntity auditEntry = new AuditEntity();

		if (document != null) {
			auditEntry.setDocumentId(document.containsKey("_id") ? document.get("_id")
					.toString() : "not-available");
			auditEntry.setCollectionName(collectionName);
			auditEntry.setDbName(changeEvent.getDatabaseName());
			auditEntry.setTransactionId(document.containsKey("auditTransactionId")
					? document.getString("auditTransactionId")
					: "not-available");
			auditEntry.setUpdatedAt(document.containsKey("updatedAt") ? document.getDate("updatedAt") : null);
			auditEntry.setCreateAt(document.containsKey("createdAt") ? document.getDate("createdAt") : null);

			auditEntry.setSnapshot(changeEvent.getFullDocument());
			auditEntry.setOperationType(changeEvent.getOperationTypeString());

		}
		return auditEntry;
	}

	private void removeFields(AuditEntity entity, List<String> removeField) {
		if (removeField != null)
			removeField.forEach(field -> entity.getSnapshot()
					.remove(field));

	}

	private void syncUpdateFields(BsonDocument updateFields, AuditEntity auditEntry) {
		if (updateFields != null) {
			auditEntry.setUpdateLog(updateFields);
			updateFields.forEach((key, value) -> auditEntry.getSnapshot()
					.put(key, value));
		}
	}

	private void truncatedArray(List<TruncatedArray> truncatedArrays, AuditEntity auditEntry) {
		log.info(new Gson().toJson(auditEntry));
		log.info(new Gson().toJson(truncatedArrays) + "++++++++++++++++++++++++++++++++++++++++++++++++++++");
	}

}

Audit Entity Class



import java.util.Date;

import org.bson.Document;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;

@org.springframework.data.mongodb.core.mapping.Document(collection = "audit_logs")
public class AuditEntity {
	@Id
	private String id;
	@Indexed
	private String transactionId;
	private Date createAt;
	private Date updatedAt;
	private Document snapshot;
	@Indexed
	private String collectionName;
	@Indexed
	private String dbName;
	@Indexed
	private String documentId;
	@Indexed
	private String operationType;

	private Object updateLog;

	public String getOperationType() {
		return operationType;
	}

	public void setOperationType(String operationType) {
		this.operationType = operationType;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getTransactionId() {
		return transactionId;
	}

	public void setTransactionId(String transactionId) {
		this.transactionId = transactionId;
	}

	public Date getCreateAt() {
		return createAt;
	}

	public void setCreateAt(Date createAt) {
		this.createAt = createAt;
	}

	public Document getSnapshot() {
		return snapshot;
	}

	public void setSnapshot(Document snapshot) {
		this.snapshot = snapshot;
	}

	public String getCollectionName() {
		return collectionName;
	}

	public void setCollectionName(String collectionName) {
		this.collectionName = collectionName;
	}

	public String getDbName() {
		return dbName;
	}

	public void setDbName(String dbName) {
		this.dbName = dbName;
	}

	public String getDocumentId() {
		return documentId;
	}

	public void setDocumentId(String documentId) {
		this.documentId = documentId;
	}

	public Date getUpdatedAt() {
		return updatedAt;
	}

	public void setUpdatedAt(Date updatedAt) {
		this.updatedAt = updatedAt;
	}

	public Object getUpdateLog() {
		return updateLog;
	}

	public void setUpdateLog(Object updateLog) {
		this.updateLog = updateLog;
	}
}

For Refernce pom.xml of my Spring boot app

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ashrith.cdc</groupId>
    <artifactId>mongo-change-streams</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mongo-change-streams</name>
    <description>mongo-change-streams</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver-sync</artifactId>
            <version>4.9.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.10.1</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.javers</groupId>
            <artifactId>javers-core</artifactId>
            <version>7.0.0</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-boot-starter</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-text</artifactId>
            <version>1.10.0</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Also i exposed couple of rest api to view the audit log based on trace id and by document id.

Controller

@RestController()
public class AuditController {

	private final AuditService auditService;

	@Autowired
	public AuditController(AuditService auditService) {
		this.auditService = auditService;
	}

	@GetMapping("/collections/{traceId}")
	public Flux<AuditCollectionChangesLogs> getCollectionChangesByTraceId(@PathVariable String traceId) {
		return auditService.getCollectionChangesByTransactionId(traceId);
	}

	@GetMapping("/document/{documentId}")
	public Mono<String> getChangeLogByTraceId(	@PathVariable String documentId,
												@RequestParam(defaultValue = "20") int limit,
												@RequestParam(defaultValue = "0") int skip) {
		return auditService.getChangesOnDocument(documentId, limit, skip);
	}
}

Transefer Objectsie i.e DTO/TO

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AuditChangeLog {
	private String changeType;
	private Document currentObject;
	private Document previousObject;
	private String updateLog;
	private List<String> diff = new ArrayList<>();
}
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AuditCollectionChangesLogs {
	private String transactionId;
	private String collectionName;
	private String documentId;
}

Service Class



import java.util.ArrayList;

import org.javers.core.JaversBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.web.bind.annotation.RestController;

import com.google.gson.Gson;
import com.lowes.cdc.mongochangestreams.model.entity.AuditEntity;
import com.lowes.cdc.mongochangestreams.model.transferobject.AuditChangeLog;
import com.lowes.cdc.mongochangestreams.model.transferobject.AuditCollectionChangesLogs;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class AuditService {
	@Autowired
	ReactiveMongoTemplate template;

	public Flux<AuditCollectionChangesLogs> getCollectionChangesByTransactionId(String transactionId) {
		return template.find(Query.query(Criteria.where("transactionId")
				.is(transactionId)), AuditEntity.class, "audit_logs")
				.map(auditEntity -> AuditCollectionChangesLogs.builder()
						.collectionName(auditEntity.getCollectionName())
						.documentId(auditEntity.getDocumentId())
						.transactionId(auditEntity.getTransactionId())
						.build());
	}

	public Mono<String> getChangesOnDocument(String documentId, int limit, int skip) {
		var query = Query.query(Criteria.where("documentId")
				.is(documentId))
				.with(Sort.by(Sort.Direction.DESC, "snapshot.updatedAt"));
		if (limit > 0) {
			query.limit(limit);
		}
		if (skip > 0) {
			query.skip(skip);
		}

		return template.find(query, AuditEntity.class, "audit_logs")
				.map(auditEntity -> AuditChangeLog.builder()
						.diff(new ArrayList<>())
						.currentObject(auditEntity.getSnapshot())
						.changeType(auditEntity.getOperationType())
						.updateLog(auditEntity.getUpdateLog() != null ? new Gson().toJson(auditEntity.getUpdateLog())
								: "")
						.build())
				.reduce((auditChangeLog, auditChangeLog2) -> {
					auditChangeLog2.setPreviousObject(auditChangeLog.getCurrentObject());
					auditChangeLog2.getDiff()
							.addAll(auditChangeLog.getDiff());
					auditChangeLog2.getDiff()
							.add(JaversBuilder.javers()
									.build()
									.compare(auditChangeLog2.getCurrentObject(), auditChangeLog2.getPreviousObject())
									.prettyPrint());
					auditChangeLog2
							.setUpdateLog(auditChangeLog2.getUpdateLog() + " \n " + auditChangeLog.getUpdateLog());
					return auditChangeLog2;
				})
				.map(auditChangeLog -> toHTML(String.join("\n", auditChangeLog.getDiff())));

	}

	private String toHTML(String str) {
		return str.replaceAll("\\n", "<br/>");
	}
}

Utility

import java.util.List;
import java.util.stream.StreamSupport;

import com.mongodb.client.MongoDatabase;

public class MongoUtils {

	private MongoUtils() {
	}

	public static List<String> getCollections(MongoDatabase db) {
		return StreamSupport.stream(db.listCollectionNames()
				.spliterator(), false)
				.toList();
	}
}