Skip to content

Do No Continue Processing Event in Batch Mode for Kinesis/DDBStreams #1820

Open
@belugabehr

Description

@belugabehr

When processing a DDB Stream in batch mode, I want to stop processing when a failure is reached. Since this is a stream, and ordering of message is important for me, the processing should immediately stop.

That is to say, if my data is partitioned on Purchase ID, I want to ensure all events related to the same purchase are played in order. If a failure occurs, the processing of the stream should stop and retry later.

Expected Behavior

When an error occurs, the offending event should be checkpointed, and processing should stop.

https://docs.aws.amazon.com/lambda/latest/dg/services-ddb-batchfailurereporting.html

Current Behavior

For DDB Stream Batch processing, the stream will continue to be reprocessed, and the same messages will be repeated again, and again.

} catch (Throwable t) {
String sequenceNumber = record.getDynamodb().getSequenceNumber();
LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures",
sequenceNumber, t.getMessage());
LOGGER.error("Error was", t);
batchFailures.add(new StreamsEventResponse.BatchItemFailure(sequenceNumber));
// Report failure if we have a handler
if (this.failureHandler != null) {
// A failing failure handler is no reason to fail the batch
try {
this.failureHandler.accept(record, t);
} catch (Throwable t2) {
LOGGER.warn("failureHandler threw handling failure", t2);
}
}
}

Possible Solution

Return on any error. Take a look at the following example as reference:

https://docs.aws.amazon.com/lambda/latest/dg/services-ddb-batchfailurereporting.html

            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }

or a little bit nicer:

                return new StreamsEventResponse(Collections.singletonList(
                        new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)));

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    Status

    On hold

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions