diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java
index 3c6adb8fdbac..4c7d70ab7553 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java
@@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
@@ -405,34 +406,36 @@ static AsyncRequestBody empty() {
/**
* Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
- * portion of the original data, based on the configured {code chunkSizeInBytes}.
+ * portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size
+ * is 2MB and the default buffer size is 8MB.
*
*
* If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the
* subscriber right after it's initialized.
*
- * // TODO: API Surface Area review: should we make this behavior configurable?
* If content length is null, it is sent after the entire content for that chunk is buffered.
* In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}.
*
- * @param chunkSizeInBytes the size for each divided chunk. The last chunk may be smaller than the configured size.
- * @param maxMemoryUsageInBytes the max memory the SDK will use to buffer the content
- * @return SplitAsyncRequestBodyResult
+ * @see AsyncRequestBodySplitConfiguration
*/
- default SdkPublisher split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
- Validate.isPositive(chunkSizeInBytes, "chunkSizeInBytes");
- Validate.isPositive(maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
-
- if (!contentLength().isPresent()) {
- Validate.isTrue(maxMemoryUsageInBytes >= chunkSizeInBytes,
- "maxMemoryUsageInBytes must be larger than or equal to " +
- "chunkSizeInBytes if the content length is unknown");
- }
+ default SdkPublisher split(AsyncRequestBodySplitConfiguration splitConfiguration) {
+ Validate.notNull(splitConfiguration, "splitConfiguration");
return SplittingPublisher.builder()
.asyncRequestBody(this)
- .chunkSizeInBytes(chunkSizeInBytes)
- .maxMemoryUsageInBytes(maxMemoryUsageInBytes)
+ .chunkSizeInBytes(splitConfiguration.chunkSizeInBytes())
+ .bufferSizeInBytes(splitConfiguration.bufferSizeInBytes())
.build();
}
+
+ /**
+ * This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
+ * avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
+ *
+ * @see #split(AsyncRequestBodySplitConfiguration)
+ */
+ default SdkPublisher split(Consumer splitConfiguration) {
+ Validate.notNull(splitConfiguration, "splitConfiguration");
+ return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
+ }
}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java
new file mode 100644
index 000000000000..fe51f33b4ff3
--- /dev/null
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.core.async;
+
+import java.util.Objects;
+import software.amazon.awssdk.annotations.SdkPublicApi;
+import software.amazon.awssdk.utils.Validate;
+import software.amazon.awssdk.utils.builder.CopyableBuilder;
+import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
+
+/**
+ * Configuration options for {@link AsyncRequestBody#split} to configure how the SDK
+ * should split an {@link SdkPublisher}.
+ */
+@SdkPublicApi
+public final class AsyncRequestBodySplitConfiguration implements ToCopyableBuilder {
+ private final Long chunkSizeInBytes;
+ private final Long bufferSizeInBytes;
+
+ private AsyncRequestBodySplitConfiguration(DefaultBuilder builder) {
+ this.chunkSizeInBytes = Validate.isPositiveOrNull(builder.chunkSizeInBytes, "chunkSizeInBytes");
+ this.bufferSizeInBytes = Validate.isPositiveOrNull(builder.bufferSizeInBytes, "bufferSizeInBytes");
+ }
+
+ /**
+ * The configured chunk size for each divided {@link AsyncRequestBody}.
+ */
+ public Long chunkSizeInBytes() {
+ return chunkSizeInBytes;
+ }
+
+ /**
+ * The configured maximum buffer size the SDK will use to buffer the content from the source {@link SdkPublisher}.
+ */
+ public Long bufferSizeInBytes() {
+ return bufferSizeInBytes;
+ }
+
+ /**
+ * Create a {@link Builder}, used to create a {@link AsyncRequestBodySplitConfiguration}.
+ */
+ public static Builder builder() {
+ return new DefaultBuilder();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AsyncRequestBodySplitConfiguration that = (AsyncRequestBodySplitConfiguration) o;
+
+ if (!Objects.equals(chunkSizeInBytes, that.chunkSizeInBytes)) {
+ return false;
+ }
+ return Objects.equals(bufferSizeInBytes, that.bufferSizeInBytes);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = chunkSizeInBytes != null ? chunkSizeInBytes.hashCode() : 0;
+ result = 31 * result + (bufferSizeInBytes != null ? bufferSizeInBytes.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public AsyncRequestBodySplitConfiguration.Builder toBuilder() {
+ return new DefaultBuilder(this);
+ }
+
+ public interface Builder extends CopyableBuilder {
+
+ /**
+ * Configures the size for each divided chunk. The last chunk may be smaller than the configured size. The default value
+ * is 2MB.
+ *
+ * @param chunkSizeInBytes the chunk size in bytes
+ * @return This object for method chaining.
+ */
+ Builder chunkSizeInBytes(Long chunkSizeInBytes);
+
+ /**
+ * The maximum buffer size the SDK will use to buffer the content from the source {@link SdkPublisher}. The default value
+ * is 8MB.
+ *
+ * @param bufferSizeInBytes the buffer size in bytes
+ * @return This object for method chaining.
+ */
+ Builder bufferSizeInBytes(Long bufferSizeInBytes);
+ }
+
+ private static final class DefaultBuilder implements Builder {
+ private Long chunkSizeInBytes;
+ private Long bufferSizeInBytes;
+
+ private DefaultBuilder(AsyncRequestBodySplitConfiguration asyncRequestBodySplitConfiguration) {
+ this.chunkSizeInBytes = asyncRequestBodySplitConfiguration.chunkSizeInBytes;
+ this.bufferSizeInBytes = asyncRequestBodySplitConfiguration.bufferSizeInBytes;
+ }
+
+ private DefaultBuilder() {
+
+ }
+
+ @Override
+ public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
+ this.chunkSizeInBytes = chunkSizeInBytes;
+ return this;
+ }
+
+ @Override
+ public Builder bufferSizeInBytes(Long bufferSizeInBytes) {
+ this.bufferSizeInBytes = bufferSizeInBytes;
+ return this;
+ }
+
+ @Override
+ public AsyncRequestBodySplitConfiguration build() {
+ return new AsyncRequestBodySplitConfiguration(this);
+ }
+ }
+}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java
index e18f9944a09e..43f2e10ff192 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java
@@ -39,17 +39,25 @@
@SdkInternalApi
public class SplittingPublisher implements SdkPublisher {
private static final Logger log = Logger.loggerFor(SplittingPublisher.class);
+ private static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024L;
+ private static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4;
private final AsyncRequestBody upstreamPublisher;
private final SplittingSubscriber splittingSubscriber;
private final SimplePublisher downstreamPublisher = new SimplePublisher<>();
private final long chunkSizeInBytes;
- private final long maxMemoryUsageInBytes;
+ private final long bufferSizeInBytes;
private SplittingPublisher(Builder builder) {
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
- this.chunkSizeInBytes = Validate.isPositive(builder.chunkSizeInBytes, "chunkSizeInBytes");
+ this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
+ this.bufferSizeInBytes = builder.bufferSizeInBytes == null ? DEFAULT_BUFFER_SIZE : builder.bufferSizeInBytes;
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
- this.maxMemoryUsageInBytes = Validate.isPositive(builder.maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
+
+ if (!upstreamPublisher.contentLength().isPresent()) {
+ Validate.isTrue(bufferSizeInBytes >= chunkSizeInBytes,
+ "bufferSizeInBytes must be larger than or equal to " +
+ "chunkSizeInBytes if the content length is unknown");
+ }
}
public static Builder builder() {
@@ -213,7 +221,7 @@ private void maybeRequestMoreUpstreamData() {
}
private boolean shouldRequestMoreData(long buffered) {
- return buffered == 0 || buffered + byteBufferSizeHint <= maxMemoryUsageInBytes;
+ return buffered == 0 || buffered + byteBufferSizeHint <= bufferSizeInBytes;
}
private Long totalDataRemaining() {
@@ -289,42 +297,20 @@ private void addDataBuffered(int length) {
public static final class Builder {
private AsyncRequestBody asyncRequestBody;
private Long chunkSizeInBytes;
- private Long maxMemoryUsageInBytes;
+ private Long bufferSizeInBytes;
- /**
- * Configures the asyncRequestBody to split
- *
- * @param asyncRequestBody The new asyncRequestBody value.
- * @return This object for method chaining.
- */
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
this.asyncRequestBody = asyncRequestBody;
return this;
}
- /**
- * Configures the size of the chunk for each {@link AsyncRequestBody} to publish
- *
- * @param chunkSizeInBytes The new chunkSizeInBytes value.
- * @return This object for method chaining.
- */
- public Builder chunkSizeInBytes(long chunkSizeInBytes) {
+ public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
this.chunkSizeInBytes = chunkSizeInBytes;
return this;
}
- /**
- * Sets the maximum memory usage in bytes.
- *
- * @param maxMemoryUsageInBytes The new maxMemoryUsageInBytes value.
- * @return This object for method chaining.
- */
- // TODO: max memory usage might not be the best name, since we may technically go a little above this limit when we add
- // on a new byte buffer. But we don't know for sure what the size of a buffer we request will be (we do use the size
- // for the last byte buffer as a hint), so I don't think we can have a truly accurate max. Maybe we call it minimum
- // buffer size instead?
- public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) {
- this.maxMemoryUsageInBytes = maxMemoryUsageInBytes;
+ public Builder bufferSizeInBytes(Long bufferSizeInBytes) {
+ this.bufferSizeInBytes = bufferSizeInBytes;
return this;
}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyConfigurationTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyConfigurationTest.java
new file mode 100644
index 000000000000..8b8f78f2b5e9
--- /dev/null
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyConfigurationTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.core.async;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class AsyncRequestBodyConfigurationTest {
+
+ @Test
+ void equalsHashCode() {
+ EqualsVerifier.forClass(AsyncRequestBodySplitConfiguration.class)
+ .verify();
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {0, -1})
+ void nonPositiveValue_shouldThrowException(long size) {
+ assertThatThrownBy(() ->
+ AsyncRequestBodySplitConfiguration.builder()
+ .chunkSizeInBytes(size)
+ .build())
+ .hasMessageContaining("must be positive");
+ assertThatThrownBy(() ->
+ AsyncRequestBodySplitConfiguration.builder()
+ .bufferSizeInBytes(size)
+ .build())
+ .hasMessageContaining("must be positive");
+ }
+
+ @Test
+ void toBuilder_shouldCopyAllFields() {
+ AsyncRequestBodySplitConfiguration config = AsyncRequestBodySplitConfiguration.builder()
+ .bufferSizeInBytes(1L)
+ .chunkSizeInBytes(2L)
+ .build();
+
+ assertThat(config.toBuilder().build()).isEqualTo(config);
+ }
+}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java
index 4d4bb42e06e0..cdd87822e3d4 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java
@@ -356,25 +356,4 @@ void publisherConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.fromPublisher(bodyPublisher);
assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
}
-
- @Test
- public void split_nonPositiveInput_shouldThrowException() {
- AsyncRequestBody body = AsyncRequestBody.fromString("test");
- assertThatThrownBy(() -> body.split(0, 4)).hasMessageContaining("must be positive");
- assertThatThrownBy(() -> body.split(-1, 4)).hasMessageContaining("must be positive");
- assertThatThrownBy(() -> body.split(5, 0)).hasMessageContaining("must be positive");
- assertThatThrownBy(() -> body.split(5, -1)).hasMessageContaining("must be positive");
- }
-
- @Test
- public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() {
- AsyncRequestBody body = AsyncRequestBody.fromPublisher(new Publisher() {
- @Override
- public void subscribe(Subscriber super ByteBuffer> s) {
-
- }
- });
- assertThatThrownBy(() -> body.split(10, 4))
- .hasMessageContaining("must be larger than or equal");
- }
}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java
index 368c403dbf88..0966ea6eb76f 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java
@@ -15,6 +15,7 @@
package software.amazon.awssdk.core.internal.async;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
@@ -39,6 +40,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.AsyncRequestBody;
@@ -66,6 +68,18 @@ public static void afterAll() throws Exception {
testFile.delete();
}
+ @Test
+ public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() {
+ AsyncRequestBody body = AsyncRequestBody.fromPublisher(s -> {
+ });
+ assertThatThrownBy(() -> SplittingPublisher.builder()
+ .asyncRequestBody(body)
+ .chunkSizeInBytes(10L)
+ .bufferSizeInBytes(5L)
+ .build())
+ .hasMessageContaining("must be larger than or equal");
+ }
+
@ParameterizedTest
@ValueSource(ints = {CHUNK_SIZE, CHUNK_SIZE * 2 - 1, CHUNK_SIZE * 2})
void differentChunkSize_shouldSplitAsyncRequestBodyCorrectly(int chunkSize) throws Exception {
@@ -94,8 +108,8 @@ public Optional contentLength() {
};
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
.asyncRequestBody(asyncRequestBody)
- .chunkSizeInBytes(CHUNK_SIZE)
- .maxMemoryUsageInBytes(10L)
+ .chunkSizeInBytes((long) CHUNK_SIZE)
+ .bufferSizeInBytes(10L)
.build();
@@ -136,8 +150,8 @@ public Optional contentLength() {
private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception {
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
.asyncRequestBody(asyncRequestBody)
- .chunkSizeInBytes(chunkSize)
- .maxMemoryUsageInBytes((long) chunkSize * 4)
+ .chunkSizeInBytes((long) chunkSize)
+ .bufferSizeInBytes((long) chunkSize * 4)
.build();
List> futures = new ArrayList<>();
diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java
index e8bef01ab81b..5e1a41da4d86 100644
--- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java
+++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java
@@ -119,7 +119,8 @@ private void doUploadInParts(Pair request,
MpuRequestContext mpuRequestContext = new MpuRequestContext(request, contentLength, optimalPartSize, uploadId);
request.right()
- .split(mpuRequestContext.partSize, maxMemoryUsageInBytes)
+ .split(b -> b.chunkSizeInBytes(mpuRequestContext.partSize)
+ .bufferSizeInBytes(maxMemoryUsageInBytes))
.subscribe(new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext,
returnFuture));
}
diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java
index d2034b4b4e94..fa8be1e0c6f3 100644
--- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java
+++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java
@@ -74,8 +74,8 @@ public CompletableFuture uploadObject(PutObjectRequest putObj
CompletableFuture returnFuture = new CompletableFuture<>();
SdkPublisher splitAsyncRequestBodyResponse =
- asyncRequestBody.split(partSizeInBytes,
- maxMemoryUsageInBytes);
+ asyncRequestBody.split(b -> b.chunkSizeInBytes(partSizeInBytes)
+ .bufferSizeInBytes(maxMemoryUsageInBytes));
splitAsyncRequestBodyResponse.subscribe(new UnknownContentLengthAsyncRequestBodySubscriber(partSizeInBytes,
putObjectRequest,