AWS S3 with Spring WebFlux
--
In this story, we are going to explore how to use AWS S3 with Asynchronous programming using Spring WebFlux Rest API.
· Prerequisites
· Overview
∘ What is Amazon S3?
∘ Setting Up S3 bucket
· Spring Webflux Application
∘ Configuring S3 clients
∘ Upload Object
∘ Download the file from AWS S3
∘ AWSS3Controller
· Test the REST APIs
· Conclusion
· References
The AWS SDK for Java 1.x has asynchronous clients that are wrappers around a thread pool and blocking synchronous clients that don’t provide the full benefit of nonblocking I/O. The AWS SDK for Java 2.x provides features for non-blocking asynchronous clients that implement high concurrency across a few threads.
In this story, we are going to explore how to use AWS S3 with Asynchronous programming using Spring WebFlux as a Backend application.
Prerequisites
This is the list of all the prerequisites for following this story:
- Spring Boot 3
- Maven 3.8.+
- Java 17
- An active AWS account with access to the S3 service.
- Postman / insomnia or any other API testing tool.
- Optionally, LocalStack to run S3 locally
- Optionally, Docker and Docker compose
Overview
What is Amazon S3?
Amazon S3 is a simple storage service that helps developers and IT teams to store, backup, archive, and retrieve data from anywhere on the web. It allows administrators to store data in categories, add tags to objects, configure access controls for multiple clients, perform high-volume data analysis, get insights into the storage usage, and measure trends based on activities.
Setting Up S3 bucket
For this story, we chose to run the LocalStack Docker image locally to support local versions of additional AWS services.
For a configuration with the AWS Management Console, you can see my previous story.
LocalStack is a cloud service emulator that runs in a single container on your laptop or in your CI environment. With LocalStack, you can run your AWS applications or Lambdas entirely on your local machine without connecting to a remote cloud provider! Whether you are testing complex CDK applications or Terraform configurations, or just beginning to learn about AWS services, LocalStack helps speed up and simplify your testing and development workflow. — https://github.com/localstack/localstack
There are several ways to install LocalStack (LocalStack CLI, LocalStack Cockpit, Docker, Docker-Compose, Helm). We’ll use Docker-Compose. We will create a docker-compose YAML file containing all the instructions to start the LocalStack Docker container.
Open your CLI and run the following command:
docker-compose up -d
The local s3 service is available on port 4566 with a bucket name “my-test-bucket”.
Spring Webflux Application
Let’s start by creating a simple Spring Reactive project from start.spring.io, with the following dependencies: Spring Reactive Web and Lombok.
Configuring S3 clients
To implement the integration with AWS, we need to add the AWS SDK for Java V2 dependencies in the pom.xml file.
<!-- AWS SDK Java V2 -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.18.41</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
<version>2.18.41</version>
</dependency>
The second important step is AWS S3 AsyncClient Creation.
First, add the AWS S3 credentials in the property file.
# AWS properties
aws:
access-key: test
secret-key: test
region: eu-west-1
s3-bucket-name: my-test-bucket
multipart-min-part-size: 5242880 # 5MB
endpoint: http://localhost:4566/
aws.region
- Aws region.aws.access-key
- AWS access key IDaws.secret-key
- AWS secret access keyaws.endpoint:
- Override the S3 client to use a local instance instead of an AWS service.aws.s3-bucket-name
- Name of the S3 bucketaws.multipart-min-part-size
- AWS S3 requires that file parts must have at least 5MB, except for the last part. — https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
Then, we need to create the service client for accessing Amazon S3 asynchronously with S3AsyncClient.
We created a bean method returning S3AsyncClient with basic configuration. The credentials are loaded from the custom AwsCredentialsProvider bean with our Localstack AWS service.
The methods for an asynchronous client in V2 of the AWS SDK for Java return CompletableFuture objects that allow us to access the response when it’s ready.
Upload Object
We are going to implement the object upload service in S3.
/**
* {@inheritDoc}
*/
@Override
public Mono<FileResponse> uploadObject(FilePart filePart) {
String filename = filePart.filename();
Map<String, String> metadata = Map.of("filename", filename);
// get media type
MediaType mediaType = ObjectUtils.defaultIfNull(filePart.headers().getContentType(), MediaType.APPLICATION_OCTET_STREAM);
CompletableFuture<CreateMultipartUploadResponse> s3AsyncClientMultipartUpload = s3AsyncClient
.createMultipartUpload(CreateMultipartUploadRequest.builder()
.contentType(mediaType.toString())
.key(filename)
.metadata(metadata)
.bucket(s3ConfigProperties.getS3BucketName())
.build());
UploadStatus uploadStatus = new UploadStatus(Objects.requireNonNull(filePart.headers().getContentType()).toString(), filename);
return Mono.fromFuture(s3AsyncClientMultipartUpload)
.flatMapMany(response -> {
FileUtils.checkSdkResponse(response);
uploadStatus.setUploadId(response.uploadId());
LOGGER.info("Upload object with ID={}", response.uploadId());
return filePart.content();
})
.bufferUntil(dataBuffer -> {
// Collect incoming values into multiple List buffers that will be emitted by the resulting Flux each time the given predicate returns true.
uploadStatus.addBuffered(dataBuffer.readableByteCount());
if (uploadStatus.getBuffered() >= s3ConfigProperties.getMultipartMinPartSize()) {
LOGGER.info("BufferUntil - returning true, bufferedBytes={}, partCounter={}, uploadId={}",
uploadStatus.getBuffered(), uploadStatus.getPartCounter(), uploadStatus.getUploadId());
// reset buffer
uploadStatus.setBuffered(0);
return true;
}
return false;
})
.map(FileUtils::dataBufferToByteBuffer)
// upload part
.flatMap(byteBuffer -> uploadPart(uploadStatus, byteBuffer))
.onBackpressureBuffer()
.reduce(uploadStatus, (status, completedPart) -> {
LOGGER.info("Completed: PartNumber={}, etag={}", completedPart.partNumber(), completedPart.eTag());
(status).getCompletedParts().put(completedPart.partNumber(), completedPart);
return status;
})
.flatMap(uploadStatus1 -> completeMultipartUpload(uploadStatus))
.map(response -> {
FileUtils.checkSdkResponse(response);
LOGGER.info("upload result: {}", response.toString());
return new FileResponse(filename, uploadStatus.getUploadId(), response.location(), uploadStatus.getContentType(), response.eTag());
});
}
This method takes as a parameter the part of the request containing the file to save. The Part
presents a part in a multipart/form-data
request, it could be a FilePart
orFormFieldPart.
Then, we prepare the request with createMultipartUpload method. This action initiates a multipart upload and returns an upload ID. This upload ID is used to associate all of the parts in the specific multipart upload using sdk uploadPart method.
/**
* Uploads a part in a multipart upload.
*/
private Mono<CompletedPart> uploadPartObject(UploadStatus uploadStatus, ByteBuffer buffer) {
final int partNumber = uploadStatus.getAddedPartCounter();
LOGGER.info("UploadPart - partNumber={}, contentLength={}", partNumber, buffer.capacity());
CompletableFuture<UploadPartResponse> uploadPartResponseCompletableFuture = s3AsyncClient.uploadPart(UploadPartRequest.builder()
.bucket(s3ConfigProperties.getS3BucketName())
.key(uploadStatus.getFileKey())
.partNumber(partNumber)
.uploadId(uploadStatus.getUploadId())
.contentLength((long) buffer.capacity())
.build(),
AsyncRequestBody.fromPublisher(Mono.just(buffer)));
return Mono
.fromFuture(uploadPartResponseCompletableFuture)
.map(uploadPartResult -> {
FileUtils.checkSdkResponse(uploadPartResult);
LOGGER.info("UploadPart - complete: part={}, etag={}", partNumber, uploadPartResult.eTag());
return CompletedPart.builder()
.eTag(uploadPartResult.eTag())
.partNumber(partNumber)
.build();
});
}
When all ETags are received, we perform a multipart download by assembling the previously downloaded parts with the CompleteMultipartUpload method.
/**
* This method is called when a part finishes uploading. It's primary function is to verify the ETag of the part
* we just uploaded.
*/
private Mono<CompleteMultipartUploadResponse> completeMultipartUpload(UploadStatus uploadStatus) {
LOGGER.info("CompleteUpload - fileKey={}, completedParts.size={}",
uploadStatus.getFileKey(), uploadStatus.getCompletedParts().size());
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
.parts(uploadStatus.getCompletedParts().values())
.build();
return Mono.fromFuture(s3AsyncClient.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(s3ConfigProperties.getS3BucketName())
.uploadId(uploadStatus.getUploadId())
.multipartUpload(multipartUpload)
.key(uploadStatus.getFileKey())
.build()));
}
Download the file from AWS S3
The AWS SDK provides the getObject() method to retrieve objects from Amazon S3.
@Override
public Mono<byte[]> getByteObject(@NotNull String key) {
LOGGER.debug("Fetching object as byte array from S3 bucket: {}, key: {}", s3ConfigProperties.getS3BucketName(), key);
return Mono.just(GetObjectRequest.builder().bucket(s3ConfigProperties.getS3BucketName()).key(key).build())
.map(it -> s3AsyncClient.getObject(it, AsyncResponseTransformer.toBytes()))
.flatMap(Mono::fromFuture)
.map(BytesWrapper::asByteArray);
}
AWSS3Controller
Let’s see all the methods of our controller class.
@RequiredArgsConstructor
@RestController
@RequestMapping("/object")
@Validated
public class AWSS3Controller {
private final AWSS3FileStorageService fileStorageService;
@PostMapping("/upload")
public Mono<SuccessResponse> upload(@RequestPart("file-data") Mono<FilePart> filePart) {
return filePart
.map(file -> {
FileUtils.filePartValidator(file);
return file;
})
.flatMap(fileStorageService::uploadObject)
.map(fileResponse -> new SuccessResponse(fileResponse, "Upload successfully"));
}
@GetMapping(path="/{fileKey}")
public Mono<SuccessResponse> download(@PathVariable("fileKey") String fileKey) {
return fileStorageService.getByteObject(fileKey)
.map(objectKey -> new SuccessResponse(objectKey, "Object byte response"));
}
@DeleteMapping(path="/{objectKey}")
public Mono<SuccessResponse> deleteFile(@PathVariable("objectKey") String objectKey) {
return fileStorageService.deleteObject(objectKey)
.map(resp -> new SuccessResponse(null, MessageFormat.format("Object with key: {0} deleted successfully", objectKey)));
}
@GetMapping
public Flux<SuccessResponse> getObject() {
return fileStorageService.getObjects()
.map(objectKey -> new SuccessResponse(objectKey, "Result found"));
}
}
Test the REST APIs
- Upload Object
- Retrieve objects by fileKey
- Get some or all (up to 1,000) of the objects in a bucket
Conclusion
In this story, we’ve explored AWS S3 integration using Spring WebFlux as Backend.
The complete source code is available on GitHub.
If you enjoyed this story, please give it a few claps for support.
Happy coding!