From c37041970a7dd300012f65364b205f455dfacb7d Mon Sep 17 00:00:00 2001 From: Anna-Karin Salander Date: Thu, 10 Aug 2023 16:42:33 -0600 Subject: [PATCH 1/2] Ensure onNext will be called even if publishing empty content and onComplete is called directly --- .../ChecksumCalculatingAsyncRequestBody.java | 50 ++++- .../pipeline/stages/HttpChecksumStage.java | 6 +- ...AwsUnsignedChunkedEncodingInputStream.java | 43 ----- .../core/internal/util/ChunkContentUtils.java | 61 ++++-- .../AwsChunkedEncodingInputStreamTest.java | 9 +- ...ecksumCalculatingAsyncRequestBodyTest.java | 175 ++++++++++-------- 6 files changed, 194 insertions(+), 150 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java index 146007927c63..649ce6f52849 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java @@ -16,7 +16,8 @@ package software.amazon.awssdk.core.internal.async; import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE; -import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumContentLength; +import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.LAST_CHUNK_LEN; +import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumTrailerLength; import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChunkLength; import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChecksumTrailer; import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChunk; @@ -28,11 +29,13 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.checksums.Algorithm; import software.amazon.awssdk.core.checksums.SdkChecksum; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.async.DelegatingSubscriber; import software.amazon.awssdk.utils.builder.SdkBuilder; /** @@ -129,13 +132,12 @@ public ChecksumCalculatingAsyncRequestBody.Builder trailerHeader(String trailerH @Override public Optional contentLength() { - if (wrapped.contentLength().isPresent() && algorithm != null) { return Optional.of(calculateChunkLength(wrapped.contentLength().get()) - + calculateChecksumContentLength(algorithm, trailerHeader)); - } else { - return wrapped.contentLength(); + + LAST_CHUNK_LEN + + calculateChecksumTrailerLength(algorithm, trailerHeader)); } + return wrapped.contentLength(); } @Override @@ -149,12 +151,15 @@ public void subscribe(Subscriber s) { if (sdkChecksum != null) { sdkChecksum.reset(); } - SynchronousChunkBuffer synchronousChunkBuffer = new SynchronousChunkBuffer(totalBytes); - wrapped.flatMapIterable(synchronousChunkBuffer::buffer) + alwaysInvokeOnNext(wrapped).flatMapIterable(synchronousChunkBuffer::buffer) .subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes)); } + private SdkPublisher alwaysInvokeOnNext(SdkPublisher source) { + return subscriber -> source.subscribe(new OnNextGuaranteedSubscriber(subscriber)); + } + private static final class ChecksumCalculatingSubscriber implements Subscriber { private final Subscriber wrapped; @@ -243,4 +248,35 @@ private Iterable buffer(ByteBuffer bytes) { } } + public static class OnNextGuaranteedSubscriber extends DelegatingSubscriber { + + private volatile boolean onNextInvoked; + + public OnNextGuaranteedSubscriber(Subscriber subscriber) { + super(subscriber); + } + + @Override //needed? + public void onSubscribe(Subscription subscription) { + super.onSubscribe(subscription); + } + + @Override + public void onNext(ByteBuffer t) { + if (!onNextInvoked) { + onNextInvoked = true; + } + + subscriber.onNext(t); + } + + @Override + public void onComplete() { + if (!onNextInvoked) { + subscriber.onNext(ByteBuffer.wrap(new byte[0])); + } + super.onComplete(); + } + } + } \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/HttpChecksumStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/HttpChecksumStage.java index 84dcd981b22f..aaf1c27428d9 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/HttpChecksumStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/HttpChecksumStage.java @@ -20,8 +20,8 @@ import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE; import static software.amazon.awssdk.core.HttpChecksumConstant.SIGNING_METHOD; import static software.amazon.awssdk.core.internal.io.AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE; -import static software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength; -import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumContentLength; +import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumTrailerLength; +import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateStreamContentLength; import static software.amazon.awssdk.core.internal.util.HttpChecksumResolver.getResolvedChecksumSpecs; import static software.amazon.awssdk.http.Header.CONTENT_LENGTH; @@ -179,7 +179,7 @@ private void addFlexibleChecksumInTrailer(SdkHttpFullRequest.Builder request, Re } } - long checksumContentLength = calculateChecksumContentLength(checksumSpecs.algorithm(), checksumSpecs.headerName()); + long checksumContentLength = calculateChecksumTrailerLength(checksumSpecs.algorithm(), checksumSpecs.headerName()); long contentLen = checksumContentLength + calculateStreamContentLength(originalContentLength, chunkSize); request.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksumSpecs.headerName()) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java index 186ca5d7d0d8..4c7f46a248cf 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java @@ -18,7 +18,6 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.core.checksums.Algorithm; import software.amazon.awssdk.core.checksums.SdkChecksum; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.internal.chunked.AwsChunkedEncodingConfig; @@ -40,48 +39,6 @@ public static Builder builder() { return new Builder(); } - /** - * Calculates the content length for a given Algorithm and header name. - * - * @param algorithm Algorithm used. - * @param headerName Header name. - * @return Content length of the trailer that will be appended at the end. - */ - public static long calculateChecksumContentLength(Algorithm algorithm, String headerName) { - return headerName.length() - + HEADER_COLON_SEPARATOR.length() - + algorithm.base64EncodedLength().longValue() - + CRLF.length() + CRLF.length(); - } - - /** - * - * @param originalContentLength Original Content length. - * @return Calculatec Chunk Length with the chunk encoding format. - */ - private static long calculateChunkLength(long originalContentLength) { - return Long.toHexString(originalContentLength).length() - + CRLF.length() - + originalContentLength - + CRLF.length(); - } - - public static long calculateStreamContentLength(long originalLength, long defaultChunkSize) { - if (originalLength < 0 || defaultChunkSize == 0) { - throw new IllegalArgumentException(originalLength + ", " + defaultChunkSize + "Args <= 0 not expected"); - } - - long maxSizeChunks = originalLength / defaultChunkSize; - long remainingBytes = originalLength % defaultChunkSize; - - long allChunks = maxSizeChunks * calculateChunkLength(defaultChunkSize); - long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0; - // last byte is composed of a "0" and "\r\n" - long lastByteSize = 1 + (long) CRLF.length(); - - return allChunks + remainingInChunk + lastByteSize; - } - @Override protected byte[] createFinalChunk(byte[] finalChunk) { StringBuilder chunkHeader = new StringBuilder(); diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/util/ChunkContentUtils.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/util/ChunkContentUtils.java index 54ad56781599..91d47c314494 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/util/ChunkContentUtils.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/util/ChunkContentUtils.java @@ -28,35 +28,64 @@ public final class ChunkContentUtils { public static final String ZERO_BYTE = "0"; public static final String CRLF = "\r\n"; + public static final String LAST_CHUNK = ZERO_BYTE + CRLF; + public static final long LAST_CHUNK_LEN = LAST_CHUNK.length(); + private ChunkContentUtils() { } /** + * The chunk format is: chunk-size CRLF chunk-data CRLF. + * * @param originalContentLength Original Content length. - * @return Calculates Chunk Length. + * @return the length of this chunk */ public static long calculateChunkLength(long originalContentLength) { + if (originalContentLength == 0) { + return 0; + } return Long.toHexString(originalContentLength).length() - + CRLF.length() - + originalContentLength - + CRLF.length() - + ZERO_BYTE.length() + CRLF.length(); + + CRLF.length() + + originalContentLength + + CRLF.length(); + } + + /** + * Calculates the content length for data that is divided into chunks. + * + * @param originalLength original content length. + * @param chunkSize chunk size + * @return Content length of the trailer that will be appended at the end. + */ + public static long calculateStreamContentLength(long originalLength, long chunkSize) { + if (originalLength < 0 || chunkSize == 0) { + throw new IllegalArgumentException(originalLength + ", " + chunkSize + "Args <= 0 not expected"); + } + + long maxSizeChunks = originalLength / chunkSize; + long remainingBytes = originalLength % chunkSize; + + long allChunks = maxSizeChunks * calculateChunkLength(chunkSize); + long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0; + // last byte is composed of a "0" and "\r\n" + long lastByteSize = 1 + (long) CRLF.length(); + + return allChunks + remainingInChunk + lastByteSize; } /** - * Calculates the content length for a given Algorithm and header name. + * Calculates the content length for a given algorithm and header name. * * @param algorithm Algorithm used. * @param headerName Header name. * @return Content length of the trailer that will be appended at the end. */ - public static long calculateChecksumContentLength(Algorithm algorithm, String headerName) { - int checksumLength = algorithm.base64EncodedLength(); - - return (headerName.length() - + HEADER_COLON_SEPARATOR.length() - + checksumLength - + CRLF.length() + CRLF.length()); + public static long calculateChecksumTrailerLength(Algorithm algorithm, String headerName) { + return headerName.length() + + HEADER_COLON_SEPARATOR.length() + + algorithm.base64EncodedLength().longValue() + + CRLF.length() + + CRLF.length(); } /** @@ -86,17 +115,13 @@ public static ByteBuffer createChunk(ByteBuffer chunkData, boolean isLastByte) { chunkHeader.append(CRLF); try { byte[] header = chunkHeader.toString().getBytes(StandardCharsets.UTF_8); - // Last byte does not need additional \r\n trailer byte[] trailer = !isLastByte ? CRLF.getBytes(StandardCharsets.UTF_8) : "".getBytes(StandardCharsets.UTF_8); ByteBuffer chunkFormattedBuffer = ByteBuffer.allocate(header.length + chunkLength + trailer.length); - chunkFormattedBuffer.put(header) - .put(chunkData) - .put(trailer); + chunkFormattedBuffer.put(header).put(chunkData).put(trailer); chunkFormattedBuffer.flip(); return chunkFormattedBuffer; } catch (Exception e) { - // This is to warp BufferOverflowException,ReadOnlyBufferException to SdkClientException. throw SdkClientException.builder() .message("Unable to create chunked data. " + e.getMessage()) .cause(e) diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java index 0fa862dd2acb..44ac097d16cf 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.core.checksum; import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumTrailerLength; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -25,6 +26,7 @@ import software.amazon.awssdk.core.checksums.SdkChecksum; import software.amazon.awssdk.core.internal.io.AwsChunkedEncodingInputStream; import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream; +import software.amazon.awssdk.core.internal.util.ChunkContentUtils; public class AwsChunkedEncodingInputStreamTest { @@ -55,10 +57,9 @@ public void readAwsUnsignedChunkedEncodingInputStream() throws IOException { public void lengthsOfCalculateByChecksumCalculatingInputStream(){ String initialString = "Hello world"; - long calculateChunkLength = AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength(initialString.length(), - AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE); - long checksumContentLength = AwsUnsignedChunkedEncodingInputStream.calculateChecksumContentLength( - SHA256_ALGORITHM, SHA256_HEADER_NAME); + long calculateChunkLength = ChunkContentUtils.calculateStreamContentLength(initialString.length(), + AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE); + long checksumContentLength = calculateChecksumTrailerLength(SHA256_ALGORITHM, SHA256_HEADER_NAME); assertThat(calculateChunkLength).isEqualTo(19); assertThat(checksumContentLength).isEqualTo(71); } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBodyTest.java index 39abaffd8f71..4aaeaa3c0710 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBodyTest.java @@ -28,11 +28,13 @@ import com.google.common.jimfs.Configuration; import com.google.common.jimfs.Jimfs; import io.reactivex.Flowable; +import java.util.stream.Stream; import org.apache.commons.lang3.RandomStringUtils; import org.assertj.core.util.Lists; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import software.amazon.awssdk.core.async.AsyncRequestBody; @@ -44,78 +46,91 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -@RunWith(Parameterized.class) public class ChecksumCalculatingAsyncRequestBodyTest { - private final static String testString = "Hello world"; - private final static String expectedTestString = "b\r\n" + + private static final String testString = "Hello world"; + private static final String expectedTestString = "b\r\n" + testString + "\r\n" + "0\r\n" + "x-amz-checksum-crc32:i9aeUg==\r\n\r\n"; - private final static Path path; - - private final static ByteBuffer positionNonZeroBytebuffer; - - private final static ByteBuffer positionZeroBytebuffer; + private static final String emptyString = ""; + private static final String expectedEmptyString = "0\r\n" + + "x-amz-checksum-crc32:AAAAAA==\r\n\r\n"; + private static final Path path; + private static final Path pathToEmpty; static { - byte[] content = testString.getBytes(); - byte[] randomContent = RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8); - positionNonZeroBytebuffer = ByteBuffer.allocate(content.length + randomContent.length); - positionNonZeroBytebuffer.put(randomContent) - .put(content); - positionNonZeroBytebuffer.position(randomContent.length); - - positionZeroBytebuffer = ByteBuffer.allocate(content.length); - positionZeroBytebuffer.put(content); - positionZeroBytebuffer.flip(); - FileSystem fs = Jimfs.newFileSystem(Configuration.unix()); path = fs.getPath("./test"); + pathToEmpty = fs.getPath("./testEmpty"); try { - Files.write(path, content); + Files.write(path, testString.getBytes()); + Files.write(pathToEmpty, emptyString.getBytes()); + } catch (IOException e) { e.printStackTrace(); } } - private final AsyncRequestBody provider; - - public ChecksumCalculatingAsyncRequestBodyTest(AsyncRequestBody provider) { - this.provider = provider; + private static Stream publishers() { + return Stream.of( + Arguments.of("RequestBody from string, test string", + checksumPublisher(AsyncRequestBody.fromString(testString)), + expectedTestString), + Arguments.of("RequestBody from file, test string", + checksumPublisher(AsyncRequestBody.fromFile(path)), + expectedTestString), + Arguments.of("RequestBody from buffer, 0 pos, test string", + checksumPublisher(AsyncRequestBody.fromRemainingByteBuffer(posZeroByteBuffer(testString))), + expectedTestString), + Arguments.of("RequestBody from buffer, random pos, test string", + checksumPublisher(AsyncRequestBody.fromRemainingByteBufferUnsafe(nonPosZeroByteBuffer(testString))), + expectedTestString), + Arguments.of("RequestBody from string, empty string", + checksumPublisher(AsyncRequestBody.fromString(emptyString)), + expectedEmptyString), + //Note: FileAsyncRequestBody with empty file does not call onNext, only onComplete() + Arguments.of("RequestBody from file, empty string", + checksumPublisher(AsyncRequestBody.fromFile(pathToEmpty)), + expectedEmptyString), + Arguments.of("RequestBody from buffer, 0 pos, empty string", + checksumPublisher(AsyncRequestBody.fromRemainingByteBuffer(posZeroByteBuffer(emptyString))), + expectedEmptyString), + Arguments.of("RequestBody from string, random pos, empty string", + checksumPublisher(AsyncRequestBody.fromRemainingByteBufferUnsafe(nonPosZeroByteBuffer(emptyString))), + expectedEmptyString)); } - @Parameterized.Parameters - public static AsyncRequestBody[] data() { - AsyncRequestBody[] asyncRequestBodies = { - ChecksumCalculatingAsyncRequestBody.builder() - .asyncRequestBody(AsyncRequestBody.fromString(testString)) - .algorithm(Algorithm.CRC32) - .trailerHeader("x-amz-checksum-crc32").build(), - - ChecksumCalculatingAsyncRequestBody.builder() - .asyncRequestBody(AsyncRequestBody.fromFile(path)) - .algorithm(Algorithm.CRC32) - .trailerHeader("x-amz-checksum-crc32").build(), + private static ChecksumCalculatingAsyncRequestBody checksumPublisher(AsyncRequestBody sourcePublisher) { + return ChecksumCalculatingAsyncRequestBody.builder() + .asyncRequestBody(sourcePublisher) + .algorithm(Algorithm.CRC32) + .trailerHeader("x-amz-checksum-crc32").build(); + } - ChecksumCalculatingAsyncRequestBody.builder() - .asyncRequestBody(AsyncRequestBody.fromRemainingByteBuffer(positionZeroBytebuffer)) - .algorithm(Algorithm.CRC32) - .trailerHeader("x-amz-checksum-crc32").build(), - ChecksumCalculatingAsyncRequestBody.builder() - .asyncRequestBody(AsyncRequestBody.fromRemainingByteBuffersUnsafe(positionNonZeroBytebuffer)) - .algorithm(Algorithm.CRC32) - .trailerHeader("x-amz-checksum-crc32").build(), - }; - return asyncRequestBodies; + private static ByteBuffer posZeroByteBuffer(String content) { + byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8); + ByteBuffer bytes = ByteBuffer.allocate(contentBytes.length); + bytes.put(contentBytes); + bytes.flip(); + return bytes; } - @Test - public void hasCorrectLength() { - assertThat(provider.contentLength()).hasValue((long) expectedTestString.length()); + private static ByteBuffer nonPosZeroByteBuffer(String content) { + byte[] randomContent = RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8); + byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8); + + ByteBuffer bytes = ByteBuffer.allocate(contentBytes.length + randomContent.length); + bytes.put(randomContent) + .put(contentBytes); + bytes.position(randomContent.length); + return bytes; } - @Test - public void hasCorrectContent() throws InterruptedException { + @ParameterizedTest(name = "{index} {0}") + @MethodSource("publishers") + public void publish_differentAsyncRequestBodiesAndSources_produceCorrectData(String description, + AsyncRequestBody provider, + String expectedContent) throws InterruptedException { StringBuilder sb = new StringBuilder(); CountDownLatch done = new CountDownLatch(1); @@ -136,14 +151,15 @@ public void onComplete() { done.countDown(); } }; - provider.subscribe(subscriber); done.await(10, TimeUnit.SECONDS); - assertThat(sb).hasToString(expectedTestString); + + assertThat(provider.contentLength()).hasValue((long) expectedContent.length()); + assertThat(sb).hasToString(expectedContent); } @Test - public void stringConstructorHasCorrectContentType() { + public void constructor_asyncRequestBodyFromString_hasCorrectContentType() { AsyncRequestBody requestBody = ChecksumCalculatingAsyncRequestBody.builder() .asyncRequestBody(AsyncRequestBody.fromString("Hello world")) .algorithm(Algorithm.CRC32) @@ -153,7 +169,7 @@ public void stringConstructorHasCorrectContentType() { } @Test - public void fileConstructorHasCorrectContentType() { + public void constructor_asyncRequestBodyFromFile_hasCorrectContentType() { AsyncRequestBody requestBody = ChecksumCalculatingAsyncRequestBody.builder() .asyncRequestBody(AsyncRequestBody.fromFile(path)) .algorithm(Algorithm.CRC32) @@ -163,7 +179,7 @@ public void fileConstructorHasCorrectContentType() { } @Test - public void bytesArrayConstructorHasCorrectContentType() { + public void constructor_asyncRequestBodyFromBytes_hasCorrectContentType() { AsyncRequestBody requestBody = ChecksumCalculatingAsyncRequestBody.builder() .asyncRequestBody(AsyncRequestBody.fromBytes("hello world".getBytes())) .algorithm(Algorithm.CRC32) @@ -173,7 +189,7 @@ public void bytesArrayConstructorHasCorrectContentType() { } @Test - public void bytesBufferConstructorHasCorrectContentType() { + public void constructor_asyncRequestBodyFromByteBuffer_hasCorrectContentType() { ByteBuffer byteBuffer = ByteBuffer.wrap("hello world".getBytes()); AsyncRequestBody requestBody = ChecksumCalculatingAsyncRequestBody.builder() .asyncRequestBody(AsyncRequestBody.fromByteBuffer(byteBuffer)) @@ -184,7 +200,7 @@ public void bytesBufferConstructorHasCorrectContentType() { } @Test - public void emptyBytesConstructorHasCorrectContentType() { + public void constructor_asyncRequestBodyFromEmpty_hasCorrectContentType() { AsyncRequestBody requestBody = ChecksumCalculatingAsyncRequestBody.builder() .asyncRequestBody(AsyncRequestBody.empty()) .algorithm(Algorithm.CRC32) @@ -194,7 +210,7 @@ public void emptyBytesConstructorHasCorrectContentType() { } @Test - public void publisherConstructorThrowsExceptionIfNoContentLength() { + public void constructor_asyncRequestBodyFromPublisher_NoContentLength_throwsException() { List requestBodyStrings = Lists.newArrayList("A", "B", "C"); List bodyBytes = requestBodyStrings.stream() .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) @@ -210,22 +226,31 @@ public void publisherConstructorThrowsExceptionIfNoContentLength() { } @Test - public void fromBytes_NullChecks() { - - ChecksumCalculatingAsyncRequestBody.Builder noAlgorithmBuilder = ChecksumCalculatingAsyncRequestBody - .builder() - .asyncRequestBody( - AsyncRequestBody.fromString("Hello world")); + public void constructor_checksumIsNull_throwsException() { + assertThatExceptionOfType(NullPointerException.class).isThrownBy( + () -> ChecksumCalculatingAsyncRequestBody.builder() + .asyncRequestBody(AsyncRequestBody.fromString("Hello world")) + .trailerHeader("x-amzn-checksum-crc32") + .build()).withMessage("algorithm cannot be null"); + } - assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> noAlgorithmBuilder.build()); + @Test + public void constructor_asyncRequestBodyIsNull_throwsException() { + assertThatExceptionOfType(NullPointerException.class).isThrownBy( + () -> ChecksumCalculatingAsyncRequestBody.builder() + .algorithm(Algorithm.CRC32) + .trailerHeader("x-amzn-checksum-crc32") + .build()).withMessage("wrapped AsyncRequestBody cannot be null"); + } - ChecksumCalculatingAsyncRequestBody.Builder noAsyncReqBodyBuilder = ChecksumCalculatingAsyncRequestBody - .builder().algorithm(Algorithm.CRC32).trailerHeader("x-amzn-checksum-crc32"); - assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> noAsyncReqBodyBuilder.build()); + @Test + public void constructor_trailerHeaderIsNull_throwsException() { + assertThatExceptionOfType(NullPointerException.class).isThrownBy( + () -> ChecksumCalculatingAsyncRequestBody.builder() + .algorithm(Algorithm.CRC32) + .asyncRequestBody(AsyncRequestBody.fromString("Hello world")) + .build()).withMessage("trailerHeader cannot be null"); - ChecksumCalculatingAsyncRequestBody.Builder noTrailerHeaderBuilder = ChecksumCalculatingAsyncRequestBody - .builder().asyncRequestBody(AsyncRequestBody.fromString("Hello world")).algorithm(Algorithm.CRC32); - assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> noTrailerHeaderBuilder.build()); } @Test From 00bfad2202d1b8783aaa868062a43e9d33b18936 Mon Sep 17 00:00:00 2001 From: Anna-Karin Salander Date: Fri, 11 Aug 2023 09:57:08 -0600 Subject: [PATCH 2/2] Adding changelog and removing unnecessary override --- .changes/next-release/bugfix-AWSSDKforJavav2-e70484b.json | 6 ++++++ .../internal/async/ChecksumCalculatingAsyncRequestBody.java | 5 ----- 2 files changed, 6 insertions(+), 5 deletions(-) create mode 100644 .changes/next-release/bugfix-AWSSDKforJavav2-e70484b.json diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-e70484b.json b/.changes/next-release/bugfix-AWSSDKforJavav2-e70484b.json new file mode 100644 index 000000000000..0f18a5376ba9 --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-e70484b.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Sends final checksum chunk and trailer when only onComplete() is called by upstream (empty content)" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java index 649ce6f52849..3f7dc927a95b 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java @@ -256,11 +256,6 @@ public OnNextGuaranteedSubscriber(Subscriber subscriber) { super(subscriber); } - @Override //needed? - public void onSubscribe(Subscription subscription) { - super.onSubscribe(subscription); - } - @Override public void onNext(ByteBuffer t) { if (!onNextInvoked) {