From cb64f90bc0b27dd7ae19c07f5d214cace87a71e6 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Wed, 19 Jul 2023 13:54:04 -0700 Subject: [PATCH] Add more tests with ByteArrayAsyncRequestBody --- .../async/SplittingPublisherTest.java | 104 +++++++++++------- .../crt/S3CrossRegionCrtIntegrationTest.java | 2 +- .../S3ClientMultiPartCopyIntegrationTest.java | 14 +-- ...ltipartClientPutObjectIntegrationTest.java | 28 ++++- 4 files changed, 95 insertions(+), 53 deletions(-) diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 45938ea684c8..3ce8559eec32 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -20,15 +20,21 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.nio.file.Files; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; @@ -51,11 +57,12 @@ public class SplittingPublisherTest { private static final int NUM_OF_CHUNK = (int) Math.ceil(CONTENT_SIZE / (double) CHUNK_SIZE); - private static RandomTempFile testFile; + private static File testFile; @BeforeAll public static void beforeAll() throws IOException { - testFile = new RandomTempFile("testfile.dat", CONTENT_SIZE); + testFile = File.createTempFile("SplittingPublisherTest", UUID.randomUUID().toString()); + Files.write(testFile.toPath(), CONTENT); } @AfterAll @@ -65,46 +72,19 @@ public static void afterAll() throws Exception { @ParameterizedTest @ValueSource(ints = {CHUNK_SIZE, CHUNK_SIZE * 2 - 1, CHUNK_SIZE * 2}) - void differentChunkSize_shouldSplitAsyncRequestBodyCorrectly(int upstreamByteBufferSize) throws Exception { - CompletableFuture future = new CompletableFuture<>(); - SplittingPublisher splittingPublisher = SplittingPublisher.builder() - .resultFuture(future) - .asyncRequestBody(FileAsyncRequestBody.builder() - .path(testFile.toPath()) - .chunkSizeInBytes(upstreamByteBufferSize) - .build()) - - .resultFuture(future) - .chunkSizeInBytes((long) CHUNK_SIZE) - .maxMemoryUsageInBytes((long) CHUNK_SIZE * 4) - .build(); - - List> futures = new ArrayList<>(); + void differentChunkSize_shouldSplitAsyncRequestBodyCorrectly(int chunkSize) throws Exception { - splittingPublisher.subscribe(requestBody -> { - CompletableFuture baosFuture = new CompletableFuture<>(); - BaosSubscriber subscriber = new BaosSubscriber(baosFuture); - futures.add(baosFuture); - requestBody.subscribe(subscriber); - }).get(5, TimeUnit.SECONDS); - - assertThat(futures.size()).isEqualTo(NUM_OF_CHUNK); + FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder() + .path(testFile.toPath()) + .chunkSizeInBytes(chunkSize) + .build(); + verifySplitContent(fileAsyncRequestBody, chunkSize); + } - for (int i = 0; i < futures.size(); i++) { - try (FileInputStream fileInputStream = new FileInputStream(testFile)) { - byte[] expected; - if (i == futures.size() - 1) { - expected = new byte[1]; - } else { - expected = new byte[5]; - } - fileInputStream.skip(i * 5); - fileInputStream.read(expected); - byte[] actualBytes = futures.get(i).join(); - assertThat(actualBytes).isEqualTo(expected); - }; - } - assertThat(future).isCompleted(); + @ParameterizedTest + @ValueSource(ints = {CHUNK_SIZE, CHUNK_SIZE * 2 - 1, CHUNK_SIZE * 2}) + void differentChunkSize_byteArrayShouldSplitAsyncRequestBodyCorrectly(int chunkSize) throws Exception { + verifySplitContent(AsyncRequestBody.fromBytes(CONTENT), chunkSize); } @@ -115,7 +95,7 @@ void cancelFuture_shouldCancelUpstream() throws IOException { SplittingPublisher splittingPublisher = SplittingPublisher.builder() .resultFuture(future) .asyncRequestBody(asyncRequestBody) - .chunkSizeInBytes((long) CHUNK_SIZE) + .chunkSizeInBytes(CHUNK_SIZE) .maxMemoryUsageInBytes(10L) .build(); @@ -139,7 +119,7 @@ public Optional contentLength() { SplittingPublisher splittingPublisher = SplittingPublisher.builder() .resultFuture(future) .asyncRequestBody(asyncRequestBody) - .chunkSizeInBytes((long) CHUNK_SIZE) + .chunkSizeInBytes(CHUNK_SIZE) .maxMemoryUsageInBytes(10L) .build(); @@ -177,6 +157,46 @@ public Optional contentLength() { } + + private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception { + CompletableFuture future = new CompletableFuture<>(); + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .resultFuture(future) + .asyncRequestBody(asyncRequestBody) + .resultFuture(future) + .chunkSizeInBytes(chunkSize) + .maxMemoryUsageInBytes((long) chunkSize * 4) + .build(); + + List> futures = new ArrayList<>(); + + splittingPublisher.subscribe(requestBody -> { + CompletableFuture baosFuture = new CompletableFuture<>(); + BaosSubscriber subscriber = new BaosSubscriber(baosFuture); + futures.add(baosFuture); + requestBody.subscribe(subscriber); + }).get(5, TimeUnit.SECONDS); + + assertThat(futures.size()).isEqualTo((int) Math.ceil(CONTENT_SIZE / (double) chunkSize)); + + for (int i = 0; i < futures.size(); i++) { + try (FileInputStream fileInputStream = new FileInputStream(testFile)) { + byte[] expected; + if (i == futures.size() - 1) { + int lastChunk = CONTENT_SIZE % chunkSize == 0 ? chunkSize : (CONTENT_SIZE % chunkSize); + expected = new byte[lastChunk]; + } else { + expected = new byte[chunkSize]; + } + fileInputStream.skip(i * chunkSize); + fileInputStream.read(expected); + byte[] actualBytes = futures.get(i).join(); + assertThat(actualBytes).isEqualTo(expected); + }; + } + assertThat(future).isCompleted(); + } + private static class TestAsyncRequestBody implements AsyncRequestBody { private volatile boolean cancelled; private volatile boolean isDone; diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrossRegionCrtIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrossRegionCrtIntegrationTest.java index 953c6e4b4f4b..72c6fce095ce 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrossRegionCrtIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrossRegionCrtIntegrationTest.java @@ -17,7 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static software.amazon.awssdk.services.s3.crt.S3CrtClientCopyIntegrationTest.randomBytes; +import static software.amazon.awssdk.services.s3.multipart.S3ClientMultiPartCopyIntegrationTest.randomBytes; import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.computeCheckSum; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3ClientMultiPartCopyIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3ClientMultiPartCopyIntegrationTest.java index ad52804b963d..6db434526fb9 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3ClientMultiPartCopyIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3ClientMultiPartCopyIntegrationTest.java @@ -119,14 +119,12 @@ void copy_ssecServerSideEncryption_shouldSucceed(S3AsyncClient s3AsyncClient) { String newB64Key = Base64.getEncoder().encodeToString(newSecretKey); String newB64KeyMd5 = Md5Utils.md5AsBase64(newSecretKey); - // MPU S3 client gets stuck - // TODO: change back to s3AsyncClient once the issue is fixed in MPU S3 client - s3Async.putObject(r -> r.bucket(BUCKET) - .key(ORIGINAL_OBJ) - .sseCustomerKey(b64Key) - .sseCustomerAlgorithm(AES256.name()) - .sseCustomerKeyMD5(b64KeyMd5), - AsyncRequestBody.fromBytes(originalContent)).join(); + s3AsyncClient.putObject(r -> r.bucket(BUCKET) + .key(ORIGINAL_OBJ) + .sseCustomerKey(b64Key) + .sseCustomerAlgorithm(AES256.name()) + .sseCustomerKeyMD5(b64KeyMd5), + AsyncRequestBody.fromBytes(originalContent)).join(); CompletableFuture future = s3AsyncClient.copyObject(c -> c .sourceBucket(BUCKET) diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java index 4174b87883dc..f791b4b3c26a 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java @@ -20,7 +20,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.nio.charset.Charset; import java.nio.file.Files; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.commons.lang3.RandomStringUtils; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -42,15 +48,18 @@ public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTest private static final String TEST_KEY = "testfile.dat"; private static final int OBJ_SIZE = 19 * 1024 * 1024; - private static RandomTempFile testFile; + private static File testFile; private static S3AsyncClient mpuS3Client; @BeforeAll public static void setup() throws Exception { S3IntegrationTestBase.setUp(); S3IntegrationTestBase.createBucket(TEST_BUCKET); + byte[] CONTENT = + RandomStringUtils.randomAscii(OBJ_SIZE).getBytes(Charset.defaultCharset()); - testFile = new RandomTempFile(TEST_KEY, OBJ_SIZE); + testFile = File.createTempFile("SplittingPublisherTest", UUID.randomUUID().toString()); + Files.write(testFile.toPath(), CONTENT); mpuS3Client = new MultipartS3AsyncClient(s3Async); } @@ -75,4 +84,19 @@ void putObject_fileRequestBody_objectSentCorrectly() throws Exception { assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); } + @Test + @Timeout(value = 30, unit = SECONDS) + void putObject_byteAsyncRequestBody_objectSentCorrectly() throws Exception { + byte[] bytes = RandomStringUtils.randomAscii(OBJ_SIZE).getBytes(Charset.defaultCharset()); + AsyncRequestBody body = AsyncRequestBody.fromBytes(bytes); + mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join(); + + ResponseInputStream objContent = S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toInputStream()); + + assertThat(objContent.response().contentLength()).isEqualTo(OBJ_SIZE); + byte[] expectedSum = ChecksumUtils.computeCheckSum(new ByteArrayInputStream(bytes)); + assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); + } + }