-
Notifications
You must be signed in to change notification settings - Fork 889
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
For SqsAsyncBatchManager, add the support to override maxBatchBytesSize #5888
Comments
Thanks @msaad10 for issues and details. We did consider about this and already have Lines 177 to 200 in 9f735a1
The setting you mentioned in V1 it just stops batching once all message size is reached 256KB and it internally doesnot split the single message. I am curious that the single message that is sent by you is greater then 256KB. Can you please confirm on that. |
@joviegas The single message size is not greater than 256Kb in my case since we already have a check for that to not send the message that exceeds this limit, which means that the batch size that SqsAsyncBatchManager is creating internally is exceeding 256Kb that is why I am getting BatchRequestTooLongException. I have checked the per message size, and it doesn't exceed 256 Kb. |
Thanks @msaad10 for the response It would be really helpful if you could give me code snippet where the issue can be recreated , instead of mssage you can use Lines 177 to 200 in 9f735a1
and also |
Thanks @joviegas. Let me share it with you, meanwhile can you share the exact version in which this was fixed. We are using |
We are using the following to create SendMessageRequest:
We are not overriding any message attributes. |
From what we have figured out is that may be the issue is this We were not taking this constant into account when sending our message to SQS. So now, max message size we'll be sending to SQS will be One thing to note is that this should be mentioned in the docs that client should be passing the max message size as |
Thanks @msaad10 , we need to root cause this issue even before going to fix it. The code you are pointing to ensures we stop batching once it crosses the 256KB limit. We have given an offset of 16KB (16384) to make sure the Message Attributes get included in the size calculation. And this internal Message size check just makes sure if we need to append a new message in a batch or send that Message with Single Item.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
/**
* Tests SQS message sending with size monitoring.
*/
public class SqsSendMessage {
private static final String QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/myTestQueue";
private static final int MESSAGE_SIZE = 220_000; // When this crosses 256KB the error is fine
private static final int MESSAGE_COUNT = 10;
private static final int DELAY_MS = 30;
@Test
void testBatchSize() throws Exception {
ExecutionInterceptor captureMessageSizeInterceptor = new CaptureMessageSizeInterceptor();
SqsAsyncClient sqsAsyncClient = SqsAsyncClient.builder()
.overrideConfiguration(o -> o.addExecutionInterceptor(captureMessageSizeInterceptor))
.build();
String messageBody = createLargeString('a', MESSAGE_SIZE);
SqsAsyncBatchManager sqsAsyncBatchManager = sqsAsyncClient.batchManager();
SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl(QUEUE_URL)
.messageBody(messageBody)
.delaySeconds(2)
.build();
List<CompletableFuture<SendMessageResponse>> futures = sendMessages(
sqsAsyncBatchManager, sendMessageRequest, MESSAGE_COUNT);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
System.out.println("All messages sent successfully");
}
/**
* Sends multiple messages with a delay between each.
*
* @param batchManager The batch manager to use
* @param messageRequest The message request template
* @param count Number of messages to send
* @return List of futures for the send operations
* @throws InterruptedException If thread is interrupted during sleep
*/
private List<CompletableFuture<SendMessageResponse>> sendMessages(
SqsAsyncBatchManager batchManager,
SendMessageRequest messageRequest,
int count) throws InterruptedException {
List<CompletableFuture<SendMessageResponse>> futures = new ArrayList<>();
for (int i = 0; i < count; i++) {
CompletableFuture<SendMessageResponse> future = batchManager.sendMessage(messageRequest)
.whenComplete((response, error) -> {
if (error != null) {
error.printStackTrace();
} else {
System.out.println("Message sent with ID: " + response.messageId());
}
});
futures.add(future);
if (i < count - 1) {
Thread.sleep(DELAY_MS);
}
}
return futures;
}
/**
* Creates a string of specified length filled with the given character.
*
* @param ch Character to fill the string with
* @param length Length of the string to create
* @return The generated string
*/
private String createLargeString(char ch, int length) {
StringBuilder sb = new StringBuilder(length);
for (int i = 0; i < length; i++) {
sb.append(ch);
}
return sb.toString();
}
/**
* Interceptor that captures and logs message sizes in batch requests.
*/
static class CaptureMessageSizeInterceptor implements ExecutionInterceptor {
@Override
public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
if (context.request() instanceof SendMessageBatchRequest) {
SendMessageBatchRequest batchRequest = (SendMessageBatchRequest) context.request();
System.out.println("Batch contains " + batchRequest.entries().size() + " messages");
int totalMessageBodySize = 0;
for (SendMessageBatchRequestEntry entry : batchRequest.entries()) {
int messageSize = entry.messageBody().length();
totalMessageBodySize += messageSize;
System.out.println("Message body size: " + messageSize + " bytes");
}
System.out.println("Total message bodies size: " + totalMessageBodySize + " bytes");
}
}
}
} |
Thanks @joviegas for sharing this. Let me try at my end and get back to you! |
Describe the feature
While working with SqsAsyncBatchManager, got the error as:
software.amazon.awssdk.services.sqs.model.BatchRequestTooLongException:Batch requests cannot be longer than 262144 bytes. You have sent XXXXXX bytes. (Service: Sqs, Status Code: 400, Request ID: *)
Upon further analysis, it seems that there is no way we can retrieve the messages in a batch due to which exception occurred.
Also, for
sdk v2
override configuration does not provide the option to override maxBatchSizeBytes, though it was present inv1
.The request is to provide the option to override
maxBatchSizeBytes
in v2 also.Though I found the code which is defaulting the maxBatchSizeBytes to 256KB, and if thats the case than why the SqsAsyncBatchManager is sending the batch which sums to greater than 256KB.
Use Case
Need this feature so we dont get BatchRequestTooLongException when working with SqsAsyncBatchManager.
Proposed Solution
No response
Other Information
No response
Acknowledgements
AWS Java SDK version used
V2
JDK version used
21
Operating System and version
Ubuntu 2022
The text was updated successfully, but these errors were encountered: