Skip to content

Implement multipart upload in Java-based S3 async client #4052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.multipart;

import static java.util.concurrent.TimeUnit.MINUTES;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

import java.nio.file.Files;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.utils.ChecksumUtils;
import software.amazon.awssdk.testutils.RandomTempFile;

public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTestBase {

private static final String TEST_BUCKET = temporaryBucketName(S3MultipartClientPutObjectIntegrationTest.class);
private static final String TEST_KEY = "10mib_file.dat";
private static final int OBJ_SIZE = 19 * 1024 * 1024;

private static RandomTempFile testFile;
private static S3AsyncClient mpuS3Client;

@BeforeAll
public static void setup() throws Exception {
S3IntegrationTestBase.setUp();
S3IntegrationTestBase.createBucket(TEST_BUCKET);

testFile = new RandomTempFile(TEST_KEY, OBJ_SIZE);
mpuS3Client = new MultipartS3AsyncClient(s3Async);
}

@AfterAll
public static void teardown() throws Exception {
mpuS3Client.close();
testFile.delete();
deleteBucketAndAllContents(TEST_BUCKET);
}

@Test
@Timeout(value = 1, unit = MINUTES)
void putObject_fileRequestBody_objectSentCorrectly() throws Exception {
AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath());
mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).get(2, MINUTES);

ResponseInputStream<GetObjectResponse> objContent = S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
ResponseTransformer.toInputStream());

byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath()));

Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.internal.multipart.GenericMultipartHelper;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
Expand All @@ -50,17 +46,16 @@
public final class CopyObjectHelper {
private static final Logger log = Logger.loggerFor(S3AsyncClient.class);

/**
* The max number of parts on S3 side is 10,000
*/
private static final long MAX_UPLOAD_PARTS = 10_000;

private final S3AsyncClient s3AsyncClient;
private final long partSizeInBytes;
private final GenericMultipartHelper<CopyObjectRequest, CopyObjectResponse> genericMultipartHelper;

public CopyObjectHelper(S3AsyncClient s3AsyncClient, long partSizeInBytes) {
this.s3AsyncClient = s3AsyncClient;
this.partSizeInBytes = partSizeInBytes;
this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient,
RequestConversionUtils::toAbortMultipartUploadRequest,
RequestConversionUtils::toCopyObjectResponse);
}

public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyObjectRequest) {
Expand All @@ -69,14 +64,15 @@ public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyOb

try {
CompletableFuture<HeadObjectResponse> headFuture =
s3AsyncClient.headObject(CopyRequestConversionUtils.toHeadObjectRequest(copyObjectRequest));
s3AsyncClient.headObject(RequestConversionUtils.toHeadObjectRequest(copyObjectRequest));

// Ensure cancellations are forwarded to the head future
CompletableFutureUtils.forwardExceptionTo(returnFuture, headFuture);

headFuture.whenComplete((headObjectResponse, throwable) -> {
if (throwable != null) {
handleException(returnFuture, () -> "Failed to retrieve metadata from the source object", throwable);
genericMultipartHelper.handleException(returnFuture, () -> "Failed to retrieve metadata from the source "
+ "object", throwable);
} else {
doCopyObject(copyObjectRequest, returnFuture, headObjectResponse);
}
Expand Down Expand Up @@ -105,7 +101,7 @@ private void copyInParts(CopyObjectRequest copyObjectRequest,
Long contentLength,
CompletableFuture<CopyObjectResponse> returnFuture) {

CreateMultipartUploadRequest request = CopyRequestConversionUtils.toCreateMultipartUploadRequest(copyObjectRequest);
CreateMultipartUploadRequest request = RequestConversionUtils.toCreateMultipartUploadRequest(copyObjectRequest);
CompletableFuture<CreateMultipartUploadResponse> createMultipartUploadFuture =
s3AsyncClient.createMultipartUpload(request);

Expand All @@ -114,25 +110,22 @@ private void copyInParts(CopyObjectRequest copyObjectRequest,

createMultipartUploadFuture.whenComplete((createMultipartUploadResponse, throwable) -> {
if (throwable != null) {
handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
genericMultipartHelper.handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
} else {
log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
doCopyInParts(copyObjectRequest, contentLength, returnFuture, createMultipartUploadResponse.uploadId());
}
});
}

private int determinePartCount(long contentLength, long partSize) {
return (int) Math.ceil(contentLength / (double) partSize);
}

private void doCopyInParts(CopyObjectRequest copyObjectRequest,
Long contentLength,
CompletableFuture<CopyObjectResponse> returnFuture,
String uploadId) {
long optimalPartSize = calculateOptimalPartSizeForCopy(contentLength);

int partCount = determinePartCount(contentLength, optimalPartSize);
long optimalPartSize = genericMultipartHelper.calculateOptimalPartSizeFor(contentLength, partSizeInBytes);

int partCount = genericMultipartHelper.determinePartCount(contentLength, optimalPartSize);

log.debug(() -> String.format("Starting multipart copy with partCount: %s, optimalPartSize: %s",
partCount, optimalPartSize));
Expand All @@ -147,32 +140,15 @@ private void doCopyInParts(CopyObjectRequest copyObjectRequest,
optimalPartSize);
CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(new CompletableFuture[0]))
.thenCompose(ignore -> completeMultipartUpload(copyObjectRequest, uploadId, completedParts))
.handle(handleExceptionOrResponse(copyObjectRequest, returnFuture, uploadId))
.handle(genericMultipartHelper.handleExceptionOrResponse(copyObjectRequest, returnFuture,
uploadId))
.exceptionally(throwable -> {
handleException(returnFuture, () -> "Unexpected exception occurred", throwable);
genericMultipartHelper.handleException(returnFuture, () -> "Unexpected exception occurred",
throwable);
return null;
});
}

private BiFunction<CompleteMultipartUploadResponse, Throwable, Void> handleExceptionOrResponse(
CopyObjectRequest copyObjectRequest,
CompletableFuture<CopyObjectResponse> returnFuture,
String uploadId) {

return (completeMultipartUploadResponse, throwable) -> {
if (throwable != null) {
cleanUpParts(copyObjectRequest, uploadId);
handleException(returnFuture, () -> "Failed to send multipart copy requests.",
throwable);
} else {
returnFuture.complete(CopyRequestConversionUtils.toCopyObjectResponse(
completeMultipartUploadResponse));
}

return null;
};
}

private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUpload(
CopyObjectRequest copyObjectRequest, String uploadId, AtomicReferenceArray<CompletedPart> completedParts) {
log.debug(() -> String.format("Sending completeMultipartUploadRequest, uploadId: %s",
Expand All @@ -194,35 +170,6 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
return s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest);
}

private void cleanUpParts(CopyObjectRequest copyObjectRequest, String uploadId) {
AbortMultipartUploadRequest abortMultipartUploadRequest =
CopyRequestConversionUtils.toAbortMultipartUploadRequest(copyObjectRequest, uploadId);
s3AsyncClient.abortMultipartUpload(abortMultipartUploadRequest)
.exceptionally(throwable -> {
log.warn(() -> String.format("Failed to abort previous multipart upload "
+ "(id: %s)"
+ ". You may need to call "
+ "S3AsyncClient#abortMultiPartUpload to "
+ "free all storage consumed by"
+ " all parts. ",
uploadId), throwable);
return null;
});
}

private static void handleException(CompletableFuture<CopyObjectResponse> returnFuture,
Supplier<String> message,
Throwable throwable) {
Throwable cause = throwable instanceof CompletionException ? throwable.getCause() : throwable;

if (cause instanceof Error) {
returnFuture.completeExceptionally(cause);
} else {
SdkClientException exception = SdkClientException.create(message.get(), cause);
returnFuture.completeExceptionally(exception);
}
}

private List<CompletableFuture<CompletedPart>> sendUploadPartCopyRequests(CopyObjectRequest copyObjectRequest,
long contentLength,
String uploadId,
Expand Down Expand Up @@ -265,23 +212,13 @@ private static CompletedPart convertUploadPartCopyResponse(AtomicReferenceArray<
UploadPartCopyResponse uploadPartCopyResponse) {
CopyPartResult copyPartResult = uploadPartCopyResponse.copyPartResult();
CompletedPart completedPart =
CopyRequestConversionUtils.toCompletedPart(copyPartResult,
partNumber);
RequestConversionUtils.toCompletedPart(copyPartResult,
partNumber);

completedParts.set(partNumber - 1, completedPart);
return completedPart;
}

/**
* Calculates the optimal part size of each part request if the copy operation is carried out as multipart copy.
*/
private long calculateOptimalPartSizeForCopy(long contentLengthOfSource) {
double optimalPartSize = contentLengthOfSource / (double) MAX_UPLOAD_PARTS;

optimalPartSize = Math.ceil(optimalPartSize);
return (long) Math.max(optimalPartSize, partSizeInBytes);
}

private void copyInOneChunk(CopyObjectRequest copyObjectRequest,
CompletableFuture<CopyObjectResponse> returnFuture) {
CompletableFuture<CopyObjectResponse> copyObjectFuture =
Expand Down
Loading