Uploading and Downloading files from MINIO object store using spring web flux.

Uploading and Downloading files from MINIO object store using spring web flux.
Photo by Thomas Millot / Unsplash

If you have landed on this page means either you working with spring boot web flux trying to upload a file to a privately hosted minio object-store server. So we know spring boot has gained popularity for its simple approach to bootstrap an application in very little time, Also minio is gaining a lot of popularity these days as people want to host their data in private cloud space. if you're wondering what is minio it's yet another object store making the owner and managed by your system administrators.

And by the way, if you need a getting started guide with minio please follow the below guide.

MINIO: Getting Started With Ubuntu 18.04
Minio is another object store just like AWS S3, This is self hosted so easily can be used in private cloud. and it is open source so its absolute free use and modify. this one of the fastest growing self managed object store out there. so when it comes to

So what is Spring Web flux?

It is spring's side-by-side version of Spring MVC,  but web flux works in a non-blocking fashion and supports reactive streams using a Projector reactor.

Coding is almost similar to web MVC. The web flux uses a netty embedded server instead of a tomcat.

So now we shall see how to upload and download a file from the minio server using spring boot.


1) Dependency

Instead of the traditional spring boot starter, I have used `spring-boot-starter-webflux`, Minio Java SDK, Lombok, and a few others. And  I have pasted my entire pom.xml for reference.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.ashrithgn</groupId>
	<artifactId>minioDemo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>minioDemo</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>17</java.version>
		<testcontainers.version>1.16.2</testcontainers.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>

		<dependency>
			<groupId>io.minio</groupId>
			<artifactId>minio</artifactId>
			<version>8.3.6</version>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.testcontainers</groupId>
			<artifactId>junit-jupiter</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.testcontainers</groupId>
				<artifactId>testcontainers-bom</artifactId>
				<version>${testcontainers.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

2) Fill up the Application properties file for the variables marked with `@Value` annotations.

3) Creating Configuration class that returns the minio client.

As a Spring boot developer, I have the stupid habit of auto-wiring stuff as it creates singleton objects and does not create a new object every time. The below code grabs config from the property file and returns an instance of the minio client we use to upload and download files using the client.

package com.ashrithgn.minioDemo.config;

import io.minio.MinioClient;
import okhttp3.OkHttpClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Configuration
public class MinioConfig {
    @Value("${minio.access.name}")
    String accessKey;
    @Value("${minio.access.secret}")
    String accessSecret;
    @Value("${minio.url}")
    String minioUrl;

    @Bean
    public MinioClient generateMinioClient() {
        try {

            OkHttpClient httpClient = new OkHttpClient.Builder()
                    .connectTimeout(10, TimeUnit.MINUTES)
                    .writeTimeout(10, TimeUnit.MINUTES)
                    .readTimeout(30, TimeUnit.MINUTES)
                    .build();

            MinioClient client = MinioClient.builder()
                    .endpoint(minioUrl)
                    .httpClient(httpClient)
                    .credentials(accessKey, accessSecret)
                    .build();
            return client;
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }

    }

}

4)The Adaptor Service class implements file upload and download logic.

I blindly name classes as Adapter that works as a wrapper blindly converting the input and output params for underlying  SDK or the library, Just to keep the service layer or the controller layer clean.

package com.ashrithgn.minioDemo.component;

import io.minio.*;
import io.minio.messages.Bucket;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.File;
import java.io.InputStream;

@Component
@Slf4j
public class MinioAdapter {
    @Autowired
    MinioClient minioClient;

    @Value("${minio.buckek.name}")
    String defaultBucketName;

    @Value("${minio.default.folder}")
    String defaultBaseFolder;


    public Flux<Bucket> getAllBuckets() {
        try {
            return Flux.fromIterable(minioClient.listBuckets()).subscribeOn(Schedulers.boundedElastic());

        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }

    }


    @SneakyThrows
    public Mono<UploadResponse> uploadFile(Mono<FilePart> file) {
        return file.subscribeOn(Schedulers.boundedElastic()).map(multipartFile -> {
            long startMillis = System.currentTimeMillis();
            File temp = new File(multipartFile.filename());
            temp.canWrite();
            temp.canRead();
            try {
                System.out.println(temp.getAbsolutePath());
                // blocking to complete io operation
                multipartFile.transferTo(temp).block();
                UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder()
                        .bucket(defaultBucketName)
                        .object(multipartFile.filename())
                        .filename(temp.getAbsolutePath())
                        .build();

                ObjectWriteResponse response = minioClient.uploadObject(uploadObjectArgs);
                temp.delete();
                log.info("upload file execution time {} ms", System.currentTimeMillis() - startMillis);
                return UploadResponse.builder().bucket(response.bucket()).objectName(response.object()).build();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).log();
    }

    public Mono<InputStreamResource> download(String name) {
        return Mono.fromCallable(() -> {
            InputStream response = minioClient.getObject(GetObjectArgs.builder().bucket(defaultBucketName).object(name).build());
            return new InputStreamResource(response);
        }).subscribeOn(Schedulers.boundedElastic());
    }

    public Mono<UploadResponse> putObject(FilePart file) {
        return file.content()
                .subscribeOn(Schedulers.boundedElastic())
                .reduce(new InputStreamCollector(),
                        (collector, dataBuffer) -> collector.collectInputStream(dataBuffer.asInputStream()))
                .map(inputStreamCollector -> {
                    long startMillis = System.currentTimeMillis();
                    try {
                        System.out.println(file.headers().getContentType().toString());
                        PutObjectArgs args = PutObjectArgs.builder().object(file.filename())
                                .contentType(file.headers().getContentType().toString())
                                .bucket(defaultBucketName)
                                .stream(inputStreamCollector.getStream(), inputStreamCollector.getStream().available(), -1)
                                .build();
                        ObjectWriteResponse response = minioClient.putObject(args);
                        log.info("upload file execution time {} ms", System.currentTimeMillis() - startMillis);
                        return UploadResponse.builder().bucket(response.bucket()).objectName(response.object()).build();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }).log();
    }

}

Also writing  UploadResponse class, that  is just a POJO used as a transfer object

package com.ashrithgn.minioDemo.component;

import lombok.Builder;
import lombok.Data;
import lombok.Value;


@Data
@Builder
public class UploadResponse {
    String id;
    String objectName;

    String bucket;
}

Let's write the `InputStreamCollector` class, which is used as a reducer to read the Bytes from Controllers input streams and combine them into one input stream which is used inside the `putObject` method in the minio adapter class.

package com.ashrithgn.minioDemo.component;

import lombok.SneakyThrows;
import org.apache.commons.compress.utils.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;

public class InputStreamCollector {
    ByteArrayOutputStream targetStream = new ByteArrayOutputStream();


    @SneakyThrows
    public InputStreamCollector collectInputStream(InputStream input) {
        IOUtils.copy(input, targetStream);
        return this;
    }

    public InputStream getStream() {
        return new ByteArrayInputStream(targetStream.toByteArray());
    }

}

5) Let's Code The WebFlux Rest Controller.

As this is an example, I have skipped writing the service class. Please do implement the service class in the actual project where you can extend the features like storing the response in DB.

package com.ashrithgn.minioDemo;

import com.ashrithgn.minioDemo.component.MinioAdapter;
import com.ashrithgn.minioDemo.component.UploadResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RequestPart;
import reactor.core.publisher.Mono;

public class FileController {
    @Autowired
    MinioAdapter adapter;

    @RequestMapping(path = "/", method = RequestMethod.POST, produces = MimeTypeUtils.APPLICATION_JSON_VALUE, consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public Mono<UploadResponse> upload(
            @RequestPart(value = "files", required = true) Mono<FilePart> files) {
        return adapter.uploadFile(files);

    }

    @RequestMapping(path = "/stream", method = RequestMethod.POST, produces = MimeTypeUtils.APPLICATION_JSON_VALUE, consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public Mono<UploadResponse> uploadStream(
            @RequestPart(value = "files", required = true) FilePart files, @RequestParam(value = "ttl", required = false) Integer ttl) {
        return adapter.putObject(files);

    }

    @RequestMapping(path = "/", method = RequestMethod.GET)
    public ResponseEntity<Mono<InputStreamResource>> download(
            @RequestParam(value = "filename") String fileName) {
        return ResponseEntity.ok()
                .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + fileName)
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_OCTET_STREAM_VALUE).body(adapter.download(fileName));

    }

}



Now after going through the above code everything is very similar to the spring boot web MVC. But input params and return values are wrapped inside Mono<> or Flux<> and most of the code is written inside the anonymous lambda functions. And `Schedulers` methods which are consumed inside the Adapter class.

To provide tiny info about reactive programming

So Basically no blocking processing will be achieved using event loop, And Publisher subscriber pattern, so Mono and flux are the implementations provided by the project reactor based on reactive specification.

So mono is the producer for a single object, and flux is for a List of objects

and In Reactor, a Scheduler is an abstraction that gives the user control about threading. A Scheduler can spawn a Worker.