Skip to content

Implement multipart copy in Java-based S3 async client #4189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.crt;
package software.amazon.awssdk.services.s3.multipart;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;
Expand All @@ -24,33 +24,40 @@
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.Base64;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.crypto.KeyGenerator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.MetadataDirective;
import software.amazon.awssdk.utils.Md5Utils;

public class S3CrtClientCopyIntegrationTest extends S3IntegrationTestBase {
private static final String BUCKET = temporaryBucketName(S3CrtClientCopyIntegrationTest.class);
@Timeout(value = 3, unit = TimeUnit.MINUTES)
public class S3ClientMultiPartCopyIntegrationTest extends S3IntegrationTestBase {
private static final String BUCKET = temporaryBucketName(S3ClientMultiPartCopyIntegrationTest.class);
private static final String ORIGINAL_OBJ = "test_file.dat";
private static final String COPIED_OBJ = "test_file_copy.dat";
private static final String ORIGINAL_OBJ_SPECIAL_CHARACTER = "original-special-chars-@$%";
private static final String COPIED_OBJ_SPECIAL_CHARACTER = "special-special-chars-@$%";
private static final long OBJ_SIZE = ThreadLocalRandom.current().nextLong(8 * 1024 * 1024, 16 * 1024 * 1024 + 1);
private static final long SMALL_OBJ_SIZE = 1024 * 1024;
private static S3AsyncClient s3CrtAsyncClient;
private static S3AsyncClient s3MpuClient;
@BeforeAll
public static void setUp() throws Exception {
S3IntegrationTestBase.setUp();
Expand All @@ -59,40 +66,50 @@ public static void setUp() throws Exception {
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.region(DEFAULT_REGION)
.build();
s3MpuClient = new MultipartS3AsyncClient(s3Async);
}

@AfterAll
public static void teardown() throws Exception {
s3CrtAsyncClient.close();
s3MpuClient.close();
deleteBucketAndAllContents(BUCKET);
}

@Test
void copy_singlePart_hasSameContent() {
public static Stream<S3AsyncClient> s3AsyncClient() {
return Stream.of(s3MpuClient, s3CrtAsyncClient);
}

@ParameterizedTest(autoCloseArguments = false)
@MethodSource("s3AsyncClient")
void copy_singlePart_hasSameContent(S3AsyncClient s3AsyncClient) {
byte[] originalContent = randomBytes(SMALL_OBJ_SIZE);
createOriginalObject(originalContent, ORIGINAL_OBJ);
copyObject(ORIGINAL_OBJ, COPIED_OBJ);
copyObject(ORIGINAL_OBJ, COPIED_OBJ, s3AsyncClient);
validateCopiedObject(originalContent, ORIGINAL_OBJ);
}

@Test
void copy_copiedObject_hasSameContent() {
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("s3AsyncClient")
void copy_copiedObject_hasSameContent(S3AsyncClient s3AsyncClient) {
byte[] originalContent = randomBytes(OBJ_SIZE);
createOriginalObject(originalContent, ORIGINAL_OBJ);
copyObject(ORIGINAL_OBJ, COPIED_OBJ);
copyObject(ORIGINAL_OBJ, COPIED_OBJ, s3AsyncClient);
validateCopiedObject(originalContent, ORIGINAL_OBJ);
}

@Test
void copy_specialCharacters_hasSameContent() {
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("s3AsyncClient")
void copy_specialCharacters_hasSameContent(S3AsyncClient s3AsyncClient) {
byte[] originalContent = randomBytes(OBJ_SIZE);
createOriginalObject(originalContent, ORIGINAL_OBJ_SPECIAL_CHARACTER);
copyObject(ORIGINAL_OBJ_SPECIAL_CHARACTER, COPIED_OBJ_SPECIAL_CHARACTER);
copyObject(ORIGINAL_OBJ_SPECIAL_CHARACTER, COPIED_OBJ_SPECIAL_CHARACTER, s3AsyncClient);
validateCopiedObject(originalContent, COPIED_OBJ_SPECIAL_CHARACTER);
}

@Test
void copy_ssecServerSideEncryption_shouldSucceed() {
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("s3AsyncClient")
void copy_ssecServerSideEncryption_shouldSucceed(S3AsyncClient s3AsyncClient) {
byte[] originalContent = randomBytes(OBJ_SIZE);
byte[] secretKey = generateSecretKey();
String b64Key = Base64.getEncoder().encodeToString(secretKey);
Expand All @@ -102,16 +119,16 @@ void copy_ssecServerSideEncryption_shouldSucceed() {
String newB64Key = Base64.getEncoder().encodeToString(newSecretKey);
String newB64KeyMd5 = Md5Utils.md5AsBase64(newSecretKey);

// Java S3 client is used because CRT S3 client putObject fails with SSE-C
// TODO: change back to S3CrtClient once the issue is fixed in CRT
// MPU S3 client gets stuck
// TODO: change back to s3AsyncClient once the issue is fixed in MPU S3 client
s3Async.putObject(r -> r.bucket(BUCKET)
.key(ORIGINAL_OBJ)
.sseCustomerKey(b64Key)
.sseCustomerAlgorithm(AES256.name())
.sseCustomerKeyMD5(b64KeyMd5),
AsyncRequestBody.fromBytes(originalContent)).join();

CompletableFuture<CopyObjectResponse> future = s3CrtAsyncClient.copyObject(c -> c
CompletableFuture<CopyObjectResponse> future = s3AsyncClient.copyObject(c -> c
.sourceBucket(BUCKET)
.sourceKey(ORIGINAL_OBJ)
.metadataDirective(MetadataDirective.REPLACE)
Expand Down Expand Up @@ -147,8 +164,8 @@ private void createOriginalObject(byte[] originalContent, String originalKey) {
AsyncRequestBody.fromBytes(originalContent)).join();
}

private void copyObject(String original, String destination) {
CompletableFuture<CopyObjectResponse> future = s3CrtAsyncClient.copyObject(c -> c
private void copyObject(String original, String destination, S3AsyncClient s3AsyncClient) {
CompletableFuture<CopyObjectResponse> future = s3AsyncClient.copyObject(c -> c
.sourceBucket(BUCKET)
.sourceKey(original)
.destinationBucket(BUCKET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
import software.amazon.awssdk.services.s3.internal.multipart.CopyObjectHelper;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand All @@ -67,7 +68,9 @@ private DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) {
super(initializeS3AsyncClient(builder));
long partSizeInBytes = builder.minimalPartSizeInBytes == null ? DEFAULT_PART_SIZE_IN_BYTES :
builder.minimalPartSizeInBytes;
this.copyObjectHelper = new CopyObjectHelper((S3AsyncClient) delegate(), partSizeInBytes);
this.copyObjectHelper = new CopyObjectHelper((S3AsyncClient) delegate(),
partSizeInBytes,
partSizeInBytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crt;
package software.amazon.awssdk.services.s3.internal.multipart;


import java.util.ArrayList;
Expand All @@ -23,6 +23,7 @@
import java.util.stream.IntStream;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.crt.UploadPartCopyRequestIterable;
import software.amazon.awssdk.services.s3.internal.multipart.GenericMultipartHelper;
import software.amazon.awssdk.services.s3.internal.multipart.SdkPojoConversionUtils;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
Expand Down Expand Up @@ -50,13 +51,15 @@ public final class CopyObjectHelper {
private final S3AsyncClient s3AsyncClient;
private final long partSizeInBytes;
private final GenericMultipartHelper<CopyObjectRequest, CopyObjectResponse> genericMultipartHelper;
private final long uploadThreshold;

public CopyObjectHelper(S3AsyncClient s3AsyncClient, long partSizeInBytes) {
public CopyObjectHelper(S3AsyncClient s3AsyncClient, long partSizeInBytes, long uploadThreshold) {
this.s3AsyncClient = s3AsyncClient;
this.partSizeInBytes = partSizeInBytes;
this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient,
SdkPojoConversionUtils::toAbortMultipartUploadRequest,
SdkPojoConversionUtils::toCopyObjectResponse);
this.uploadThreshold = uploadThreshold;
}

public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyObjectRequest) {
Expand Down Expand Up @@ -89,7 +92,7 @@ private void doCopyObject(CopyObjectRequest copyObjectRequest, CompletableFuture
HeadObjectResponse headObjectResponse) {
Long contentLength = headObjectResponse.contentLength();

if (contentLength <= partSizeInBytes) {
if (contentLength <= partSizeInBytes || contentLength <= uploadThreshold) {
log.debug(() -> "Starting the copy as a single copy part request");
copyInOneChunk(copyObjectRequest, returnFuture);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

Expand All @@ -33,15 +35,27 @@ public class MultipartS3AsyncClient extends DelegatingS3AsyncClient {

private static final long DEFAULT_MAX_MEMORY = DEFAULT_PART_SIZE_IN_BYTES * 2;
private final MultipartUploadHelper mpuHelper;
private final CopyObjectHelper copyObjectHelper;

public MultipartS3AsyncClient(S3AsyncClient delegate) {
super(delegate);
// TODO: pass a config object to the upload helper instead
mpuHelper = new MultipartUploadHelper(delegate, DEFAULT_PART_SIZE_IN_BYTES, DEFAULT_THRESHOLD, DEFAULT_MAX_MEMORY);
copyObjectHelper = new CopyObjectHelper(delegate, DEFAULT_PART_SIZE_IN_BYTES, DEFAULT_THRESHOLD);
}

@Override
public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) {
return mpuHelper.uploadObject(putObjectRequest, requestBody);
}

@Override
public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyObjectRequest) {
return copyObjectHelper.copyObject(copyObjectRequest);
}

@Override
public void close() {
delegate().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.utils.Logger;

/**
* Request conversion utility method for POJO classes associated with multipart feature.
*/
@SdkInternalApi
public final class SdkPojoConversionUtils {
private static final Logger log = Logger.loggerFor(SdkPojoConversionUtils.class);

private static final HashSet<String> PUT_OBJECT_REQUEST_TO_UPLOAD_PART_FIELDS_TO_IGNORE =
new HashSet<>(Arrays.asList("ChecksumSHA1", "ChecksumSHA256", "ContentMD5", "ChecksumCRC32C", "ChecksumCRC32"));
Expand All @@ -68,9 +70,22 @@ public static CreateMultipartUploadRequest toCreateMultipartUploadRequest(PutObj
}

public static HeadObjectRequest toHeadObjectRequest(CopyObjectRequest copyObjectRequest) {
HeadObjectRequest.Builder builder = HeadObjectRequest.builder();
setSdkFields(builder, copyObjectRequest);
return builder.build();

// We can't set SdkFields directly because the fields in CopyObjectRequest do not match 100% with the ones in
// HeadObjectRequest
return HeadObjectRequest.builder()
.bucket(copyObjectRequest.sourceBucket())
.key(copyObjectRequest.sourceKey())
.versionId(copyObjectRequest.sourceVersionId())
.ifMatch(copyObjectRequest.copySourceIfMatch())
.ifModifiedSince(copyObjectRequest.copySourceIfModifiedSince())
.ifNoneMatch(copyObjectRequest.copySourceIfNoneMatch())
.ifUnmodifiedSince(copyObjectRequest.copySourceIfUnmodifiedSince())
.expectedBucketOwner(copyObjectRequest.expectedSourceBucketOwner())
.sseCustomerAlgorithm(copyObjectRequest.copySourceSSECustomerAlgorithm())
.sseCustomerKey(copyObjectRequest.copySourceSSECustomerKey())
.sseCustomerKeyMD5(copyObjectRequest.copySourceSSECustomerKeyMD5())
.build();
}

public static CompletedPart toCompletedPart(CopyPartResult copyPartResult, int partNumber) {
Expand Down Expand Up @@ -106,6 +121,8 @@ public static CreateMultipartUploadRequest toCreateMultipartUploadRequest(CopyOb
CreateMultipartUploadRequest.Builder builder = CreateMultipartUploadRequest.builder();

setSdkFields(builder, copyObjectRequest);
builder.bucket(copyObjectRequest.destinationBucket());
builder.key(copyObjectRequest.destinationKey());
return builder.build();
}

Expand Down Expand Up @@ -136,6 +153,8 @@ private static CopyObjectResult toCopyObjectResult(CompleteMultipartUploadRespon
public static AbortMultipartUploadRequest.Builder toAbortMultipartUploadRequest(CopyObjectRequest copyObjectRequest) {
AbortMultipartUploadRequest.Builder builder = AbortMultipartUploadRequest.builder();
setSdkFields(builder, copyObjectRequest);
builder.bucket(copyObjectRequest.destinationBucket());
builder.key(copyObjectRequest.destinationKey());
return builder;
}

Expand All @@ -154,6 +173,8 @@ public static UploadPartCopyRequest toUploadPartCopyRequest(CopyObjectRequest co
return builder.copySourceRange(range)
.partNumber(partNumber)
.uploadId(uploadId)
.bucket(copyObjectRequest.destinationBucket())
.key(copyObjectRequest.destinationKey())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.mockito.stubbing.Answer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.multipart.CopyObjectHelper;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
Expand All @@ -59,10 +60,13 @@ class CopyObjectHelperTest {
private S3AsyncClient s3AsyncClient;
private CopyObjectHelper copyHelper;

private static final long PART_SIZE = 1024L;
private static final long UPLOAD_THRESHOLD = 2048L;

@BeforeEach
public void setUp() {
s3AsyncClient = Mockito.mock(S3AsyncClient.class);
copyHelper = new CopyObjectHelper(s3AsyncClient, 1024L);
copyHelper = new CopyObjectHelper(s3AsyncClient, PART_SIZE, UPLOAD_THRESHOLD);
}

@Test
Expand Down Expand Up @@ -114,6 +118,25 @@ void singlePartCopy_happyCase_shouldSucceed() {
assertThat(future.join()).isEqualTo(expectedResponse);
}

@Test
void copy_doesNotExceedThreshold_shouldUseSingleObjectCopy() {

CopyObjectRequest copyObjectRequest = copyObjectRequest();

stubSuccessfulHeadObjectCall(2000L);

CopyObjectResponse expectedResponse = CopyObjectResponse.builder().build();
CompletableFuture<CopyObjectResponse> copyFuture =
CompletableFuture.completedFuture(expectedResponse);

when(s3AsyncClient.copyObject(copyObjectRequest)).thenReturn(copyFuture);

CompletableFuture<CopyObjectResponse> future =
copyHelper.copyObject(copyObjectRequest);

assertThat(future.join()).isEqualTo(expectedResponse);
}

@Test
void multiPartCopy_fourPartsHappyCase_shouldSucceed() {
CopyObjectRequest copyObjectRequest = copyObjectRequest();
Expand Down