Skip to content

Support streaming with unknown content length #4226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -420,24 +419,20 @@ static AsyncRequestBody empty() {
* @param maxMemoryUsageInBytes the max memory the SDK will use to buffer the content
* @return SplitAsyncRequestBodyResult
*/
default SplitAsyncRequestBodyResponse split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
default SdkPublisher<AsyncRequestBody> split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
Validate.isPositive(chunkSizeInBytes, "chunkSizeInBytes");
Validate.isPositive(maxMemoryUsageInBytes, "maxMemoryUsageInBytes");

if (!this.contentLength().isPresent()) {
if (!contentLength().isPresent()) {
Validate.isTrue(maxMemoryUsageInBytes >= chunkSizeInBytes,
"maxMemoryUsageInBytes must be larger than or equal to " +
"chunkSizeInBytes if the content length is unknown");
}

CompletableFuture<Void> future = new CompletableFuture<>();
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
.asyncRequestBody(this)
.chunkSizeInBytes(chunkSizeInBytes)
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
.resultFuture(future)
.build();

return SplitAsyncRequestBodyResponse.create(splittingPublisher, future);
return SplittingPublisher.builder()
.asyncRequestBody(this)
.chunkSizeInBytes(chunkSizeInBytes)
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
.build();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -45,24 +44,12 @@ public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
private final SimplePublisher<AsyncRequestBody> downstreamPublisher = new SimplePublisher<>();
private final long chunkSizeInBytes;
private final long maxMemoryUsageInBytes;
private final CompletableFuture<Void> future;

private SplittingPublisher(Builder builder) {
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.chunkSizeInBytes = Validate.isPositive(builder.chunkSizeInBytes, "chunkSizeInBytes");
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
this.maxMemoryUsageInBytes = Validate.isPositive(builder.maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
this.future = builder.future;

// We need to cancel upstream subscription if the future gets cancelled.
future.whenComplete((r, t) -> {
if (t != null) {
if (splittingSubscriber.upstreamSubscription != null) {
log.trace(() -> "Cancelling subscription because return future completed exceptionally ", t);
splittingSubscriber.upstreamSubscription.cancel();
}
}
});
}

public static Builder builder() {
Expand Down Expand Up @@ -117,26 +104,35 @@ public void onNext(ByteBuffer byteBuffer) {
byteBufferSizeHint = byteBuffer.remaining();

while (true) {

if (!byteBuffer.hasRemaining()) {
break;
}

int amountRemainingInChunk = amountRemainingInChunk();

// If we have fulfilled this chunk,
// we should create a new DownstreamBody if needed
// complete the current body
if (amountRemainingInChunk == 0) {
completeCurrentBody();
completeCurrentBodyAndCreateNewIfNeeded(byteBuffer);
amountRemainingInChunk = amountRemainingInChunk();
}

if (shouldCreateNewDownstreamRequestBody(byteBuffer)) {
int currentChunk = chunkNumber.incrementAndGet();
long chunkSize = calculateChunkSize(totalDataRemaining());
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, currentChunk);
}
// If the current ByteBuffer < this chunk, send it as-is
if (amountRemainingInChunk > byteBuffer.remaining()) {
currentBody.send(byteBuffer.duplicate());
break;
}

amountRemainingInChunk = amountRemainingInChunk();
if (amountRemainingInChunk >= byteBuffer.remaining()) {
// If the current ByteBuffer == this chunk, send it as-is and
// complete the current body
if (amountRemainingInChunk == byteBuffer.remaining()) {
currentBody.send(byteBuffer.duplicate());
completeCurrentBodyAndCreateNewIfNeeded(byteBuffer);
break;
}

// If the current ByteBuffer > this chunk, split this ByteBuffer
ByteBuffer firstHalf = byteBuffer.duplicate();
int newLimit = firstHalf.position() + amountRemainingInChunk;
firstHalf.limit(newLimit);
Expand All @@ -147,20 +143,30 @@ public void onNext(ByteBuffer byteBuffer) {
maybeRequestMoreUpstreamData();
}

private void completeCurrentBodyAndCreateNewIfNeeded(ByteBuffer byteBuffer) {
completeCurrentBody();
int currentChunk = chunkNumber.incrementAndGet();
boolean shouldCreateNewDownstreamRequestBody;
Long dataRemaining = totalDataRemaining();

/**
* If content length is known, we should create new DownstreamRequestBody if there's remaining data.
* If content length is unknown, we should create new DownstreamRequestBody if upstream is not completed yet.
*/
private boolean shouldCreateNewDownstreamRequestBody(ByteBuffer byteBuffer) {
return !upstreamComplete || byteBuffer.remaining() > 0;
if (upstreamSize == null) {
shouldCreateNewDownstreamRequestBody = !upstreamComplete || byteBuffer.hasRemaining();
} else {
shouldCreateNewDownstreamRequestBody = dataRemaining != null && dataRemaining > 0;
}

if (shouldCreateNewDownstreamRequestBody) {
long chunkSize = calculateChunkSize(dataRemaining);
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, currentChunk);
}
}

private int amountRemainingInChunk() {
return Math.toIntExact(currentBody.maxLength - currentBody.transferredLength);
}

private void completeCurrentBody() {
log.debug(() -> "completeCurrentBody for chunk " + chunkNumber.get());
currentBody.complete();
if (upstreamSize == null) {
sendCurrentBody(currentBody);
Expand All @@ -172,12 +178,13 @@ public void onComplete() {
upstreamComplete = true;
log.trace(() -> "Received onComplete()");
completeCurrentBody();
downstreamPublisher.complete().thenRun(() -> future.complete(null));
downstreamPublisher.complete();
}

@Override
public void onError(Throwable t) {
currentBody.error(t);
log.trace(() -> "Received onError()", t);
downstreamPublisher.error(t);
}

private void sendCurrentBody(AsyncRequestBody body) {
Expand Down Expand Up @@ -206,7 +213,7 @@ private void maybeRequestMoreUpstreamData() {
}

private boolean shouldRequestMoreData(long buffered) {
return buffered == 0 || buffered + byteBufferSizeHint < maxMemoryUsageInBytes;
return buffered == 0 || buffered + byteBufferSizeHint <= maxMemoryUsageInBytes;
}

private Long totalDataRemaining() {
Expand Down Expand Up @@ -240,7 +247,7 @@ public Optional<Long> contentLength() {
}

public void send(ByteBuffer data) {
log.trace(() -> "Sending bytebuffer " + data);
log.trace(() -> String.format("Sending bytebuffer %s to chunk %d", data, chunkNumber));
int length = data.remaining();
transferredLength += length;
addDataBuffered(length);
Expand Down Expand Up @@ -283,7 +290,6 @@ public static final class Builder {
private AsyncRequestBody asyncRequestBody;
private Long chunkSizeInBytes;
private Long maxMemoryUsageInBytes;
private CompletableFuture<Void> future;

/**
* Configures the asyncRequestBody to split
Expand Down Expand Up @@ -322,18 +328,6 @@ public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) {
return this;
}

/**
* Sets the result future. The future will be completed when all request bodies
* have been sent.
*
* @param future The new future value.
* @return This object for method chaining.
*/
public Builder resultFuture(CompletableFuture<Void> future) {
this.future = future;
return this;
}

public SplittingPublisher build() {
return new SplittingPublisher(this);
}
Expand Down

This file was deleted.

Loading