Skip to content

Create a configuration class for SdkPublisher#split #4236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
Expand Down Expand Up @@ -405,34 +406,36 @@ static AsyncRequestBody empty() {

/**
* Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
* portion of the original data, based on the configured {code chunkSizeInBytes}.
* portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size
* is 2MB and the default buffer size is 8MB.
*
* <p>
* If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the
* subscriber right after it's initialized.
* <p>
* // TODO: API Surface Area review: should we make this behavior configurable?
* If content length is null, it is sent after the entire content for that chunk is buffered.
* In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}.
*
* @param chunkSizeInBytes the size for each divided chunk. The last chunk may be smaller than the configured size.
* @param maxMemoryUsageInBytes the max memory the SDK will use to buffer the content
* @return SplitAsyncRequestBodyResult
* @see AsyncRequestBodySplitConfiguration
*/
default SdkPublisher<AsyncRequestBody> split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
Validate.isPositive(chunkSizeInBytes, "chunkSizeInBytes");
Validate.isPositive(maxMemoryUsageInBytes, "maxMemoryUsageInBytes");

if (!contentLength().isPresent()) {
Validate.isTrue(maxMemoryUsageInBytes >= chunkSizeInBytes,
"maxMemoryUsageInBytes must be larger than or equal to " +
"chunkSizeInBytes if the content length is unknown");
}
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
Validate.notNull(splitConfiguration, "splitConfiguration");

return SplittingPublisher.builder()
.asyncRequestBody(this)
.chunkSizeInBytes(chunkSizeInBytes)
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
.chunkSizeInBytes(splitConfiguration.chunkSizeInBytes())
.bufferSizeInBytes(splitConfiguration.bufferSizeInBytes())
.build();
}

/**
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
*
* @see #split(AsyncRequestBodySplitConfiguration)
*/
default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
Validate.notNull(splitConfiguration, "splitConfiguration");
return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.async;

import java.util.Objects;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;

/**
* Configuration options for {@link AsyncRequestBody#split} to configure how the SDK
* should split an {@link SdkPublisher}.
*/
@SdkPublicApi
public final class AsyncRequestBodySplitConfiguration implements ToCopyableBuilder<AsyncRequestBodySplitConfiguration.Builder,
AsyncRequestBodySplitConfiguration> {
private final Long chunkSizeInBytes;
private final Long bufferSizeInBytes;

private AsyncRequestBodySplitConfiguration(DefaultBuilder builder) {
this.chunkSizeInBytes = Validate.isPositiveOrNull(builder.chunkSizeInBytes, "chunkSizeInBytes");
this.bufferSizeInBytes = Validate.isPositiveOrNull(builder.bufferSizeInBytes, "bufferSizeInBytes");
}

/**
* The configured chunk size for each divided {@link AsyncRequestBody}.
*/
public Long chunkSizeInBytes() {
return chunkSizeInBytes;
}

/**
* The configured maximum buffer size the SDK will use to buffer the content from the source {@link SdkPublisher}.
*/
public Long bufferSizeInBytes() {
return bufferSizeInBytes;
}

/**
* Create a {@link Builder}, used to create a {@link AsyncRequestBodySplitConfiguration}.
*/
public static Builder builder() {
return new DefaultBuilder();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

AsyncRequestBodySplitConfiguration that = (AsyncRequestBodySplitConfiguration) o;

if (!Objects.equals(chunkSizeInBytes, that.chunkSizeInBytes)) {
return false;
}
return Objects.equals(bufferSizeInBytes, that.bufferSizeInBytes);
}

@Override
public int hashCode() {
int result = chunkSizeInBytes != null ? chunkSizeInBytes.hashCode() : 0;
result = 31 * result + (bufferSizeInBytes != null ? bufferSizeInBytes.hashCode() : 0);
return result;
}

@Override
public AsyncRequestBodySplitConfiguration.Builder toBuilder() {
return new DefaultBuilder(this);
}

public interface Builder extends CopyableBuilder<AsyncRequestBodySplitConfiguration.Builder,
AsyncRequestBodySplitConfiguration> {

/**
* Configures the size for each divided chunk. The last chunk may be smaller than the configured size. The default value
* is 2MB.
*
* @param chunkSizeInBytes the chunk size in bytes
* @return This object for method chaining.
*/
Builder chunkSizeInBytes(Long chunkSizeInBytes);

/**
* The maximum buffer size the SDK will use to buffer the content from the source {@link SdkPublisher}. The default value
* is 8MB.
*
* @param bufferSizeInBytes the buffer size in bytes
* @return This object for method chaining.
*/
Builder bufferSizeInBytes(Long bufferSizeInBytes);
}

private static final class DefaultBuilder implements Builder {
private Long chunkSizeInBytes;
private Long bufferSizeInBytes;

private DefaultBuilder(AsyncRequestBodySplitConfiguration asyncRequestBodySplitConfiguration) {
this.chunkSizeInBytes = asyncRequestBodySplitConfiguration.chunkSizeInBytes;
this.bufferSizeInBytes = asyncRequestBodySplitConfiguration.bufferSizeInBytes;
}

private DefaultBuilder() {

}

@Override
public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
this.chunkSizeInBytes = chunkSizeInBytes;
return this;
}

@Override
public Builder bufferSizeInBytes(Long bufferSizeInBytes) {
this.bufferSizeInBytes = bufferSizeInBytes;
return this;
}

@Override
public AsyncRequestBodySplitConfiguration build() {
return new AsyncRequestBodySplitConfiguration(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,25 @@
@SdkInternalApi
public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
private static final Logger log = Logger.loggerFor(SplittingPublisher.class);
private static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024L;
private static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4;
private final AsyncRequestBody upstreamPublisher;
private final SplittingSubscriber splittingSubscriber;
private final SimplePublisher<AsyncRequestBody> downstreamPublisher = new SimplePublisher<>();
private final long chunkSizeInBytes;
private final long maxMemoryUsageInBytes;
private final long bufferSizeInBytes;

private SplittingPublisher(Builder builder) {
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.chunkSizeInBytes = Validate.isPositive(builder.chunkSizeInBytes, "chunkSizeInBytes");
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
this.bufferSizeInBytes = builder.bufferSizeInBytes == null ? DEFAULT_BUFFER_SIZE : builder.bufferSizeInBytes;
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
this.maxMemoryUsageInBytes = Validate.isPositive(builder.maxMemoryUsageInBytes, "maxMemoryUsageInBytes");

if (!upstreamPublisher.contentLength().isPresent()) {
Validate.isTrue(bufferSizeInBytes >= chunkSizeInBytes,
"bufferSizeInBytes must be larger than or equal to " +
"chunkSizeInBytes if the content length is unknown");
}
}

public static Builder builder() {
Expand Down Expand Up @@ -213,7 +221,7 @@ private void maybeRequestMoreUpstreamData() {
}

private boolean shouldRequestMoreData(long buffered) {
return buffered == 0 || buffered + byteBufferSizeHint <= maxMemoryUsageInBytes;
return buffered == 0 || buffered + byteBufferSizeHint <= bufferSizeInBytes;
}

private Long totalDataRemaining() {
Expand Down Expand Up @@ -289,42 +297,20 @@ private void addDataBuffered(int length) {
public static final class Builder {
private AsyncRequestBody asyncRequestBody;
private Long chunkSizeInBytes;
private Long maxMemoryUsageInBytes;
private Long bufferSizeInBytes;

/**
* Configures the asyncRequestBody to split
*
* @param asyncRequestBody The new asyncRequestBody value.
* @return This object for method chaining.
*/
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
this.asyncRequestBody = asyncRequestBody;
return this;
}

/**
* Configures the size of the chunk for each {@link AsyncRequestBody} to publish
*
* @param chunkSizeInBytes The new chunkSizeInBytes value.
* @return This object for method chaining.
*/
public Builder chunkSizeInBytes(long chunkSizeInBytes) {
public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
this.chunkSizeInBytes = chunkSizeInBytes;
return this;
}

/**
* Sets the maximum memory usage in bytes.
*
* @param maxMemoryUsageInBytes The new maxMemoryUsageInBytes value.
* @return This object for method chaining.
*/
// TODO: max memory usage might not be the best name, since we may technically go a little above this limit when we add
// on a new byte buffer. But we don't know for sure what the size of a buffer we request will be (we do use the size
// for the last byte buffer as a hint), so I don't think we can have a truly accurate max. Maybe we call it minimum
// buffer size instead?
public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) {
this.maxMemoryUsageInBytes = maxMemoryUsageInBytes;
public Builder bufferSizeInBytes(Long bufferSizeInBytes) {
this.bufferSizeInBytes = bufferSizeInBytes;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.async;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class AsyncRequestBodyConfigurationTest {

@Test
void equalsHashCode() {
EqualsVerifier.forClass(AsyncRequestBodySplitConfiguration.class)
.verify();
}

@ParameterizedTest
@ValueSource(longs = {0, -1})
void nonPositiveValue_shouldThrowException(long size) {
assertThatThrownBy(() ->
AsyncRequestBodySplitConfiguration.builder()
.chunkSizeInBytes(size)
.build())
.hasMessageContaining("must be positive");
assertThatThrownBy(() ->
AsyncRequestBodySplitConfiguration.builder()
.bufferSizeInBytes(size)
.build())
.hasMessageContaining("must be positive");
}

@Test
void toBuilder_shouldCopyAllFields() {
AsyncRequestBodySplitConfiguration config = AsyncRequestBodySplitConfiguration.builder()
.bufferSizeInBytes(1L)
.chunkSizeInBytes(2L)
.build();

assertThat(config.toBuilder().build()).isEqualTo(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,25 +356,4 @@ void publisherConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.fromPublisher(bodyPublisher);
assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
}

@Test
public void split_nonPositiveInput_shouldThrowException() {
AsyncRequestBody body = AsyncRequestBody.fromString("test");
assertThatThrownBy(() -> body.split(0, 4)).hasMessageContaining("must be positive");
assertThatThrownBy(() -> body.split(-1, 4)).hasMessageContaining("must be positive");
assertThatThrownBy(() -> body.split(5, 0)).hasMessageContaining("must be positive");
assertThatThrownBy(() -> body.split(5, -1)).hasMessageContaining("must be positive");
}

@Test
public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() {
AsyncRequestBody body = AsyncRequestBody.fromPublisher(new Publisher<ByteBuffer>() {
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {

}
});
assertThatThrownBy(() -> body.split(10, 4))
.hasMessageContaining("must be larger than or equal");
}
}
Loading