diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index 43f2e10ff192..c56d1b6437d9 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -25,6 +25,8 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.exception.NonRetryableException; +import software.amazon.awssdk.core.internal.util.NoopSubscription; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.async.SimplePublisher; @@ -48,8 +50,8 @@ public class SplittingPublisher implements SdkPublisher { private final long bufferSizeInBytes; private SplittingPublisher(Builder builder) { - this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody"); - this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes; + this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody"); + this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes; this.bufferSizeInBytes = builder.bufferSizeInBytes == null ? DEFAULT_BUFFER_SIZE : builder.bufferSizeInBytes; this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null)); @@ -234,13 +236,14 @@ private Long totalDataRemaining() { private final class DownstreamBody implements AsyncRequestBody { /** - * The maximum length of the content this AsyncRequestBody can hold. - * If the upstream content length is known, this is the same as totalLength + * The maximum length of the content this AsyncRequestBody can hold. If the upstream content length is known, this is + * the same as totalLength */ private final long maxLength; private final Long totalLength; private final SimplePublisher delegate = new SimplePublisher<>(); private final int chunkNumber; + private final AtomicBoolean subscribeCalled = new AtomicBoolean(false); private volatile long transferredLength = 0; private DownstreamBody(boolean contentLengthKnown, long maxLength, int chunkNumber) { @@ -282,7 +285,14 @@ public void error(Throwable error) { @Override public void subscribe(Subscriber s) { - delegate.subscribe(s); + if (subscribeCalled.compareAndSet(false, true)) { + delegate.subscribe(s); + } else { + s.onSubscribe(new NoopSubscription(s)); + s.onError(NonRetryableException.create( + "A retry was attempted, but AsyncRequestBody.split does not " + + "support retries.")); + } } private void addDataBuffered(int length) { @@ -293,7 +303,7 @@ private void addDataBuffered(int length) { } } } - + public static final class Builder { private AsyncRequestBody asyncRequestBody; private Long chunkSizeInBytes;