From 89fdb892e811fc1c8bc7a86a1ceef6d0227cb8e0 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Thu, 17 Aug 2023 16:00:31 -0700 Subject: [PATCH 1/5] Optimize file upload by reading from file in different offsets in parallel --- .../core/FileRequestBodyConfiguration.java | 209 ++++++++++++++++++ .../awssdk/core/async/AsyncRequestBody.java | 35 ++- .../internal/async/FileAsyncRequestBody.java | 58 ++++- .../FileRequestBodyConfigurationTest.java | 73 ++++++ .../async/FileAsyncRequestBodyTest.java | 85 +++++++ services/s3/pom.xml | 5 + ...ltipartClientPutObjectIntegrationTest.java | 13 ++ .../multipart/MultipartS3AsyncClient.java | 10 +- .../multipart/UploadObjectHelper.java | 21 +- .../UploadWithKnownContentLengthHelper.java | 83 +++++-- .../multipart/UploadObjectHelperTest.java | 132 +++++++---- .../amazon/awssdk/utils/Validate.java | 13 ++ .../amazon/awssdk/utils/ValidateTest.java | 14 ++ 13 files changed, 678 insertions(+), 73 deletions(-) create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/FileRequestBodyConfiguration.java create mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/FileRequestBodyConfigurationTest.java diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/FileRequestBodyConfiguration.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/FileRequestBodyConfiguration.java new file mode 100644 index 000000000000..07e7a98e424e --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/FileRequestBodyConfiguration.java @@ -0,0 +1,209 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core; + +import java.nio.file.Path; +import java.util.Objects; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.builder.CopyableBuilder; +import software.amazon.awssdk.utils.builder.ToCopyableBuilder; + +/** + * Configuration options for {@link AsyncRequestBody#fromFile(FileRequestBodyConfiguration)} to configure how the SDK + * should read the file. + * + * @see #builder() + */ +@SdkPublicApi +public final class FileRequestBodyConfiguration implements ToCopyableBuilder { + private final Integer chunkSizeInBytes; + private final Long position; + private final Long numBytesToRead; + private final Path path; + + private FileRequestBodyConfiguration(DefaultBuilder builder) { + this.path = Validate.notNull(builder.path, "path"); + this.chunkSizeInBytes = Validate.isPositiveOrNull(builder.chunkSizeInBytes, "chunkSizeInBytes"); + this.position = Validate.isNotNegativeOrNull(builder.position, "position"); + this.numBytesToRead = Validate.isNotNegativeOrNull(builder.numBytesToRead, "numBytesToRead"); + } + + /** + * Create a {@link Builder}, used to create a {@link FileRequestBodyConfiguration}. + */ + public static Builder builder() { + return new DefaultBuilder(); + } + + /** + * @return the size of each chunk to read from the file + */ + public Integer chunkSizeInBytes() { + return chunkSizeInBytes; + } + + /** + * @return the file position at which the request body begins. + */ + public Long position() { + return position; + } + + /** + * @return the number of bytes to read from this file. + */ + public Long numBytesToRead() { + return numBytesToRead; + } + + /** + * @return the file path + */ + public Path path() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FileRequestBodyConfiguration that = (FileRequestBodyConfiguration) o; + + if (!Objects.equals(chunkSizeInBytes, that.chunkSizeInBytes)) { + return false; + } + if (!Objects.equals(position, that.position)) { + return false; + } + if (!Objects.equals(numBytesToRead, that.numBytesToRead)) { + return false; + } + return Objects.equals(path, that.path); + } + + @Override + public int hashCode() { + int result = chunkSizeInBytes != null ? chunkSizeInBytes.hashCode() : 0; + result = 31 * result + (position != null ? position.hashCode() : 0); + result = 31 * result + (numBytesToRead != null ? numBytesToRead.hashCode() : 0); + result = 31 * result + (path != null ? path.hashCode() : 0); + return result; + } + + @Override + public Builder toBuilder() { + return new DefaultBuilder(this); + } + + public interface Builder extends CopyableBuilder { + + /** + * Sets the {@link Path} to the file containing data to send to the service + * + * @param path Path to file to read. + * @return This builder for method chaining. + */ + Builder path(Path path); + + /** + * Sets the size of chunks read from the file. Increasing this will cause more data to be buffered into memory but + * may yield better latencies. Decreasing this will reduce memory usage but may cause reduced latency. Setting this value + * is very dependent on upload speed and requires some performance testing to tune. + * + *

The default chunk size is 16 KiB

+ * + * @param chunkSize New chunk size in bytes. + * @return This builder for method chaining. + */ + Builder chunkSizeInBytes(Integer chunkSize); + + /** + * Sets the file position at which the request body begins. + * + *

By default, it's 0, i.e., reading from the beginning. + * + * @param position the position of the file + * @return The builder for method chaining. + */ + Builder position(Long position); + + /** + * Sets the number of bytes to read from this file. + * + *

By default, it's same as the file length. + * + * @param numBytesToRead number of bytes to read + * @return The builder for method chaining. + */ + Builder numBytesToRead(Long numBytesToRead); + } + + private static final class DefaultBuilder implements Builder { + private Long position; + private Path path; + private Integer chunkSizeInBytes; + private Long numBytesToRead; + + private DefaultBuilder(FileRequestBodyConfiguration configuration) { + this.position = configuration.position; + this.path = configuration.path; + this.chunkSizeInBytes = configuration.chunkSizeInBytes; + this.numBytesToRead = configuration.numBytesToRead; + } + + private DefaultBuilder() { + + } + + @Override + public Builder path(Path path) { + this.path = path; + return this; + } + + @Override + public Builder chunkSizeInBytes(Integer chunkSizeInBytes) { + this.chunkSizeInBytes = chunkSizeInBytes; + return this; + } + + @Override + public Builder position(Long position) { + this.position = position; + return this; + } + + @Override + public Builder numBytesToRead(Long numBytesToRead) { + this.numBytesToRead = numBytesToRead; + return this; + } + + @Override + public FileRequestBodyConfiguration build() { + return new FileRequestBodyConfiguration(this); + } + } + +} \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java index 4c7d70ab7553..21300782a50e 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java @@ -29,6 +29,7 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.core.FileRequestBodyConfiguration; import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody; import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody; @@ -112,16 +113,46 @@ static AsyncRequestBody fromFile(Path path) { /** * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. See - * {@link FileAsyncRequestBody#builder} to create a customized body implementation. + * {@link #fromFile(FileRequestBodyConfiguration)} to create a customized body implementation. * * @param file The file to read from. * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file. - * @see FileAsyncRequestBody */ static AsyncRequestBody fromFile(File file) { return FileAsyncRequestBody.builder().path(file.toPath()).build(); } + /** + * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. + * + * @param configuration configuration for how the SDK should read the file + * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file. + */ + static AsyncRequestBody fromFile(FileRequestBodyConfiguration configuration) { + Validate.notNull(configuration, "configuration"); + return FileAsyncRequestBody.builder() + .path(configuration.path()) + .position(configuration.position()) + .chunkSizeInBytes(configuration.chunkSizeInBytes()) + .numBytesToRead(configuration.numBytesToRead()) + .build(); + } + + /** + * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. + * + *

+ * This is a convenience method that creates an instance of the {@link FileRequestBodyConfiguration} builder, + * avoiding the need to create one manually via {@link FileRequestBodyConfiguration#builder()}. + * + * @param configuration configuration for how the SDK should read the file + * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file. + */ + static AsyncRequestBody fromFile(Consumer configuration) { + Validate.notNull(configuration, "configuration"); + return fromFile(FileRequestBodyConfiguration.builder().applyMutation(configuration).build()); + } + /** * Creates an {@link AsyncRequestBody} that uses a single string as data. * diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java index 8f7b2a483607..70d51ffcf690 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java @@ -36,6 +36,7 @@ import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.core.internal.util.NoopSubscription; import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.NumericUtils; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.builder.SdkBuilder; @@ -65,16 +66,21 @@ public final class FileAsyncRequestBody implements AsyncRequestBody { * Size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber. */ private final int chunkSizeInBytes; + private final long position; + private final long numBytesToRead; private FileAsyncRequestBody(DefaultBuilder builder) { this.path = builder.path; this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes; this.fileLength = invokeSafely(() -> Files.size(path)); + this.position = builder.position == null ? 0 : Validate.isNotNegative(builder.position, "position"); + this.numBytesToRead = builder.numBytesToRead == null ? fileLength - this.position : + Validate.isNotNegative(builder.numBytesToRead, "numBytesToRead"); } @Override public Optional contentLength() { - return Optional.of(fileLength); + return Optional.of(numBytesToRead); } @Override @@ -91,7 +97,7 @@ public void subscribe(Subscriber s) { // We need to synchronize here because the subscriber could call // request() from within onSubscribe which would potentially // trigger onNext before onSubscribe is finished. - Subscription subscription = new FileSubscription(path, channel, s, chunkSizeInBytes); + Subscription subscription = new FileSubscription(path, channel, s, chunkSizeInBytes, position, numBytesToRead); synchronized (subscription) { s.onSubscribe(subscription); @@ -128,7 +134,7 @@ public interface Builder extends SdkBuilder { Builder path(Path path); /** - * Sets the size of chunks read from the file. Increasing this will cause more data to be buffered into memory but + * Sets the size of chunks to read from the file. Increasing this will cause more data to be buffered into memory but * may yield better latencies. Decreasing this will reduce memory usage but may cause reduced latency. Setting this value * is very dependent on upload speed and requires some performance testing to tune. * @@ -139,12 +145,33 @@ public interface Builder extends SdkBuilder { */ Builder chunkSizeInBytes(Integer chunkSize); + /** + * Sets the file position at which the request body begins. + * + *

By default, it's 0, i.e., reading from the beginning. + * + * @param position the position of the file + * @return The builder for method chaining. + */ + Builder position(Long position); + + /** + * Sets the number of bytes to read from this file. + * + *

By default, it's same as the file length. + * + * @param numBytesToRead number of bytes to read + * @return The builder for method chaining. + */ + Builder numBytesToRead(Long numBytesToRead); } private static final class DefaultBuilder implements Builder { + private Long position; private Path path; private Integer chunkSizeInBytes; + private Long numBytesToRead; @Override public Builder path(Path path) { @@ -162,6 +189,18 @@ public Builder chunkSizeInBytes(Integer chunkSizeInBytes) { return this; } + @Override + public Builder position(Long position) { + this.position = position; + return this; + } + + @Override + public Builder numBytesToRead(Long numBytesToRead) { + this.numBytesToRead = numBytesToRead; + return this; + } + public void setChunkSizeInBytes(Integer chunkSizeInBytes) { chunkSizeInBytes(chunkSizeInBytes); } @@ -181,8 +220,8 @@ private static final class FileSubscription implements Subscription { private final Subscriber subscriber; private final int chunkSize; - private final AtomicLong position = new AtomicLong(0); - private final AtomicLong remainingBytes = new AtomicLong(0); + private final AtomicLong position; + private final AtomicLong remainingBytes; private final long sizeAtStart; private final FileTime modifiedTimeAtStart; private long outstandingDemand = 0; @@ -193,14 +232,17 @@ private static final class FileSubscription implements Subscription { private FileSubscription(Path path, AsynchronousFileChannel inputChannel, Subscriber subscriber, - int chunkSize) throws IOException { + int chunkSize, + long position, + long numBytesToRead) throws IOException { this.path = path; this.inputChannel = inputChannel; this.subscriber = subscriber; this.chunkSize = chunkSize; this.sizeAtStart = inputChannel.size(); this.modifiedTimeAtStart = Files.getLastModifiedTime(path); - this.remainingBytes.set(Validate.isNotNegative(sizeAtStart, "size")); + this.remainingBytes = new AtomicLong(numBytesToRead); + this.position = new AtomicLong(position); } @Override @@ -255,7 +297,7 @@ private void readData() { return; } - ByteBuffer buffer = ByteBuffer.allocate(chunkSize); + ByteBuffer buffer = ByteBuffer.allocate(Math.min(chunkSize, NumericUtils.saturatedCast(remainingBytes.get()))); inputChannel.read(buffer, position.get(), buffer, new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer attachment) { diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/FileRequestBodyConfigurationTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/FileRequestBodyConfigurationTest.java new file mode 100644 index 000000000000..535a7176856c --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/FileRequestBodyConfigurationTest.java @@ -0,0 +1,73 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.file.Paths; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.jupiter.api.Test; + +public class FileRequestBodyConfigurationTest { + + @Test + void equalsHashCode() { + EqualsVerifier.forClass(FileRequestBodyConfiguration.class) + .verify(); + } + + @Test + void invalidRequest_shouldThrowException() { + assertThatThrownBy(() -> FileRequestBodyConfiguration.builder() + .path(Paths.get(".")) + .position(-1L) + .build()) + .hasMessage("position must not be negative"); + + assertThatThrownBy(() -> FileRequestBodyConfiguration.builder() + .path(Paths.get(".")) + .numBytesToRead(-1L) + .build()) + .hasMessage("numBytesToRead must not be negative"); + + assertThatThrownBy(() -> FileRequestBodyConfiguration.builder() + .path(Paths.get(".")) + .chunkSizeInBytes(0) + .build()) + .hasMessage("chunkSizeInBytes must be positive"); + assertThatThrownBy(() -> FileRequestBodyConfiguration.builder() + .path(Paths.get(".")) + .chunkSizeInBytes(-5) + .build()) + .hasMessage("chunkSizeInBytes must be positive"); + assertThatThrownBy(() -> FileRequestBodyConfiguration.builder() + .build()) + .hasMessage("path"); + } + + @Test + void toBuilder_shouldCopyAllProperties() { + FileRequestBodyConfiguration config = FileRequestBodyConfiguration.builder() + .path(Paths.get(".")).numBytesToRead(100L) + .position(1L) + .chunkSizeInBytes(1024) + .build(); + + assertThat(config.toBuilder().build()).isEqualTo(config); + } + +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java index da9daf557e22..6335c756c268 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java @@ -15,13 +15,17 @@ package software.amazon.awssdk.core.internal.async; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertTrue; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; +import io.reactivex.Flowable; import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.NoSuchFileException; @@ -38,6 +42,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.http.async.SimpleSubscriber; import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.utils.BinaryUtils; @@ -45,10 +50,12 @@ public class FileAsyncRequestBodyTest { private static final long MiB = 1024 * 1024; private static final long TEST_FILE_SIZE = 10 * MiB; private static Path testFile; + private static Path smallFile; @BeforeEach public void setup() throws IOException { testFile = new RandomTempFile(TEST_FILE_SIZE).toPath(); + smallFile = new RandomTempFile(100).toPath(); } @AfterEach @@ -226,6 +233,84 @@ public void changingFile_fileGetsDeleted_failsBecauseDeleted() throws Exception .hasCauseInstanceOf(IOException.class); } + @Test + public void positionNotZero_shouldReadFromPosition() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + long position = 20L; + AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder() + .path(smallFile) + .position(position) + .chunkSizeInBytes(10) + .build(); + + ByteArrayAsyncResponseTransformer.BaosSubscriber baosSubscriber = + new ByteArrayAsyncResponseTransformer.BaosSubscriber(future); + asyncRequestBody.subscribe(baosSubscriber); + assertThat(asyncRequestBody.contentLength()).contains(80L); + + byte[] bytes = future.get(1, TimeUnit.SECONDS); + + byte[] expected = new byte[80]; + try(FileInputStream inputStream = new FileInputStream(smallFile.toFile())) { + inputStream.skip(position); + inputStream.read(expected, 0, 80); + } + + assertThat(bytes).isEqualTo(expected); + } + + @Test + public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + long position = 20L; + long numBytesToRead = 5L; + AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder() + .path(smallFile) + .position(position) + .numBytesToRead(numBytesToRead) + .chunkSizeInBytes(10) + .build(); + + ByteArrayAsyncResponseTransformer.BaosSubscriber baosSubscriber = + new ByteArrayAsyncResponseTransformer.BaosSubscriber(future); + asyncRequestBody.subscribe(baosSubscriber); + assertThat(asyncRequestBody.contentLength()).contains(numBytesToRead); + + byte[] bytes = future.get(1, TimeUnit.SECONDS); + + byte[] expected = new byte[5]; + try (FileInputStream inputStream = new FileInputStream(smallFile.toFile())) { + inputStream.skip(position); + inputStream.read(expected, 0, 5); + } + + assertThat(bytes).isEqualTo(expected); + } + + @Test + public void numBytesToReadConfigured_shouldHonor() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder() + .path(smallFile) + .numBytesToRead(5L) + .chunkSizeInBytes(10) + .build(); + + ByteArrayAsyncResponseTransformer.BaosSubscriber baosSubscriber = + new ByteArrayAsyncResponseTransformer.BaosSubscriber(future); + asyncRequestBody.subscribe(baosSubscriber); + assertThat(asyncRequestBody.contentLength()).contains(5L); + + byte[] bytes = future.get(1, TimeUnit.SECONDS); + + byte[] expected = new byte[5]; + try (FileInputStream inputStream = new FileInputStream(smallFile.toFile())) { + inputStream.read(expected, 0, 5); + } + + assertThat(bytes).isEqualTo(expected); + } + private static class ControllableSubscriber implements Subscriber { private final ByteArrayOutputStream output = new ByteArrayOutputStream(); private final CompletableFuture completed = new CompletableFuture<>(); diff --git a/services/s3/pom.xml b/services/s3/pom.xml index a8d84213fa7e..3b3bbec57206 100644 --- a/services/s3/pom.xml +++ b/services/s3/pom.xml @@ -153,6 +153,11 @@ equalsverifier test + + com.google.jimfs + jimfs + test + diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java index fa31b5453e5e..800cd19a9a24 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java @@ -92,6 +92,19 @@ void putObject_fileRequestBody_objectSentCorrectly() throws Exception { assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); } + @Test + void putObject_withPath_objectSentCorrectly() throws Exception { + mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), testFile.toPath()).get(10, SECONDS); + + ResponseInputStream objContent = + S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toInputStream()); + + assertThat(objContent.response().contentLength()).isEqualTo(testFile.length()); + byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath())); + assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); + } + @Test void putObject_byteAsyncRequestBody_objectSentCorrectly() throws Exception { byte[] bytes = RandomStringUtils.randomAscii(OBJ_SIZE).getBytes(Charset.defaultCharset()); diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java index 8b53099b8683..7cad60336250 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.services.s3.internal.multipart; +import java.nio.file.Path; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -62,7 +63,14 @@ private MultipartS3AsyncClient(S3AsyncClient delegate, MultipartConfiguration mu @Override public CompletableFuture putObject(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) { - return mpuHelper.uploadObject(putObjectRequest, requestBody); + return mpuHelper.uploadWithAsyncRequestBody(putObjectRequest, requestBody); + } + + // TODO: TM currently doesn't call this method because it relies on wrapping of the async request body to report progress + // and we should fix it + @Override + public CompletableFuture putObject(PutObjectRequest putObjectRequest, Path sourcePath) { + return mpuHelper.uploadWithFile(putObjectRequest, sourcePath); } @Override diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelper.java index 1ca499b57aa8..2eb402472c8e 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelper.java @@ -15,12 +15,17 @@ package software.amazon.awssdk.services.s3.internal.multipart; +import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; + +import java.nio.file.Files; +import java.nio.file.Path; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Logger; /** @@ -58,14 +63,24 @@ public UploadObjectHelper(S3AsyncClient s3AsyncClient, apiCallBufferSize); } - public CompletableFuture uploadObject(PutObjectRequest putObjectRequest, - AsyncRequestBody asyncRequestBody) { + public CompletableFuture uploadWithAsyncRequestBody(PutObjectRequest putObjectRequest, + AsyncRequestBody asyncRequestBody) { Long contentLength = asyncRequestBody.contentLength().orElseGet(putObjectRequest::contentLength); if (contentLength == null) { return uploadWithUnknownContentLength.uploadObject(putObjectRequest, asyncRequestBody); } else { - return uploadWithKnownContentLength.uploadObject(putObjectRequest, asyncRequestBody, contentLength.longValue()); + return uploadWithKnownContentLength.uploadObject(putObjectRequest, + Either.left(asyncRequestBody), + contentLength.longValue()); } } + + public CompletableFuture uploadWithFile(PutObjectRequest putObjectRequest, + Path path) { + long contentLength = invokeSafely(() -> Files.size(path)); + return uploadWithKnownContentLength.uploadObject(putObjectRequest, + Either.right(path), + contentLength); + } } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java index f7d199ac3aa6..7f9c1267d5b6 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.services.s3.internal.multipart; +import java.nio.file.Path; import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -34,8 +35,10 @@ import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Pair; +import software.amazon.awssdk.utils.async.SimplePublisher; /** * An internal helper class that automatically uses multipart upload based on the size of the object. @@ -68,17 +71,20 @@ public UploadWithKnownContentLengthHelper(S3AsyncClient s3AsyncClient, } public CompletableFuture uploadObject(PutObjectRequest putObjectRequest, - AsyncRequestBody asyncRequestBody, + Either requestBodyOrPath, long contentLength) { CompletableFuture returnFuture = new CompletableFuture<>(); try { if (contentLength > multipartUploadThresholdInBytes && contentLength > partSizeInBytes) { log.debug(() -> "Starting the upload as multipart upload request"); - uploadInParts(putObjectRequest, contentLength, asyncRequestBody, returnFuture); + uploadInParts(putObjectRequest, contentLength, requestBodyOrPath, returnFuture); } else { log.debug(() -> "Starting the upload as a single upload part request"); - multipartUploadHelper.uploadInOneChunk(putObjectRequest, asyncRequestBody, returnFuture); + requestBodyOrPath.apply( + requestBody -> multipartUploadHelper.uploadInOneChunk(putObjectRequest, requestBody, returnFuture), + path -> multipartUploadHelper.uploadInOneChunk(putObjectRequest, AsyncRequestBody.fromFile(path), + returnFuture)); } } catch (Throwable throwable) { @@ -88,7 +94,9 @@ public CompletableFuture uploadObject(PutObjectRequest putObj return returnFuture; } - private void uploadInParts(PutObjectRequest putObjectRequest, long contentLength, AsyncRequestBody asyncRequestBody, + private void uploadInParts(PutObjectRequest putObjectRequest, + long contentLength, + Either requestBodyOrPath, CompletableFuture returnFuture) { CompletableFuture createMultipartUploadFuture = @@ -99,13 +107,13 @@ private void uploadInParts(PutObjectRequest putObjectRequest, long contentLength genericMultipartHelper.handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable); } else { log.debug(() -> "Initiated a new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); - doUploadInParts(Pair.of(putObjectRequest, asyncRequestBody), contentLength, returnFuture, + doUploadInParts(Pair.of(putObjectRequest, requestBodyOrPath), contentLength, returnFuture, createMultipartUploadResponse.uploadId()); } }); } - private void doUploadInParts(Pair request, + private void doUploadInParts(Pair> request, long contentLength, CompletableFuture returnFuture, String uploadId) { @@ -120,23 +128,68 @@ private void doUploadInParts(Pair request, log.debug(() -> String.format("Starting multipart upload with partCount: %d, optimalPartSize: %d", partCount, optimalPartSize)); - MpuRequestContext mpuRequestContext = new MpuRequestContext(request, contentLength, optimalPartSize, uploadId); + MpuRequestContext mpuRequestContext = new MpuRequestContext(request.left(), contentLength, optimalPartSize, uploadId); - request.right() - .split(b -> b.chunkSizeInBytes(mpuRequestContext.partSize) - .bufferSizeInBytes(maxMemoryUsageInBytes)) - .subscribe(new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, - returnFuture)); + Either requestBodyOrPath = request.right(); + + Consumer splitAsyncRequestBodyAndSendRequests = + splitAsyncRequestBodyAndSendRequests(returnFuture, mpuRequestContext); + Consumer splitFileAndSendRequests = + splitFileAndSendRequests(returnFuture, mpuRequestContext, partCount, optimalPartSize); + + requestBodyOrPath.apply(splitAsyncRequestBodyAndSendRequests, splitFileAndSendRequests); + } + + private Consumer splitFileAndSendRequests(CompletableFuture returnFuture, + MpuRequestContext mpuRequestContext, + int partCount, + long optimalPartSize) { + return path -> { + SimplePublisher simplePublisher = new SimplePublisher<>(); + simplePublisher.subscribe(new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, + returnFuture)); + + try { + for (int i = 0; i < partCount; i++) { + long numBytesToRead; + long position = optimalPartSize * i; + if (i == partCount - 1) { + numBytesToRead = mpuRequestContext.contentLength - position; + } else { + numBytesToRead = optimalPartSize; + } + simplePublisher.send(AsyncRequestBody.fromFile( + config -> config.path(path) + .chunkSizeInBytes(1024 * 1024) // TODO: perf test this + .numBytesToRead(numBytesToRead) + .position(position))); + } + + } catch (Throwable throwable) { + simplePublisher.error(throwable); + } + simplePublisher.complete(); + }; + } + + private Consumer splitAsyncRequestBodyAndSendRequests(CompletableFuture returnFuture, + MpuRequestContext mpuRequestContext) { + return requestBody -> { + requestBody.split(b -> b.chunkSizeInBytes(mpuRequestContext.partSize) + .bufferSizeInBytes(maxMemoryUsageInBytes)) + .subscribe(new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, + returnFuture)); + }; } private static final class MpuRequestContext { - private final Pair request; + private final PutObjectRequest request; private final long contentLength; private final long partSize; private final String uploadId; - private MpuRequestContext(Pair request, + private MpuRequestContext(PutObjectRequest request, long contentLength, long partSize, String uploadId) { @@ -178,7 +231,7 @@ private class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber long optimalPartSize = genericMultipartHelper.calculateOptimalPartSizeFor(mpuRequestContext.contentLength, partSizeInBytes); int partCount = genericMultipartHelper.determinePartCount(mpuRequestContext.contentLength, optimalPartSize); - this.putObjectRequest = mpuRequestContext.request.left(); + this.putObjectRequest = mpuRequestContext.request; this.returnFuture = returnFuture; this.completedParts = new AtomicReferenceArray<>(partCount); this.uploadId = mpuRequestContext.uploadId; diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java index b287270515b0..600b5912782d 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java @@ -25,9 +25,13 @@ import static org.mockito.Mockito.when; import static software.amazon.awssdk.services.s3.internal.multipart.MpuTestUtils.stubSuccessfulCompleteMultipartCall; +import com.google.common.jimfs.Jimfs; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -50,6 +54,7 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; @@ -62,37 +67,44 @@ import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.multipart.MultipartConfiguration; -import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.utils.CompletableFutureUtils; +import software.amazon.awssdk.utils.Either; public class UploadObjectHelperTest { + private static FileSystem jimfs; private static final String BUCKET = "bucket"; private static final String KEY = "key"; - private static final long PART_SIZE = 8 * 1024; + private static final int PART_SIZE = 8 * 1024; // Should contain four parts: [8KB, 8KB, 8KB, 1KB] - private static final long MPU_CONTENT_SIZE = 25 * 1024; - private static final long THRESHOLD = 10 * 1024; + private static final int MPU_CONTENT_SIZE = 25 * 1024; + private static final int THRESHOLD = 10 * 1024; private static final String UPLOAD_ID = "1234"; - private static RandomTempFile testFile; + private static Path testFile; + private static Path testDirectory; private UploadObjectHelper uploadHelper; private S3AsyncClient s3AsyncClient; @BeforeAll public static void beforeAll() throws IOException { - testFile = new RandomTempFile("testfile.dat", MPU_CONTENT_SIZE); + jimfs = Jimfs.newFileSystem(); + testDirectory = jimfs.getPath("test"); + Files.createDirectory(testDirectory); + testFile = Files.write(jimfs.getPath("test", "test.txt"), + RandomStringUtils.randomAscii(MPU_CONTENT_SIZE).getBytes(StandardCharsets.UTF_8)); } @AfterAll public static void afterAll() throws Exception { - testFile.delete(); + jimfs.close(); } - public static Stream asyncRequestBody() { - return Stream.of(new UnknownContentLengthAsyncRequestBody(AsyncRequestBody.fromFile(testFile)), - AsyncRequestBody.fromFile(testFile)); + public static Stream> asyncRequestBodyOrPath() { + return Stream.of(Either.left(new UnknownContentLengthAsyncRequestBody(AsyncRequestBody.fromFile(testFile))), + Either.left(AsyncRequestBody.fromFile(testFile)), + Either.right(testFile)); } @BeforeEach @@ -100,28 +112,50 @@ public void beforeEach() { s3AsyncClient = Mockito.mock(S3AsyncClient.class); uploadHelper = new UploadObjectHelper(s3AsyncClient, new MultipartConfigurationResolver(MultipartConfiguration.builder() - .minimumPartSizeInBytes(PART_SIZE) - .thresholdInBytes(THRESHOLD) - .thresholdInBytes(PART_SIZE * 2) + .minimumPartSizeInBytes((long) PART_SIZE) + .thresholdInBytes((long) THRESHOLD) + .thresholdInBytes((long) (PART_SIZE * 2)) .build())); } @ParameterizedTest - @ValueSource(longs = {THRESHOLD, PART_SIZE, THRESHOLD - 1, PART_SIZE - 1}) - void uploadObject_contentLengthDoesNotExceedThresholdAndPartSize_shouldUploadInOneChunk(long contentLength) { + @ValueSource(ints = {THRESHOLD, PART_SIZE, THRESHOLD - 1, PART_SIZE - 1}) + void uploadWithAsyncRequestBody_contentLengthDoesNotExceedThresholdAndPartSize_shouldUploadInOneChunk(int contentLength) { PutObjectRequest putObjectRequest = putObjectRequest(contentLength); AsyncRequestBody asyncRequestBody = Mockito.mock(AsyncRequestBody.class); CompletableFuture completedFuture = CompletableFuture.completedFuture(PutObjectResponse.builder().build()); when(s3AsyncClient.putObject(putObjectRequest, asyncRequestBody)).thenReturn(completedFuture); - uploadHelper.uploadObject(putObjectRequest, asyncRequestBody).join(); - Mockito.verify(s3AsyncClient).putObject(putObjectRequest, asyncRequestBody); + uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody).join(); + verify(s3AsyncClient).putObject(putObjectRequest, asyncRequestBody); + } + + @ParameterizedTest + @ValueSource(ints = {THRESHOLD, PART_SIZE, THRESHOLD - 1, PART_SIZE - 1}) + void uploadWithPath_contentLengthDoesNotExceedThresholdAndPartSize_shouldUploadInOneChunk(int contentLength) throws IOException { + PutObjectRequest putObjectRequest = putObjectRequest(contentLength); + + CompletableFuture completedFuture = + CompletableFuture.completedFuture(PutObjectResponse.builder().build()); + + Path file = Files.write(jimfs.getPath("test", "bar.txt"), + RandomStringUtils.randomAscii(contentLength).getBytes(StandardCharsets.UTF_8)); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(AsyncRequestBody.class); + + when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(FileAsyncRequestBody.class))).thenReturn(completedFuture); + + uploadHelper.uploadWithFile(putObjectRequest, file).join(); + + verify(s3AsyncClient).putObject(any(PutObjectRequest.class), requestCaptor.capture()); + AsyncRequestBody actual = requestCaptor.getValue(); + assertThat(actual.contentLength()).hasValue(Long.valueOf(contentLength)); } @ParameterizedTest - @ValueSource(longs = {PART_SIZE, PART_SIZE - 1}) - void uploadObject_unKnownContentLengthDoesNotExceedPartSize_shouldUploadInOneChunk(long contentLength) { + @ValueSource(ints = {PART_SIZE, PART_SIZE - 1}) + void uploadWithAsyncRequestBody_unKnownContentLengthDoesNotExceedPartSize_shouldUploadInOneChunk(int contentLength) { PutObjectRequest putObjectRequest = putObjectRequest(contentLength); AsyncRequestBody asyncRequestBody = new UnknownContentLengthAsyncRequestBody(AsyncRequestBody.fromBytes(RandomStringUtils.randomAscii(Math.toIntExact(contentLength)) @@ -130,20 +164,24 @@ void uploadObject_unKnownContentLengthDoesNotExceedPartSize_shouldUploadInOneChu CompletableFuture completedFuture = CompletableFuture.completedFuture(PutObjectResponse.builder().build()); when(s3AsyncClient.putObject(putObjectRequest, asyncRequestBody)).thenReturn(completedFuture); - uploadHelper.uploadObject(putObjectRequest, asyncRequestBody).join(); + uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody).join(); Mockito.verify(s3AsyncClient).putObject(putObjectRequest, asyncRequestBody); } @ParameterizedTest - @MethodSource("asyncRequestBody") - void uploadObject_contentLengthExceedThresholdAndPartSize_shouldUseMPU(AsyncRequestBody asyncRequestBody) { + @MethodSource("asyncRequestBodyOrPath") + void upload_contentLengthExceedThresholdAndPartSize_shouldUseMPU(Either asyncRequestBodyOrPath) { PutObjectRequest putObjectRequest = putObjectRequest(null); MpuTestUtils.stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient); stubSuccessfulUploadPartCalls(); stubSuccessfulCompleteMultipartCall(BUCKET, KEY, s3AsyncClient); - uploadHelper.uploadObject(putObjectRequest, asyncRequestBody).join(); + asyncRequestBodyOrPath.apply( + asyncRequestBody -> uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody).join(), + path -> uploadHelper.uploadWithFile(putObjectRequest, path).join() + ); + ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(UploadPartRequest.class); ArgumentCaptor requestBodyArgumentCaptor = ArgumentCaptor.forClass(AsyncRequestBody.class); verify(s3AsyncClient, times(4)).uploadPart(requestArgumentCaptor.capture(), @@ -164,7 +202,7 @@ void uploadObject_contentLengthExceedThresholdAndPartSize_shouldUseMPU(AsyncRequ if (i == actualRequests.size() - 1) { assertThat(requestBody.contentLength()).hasValue(1024L); } else{ - assertThat(requestBody.contentLength()).hasValue(PART_SIZE); + assertThat(requestBody.contentLength()).hasValue(Long.valueOf(PART_SIZE)); } } } @@ -173,8 +211,8 @@ void uploadObject_contentLengthExceedThresholdAndPartSize_shouldUseMPU(AsyncRequ * The second part failed, it should cancel ongoing part(first part). */ @ParameterizedTest - @MethodSource("asyncRequestBody") - void mpu_onePartFailed_shouldFailOtherPartsAndAbort(AsyncRequestBody asyncRequestBody) { + @MethodSource("asyncRequestBodyOrPath") + void mpu_onePartFailed_shouldFailOtherPartsAndAbort(Either asyncRequestBodyOrPath) { PutObjectRequest putObjectRequest = putObjectRequest(MPU_CONTENT_SIZE); MpuTestUtils.stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient); @@ -190,8 +228,10 @@ void mpu_onePartFailed_shouldFailOtherPartsAndAbort(AsyncRequestBody asyncReques when(s3AsyncClient.abortMultipartUpload(any(AbortMultipartUploadRequest.class))) .thenReturn(CompletableFuture.completedFuture(AbortMultipartUploadResponse.builder().build())); - CompletableFuture future = uploadHelper.uploadObject(putObjectRequest, - asyncRequestBody); + CompletableFuture future = asyncRequestBodyOrPath.map( + asyncRequestBody -> uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody), + path -> uploadHelper.uploadWithFile(putObjectRequest, path) + ); assertThatThrownBy(future::join).hasMessageContaining("Failed to send multipart upload requests").hasRootCause(exception); @@ -224,7 +264,7 @@ void upload_knownContentLengthCancelResponseFuture_shouldCancelCreateMultipart() .thenReturn(createMultipartFuture); CompletableFuture future = - uploadHelper.uploadObject(putObjectRequest, AsyncRequestBody.fromFile(testFile)); + uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, AsyncRequestBody.fromFile(testFile)); future.cancel(true); @@ -245,7 +285,7 @@ void upload_knownContentLengthCancelResponseFuture_shouldCancelUploadPart() { any(AsyncRequestBody.class))).thenReturn(ongoingRequest); CompletableFuture future = - uploadHelper.uploadObject(putObjectRequest, AsyncRequestBody.fromFile(testFile)); + uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, AsyncRequestBody.fromFile(testFile)); future.cancel(true); @@ -253,8 +293,8 @@ void upload_knownContentLengthCancelResponseFuture_shouldCancelUploadPart() { } @ParameterizedTest - @MethodSource("asyncRequestBody") - void uploadObject_createMultipartUploadFailed_shouldFail(AsyncRequestBody asyncRequestBody) { + @MethodSource("asyncRequestBodyOrPath") + void uploadWithAsyncRequestBody_createMultipartUploadFailed_shouldFail(Either asyncRequestBodyOrPath) { PutObjectRequest putObjectRequest = putObjectRequest(null); SdkClientException exception = SdkClientException.create("CompleteMultipartUpload failed"); @@ -265,15 +305,17 @@ void uploadObject_createMultipartUploadFailed_shouldFail(AsyncRequestBody asyncR when(s3AsyncClient.createMultipartUpload(any(CreateMultipartUploadRequest.class))) .thenReturn(createMultipartUploadFuture); - CompletableFuture future = uploadHelper.uploadObject(putObjectRequest, - asyncRequestBody); + CompletableFuture future = asyncRequestBodyOrPath.map( + asyncRequestBody -> uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody), + path -> uploadHelper.uploadWithFile(putObjectRequest, path) + ); assertThatThrownBy(future::join).hasMessageContaining("Failed to initiate multipart upload") .hasRootCause(exception); } @ParameterizedTest - @MethodSource("asyncRequestBody") - void uploadObject_completeMultipartFailed_shouldFailAndAbort(AsyncRequestBody asyncRequestBody) { + @MethodSource("asyncRequestBodyOrPath") + void uploadWithAsyncRequestBody_completeMultipartFailed_shouldFailAndAbort(Either asyncRequestBodyOrPath) { PutObjectRequest putObjectRequest = putObjectRequest(null); MpuTestUtils.stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient); @@ -290,19 +332,21 @@ void uploadObject_completeMultipartFailed_shouldFailAndAbort(AsyncRequestBody as when(s3AsyncClient.abortMultipartUpload(any(AbortMultipartUploadRequest.class))) .thenReturn(CompletableFuture.completedFuture(AbortMultipartUploadResponse.builder().build())); - CompletableFuture future = uploadHelper.uploadObject(putObjectRequest, - asyncRequestBody); + CompletableFuture future = asyncRequestBodyOrPath.map( + asyncRequestBody -> uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody), + path -> uploadHelper.uploadWithFile(putObjectRequest, path) + ); assertThatThrownBy(future::join).hasMessageContaining("Failed to send multipart requests") .hasRootCause(exception); } @ParameterizedTest() @ValueSource(booleans = {false, true}) - void uploadObject_requestBodyOnError_shouldFailAndAbort(boolean contentLengthKnown) { + void uploadWithAsyncRequestBody_requestBodyOnError_shouldFailAndAbort(boolean contentLengthKnown) { PutObjectRequest putObjectRequest = putObjectRequest(null); Exception exception = new RuntimeException("error"); - Long contentLength = contentLengthKnown ? MPU_CONTENT_SIZE : null; + Long contentLength = contentLengthKnown ? Long.valueOf(MPU_CONTENT_SIZE) : null; ErroneousAsyncRequestBody erroneousAsyncRequestBody = new ErroneousAsyncRequestBody(contentLength, exception); MpuTestUtils.stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient); @@ -311,17 +355,17 @@ void uploadObject_requestBodyOnError_shouldFailAndAbort(boolean contentLengthKno when(s3AsyncClient.abortMultipartUpload(any(AbortMultipartUploadRequest.class))) .thenReturn(CompletableFuture.completedFuture(AbortMultipartUploadResponse.builder().build())); - CompletableFuture future = uploadHelper.uploadObject(putObjectRequest, - erroneousAsyncRequestBody); + CompletableFuture future = uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, + erroneousAsyncRequestBody); assertThatThrownBy(future::join).hasMessageContaining("Failed to send multipart upload requests") .hasRootCause(exception); } - private static PutObjectRequest putObjectRequest(Long contentLength) { + private static PutObjectRequest putObjectRequest(Integer contentLength) { return PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) - .contentLength(contentLength) + .contentLength(contentLength == null ? null : Long.valueOf(contentLength)) .build(); } diff --git a/utils/src/main/java/software/amazon/awssdk/utils/Validate.java b/utils/src/main/java/software/amazon/awssdk/utils/Validate.java index 7890c3ee14cf..6941ad9a2527 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/Validate.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/Validate.java @@ -656,6 +656,19 @@ public static int isNotNegative(int num, String fieldName) { return num; } + public static Long isNotNegativeOrNull(Long num, String fieldName) { + + if (num == null) { + return null; + } + + if (num < 0) { + throw new IllegalArgumentException(String.format("%s must not be negative", fieldName)); + } + + return num; + } + public static long isNotNegative(long num, String fieldName) { if (num < 0) { diff --git a/utils/src/test/java/software/amazon/awssdk/utils/ValidateTest.java b/utils/src/test/java/software/amazon/awssdk/utils/ValidateTest.java index 2983398f83d9..29bc80edbe83 100644 --- a/utils/src/test/java/software/amazon/awssdk/utils/ValidateTest.java +++ b/utils/src/test/java/software/amazon/awssdk/utils/ValidateTest.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.utils; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -610,6 +611,19 @@ public void isNull_notNull_shouldThrow() { Validate.isNull("string", "not null"); } + @Test + public void isNotNegativeOrNull_negative_throws() { + expected.expect(IllegalArgumentException.class); + expected.expectMessage("foo"); + Validate.isNotNegativeOrNull(-1L, "foo"); + } + + @Test + public void isNotNegativeOrNull_notNegative_notThrow() { + assertThat(Validate.isNotNegativeOrNull(5L, "foo")).isEqualTo(5L); + assertThat(Validate.isNotNegativeOrNull(0L, "foo")).isEqualTo(0L); + } + @Test public void isNull_null_shouldPass() { Validate.isNull(null, "not null"); From 59288f97a0b0851e47df9b136283df564638178a Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Tue, 22 Aug 2023 13:04:56 -0700 Subject: [PATCH 2/5] Override split in FileAsyncRequestBody --- .../awssdk/core/async/AsyncRequestBody.java | 16 +- .../AsyncRequestBodySplitConfiguration.java | 10 ++ .../internal/async/FileAsyncRequestBody.java | 54 ++++-- .../FileAsyncRequestBodySplitHelper.java | 162 ++++++++++++++++++ .../internal/async/SplittingPublisher.java | 48 ++---- .../FileAsyncRequestBodySplitHelperTest.java | 96 +++++++++++ .../async/FileAsyncRequestBodyTest.java | 7 +- .../async/SplittingPublisherTest.java | 53 ++---- .../async/SplittingPublisherTestUtils.java | 70 ++++++++ ...ltipartClientPutObjectIntegrationTest.java | 13 -- .../multipart/MultipartS3AsyncClient.java | 10 +- .../multipart/UploadObjectHelper.java | 21 +-- .../UploadWithKnownContentLengthHelper.java | 87 ++-------- .../multipart/UploadObjectHelperTest.java | 150 +++++++--------- 14 files changed, 492 insertions(+), 305 deletions(-) create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java create mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java create mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java index 21300782a50e..8fd0fb6d6659 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java @@ -441,22 +441,18 @@ static AsyncRequestBody empty() { * is 2MB and the default buffer size is 8MB. * *

- * If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the - * subscriber right after it's initialized. - *

- * If content length is null, it is sent after the entire content for that chunk is buffered. - * In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}. + * By default, if content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is + * delivered to the subscriber right after it's initialized. On the other hand, if content length is null, it is sent after + * the entire content for that chunk is buffered. In this case, the configured {@code maxMemoryUsageInBytes} must be larger + * than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this + * interface overrides this method. * * @see AsyncRequestBodySplitConfiguration */ default SdkPublisher split(AsyncRequestBodySplitConfiguration splitConfiguration) { Validate.notNull(splitConfiguration, "splitConfiguration"); - return SplittingPublisher.builder() - .asyncRequestBody(this) - .chunkSizeInBytes(splitConfiguration.chunkSizeInBytes()) - .bufferSizeInBytes(splitConfiguration.bufferSizeInBytes()) - .build(); + return new SplittingPublisher(this, splitConfiguration); } /** diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java index fe51f33b4ff3..45596ab03eaa 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java @@ -28,6 +28,12 @@ @SdkPublicApi public final class AsyncRequestBodySplitConfiguration implements ToCopyableBuilder { + private static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024L; + private static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4; + private static final AsyncRequestBodySplitConfiguration DEFAULT_CONFIG = builder() + .bufferSizeInBytes(DEFAULT_BUFFER_SIZE) + .chunkSizeInBytes(DEFAULT_CHUNK_SIZE) + .build(); private final Long chunkSizeInBytes; private final Long bufferSizeInBytes; @@ -36,6 +42,10 @@ private AsyncRequestBodySplitConfiguration(DefaultBuilder builder) { this.bufferSizeInBytes = Validate.isPositiveOrNull(builder.bufferSizeInBytes, "bufferSizeInBytes"); } + public static AsyncRequestBodySplitConfiguration defaultConfiguration() { + return DEFAULT_CONFIG; + } + /** * The configured chunk size for each divided {@link AsyncRequestBody}. */ diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java index 70d51ffcf690..f8bbdd552088 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java @@ -33,6 +33,8 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.core.internal.util.NoopSubscription; import software.amazon.awssdk.utils.Logger; @@ -78,6 +80,32 @@ private FileAsyncRequestBody(DefaultBuilder builder) { Validate.isNotNegative(builder.numBytesToRead, "numBytesToRead"); } + @Override + public SdkPublisher split(AsyncRequestBodySplitConfiguration splitConfiguration) { + Validate.notNull(splitConfiguration, "splitConfiguration"); + return new FileAsyncRequestBodySplitHelper(this, splitConfiguration).split(); + } + + public Path path() { + return path; + } + + public long fileLength() { + return fileLength; + } + + public int chunkSizeInBytes() { + return chunkSizeInBytes; + } + + public long position() { + return position; + } + + public long numBytesToRead() { + return numBytesToRead; + } + @Override public Optional contentLength() { return Optional.of(numBytesToRead); @@ -97,7 +125,7 @@ public void subscribe(Subscriber s) { // We need to synchronize here because the subscriber could call // request() from within onSubscribe which would potentially // trigger onNext before onSubscribe is finished. - Subscription subscription = new FileSubscription(path, channel, s, chunkSizeInBytes, position, numBytesToRead); + Subscription subscription = new FileSubscription(channel, s); synchronized (subscription) { s.onSubscribe(subscription); @@ -214,13 +242,11 @@ public FileAsyncRequestBody build() { /** * Reads the file for one subscriber. */ - private static final class FileSubscription implements Subscription { - private final Path path; + private final class FileSubscription implements Subscription { private final AsynchronousFileChannel inputChannel; private final Subscriber subscriber; - private final int chunkSize; - private final AtomicLong position; + private final AtomicLong currentPosition; private final AtomicLong remainingBytes; private final long sizeAtStart; private final FileTime modifiedTimeAtStart; @@ -229,20 +255,14 @@ private static final class FileSubscription implements Subscription { private volatile boolean done = false; private final Object lock = new Object(); - private FileSubscription(Path path, - AsynchronousFileChannel inputChannel, - Subscriber subscriber, - int chunkSize, - long position, - long numBytesToRead) throws IOException { - this.path = path; + private FileSubscription(AsynchronousFileChannel inputChannel, + Subscriber subscriber) throws IOException { this.inputChannel = inputChannel; this.subscriber = subscriber; - this.chunkSize = chunkSize; this.sizeAtStart = inputChannel.size(); this.modifiedTimeAtStart = Files.getLastModifiedTime(path); this.remainingBytes = new AtomicLong(numBytesToRead); - this.position = new AtomicLong(position); + this.currentPosition = new AtomicLong(position); } @Override @@ -297,8 +317,8 @@ private void readData() { return; } - ByteBuffer buffer = ByteBuffer.allocate(Math.min(chunkSize, NumericUtils.saturatedCast(remainingBytes.get()))); - inputChannel.read(buffer, position.get(), buffer, new CompletionHandler() { + ByteBuffer buffer = ByteBuffer.allocate(Math.min(chunkSizeInBytes, NumericUtils.saturatedCast(remainingBytes.get()))); + inputChannel.read(buffer, currentPosition.get(), buffer, new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer attachment) { try { @@ -306,7 +326,7 @@ public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); int readBytes = attachment.remaining(); - position.addAndGet(readBytes); + currentPosition.addAndGet(readBytes); remainingBytes.addAndGet(-readBytes); signalOnNext(attachment); diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java new file mode 100644 index 000000000000..8ec98007fd40 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -0,0 +1,162 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.SdkTestInternalApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.async.SimplePublisher; + +/** + * A helper class to split a {@link FileAsyncRequestBody} to multiple smaller async request bodies. It ensures the buffer used to + * be under the configured size via {@link AsyncRequestBodySplitConfiguration#bufferSizeInBytes()} by tracking the number of + * concurrent ongoing {@link AsyncRequestBody}s. + */ +@SdkInternalApi +public final class FileAsyncRequestBodySplitHelper { + private static final Logger log = Logger.loggerFor(FileAsyncRequestBodySplitHelper.class); + + private final AtomicBoolean isSendingRequestBody = new AtomicBoolean(false); + private final AtomicLong remainingBytes; + + private final long totalContentLength; + private final Path path; + private final int bufferPerAsyncRequestBody; + private final long totalBufferSize; + private final long chunkSize; + + private volatile boolean isDone = false; + + private AtomicInteger numAsyncRequestBodiesInFlight = new AtomicInteger(0); + private AtomicInteger chunkIndex = new AtomicInteger(0); + + public FileAsyncRequestBodySplitHelper(FileAsyncRequestBody asyncRequestBody, + AsyncRequestBodySplitConfiguration splitConfiguration) { + Validate.notNull(asyncRequestBody, "asyncRequestBody"); + Validate.notNull(splitConfiguration, "splitConfiguration"); + Validate.isTrue(asyncRequestBody.contentLength().isPresent(), "Content length must be present", asyncRequestBody); + this.totalContentLength = asyncRequestBody.contentLength().get(); + this.remainingBytes = new AtomicLong(totalContentLength); + this.path = asyncRequestBody.path(); + this.chunkSize = splitConfiguration.chunkSizeInBytes() == null ? + AsyncRequestBodySplitConfiguration.defaultConfiguration().chunkSizeInBytes() : + splitConfiguration.chunkSizeInBytes(); + this.totalBufferSize = splitConfiguration.bufferSizeInBytes() == null ? + AsyncRequestBodySplitConfiguration.defaultConfiguration().bufferSizeInBytes() : + splitConfiguration.bufferSizeInBytes(); + this.bufferPerAsyncRequestBody = asyncRequestBody.chunkSizeInBytes(); + } + + public SdkPublisher split() { + + SimplePublisher simplePublisher = new SimplePublisher<>(); + + try { + sendAsyncRequestBody(simplePublisher); + } catch (Throwable throwable) { + simplePublisher.error(throwable); + } + + return SdkPublisher.adapt(simplePublisher); + } + + private void sendAsyncRequestBody(SimplePublisher simplePublisher) { + if (!isSendingRequestBody.compareAndSet(false, true)) { + return; + } + + try { + doSendAsyncRequestBody(simplePublisher); + } finally { + isSendingRequestBody.set(false); + } + } + + private void doSendAsyncRequestBody(SimplePublisher simplePublisher) { + while (true) { + if (!shouldSendMore()) { + break; + } + + AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(simplePublisher); + simplePublisher.send(currentAsyncRequestBody); + numAsyncRequestBodiesInFlight.incrementAndGet(); + checkCompletion(simplePublisher, currentAsyncRequestBody); + } + } + + private void checkCompletion(SimplePublisher simplePublisher, AsyncRequestBody currentAsyncRequestBody) { + long remaining = remainingBytes.addAndGet(-currentAsyncRequestBody.contentLength().get()); + + if (remaining == 0) { + isDone = true; + simplePublisher.complete(); + } + } + + private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher simplePublisher) { + long position = chunkSize * chunkIndex.getAndIncrement(); + long numBytesToReadForThisChunk = Math.min(totalContentLength - position, chunkSize); + FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder() + .path(path) + .position(position) + .numBytesToRead(numBytesToReadForThisChunk) + .build(); + return new AsyncRequestBody() { + + @Override + public void subscribe(Subscriber s) { + fileAsyncRequestBody.doAfterOnComplete(() -> { + numAsyncRequestBodiesInFlight.decrementAndGet(); + sendAsyncRequestBody(simplePublisher); + }).subscribe(s); + } + + @Override + public Optional contentLength() { + return fileAsyncRequestBody.contentLength(); + } + }; + } + + /** + * Should not send more if it's done OR sending next request body would exceed the total buffer size + */ + private boolean shouldSendMore() { + if (isDone) { + return false; + } + + long currentUsedBuffer = numAsyncRequestBodiesInFlight.get() * bufferPerAsyncRequestBody; + return currentUsedBuffer + bufferPerAsyncRequestBody <= totalBufferSize; + } + + @SdkTestInternalApi + AtomicInteger numAsyncRequestBodiesInFlight() { + return numAsyncRequestBodiesInFlight; + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index c56d1b6437d9..6d8d18a14754 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -24,6 +24,7 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.NonRetryableException; import software.amazon.awssdk.core.internal.util.NoopSubscription; @@ -41,18 +42,24 @@ @SdkInternalApi public class SplittingPublisher implements SdkPublisher { private static final Logger log = Logger.loggerFor(SplittingPublisher.class); - private static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024L; - private static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4; private final AsyncRequestBody upstreamPublisher; private final SplittingSubscriber splittingSubscriber; private final SimplePublisher downstreamPublisher = new SimplePublisher<>(); private final long chunkSizeInBytes; private final long bufferSizeInBytes; - private SplittingPublisher(Builder builder) { - this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody"); - this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes; - this.bufferSizeInBytes = builder.bufferSizeInBytes == null ? DEFAULT_BUFFER_SIZE : builder.bufferSizeInBytes; + public SplittingPublisher(AsyncRequestBody asyncRequestBody, + AsyncRequestBodySplitConfiguration splitConfiguration) { + this.upstreamPublisher = Validate.paramNotNull(asyncRequestBody, "asyncRequestBody"); + Validate.notNull(splitConfiguration, "splitConfiguration"); + this.chunkSizeInBytes = splitConfiguration.chunkSizeInBytes() == null ? + AsyncRequestBodySplitConfiguration.defaultConfiguration().chunkSizeInBytes() : + splitConfiguration.chunkSizeInBytes(); + + this.bufferSizeInBytes = splitConfiguration.bufferSizeInBytes() == null ? + AsyncRequestBodySplitConfiguration.defaultConfiguration().bufferSizeInBytes() : + splitConfiguration.bufferSizeInBytes(); + this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null)); if (!upstreamPublisher.contentLength().isPresent()) { @@ -62,10 +69,6 @@ private SplittingPublisher(Builder builder) { } } - public static Builder builder() { - return new Builder(); - } - @Override public void subscribe(Subscriber downstreamSubscriber) { downstreamPublisher.subscribe(downstreamSubscriber); @@ -303,29 +306,4 @@ private void addDataBuffered(int length) { } } } - - public static final class Builder { - private AsyncRequestBody asyncRequestBody; - private Long chunkSizeInBytes; - private Long bufferSizeInBytes; - - public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) { - this.asyncRequestBody = asyncRequestBody; - return this; - } - - public Builder chunkSizeInBytes(Long chunkSizeInBytes) { - this.chunkSizeInBytes = chunkSizeInBytes; - return this; - } - - public Builder bufferSizeInBytes(Long bufferSizeInBytes) { - this.bufferSizeInBytes = bufferSizeInBytes; - return this; - } - - public SplittingPublisher build() { - return new SplittingPublisher(this); - } - } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java new file mode 100644 index 000000000000..4c5d0748d16d --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java @@ -0,0 +1,96 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; +import software.amazon.awssdk.testutils.RandomTempFile; + +public class FileAsyncRequestBodySplitHelperTest { + + private static final int CHUNK_SIZE = 5; + private static Path testFile; + private static ScheduledExecutorService executor; + + + @BeforeAll + public static void setup() throws IOException { + testFile = new RandomTempFile(2000).toPath(); + executor = Executors.newScheduledThreadPool(1); + } + + @AfterAll + public static void teardown() throws IOException { + try { + Files.delete(testFile); + } catch (NoSuchFileException e) { + // ignore + } + executor.shutdown(); + } + + @ParameterizedTest + @ValueSource(ints = {CHUNK_SIZE, CHUNK_SIZE * 2 - 1, CHUNK_SIZE * 2}) + public void split_differentChunkSize_shouldSplitCorrectly(int chunkSize) throws Exception { + long bufferSize = 55l; + int chunkSizeInBytes = 10; + FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder() + .path(testFile) + .chunkSizeInBytes(10) + .build(); + AsyncRequestBodySplitConfiguration config = + AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes((long) chunkSize) + .bufferSizeInBytes(55L) + .build(); + FileAsyncRequestBodySplitHelper helper = new FileAsyncRequestBodySplitHelper(fileAsyncRequestBody, config); + + AtomicInteger maxConcurrency = new AtomicInteger(0); + ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(verifyConcurrentRequests(helper, maxConcurrency), + 1, 50, TimeUnit.MICROSECONDS); + + verifyIndividualAsyncRequestBody(helper.split(), testFile, chunkSize); + scheduledFuture.cancel(true); + int expectedMaxConcurrency = (int) (bufferSize / chunkSizeInBytes); + assertThat(maxConcurrency.get()).isLessThanOrEqualTo(expectedMaxConcurrency); + } + + private static Runnable verifyConcurrentRequests(FileAsyncRequestBodySplitHelper helper, AtomicInteger maxConcurrency) { + return () -> { + int concurrency = helper.numAsyncRequestBodiesInFlight().get(); + + if (concurrency > maxConcurrency.get()) { + maxConcurrency.set(concurrency); + } + assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueLessThan(10); + }; + } +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java index 6335c756c268..5d12035c1879 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java @@ -18,14 +18,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertTrue; +import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; -import io.reactivex.Flowable; import java.io.ByteArrayOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousFileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.NoSuchFileException; @@ -39,10 +38,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.http.async.SimpleSubscriber; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.utils.BinaryUtils; diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 0966ea6eb76f..d2e06f28492a 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; import java.io.ByteArrayInputStream; @@ -44,6 +45,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; import software.amazon.awssdk.utils.BinaryUtils; public class SplittingPublisherTest { @@ -72,11 +74,10 @@ public static void afterAll() throws Exception { public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() { AsyncRequestBody body = AsyncRequestBody.fromPublisher(s -> { }); - assertThatThrownBy(() -> SplittingPublisher.builder() - .asyncRequestBody(body) - .chunkSizeInBytes(10L) - .bufferSizeInBytes(5L) - .build()) + assertThatThrownBy(() -> new SplittingPublisher(body, AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(5L) + .build())) .hasMessageContaining("must be larger than or equal"); } @@ -106,11 +107,10 @@ public Optional contentLength() { return Optional.empty(); } }; - SplittingPublisher splittingPublisher = SplittingPublisher.builder() - .asyncRequestBody(asyncRequestBody) + SplittingPublisher splittingPublisher = new SplittingPublisher(asyncRequestBody, AsyncRequestBodySplitConfiguration.builder() .chunkSizeInBytes((long) CHUNK_SIZE) .bufferSizeInBytes(10L) - .build(); + .build()); List> futures = new ArrayList<>(); @@ -148,38 +148,13 @@ public Optional contentLength() { private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception { - SplittingPublisher splittingPublisher = SplittingPublisher.builder() - .asyncRequestBody(asyncRequestBody) - .chunkSizeInBytes((long) chunkSize) - .bufferSizeInBytes((long) chunkSize * 4) - .build(); + SplittingPublisher splittingPublisher = new SplittingPublisher(asyncRequestBody, + AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes((long) chunkSize) + .bufferSizeInBytes((long) chunkSize * 4) + .build()); - List> futures = new ArrayList<>(); - - splittingPublisher.subscribe(requestBody -> { - CompletableFuture baosFuture = new CompletableFuture<>(); - BaosSubscriber subscriber = new BaosSubscriber(baosFuture); - futures.add(baosFuture); - requestBody.subscribe(subscriber); - }).get(5, TimeUnit.SECONDS); - - assertThat(futures.size()).isEqualTo((int) Math.ceil(CONTENT_SIZE / (double) chunkSize)); - - for (int i = 0; i < futures.size(); i++) { - try (FileInputStream fileInputStream = new FileInputStream(testFile)) { - byte[] expected; - if (i == futures.size() - 1) { - int lastChunk = CONTENT_SIZE % chunkSize == 0 ? chunkSize : (CONTENT_SIZE % chunkSize); - expected = new byte[lastChunk]; - } else { - expected = new byte[chunkSize]; - } - fileInputStream.skip(i * chunkSize); - fileInputStream.read(expected); - byte[] actualBytes = futures.get(i).join(); - assertThat(actualBytes).isEqualTo(expected); - }; - } + verifyIndividualAsyncRequestBody(splittingPublisher, testFile.toPath(), chunkSize); } private static class TestAsyncRequestBody implements AsyncRequestBody { diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java new file mode 100644 index 000000000000..04da97adbf42 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java @@ -0,0 +1,70 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.File; +import java.io.FileInputStream; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.assertj.core.api.Assertions; +import org.reactivestreams.Publisher; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer; +import software.amazon.awssdk.core.internal.async.SplittingPublisherTest; + +public final class SplittingPublisherTestUtils { + + public static void verifyIndividualAsyncRequestBody(SdkPublisher publisher, + Path file, + int chunkSize) throws Exception { + + List> futures = new ArrayList<>(); + publisher.subscribe(requestBody -> { + CompletableFuture baosFuture = new CompletableFuture<>(); + ByteArrayAsyncResponseTransformer.BaosSubscriber subscriber = + new ByteArrayAsyncResponseTransformer.BaosSubscriber(baosFuture); + requestBody.subscribe(subscriber); + futures.add(baosFuture); + }).get(5, TimeUnit.SECONDS); + + long contentLength = file.toFile().length(); + Assertions.assertThat(futures.size()).isEqualTo((int) Math.ceil(contentLength / (double) chunkSize)); + + for (int i = 0; i < futures.size(); i++) { + try (FileInputStream fileInputStream = new FileInputStream(file.toFile())) { + byte[] expected; + if (i == futures.size() - 1) { + int lastChunk = contentLength % chunkSize == 0 ? chunkSize : (int) (contentLength % chunkSize); + expected = new byte[lastChunk]; + } else { + expected = new byte[chunkSize]; + } + fileInputStream.skip(i * chunkSize); + fileInputStream.read(expected); + byte[] actualBytes = futures.get(i).join(); + Assertions.assertThat(actualBytes).isEqualTo(expected); + } + } + } +} diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java index 800cd19a9a24..fa31b5453e5e 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java @@ -92,19 +92,6 @@ void putObject_fileRequestBody_objectSentCorrectly() throws Exception { assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); } - @Test - void putObject_withPath_objectSentCorrectly() throws Exception { - mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), testFile.toPath()).get(10, SECONDS); - - ResponseInputStream objContent = - S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), - ResponseTransformer.toInputStream()); - - assertThat(objContent.response().contentLength()).isEqualTo(testFile.length()); - byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath())); - assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); - } - @Test void putObject_byteAsyncRequestBody_objectSentCorrectly() throws Exception { byte[] bytes = RandomStringUtils.randomAscii(OBJ_SIZE).getBytes(Charset.defaultCharset()); diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java index 7cad60336250..8b53099b8683 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java @@ -16,7 +16,6 @@ package software.amazon.awssdk.services.s3.internal.multipart; -import java.nio.file.Path; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -63,14 +62,7 @@ private MultipartS3AsyncClient(S3AsyncClient delegate, MultipartConfiguration mu @Override public CompletableFuture putObject(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) { - return mpuHelper.uploadWithAsyncRequestBody(putObjectRequest, requestBody); - } - - // TODO: TM currently doesn't call this method because it relies on wrapping of the async request body to report progress - // and we should fix it - @Override - public CompletableFuture putObject(PutObjectRequest putObjectRequest, Path sourcePath) { - return mpuHelper.uploadWithFile(putObjectRequest, sourcePath); + return mpuHelper.uploadObject(putObjectRequest, requestBody); } @Override diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelper.java index 2eb402472c8e..1ca499b57aa8 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelper.java @@ -15,17 +15,12 @@ package software.amazon.awssdk.services.s3.internal.multipart; -import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; - -import java.nio.file.Files; -import java.nio.file.Path; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; -import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Logger; /** @@ -63,24 +58,14 @@ public UploadObjectHelper(S3AsyncClient s3AsyncClient, apiCallBufferSize); } - public CompletableFuture uploadWithAsyncRequestBody(PutObjectRequest putObjectRequest, - AsyncRequestBody asyncRequestBody) { + public CompletableFuture uploadObject(PutObjectRequest putObjectRequest, + AsyncRequestBody asyncRequestBody) { Long contentLength = asyncRequestBody.contentLength().orElseGet(putObjectRequest::contentLength); if (contentLength == null) { return uploadWithUnknownContentLength.uploadObject(putObjectRequest, asyncRequestBody); } else { - return uploadWithKnownContentLength.uploadObject(putObjectRequest, - Either.left(asyncRequestBody), - contentLength.longValue()); + return uploadWithKnownContentLength.uploadObject(putObjectRequest, asyncRequestBody, contentLength.longValue()); } } - - public CompletableFuture uploadWithFile(PutObjectRequest putObjectRequest, - Path path) { - long contentLength = invokeSafely(() -> Files.size(path)); - return uploadWithKnownContentLength.uploadObject(putObjectRequest, - Either.right(path), - contentLength); - } } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java index 7f9c1267d5b6..46caefca8d61 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java @@ -16,7 +16,6 @@ package software.amazon.awssdk.services.s3.internal.multipart; -import java.nio.file.Path; import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -35,10 +34,8 @@ import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.UploadPartRequest; -import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Pair; -import software.amazon.awssdk.utils.async.SimplePublisher; /** * An internal helper class that automatically uses multipart upload based on the size of the object. @@ -71,20 +68,17 @@ public UploadWithKnownContentLengthHelper(S3AsyncClient s3AsyncClient, } public CompletableFuture uploadObject(PutObjectRequest putObjectRequest, - Either requestBodyOrPath, + AsyncRequestBody asyncRequestBody, long contentLength) { CompletableFuture returnFuture = new CompletableFuture<>(); try { if (contentLength > multipartUploadThresholdInBytes && contentLength > partSizeInBytes) { log.debug(() -> "Starting the upload as multipart upload request"); - uploadInParts(putObjectRequest, contentLength, requestBodyOrPath, returnFuture); + uploadInParts(putObjectRequest, contentLength, asyncRequestBody, returnFuture); } else { log.debug(() -> "Starting the upload as a single upload part request"); - requestBodyOrPath.apply( - requestBody -> multipartUploadHelper.uploadInOneChunk(putObjectRequest, requestBody, returnFuture), - path -> multipartUploadHelper.uploadInOneChunk(putObjectRequest, AsyncRequestBody.fromFile(path), - returnFuture)); + multipartUploadHelper.uploadInOneChunk(putObjectRequest, asyncRequestBody, returnFuture); } } catch (Throwable throwable) { @@ -94,9 +88,7 @@ public CompletableFuture uploadObject(PutObjectRequest putObj return returnFuture; } - private void uploadInParts(PutObjectRequest putObjectRequest, - long contentLength, - Either requestBodyOrPath, + private void uploadInParts(PutObjectRequest putObjectRequest, long contentLength, AsyncRequestBody asyncRequestBody, CompletableFuture returnFuture) { CompletableFuture createMultipartUploadFuture = @@ -107,13 +99,13 @@ private void uploadInParts(PutObjectRequest putObjectRequest, genericMultipartHelper.handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable); } else { log.debug(() -> "Initiated a new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); - doUploadInParts(Pair.of(putObjectRequest, requestBodyOrPath), contentLength, returnFuture, + doUploadInParts(Pair.of(putObjectRequest, asyncRequestBody), contentLength, returnFuture, createMultipartUploadResponse.uploadId()); } }); } - private void doUploadInParts(Pair> request, + private void doUploadInParts(Pair request, long contentLength, CompletableFuture returnFuture, String uploadId) { @@ -128,68 +120,23 @@ private void doUploadInParts(Pair String.format("Starting multipart upload with partCount: %d, optimalPartSize: %d", partCount, optimalPartSize)); - MpuRequestContext mpuRequestContext = new MpuRequestContext(request.left(), contentLength, optimalPartSize, uploadId); + MpuRequestContext mpuRequestContext = new MpuRequestContext(request, contentLength, optimalPartSize, uploadId); - Either requestBodyOrPath = request.right(); - - Consumer splitAsyncRequestBodyAndSendRequests = - splitAsyncRequestBodyAndSendRequests(returnFuture, mpuRequestContext); - Consumer splitFileAndSendRequests = - splitFileAndSendRequests(returnFuture, mpuRequestContext, partCount, optimalPartSize); - - requestBodyOrPath.apply(splitAsyncRequestBodyAndSendRequests, splitFileAndSendRequests); - } - - private Consumer splitFileAndSendRequests(CompletableFuture returnFuture, - MpuRequestContext mpuRequestContext, - int partCount, - long optimalPartSize) { - return path -> { - SimplePublisher simplePublisher = new SimplePublisher<>(); - simplePublisher.subscribe(new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, - returnFuture)); - - try { - for (int i = 0; i < partCount; i++) { - long numBytesToRead; - long position = optimalPartSize * i; - if (i == partCount - 1) { - numBytesToRead = mpuRequestContext.contentLength - position; - } else { - numBytesToRead = optimalPartSize; - } - simplePublisher.send(AsyncRequestBody.fromFile( - config -> config.path(path) - .chunkSizeInBytes(1024 * 1024) // TODO: perf test this - .numBytesToRead(numBytesToRead) - .position(position))); - } - - } catch (Throwable throwable) { - simplePublisher.error(throwable); - } - simplePublisher.complete(); - }; - } - - private Consumer splitAsyncRequestBodyAndSendRequests(CompletableFuture returnFuture, - MpuRequestContext mpuRequestContext) { - return requestBody -> { - requestBody.split(b -> b.chunkSizeInBytes(mpuRequestContext.partSize) - .bufferSizeInBytes(maxMemoryUsageInBytes)) - .subscribe(new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, - returnFuture)); - }; + request.right() + .split(b -> b.chunkSizeInBytes(mpuRequestContext.partSize) + .bufferSizeInBytes(maxMemoryUsageInBytes)) + .subscribe(new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, + returnFuture)); } private static final class MpuRequestContext { - private final PutObjectRequest request; + private final Pair request; private final long contentLength; private final long partSize; private final String uploadId; - private MpuRequestContext(PutObjectRequest request, + private MpuRequestContext(Pair request, long contentLength, long partSize, String uploadId) { @@ -231,7 +178,7 @@ private class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber long optimalPartSize = genericMultipartHelper.calculateOptimalPartSizeFor(mpuRequestContext.contentLength, partSizeInBytes); int partCount = genericMultipartHelper.determinePartCount(mpuRequestContext.contentLength, optimalPartSize); - this.putObjectRequest = mpuRequestContext.request; + this.putObjectRequest = mpuRequestContext.request.left(); this.returnFuture = returnFuture; this.completedParts = new AtomicReferenceArray<>(partCount); this.uploadId = mpuRequestContext.uploadId; @@ -249,7 +196,9 @@ public void onSubscribe(Subscription s) { returnFuture.whenComplete((r, t) -> { if (t != null) { s.cancel(); - multipartUploadHelper.cancelingOtherOngoingRequests(futures, t); + if (failureActionInitiated.compareAndSet(false, true)) { + multipartUploadHelper.failRequestsElegantly(futures, t, uploadId, returnFuture, putObjectRequest); + } } }); } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java index 600b5912782d..9758b77a9d84 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java @@ -25,13 +25,9 @@ import static org.mockito.Mockito.when; import static software.amazon.awssdk.services.s3.internal.multipart.MpuTestUtils.stubSuccessfulCompleteMultipartCall; -import com.google.common.jimfs.Jimfs; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.nio.file.FileSystem; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -42,6 +38,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; @@ -54,7 +51,6 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.exception.SdkClientException; -import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; @@ -67,44 +63,37 @@ import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.multipart.MultipartConfiguration; +import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.utils.CompletableFutureUtils; -import software.amazon.awssdk.utils.Either; public class UploadObjectHelperTest { - private static FileSystem jimfs; private static final String BUCKET = "bucket"; private static final String KEY = "key"; - private static final int PART_SIZE = 8 * 1024; + private static final long PART_SIZE = 8 * 1024; // Should contain four parts: [8KB, 8KB, 8KB, 1KB] - private static final int MPU_CONTENT_SIZE = 25 * 1024; - private static final int THRESHOLD = 10 * 1024; + private static final long MPU_CONTENT_SIZE = 25 * 1024; + private static final long THRESHOLD = 10 * 1024; private static final String UPLOAD_ID = "1234"; - private static Path testFile; - private static Path testDirectory; + private static RandomTempFile testFile; private UploadObjectHelper uploadHelper; private S3AsyncClient s3AsyncClient; @BeforeAll public static void beforeAll() throws IOException { - jimfs = Jimfs.newFileSystem(); - testDirectory = jimfs.getPath("test"); - Files.createDirectory(testDirectory); - testFile = Files.write(jimfs.getPath("test", "test.txt"), - RandomStringUtils.randomAscii(MPU_CONTENT_SIZE).getBytes(StandardCharsets.UTF_8)); + testFile = new RandomTempFile("testfile.dat", MPU_CONTENT_SIZE); } @AfterAll public static void afterAll() throws Exception { - jimfs.close(); + testFile.delete(); } - public static Stream> asyncRequestBodyOrPath() { - return Stream.of(Either.left(new UnknownContentLengthAsyncRequestBody(AsyncRequestBody.fromFile(testFile))), - Either.left(AsyncRequestBody.fromFile(testFile)), - Either.right(testFile)); + public static Stream asyncRequestBody() { + return Stream.of(new UnknownContentLengthAsyncRequestBody(AsyncRequestBody.fromFile(testFile)), + AsyncRequestBody.fromFile(testFile)); } @BeforeEach @@ -112,50 +101,28 @@ public void beforeEach() { s3AsyncClient = Mockito.mock(S3AsyncClient.class); uploadHelper = new UploadObjectHelper(s3AsyncClient, new MultipartConfigurationResolver(MultipartConfiguration.builder() - .minimumPartSizeInBytes((long) PART_SIZE) - .thresholdInBytes((long) THRESHOLD) - .thresholdInBytes((long) (PART_SIZE * 2)) + .minimumPartSizeInBytes(PART_SIZE) + .thresholdInBytes(THRESHOLD) + .thresholdInBytes(PART_SIZE * 2) .build())); } @ParameterizedTest - @ValueSource(ints = {THRESHOLD, PART_SIZE, THRESHOLD - 1, PART_SIZE - 1}) - void uploadWithAsyncRequestBody_contentLengthDoesNotExceedThresholdAndPartSize_shouldUploadInOneChunk(int contentLength) { + @ValueSource(longs = {THRESHOLD, PART_SIZE, THRESHOLD - 1, PART_SIZE - 1}) + void uploadObject_contentLengthDoesNotExceedThresholdAndPartSize_shouldUploadInOneChunk(long contentLength) { PutObjectRequest putObjectRequest = putObjectRequest(contentLength); AsyncRequestBody asyncRequestBody = Mockito.mock(AsyncRequestBody.class); CompletableFuture completedFuture = CompletableFuture.completedFuture(PutObjectResponse.builder().build()); when(s3AsyncClient.putObject(putObjectRequest, asyncRequestBody)).thenReturn(completedFuture); - uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody).join(); - verify(s3AsyncClient).putObject(putObjectRequest, asyncRequestBody); - } - - @ParameterizedTest - @ValueSource(ints = {THRESHOLD, PART_SIZE, THRESHOLD - 1, PART_SIZE - 1}) - void uploadWithPath_contentLengthDoesNotExceedThresholdAndPartSize_shouldUploadInOneChunk(int contentLength) throws IOException { - PutObjectRequest putObjectRequest = putObjectRequest(contentLength); - - CompletableFuture completedFuture = - CompletableFuture.completedFuture(PutObjectResponse.builder().build()); - - Path file = Files.write(jimfs.getPath("test", "bar.txt"), - RandomStringUtils.randomAscii(contentLength).getBytes(StandardCharsets.UTF_8)); - - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(AsyncRequestBody.class); - - when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(FileAsyncRequestBody.class))).thenReturn(completedFuture); - - uploadHelper.uploadWithFile(putObjectRequest, file).join(); - - verify(s3AsyncClient).putObject(any(PutObjectRequest.class), requestCaptor.capture()); - AsyncRequestBody actual = requestCaptor.getValue(); - assertThat(actual.contentLength()).hasValue(Long.valueOf(contentLength)); + uploadHelper.uploadObject(putObjectRequest, asyncRequestBody).join(); + Mockito.verify(s3AsyncClient).putObject(putObjectRequest, asyncRequestBody); } @ParameterizedTest - @ValueSource(ints = {PART_SIZE, PART_SIZE - 1}) - void uploadWithAsyncRequestBody_unKnownContentLengthDoesNotExceedPartSize_shouldUploadInOneChunk(int contentLength) { + @ValueSource(longs = {PART_SIZE, PART_SIZE - 1}) + void uploadObject_unKnownContentLengthDoesNotExceedPartSize_shouldUploadInOneChunk(long contentLength) { PutObjectRequest putObjectRequest = putObjectRequest(contentLength); AsyncRequestBody asyncRequestBody = new UnknownContentLengthAsyncRequestBody(AsyncRequestBody.fromBytes(RandomStringUtils.randomAscii(Math.toIntExact(contentLength)) @@ -164,24 +131,20 @@ void uploadWithAsyncRequestBody_unKnownContentLengthDoesNotExceedPartSize_should CompletableFuture completedFuture = CompletableFuture.completedFuture(PutObjectResponse.builder().build()); when(s3AsyncClient.putObject(putObjectRequest, asyncRequestBody)).thenReturn(completedFuture); - uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody).join(); + uploadHelper.uploadObject(putObjectRequest, asyncRequestBody).join(); Mockito.verify(s3AsyncClient).putObject(putObjectRequest, asyncRequestBody); } @ParameterizedTest - @MethodSource("asyncRequestBodyOrPath") - void upload_contentLengthExceedThresholdAndPartSize_shouldUseMPU(Either asyncRequestBodyOrPath) { + @MethodSource("asyncRequestBody") + void uploadObject_contentLengthExceedThresholdAndPartSize_shouldUseMPU(AsyncRequestBody asyncRequestBody) { PutObjectRequest putObjectRequest = putObjectRequest(null); MpuTestUtils.stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient); stubSuccessfulUploadPartCalls(); stubSuccessfulCompleteMultipartCall(BUCKET, KEY, s3AsyncClient); - asyncRequestBodyOrPath.apply( - asyncRequestBody -> uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody).join(), - path -> uploadHelper.uploadWithFile(putObjectRequest, path).join() - ); - + uploadHelper.uploadObject(putObjectRequest, asyncRequestBody).join(); ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(UploadPartRequest.class); ArgumentCaptor requestBodyArgumentCaptor = ArgumentCaptor.forClass(AsyncRequestBody.class); verify(s3AsyncClient, times(4)).uploadPart(requestArgumentCaptor.capture(), @@ -202,7 +165,7 @@ void upload_contentLengthExceedThresholdAndPartSize_shouldUseMPU(Either asyncRequestBodyOrPath) { + @MethodSource("asyncRequestBody") + void mpu_onePartFailed_shouldFailOtherPartsAndAbort(AsyncRequestBody asyncRequestBody) { PutObjectRequest putObjectRequest = putObjectRequest(MPU_CONTENT_SIZE); MpuTestUtils.stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient); @@ -228,12 +191,11 @@ void mpu_onePartFailed_shouldFailOtherPartsAndAbort(Either future = asyncRequestBodyOrPath.map( - asyncRequestBody -> uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody), - path -> uploadHelper.uploadWithFile(putObjectRequest, path) - ); + CompletableFuture future = uploadHelper.uploadObject(putObjectRequest, + asyncRequestBody); - assertThatThrownBy(future::join).hasMessageContaining("Failed to send multipart upload requests").hasRootCause(exception); + assertThatThrownBy(() -> future.get(100, TimeUnit.MILLISECONDS)) + .hasMessageContaining("Failed to send multipart upload requests").hasRootCause(exception); verify(s3AsyncClient, never()).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); @@ -243,10 +205,10 @@ void mpu_onePartFailed_shouldFailOtherPartsAndAbort(Either future = - uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, AsyncRequestBody.fromFile(testFile)); + uploadHelper.uploadObject(putObjectRequest, AsyncRequestBody.fromFile(testFile)); future.cancel(true); @@ -285,16 +247,24 @@ void upload_knownContentLengthCancelResponseFuture_shouldCancelUploadPart() { any(AsyncRequestBody.class))).thenReturn(ongoingRequest); CompletableFuture future = - uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, AsyncRequestBody.fromFile(testFile)); + uploadHelper.uploadObject(putObjectRequest, AsyncRequestBody.fromFile(testFile)); + + when(s3AsyncClient.abortMultipartUpload(any(AbortMultipartUploadRequest.class))) + .thenReturn(CompletableFuture.completedFuture(AbortMultipartUploadResponse.builder().build())); future.cancel(true); - assertThat(ongoingRequest).isCancelled(); + try { + ongoingRequest.join(); + fail("no exception"); + } catch (Exception exception) { + assertThat(ongoingRequest).isCancelled(); + } } @ParameterizedTest - @MethodSource("asyncRequestBodyOrPath") - void uploadWithAsyncRequestBody_createMultipartUploadFailed_shouldFail(Either asyncRequestBodyOrPath) { + @MethodSource("asyncRequestBody") + void uploadObject_createMultipartUploadFailed_shouldFail(AsyncRequestBody asyncRequestBody) { PutObjectRequest putObjectRequest = putObjectRequest(null); SdkClientException exception = SdkClientException.create("CompleteMultipartUpload failed"); @@ -305,17 +275,15 @@ void uploadWithAsyncRequestBody_createMultipartUploadFailed_shouldFail(Either future = asyncRequestBodyOrPath.map( - asyncRequestBody -> uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody), - path -> uploadHelper.uploadWithFile(putObjectRequest, path) - ); + CompletableFuture future = uploadHelper.uploadObject(putObjectRequest, + asyncRequestBody); assertThatThrownBy(future::join).hasMessageContaining("Failed to initiate multipart upload") .hasRootCause(exception); } @ParameterizedTest - @MethodSource("asyncRequestBodyOrPath") - void uploadWithAsyncRequestBody_completeMultipartFailed_shouldFailAndAbort(Either asyncRequestBodyOrPath) { + @MethodSource("asyncRequestBody") + void uploadObject_completeMultipartFailed_shouldFailAndAbort(AsyncRequestBody asyncRequestBody) { PutObjectRequest putObjectRequest = putObjectRequest(null); MpuTestUtils.stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient); @@ -332,21 +300,19 @@ void uploadWithAsyncRequestBody_completeMultipartFailed_shouldFailAndAbort(Eithe when(s3AsyncClient.abortMultipartUpload(any(AbortMultipartUploadRequest.class))) .thenReturn(CompletableFuture.completedFuture(AbortMultipartUploadResponse.builder().build())); - CompletableFuture future = asyncRequestBodyOrPath.map( - asyncRequestBody -> uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, asyncRequestBody), - path -> uploadHelper.uploadWithFile(putObjectRequest, path) - ); + CompletableFuture future = uploadHelper.uploadObject(putObjectRequest, + asyncRequestBody); assertThatThrownBy(future::join).hasMessageContaining("Failed to send multipart requests") .hasRootCause(exception); } @ParameterizedTest() @ValueSource(booleans = {false, true}) - void uploadWithAsyncRequestBody_requestBodyOnError_shouldFailAndAbort(boolean contentLengthKnown) { + void uploadObject_requestBodyOnError_shouldFailAndAbort(boolean contentLengthKnown) { PutObjectRequest putObjectRequest = putObjectRequest(null); Exception exception = new RuntimeException("error"); - Long contentLength = contentLengthKnown ? Long.valueOf(MPU_CONTENT_SIZE) : null; + Long contentLength = contentLengthKnown ? MPU_CONTENT_SIZE : null; ErroneousAsyncRequestBody erroneousAsyncRequestBody = new ErroneousAsyncRequestBody(contentLength, exception); MpuTestUtils.stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient); @@ -355,17 +321,17 @@ void uploadWithAsyncRequestBody_requestBodyOnError_shouldFailAndAbort(boolean co when(s3AsyncClient.abortMultipartUpload(any(AbortMultipartUploadRequest.class))) .thenReturn(CompletableFuture.completedFuture(AbortMultipartUploadResponse.builder().build())); - CompletableFuture future = uploadHelper.uploadWithAsyncRequestBody(putObjectRequest, - erroneousAsyncRequestBody); + CompletableFuture future = uploadHelper.uploadObject(putObjectRequest, + erroneousAsyncRequestBody); assertThatThrownBy(future::join).hasMessageContaining("Failed to send multipart upload requests") .hasRootCause(exception); } - private static PutObjectRequest putObjectRequest(Integer contentLength) { + private static PutObjectRequest putObjectRequest(Long contentLength) { return PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) - .contentLength(contentLength == null ? null : Long.valueOf(contentLength)) + .contentLength(contentLength) .build(); } From 50b663431c7054406c1063d14c9b279913a35f74 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Fri, 25 Aug 2023 09:43:34 -0700 Subject: [PATCH 3/5] Address feedback and fix the issue where doAfterOnComplete is invoked --- .../FileAsyncRequestBodySplitHelper.java | 61 +++++++++++++------ .../src/main/resources/log4j2.properties | 3 + 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java index 8ec98007fd40..fdd91087f990 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -27,6 +27,7 @@ import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.async.SimplePublisher; @@ -97,11 +98,7 @@ private void sendAsyncRequestBody(SimplePublisher simplePublis } private void doSendAsyncRequestBody(SimplePublisher simplePublisher) { - while (true) { - if (!shouldSendMore()) { - break; - } - + while (shouldSendMore()) { AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(simplePublisher); simplePublisher.send(currentAsyncRequestBody); numAsyncRequestBodiesInFlight.incrementAndGet(); @@ -115,9 +112,18 @@ private void checkCompletion(SimplePublisher simplePublisher, if (remaining == 0) { isDone = true; simplePublisher.complete(); + } else if (remaining < 0) { + isDone = true; + simplePublisher.error(SdkClientException.create( + "Unexpected error occurred. Remaining data is negative: " + remaining)); } } + private void startNextRequestBody(SimplePublisher simplePublisher) { + numAsyncRequestBodiesInFlight.decrementAndGet(); + sendAsyncRequestBody(simplePublisher); + } + private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher simplePublisher) { long position = chunkSize * chunkIndex.getAndIncrement(); long numBytesToReadForThisChunk = Math.min(totalContentLength - position, chunkSize); @@ -126,21 +132,7 @@ private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher s) { - fileAsyncRequestBody.doAfterOnComplete(() -> { - numAsyncRequestBodiesInFlight.decrementAndGet(); - sendAsyncRequestBody(simplePublisher); - }).subscribe(s); - } - - @Override - public Optional contentLength() { - return fileAsyncRequestBody.contentLength(); - } - }; + return new FileAsyncRequestBodyWrapper(fileAsyncRequestBody, simplePublisher); } /** @@ -159,4 +151,33 @@ private boolean shouldSendMore() { AtomicInteger numAsyncRequestBodiesInFlight() { return numAsyncRequestBodiesInFlight; } + + private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody { + + private final FileAsyncRequestBody fileAsyncRequestBody; + private final SimplePublisher simplePublisher; + + FileAsyncRequestBodyWrapper(FileAsyncRequestBody fileAsyncRequestBody, + SimplePublisher simplePublisher) { + this.fileAsyncRequestBody = fileAsyncRequestBody; + this.simplePublisher = simplePublisher; + } + + @Override + public void subscribe(Subscriber s) { + fileAsyncRequestBody.doAfterOnComplete(() -> startNextRequestBody(simplePublisher)) + // The reason we still need to call startNextRequestBody when the subscription is + // cancelled is that upstream could cancel the subscription even though the stream has + // finished successfully before onComplete. If this happens, doAfterOnComplete callback + // will never be invoked, and if the current buffer is full, the publisher will stop + // sending new FileAsyncRequestBody, leading to uncompleted future. + .doAfterOnCancel(() -> startNextRequestBody(simplePublisher)) + .subscribe(s); + } + + @Override + public Optional contentLength() { + return fileAsyncRequestBody.contentLength(); + } + } } diff --git a/test/s3-benchmarks/src/main/resources/log4j2.properties b/test/s3-benchmarks/src/main/resources/log4j2.properties index 58a399c44f10..e4d18ecc6eac 100644 --- a/test/s3-benchmarks/src/main/resources/log4j2.properties +++ b/test/s3-benchmarks/src/main/resources/log4j2.properties @@ -43,3 +43,6 @@ rootLogger.appenderRef.file.ref = FileAppender # #logger.netty.name = io.netty.handler.logging #logger.netty.level = debug + +#logger.s3mpu.name = software.amazon.awssdk.services.s3.internal.multipart +#logger.s3mpu.level = debug \ No newline at end of file From 657a76959af54f45a42d31e1156a3faac6155c92 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Fri, 25 Aug 2023 15:32:41 -0700 Subject: [PATCH 4/5] address feedback --- .../FileAsyncRequestBodySplitHelper.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java index fdd91087f990..2cbf933f9d76 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -86,15 +86,17 @@ public SdkPublisher split() { } private void sendAsyncRequestBody(SimplePublisher simplePublisher) { - if (!isSendingRequestBody.compareAndSet(false, true)) { - return; - } - - try { - doSendAsyncRequestBody(simplePublisher); - } finally { - isSendingRequestBody.set(false); - } + do { + if (!isSendingRequestBody.compareAndSet(false, true)) { + return; + } + + try { + doSendAsyncRequestBody(simplePublisher); + } finally { + isSendingRequestBody.set(false); + } + } while (shouldSendMore()); } private void doSendAsyncRequestBody(SimplePublisher simplePublisher) { From 03bd5ab1a60236b4458a03f7261044a1270f9599 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Mon, 28 Aug 2023 11:52:33 -0700 Subject: [PATCH 5/5] Fix issue reported by sonarcloud --- .../core/internal/async/FileAsyncRequestBodySplitHelper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java index 2cbf933f9d76..4b0acfbd81f2 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -145,7 +145,7 @@ private boolean shouldSendMore() { return false; } - long currentUsedBuffer = numAsyncRequestBodiesInFlight.get() * bufferPerAsyncRequestBody; + long currentUsedBuffer = (long) numAsyncRequestBodiesInFlight.get() * bufferPerAsyncRequestBody; return currentUsedBuffer + bufferPerAsyncRequestBody <= totalBufferSize; }