diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java index 3c6adb8fdbac..4c7d70ab7553 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import software.amazon.awssdk.annotations.SdkPublicApi; @@ -405,34 +406,36 @@ static AsyncRequestBody empty() { /** * Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific - * portion of the original data, based on the configured {code chunkSizeInBytes}. + * portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size + * is 2MB and the default buffer size is 8MB. * *

* If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the * subscriber right after it's initialized. *

- * // TODO: API Surface Area review: should we make this behavior configurable? * If content length is null, it is sent after the entire content for that chunk is buffered. * In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}. * - * @param chunkSizeInBytes the size for each divided chunk. The last chunk may be smaller than the configured size. - * @param maxMemoryUsageInBytes the max memory the SDK will use to buffer the content - * @return SplitAsyncRequestBodyResult + * @see AsyncRequestBodySplitConfiguration */ - default SdkPublisher split(long chunkSizeInBytes, long maxMemoryUsageInBytes) { - Validate.isPositive(chunkSizeInBytes, "chunkSizeInBytes"); - Validate.isPositive(maxMemoryUsageInBytes, "maxMemoryUsageInBytes"); - - if (!contentLength().isPresent()) { - Validate.isTrue(maxMemoryUsageInBytes >= chunkSizeInBytes, - "maxMemoryUsageInBytes must be larger than or equal to " + - "chunkSizeInBytes if the content length is unknown"); - } + default SdkPublisher split(AsyncRequestBodySplitConfiguration splitConfiguration) { + Validate.notNull(splitConfiguration, "splitConfiguration"); return SplittingPublisher.builder() .asyncRequestBody(this) - .chunkSizeInBytes(chunkSizeInBytes) - .maxMemoryUsageInBytes(maxMemoryUsageInBytes) + .chunkSizeInBytes(splitConfiguration.chunkSizeInBytes()) + .bufferSizeInBytes(splitConfiguration.bufferSizeInBytes()) .build(); } + + /** + * This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder, + * avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}. + * + * @see #split(AsyncRequestBodySplitConfiguration) + */ + default SdkPublisher split(Consumer splitConfiguration) { + Validate.notNull(splitConfiguration, "splitConfiguration"); + return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build()); + } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java new file mode 100644 index 000000000000..fe51f33b4ff3 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java @@ -0,0 +1,141 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async; + +import java.util.Objects; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.builder.CopyableBuilder; +import software.amazon.awssdk.utils.builder.ToCopyableBuilder; + +/** + * Configuration options for {@link AsyncRequestBody#split} to configure how the SDK + * should split an {@link SdkPublisher}. + */ +@SdkPublicApi +public final class AsyncRequestBodySplitConfiguration implements ToCopyableBuilder { + private final Long chunkSizeInBytes; + private final Long bufferSizeInBytes; + + private AsyncRequestBodySplitConfiguration(DefaultBuilder builder) { + this.chunkSizeInBytes = Validate.isPositiveOrNull(builder.chunkSizeInBytes, "chunkSizeInBytes"); + this.bufferSizeInBytes = Validate.isPositiveOrNull(builder.bufferSizeInBytes, "bufferSizeInBytes"); + } + + /** + * The configured chunk size for each divided {@link AsyncRequestBody}. + */ + public Long chunkSizeInBytes() { + return chunkSizeInBytes; + } + + /** + * The configured maximum buffer size the SDK will use to buffer the content from the source {@link SdkPublisher}. + */ + public Long bufferSizeInBytes() { + return bufferSizeInBytes; + } + + /** + * Create a {@link Builder}, used to create a {@link AsyncRequestBodySplitConfiguration}. + */ + public static Builder builder() { + return new DefaultBuilder(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AsyncRequestBodySplitConfiguration that = (AsyncRequestBodySplitConfiguration) o; + + if (!Objects.equals(chunkSizeInBytes, that.chunkSizeInBytes)) { + return false; + } + return Objects.equals(bufferSizeInBytes, that.bufferSizeInBytes); + } + + @Override + public int hashCode() { + int result = chunkSizeInBytes != null ? chunkSizeInBytes.hashCode() : 0; + result = 31 * result + (bufferSizeInBytes != null ? bufferSizeInBytes.hashCode() : 0); + return result; + } + + @Override + public AsyncRequestBodySplitConfiguration.Builder toBuilder() { + return new DefaultBuilder(this); + } + + public interface Builder extends CopyableBuilder { + + /** + * Configures the size for each divided chunk. The last chunk may be smaller than the configured size. The default value + * is 2MB. + * + * @param chunkSizeInBytes the chunk size in bytes + * @return This object for method chaining. + */ + Builder chunkSizeInBytes(Long chunkSizeInBytes); + + /** + * The maximum buffer size the SDK will use to buffer the content from the source {@link SdkPublisher}. The default value + * is 8MB. + * + * @param bufferSizeInBytes the buffer size in bytes + * @return This object for method chaining. + */ + Builder bufferSizeInBytes(Long bufferSizeInBytes); + } + + private static final class DefaultBuilder implements Builder { + private Long chunkSizeInBytes; + private Long bufferSizeInBytes; + + private DefaultBuilder(AsyncRequestBodySplitConfiguration asyncRequestBodySplitConfiguration) { + this.chunkSizeInBytes = asyncRequestBodySplitConfiguration.chunkSizeInBytes; + this.bufferSizeInBytes = asyncRequestBodySplitConfiguration.bufferSizeInBytes; + } + + private DefaultBuilder() { + + } + + @Override + public Builder chunkSizeInBytes(Long chunkSizeInBytes) { + this.chunkSizeInBytes = chunkSizeInBytes; + return this; + } + + @Override + public Builder bufferSizeInBytes(Long bufferSizeInBytes) { + this.bufferSizeInBytes = bufferSizeInBytes; + return this; + } + + @Override + public AsyncRequestBodySplitConfiguration build() { + return new AsyncRequestBodySplitConfiguration(this); + } + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index e18f9944a09e..43f2e10ff192 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -39,17 +39,25 @@ @SdkInternalApi public class SplittingPublisher implements SdkPublisher { private static final Logger log = Logger.loggerFor(SplittingPublisher.class); + private static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024L; + private static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4; private final AsyncRequestBody upstreamPublisher; private final SplittingSubscriber splittingSubscriber; private final SimplePublisher downstreamPublisher = new SimplePublisher<>(); private final long chunkSizeInBytes; - private final long maxMemoryUsageInBytes; + private final long bufferSizeInBytes; private SplittingPublisher(Builder builder) { this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody"); - this.chunkSizeInBytes = Validate.isPositive(builder.chunkSizeInBytes, "chunkSizeInBytes"); + this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes; + this.bufferSizeInBytes = builder.bufferSizeInBytes == null ? DEFAULT_BUFFER_SIZE : builder.bufferSizeInBytes; this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null)); - this.maxMemoryUsageInBytes = Validate.isPositive(builder.maxMemoryUsageInBytes, "maxMemoryUsageInBytes"); + + if (!upstreamPublisher.contentLength().isPresent()) { + Validate.isTrue(bufferSizeInBytes >= chunkSizeInBytes, + "bufferSizeInBytes must be larger than or equal to " + + "chunkSizeInBytes if the content length is unknown"); + } } public static Builder builder() { @@ -213,7 +221,7 @@ private void maybeRequestMoreUpstreamData() { } private boolean shouldRequestMoreData(long buffered) { - return buffered == 0 || buffered + byteBufferSizeHint <= maxMemoryUsageInBytes; + return buffered == 0 || buffered + byteBufferSizeHint <= bufferSizeInBytes; } private Long totalDataRemaining() { @@ -289,42 +297,20 @@ private void addDataBuffered(int length) { public static final class Builder { private AsyncRequestBody asyncRequestBody; private Long chunkSizeInBytes; - private Long maxMemoryUsageInBytes; + private Long bufferSizeInBytes; - /** - * Configures the asyncRequestBody to split - * - * @param asyncRequestBody The new asyncRequestBody value. - * @return This object for method chaining. - */ public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) { this.asyncRequestBody = asyncRequestBody; return this; } - /** - * Configures the size of the chunk for each {@link AsyncRequestBody} to publish - * - * @param chunkSizeInBytes The new chunkSizeInBytes value. - * @return This object for method chaining. - */ - public Builder chunkSizeInBytes(long chunkSizeInBytes) { + public Builder chunkSizeInBytes(Long chunkSizeInBytes) { this.chunkSizeInBytes = chunkSizeInBytes; return this; } - /** - * Sets the maximum memory usage in bytes. - * - * @param maxMemoryUsageInBytes The new maxMemoryUsageInBytes value. - * @return This object for method chaining. - */ - // TODO: max memory usage might not be the best name, since we may technically go a little above this limit when we add - // on a new byte buffer. But we don't know for sure what the size of a buffer we request will be (we do use the size - // for the last byte buffer as a hint), so I don't think we can have a truly accurate max. Maybe we call it minimum - // buffer size instead? - public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) { - this.maxMemoryUsageInBytes = maxMemoryUsageInBytes; + public Builder bufferSizeInBytes(Long bufferSizeInBytes) { + this.bufferSizeInBytes = bufferSizeInBytes; return this; } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyConfigurationTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyConfigurationTest.java new file mode 100644 index 000000000000..8b8f78f2b5e9 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyConfigurationTest.java @@ -0,0 +1,58 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class AsyncRequestBodyConfigurationTest { + + @Test + void equalsHashCode() { + EqualsVerifier.forClass(AsyncRequestBodySplitConfiguration.class) + .verify(); + } + + @ParameterizedTest + @ValueSource(longs = {0, -1}) + void nonPositiveValue_shouldThrowException(long size) { + assertThatThrownBy(() -> + AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(size) + .build()) + .hasMessageContaining("must be positive"); + assertThatThrownBy(() -> + AsyncRequestBodySplitConfiguration.builder() + .bufferSizeInBytes(size) + .build()) + .hasMessageContaining("must be positive"); + } + + @Test + void toBuilder_shouldCopyAllFields() { + AsyncRequestBodySplitConfiguration config = AsyncRequestBodySplitConfiguration.builder() + .bufferSizeInBytes(1L) + .chunkSizeInBytes(2L) + .build(); + + assertThat(config.toBuilder().build()).isEqualTo(config); + } +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java index 4d4bb42e06e0..cdd87822e3d4 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java @@ -356,25 +356,4 @@ void publisherConstructorHasCorrectContentType() { AsyncRequestBody requestBody = AsyncRequestBody.fromPublisher(bodyPublisher); assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType()); } - - @Test - public void split_nonPositiveInput_shouldThrowException() { - AsyncRequestBody body = AsyncRequestBody.fromString("test"); - assertThatThrownBy(() -> body.split(0, 4)).hasMessageContaining("must be positive"); - assertThatThrownBy(() -> body.split(-1, 4)).hasMessageContaining("must be positive"); - assertThatThrownBy(() -> body.split(5, 0)).hasMessageContaining("must be positive"); - assertThatThrownBy(() -> body.split(5, -1)).hasMessageContaining("must be positive"); - } - - @Test - public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() { - AsyncRequestBody body = AsyncRequestBody.fromPublisher(new Publisher() { - @Override - public void subscribe(Subscriber s) { - - } - }); - assertThatThrownBy(() -> body.split(10, 4)) - .hasMessageContaining("must be larger than or equal"); - } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 368c403dbf88..0966ea6eb76f 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.core.internal.async; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; @@ -39,6 +40,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; @@ -66,6 +68,18 @@ public static void afterAll() throws Exception { testFile.delete(); } + @Test + public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() { + AsyncRequestBody body = AsyncRequestBody.fromPublisher(s -> { + }); + assertThatThrownBy(() -> SplittingPublisher.builder() + .asyncRequestBody(body) + .chunkSizeInBytes(10L) + .bufferSizeInBytes(5L) + .build()) + .hasMessageContaining("must be larger than or equal"); + } + @ParameterizedTest @ValueSource(ints = {CHUNK_SIZE, CHUNK_SIZE * 2 - 1, CHUNK_SIZE * 2}) void differentChunkSize_shouldSplitAsyncRequestBodyCorrectly(int chunkSize) throws Exception { @@ -94,8 +108,8 @@ public Optional contentLength() { }; SplittingPublisher splittingPublisher = SplittingPublisher.builder() .asyncRequestBody(asyncRequestBody) - .chunkSizeInBytes(CHUNK_SIZE) - .maxMemoryUsageInBytes(10L) + .chunkSizeInBytes((long) CHUNK_SIZE) + .bufferSizeInBytes(10L) .build(); @@ -136,8 +150,8 @@ public Optional contentLength() { private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception { SplittingPublisher splittingPublisher = SplittingPublisher.builder() .asyncRequestBody(asyncRequestBody) - .chunkSizeInBytes(chunkSize) - .maxMemoryUsageInBytes((long) chunkSize * 4) + .chunkSizeInBytes((long) chunkSize) + .bufferSizeInBytes((long) chunkSize * 4) .build(); List> futures = new ArrayList<>(); diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java index e8bef01ab81b..5e1a41da4d86 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java @@ -119,7 +119,8 @@ private void doUploadInParts(Pair request, MpuRequestContext mpuRequestContext = new MpuRequestContext(request, contentLength, optimalPartSize, uploadId); request.right() - .split(mpuRequestContext.partSize, maxMemoryUsageInBytes) + .split(b -> b.chunkSizeInBytes(mpuRequestContext.partSize) + .bufferSizeInBytes(maxMemoryUsageInBytes)) .subscribe(new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, returnFuture)); } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java index d2034b4b4e94..fa8be1e0c6f3 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java @@ -74,8 +74,8 @@ public CompletableFuture uploadObject(PutObjectRequest putObj CompletableFuture returnFuture = new CompletableFuture<>(); SdkPublisher splitAsyncRequestBodyResponse = - asyncRequestBody.split(partSizeInBytes, - maxMemoryUsageInBytes); + asyncRequestBody.split(b -> b.chunkSizeInBytes(partSizeInBytes) + .bufferSizeInBytes(maxMemoryUsageInBytes)); splitAsyncRequestBodyResponse.subscribe(new UnknownContentLengthAsyncRequestBodySubscriber(partSizeInBytes, putObjectRequest,