Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For SqsAsyncBatchManager, add the support to override maxBatchBytesSize #5888

Open
2 tasks
msaad10 opened this issue Feb 18, 2025 · 8 comments
Open
2 tasks
Labels
feature-request A feature should be added or improved.

Comments

@msaad10
Copy link

msaad10 commented Feb 18, 2025

Describe the feature

While working with SqsAsyncBatchManager, got the error as:

software.amazon.awssdk.services.sqs.model.BatchRequestTooLongException:Batch requests cannot be longer than 262144 bytes. You have sent XXXXXX bytes. (Service: Sqs, Status Code: 400, Request ID: *)

Upon further analysis, it seems that there is no way we can retrieve the messages in a batch due to which exception occurred.

Also, for sdk v2 override configuration does not provide the option to override maxBatchSizeBytes, though it was present in v1.

The request is to provide the option to override maxBatchSizeBytes in v2 also.

Though I found the code which is defaulting the maxBatchSizeBytes to 256KB, and if thats the case than why the SqsAsyncBatchManager is sending the batch which sums to greater than 256KB.

Image

Use Case

Need this feature so we dont get BatchRequestTooLongException when working with SqsAsyncBatchManager.

Proposed Solution

No response

Other Information

No response

Acknowledgements

  • I may be able to implement this feature request
  • This feature might incur a breaking change

AWS Java SDK version used

V2

JDK version used

21

Operating System and version

Ubuntu 2022

@msaad10 msaad10 added feature-request A feature should be added or improved. needs-triage This issue or PR still needs to be triaged. labels Feb 18, 2025
@joviegas
Copy link
Contributor

Thanks @msaad10 for issues and details.

We did consider about this and already have

void sendMessagesWhichCanExceed256KiBCollectively(String queueUrl) {
String largeMessageBody = createLargeString('a', 256_000);
List<CompletableFuture<SendMessageResponse>> futures = new ArrayList<>();
// Send the large message 10 times and collect the futures
for (int i = 0; i < 10; i++) {
CompletableFuture<SendMessageResponse> future =
batchManager.sendMessage(r -> r.queueUrl(queueUrl).messageBody(largeMessageBody));
futures.add(future);
}
// Wait for all sendMessage futures to complete
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
// Validate that all responses have a non-null messageId
futures.forEach(future -> {
SendMessageResponse response = future.join();
assertThat(response.messageId()).isNotNull();
assertThat(response.md5OfMessageBody()).isNotNull();
});
}
for the same.

The setting you mentioned in V1 it just stops batching once all message size is reached 256KB and it internally doesnot split the single message.

I am curious that the single message that is sent by you is greater then 256KB. Can you please confirm on that.

@msaad10
Copy link
Author

msaad10 commented Feb 19, 2025

@joviegas The single message size is not greater than 256Kb in my case since we already have a check for that to not send the message that exceeds this limit, which means that the batch size that SqsAsyncBatchManager is creating internally is exceeding 256Kb that is why I am getting BatchRequestTooLongException.

Image

I have checked the per message size, and it doesn't exceed 256 Kb.

@joviegas
Copy link
Contributor

joviegas commented Feb 20, 2025

Thanks @msaad10 for the response
Could you please let me know what is the size of the other fields in SendMessageRequest like messageAttributes , messageSystemAttributes etc as in https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/model/SendMessageRequest.html

It would be really helpful if you could give me code snippet where the issue can be recreated , instead of mssage you can use String largeMessageBody = createLargeString('a', SIZE_IN_YOUR_CASE); from

void sendMessagesWhichCanExceed256KiBCollectively(String queueUrl) {
String largeMessageBody = createLargeString('a', 256_000);
List<CompletableFuture<SendMessageResponse>> futures = new ArrayList<>();
// Send the large message 10 times and collect the futures
for (int i = 0; i < 10; i++) {
CompletableFuture<SendMessageResponse> future =
batchManager.sendMessage(r -> r.queueUrl(queueUrl).messageBody(largeMessageBody));
futures.add(future);
}
// Wait for all sendMessage futures to complete
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
// Validate that all responses have a non-null messageId
futures.forEach(future -> {
SendMessageResponse response = future.join();
assertThat(response.messageId()).isNotNull();
assertThat(response.md5OfMessageBody()).isNotNull();
});
}

and also messageAttributes , messageSystemAttributes .

@msaad10
Copy link
Author

msaad10 commented Feb 21, 2025

Thanks @joviegas. Let me share it with you, meanwhile can you share the exact version in which this was fixed. We are using 2.28.29, thanks!

@msaad10
Copy link
Author

msaad10 commented Feb 24, 2025

@joviegas

We are using the following to create SendMessageRequest:

       val sendMessageRequest = SendMessageRequest.builder()
            .queueUrl(queueUrl)
            .messageBody(messageBody)
            .delaySeconds(delay)
            .build()

We are not overriding any message attributes.

@debora-ito debora-ito removed the needs-triage This issue or PR still needs to be triaged. label Feb 24, 2025
@msaad10
Copy link
Author

msaad10 commented Feb 25, 2025

@joviegas

From what we have figured out is that may be the issue is this 16384 constant that is added in the message body by SQS to calculate the message size.

Image

We were not taking this constant into account when sending our message to SQS. So now, max message size we'll be sending to SQS will be 262144-16384.

One thing to note is that this should be mentioned in the docs that client should be passing the max message size as 262144-16384 and not exactly 262144.

@joviegas
Copy link
Contributor

joviegas commented Feb 27, 2025

Thanks @msaad10 , we need to root cause this issue even before going to fix it.

The code you are pointing to ensures we stop batching once it crosses the 256KB limit.

We have given an offset of 16KB (16384) to make sure the Message Attributes get included in the size calculation. And this internal Message size check just makes sure if we need to append a new message in a batch or send that Message with Single Item.

  • can you use the code below to reproduce the issue and check what's the batch size and message body size? The output should look like:
Batch contains N messages
Message body size: ABCD bytes

  • Also if possible increase the logLevel to DEBUG logs and check Wire logs to see the SendMessageBatchRequest batched messages.

  • Is this intermittent issue or is always seen ?

  • Also could you please give us an independent code snippet as below to recreate the issue.

  • Also please try to do a direct send message with SqsAsyncClient and check if we are getting same errror

  • Below is the code which I tried to recreate , but its working fine.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

/**
 * Tests SQS message sending with size monitoring.
 */
public class SqsSendMessage {

    private static final String QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/myTestQueue";
    private static final int MESSAGE_SIZE = 220_000; // When this crosses 256KB the error is fine
    private static final int MESSAGE_COUNT = 10;
    private static final int DELAY_MS = 30;

    @Test
    void testBatchSize() throws Exception {
        ExecutionInterceptor captureMessageSizeInterceptor = new CaptureMessageSizeInterceptor();

        SqsAsyncClient sqsAsyncClient = SqsAsyncClient.builder()
                                                      .overrideConfiguration(o -> o.addExecutionInterceptor(captureMessageSizeInterceptor))
                                                      .build();

        String messageBody = createLargeString('a', MESSAGE_SIZE);
        SqsAsyncBatchManager sqsAsyncBatchManager = sqsAsyncClient.batchManager();

        SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
                                                                  .queueUrl(QUEUE_URL)
                                                                  .messageBody(messageBody)
                                                                  .delaySeconds(2)
                                                                  .build();

        List<CompletableFuture<SendMessageResponse>> futures = sendMessages(
            sqsAsyncBatchManager, sendMessageRequest, MESSAGE_COUNT);

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();

        System.out.println("All messages sent successfully");
    }

    /**
     * Sends multiple messages with a delay between each.
     *
     * @param batchManager The batch manager to use
     * @param messageRequest The message request template
     * @param count Number of messages to send
     * @return List of futures for the send operations
     * @throws InterruptedException If thread is interrupted during sleep
     */
    private List<CompletableFuture<SendMessageResponse>> sendMessages(
        SqsAsyncBatchManager batchManager,
        SendMessageRequest messageRequest,
        int count) throws InterruptedException {

        List<CompletableFuture<SendMessageResponse>> futures = new ArrayList<>();

        for (int i = 0; i < count; i++) {
            CompletableFuture<SendMessageResponse> future = batchManager.sendMessage(messageRequest)
                                                                        .whenComplete((response, error) -> {
                                                                            if (error != null) {
                                                                                error.printStackTrace();
                                                                            } else {
                                                                                System.out.println("Message sent with ID: " + response.messageId());
                                                                            }
                                                                        });

            futures.add(future);

            if (i < count - 1) {
                Thread.sleep(DELAY_MS);
            }
        }

        return futures;
    }

    /**
     * Creates a string of specified length filled with the given character.
     *
     * @param ch Character to fill the string with
     * @param length Length of the string to create
     * @return The generated string
     */
    private String createLargeString(char ch, int length) {
        StringBuilder sb = new StringBuilder(length);
        for (int i = 0; i < length; i++) {
            sb.append(ch);
        }
        return sb.toString();
    }

    /**
     * Interceptor that captures and logs message sizes in batch requests.
     */
    static class CaptureMessageSizeInterceptor implements ExecutionInterceptor {
        @Override
        public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
            if (context.request() instanceof SendMessageBatchRequest) {
                SendMessageBatchRequest batchRequest = (SendMessageBatchRequest) context.request();

                System.out.println("Batch contains " + batchRequest.entries().size() + " messages");

                int totalMessageBodySize = 0;
                for (SendMessageBatchRequestEntry entry : batchRequest.entries()) {
                    int messageSize = entry.messageBody().length();
                    totalMessageBodySize += messageSize;
                    System.out.println("Message body size: " + messageSize + " bytes");
                }

                System.out.println("Total message bodies size: " + totalMessageBodySize + " bytes");
            }
        }
    }
}

@msaad10
Copy link
Author

msaad10 commented Mar 3, 2025

Thanks @joviegas for sharing this. Let me try at my end and get back to you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request A feature should be added or improved.
Projects
None yet
Development

No branches or pull requests

3 participants