Skip to content

Do No Continue Processing Event in Batch Mode for Kinesis/DDBStreams #1820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
belugabehr opened this issue Apr 16, 2025 · 6 comments
Open
Labels
batch feature-parity Feature parity with python version feature-request New feature or request

Comments

@belugabehr
Copy link

belugabehr commented Apr 16, 2025

When processing a DDB Stream in batch mode, I want to stop processing when a failure is reached. Since this is a stream, and ordering of message is important for me, the processing should immediately stop.

That is to say, if my data is partitioned on Purchase ID, I want to ensure all events related to the same purchase are played in order. If a failure occurs, the processing of the stream should stop and retry later.

Expected Behavior

When an error occurs, the offending event should be checkpointed, and processing should stop.

https://docs.aws.amazon.com/lambda/latest/dg/services-ddb-batchfailurereporting.html

Current Behavior

For DDB Stream Batch processing, the stream will continue to be reprocessed, and the same messages will be repeated again, and again.

} catch (Throwable t) {
String sequenceNumber = record.getDynamodb().getSequenceNumber();
LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures",
sequenceNumber, t.getMessage());
LOGGER.error("Error was", t);
batchFailures.add(new StreamsEventResponse.BatchItemFailure(sequenceNumber));
// Report failure if we have a handler
if (this.failureHandler != null) {
// A failing failure handler is no reason to fail the batch
try {
this.failureHandler.accept(record, t);
} catch (Throwable t2) {
LOGGER.warn("failureHandler threw handling failure", t2);
}
}
}

Possible Solution

Return on any error. Take a look at the following example as reference:

https://docs.aws.amazon.com/lambda/latest/dg/services-ddb-batchfailurereporting.html

            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }

or a little bit nicer:

                return new StreamsEventResponse(Collections.singletonList(
                        new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)));
@phipag
Copy link
Contributor

phipag commented Apr 17, 2025

Hey @belugabehr,

thank you for opening this issue and even taking the time to find the code inside library.

I would like to understand your use-case better. Can you elaborate on this? I don't fully understand what you mean by

and the same messages will be repeated again, and again.

If we stopped processing the batch on the first failure the same (failing) message would still be re-processed again. Am I missing something?


Regarding the behavior itself: This is the expected implementation. If a batch fails partially we still finish processing the batch and then report the failed events back to the DDB Streams service so that a checkpoint will be left at the index of the failed item in the batch. We have a nice diagram for this in the Powertools for AWS Lambda (Python) documentation: https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#kinesis-and-dynamodb-streams.

To confirm my understanding of your request. Are you looking for SqsFifoPartialProcessor that we have in Powertools for AWS Lambda (Python)? Can you checkout this diagram and let me know if this what you are looking for? https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#sqs-fifo

@phipag phipag added batch feature-request New feature or request feature-parity Feature parity with python version and removed triage bug Something isn't working labels Apr 17, 2025
@belugabehr
Copy link
Author

Hey,

Thanks for the feedback.

Yes. we have DDB Stream hooked up to an SNS FIFO queue.

The issue we have is if we have three events: (C)reate, (U)pdate, and (D)elete, then we need to process those in order.

For example, if we receive the C, and then we fail to handle the U, we do not want to continue in the stream and process the D. We just need the checkpoint to move up to the U event and wait until the issue clears.

So maybe just need a flag on the existing batch processor to exit early instead of continuing to process messages (and reset the checkpoint back to the latest queue offset).

@phipag
Copy link
Contributor

phipag commented Apr 17, 2025

Hey @belugabehr,

thanks for explaining your use-case. I will get back to to you with some new information next week and will do some tests in the meantime.

@leandrodamascena
Copy link

Thanks for opening this issue @belugabehr! This also can happen with Kinesis stream and if using bisect configuration can have some other side effects.

I have a few additional thoughts here before we make a decision and I hope to share them by Monday.

Thanks

@belugabehr
Copy link
Author

belugabehr commented Apr 18, 2025 via email

@belugabehr
Copy link
Author

belugabehr commented Apr 18, 2025 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
batch feature-parity Feature parity with python version feature-request New feature or request
Projects
Status: Triage
Development

No branches or pull requests

3 participants