From 7dc7c342d942766d26a485902cb125d0e1f0eda0 Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Fri, 5 Apr 2024 14:53:33 +0200 Subject: [PATCH 01/10] implement parallel processing --- .../batch/handler/BatchMessageHandler.java | 17 ++- .../handler/DynamoDbBatchMessageHandler.java | 78 ++++++++----- .../KinesisStreamsBatchMessageHandler.java | 91 +++++++++------ .../batch/handler/SqsBatchMessageHandler.java | 105 ++++++++++++------ .../batch/internal/MultiThreadMDC.java | 47 ++++++++ .../batch/SQSBatchProcessorTest.java | 89 +++++++++++++-- 6 files changed, 323 insertions(+), 104 deletions(-) create mode 100644 powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java index 730211feb..18d74bb25 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java @@ -33,6 +33,21 @@ public interface BatchMessageHandler { * @param context The lambda context * @return A partial batch response */ - public abstract R processBatch(E event, Context context); + R processBatch(E event, Context context); + /** + * Processes the given batch in parallel returning a partial batch + * response indicating the success and failure of individual + * messages within the batch.
+ * Note that parallel processing is not always better than sequential processing, + * and you should benchmark your code to determine the best approach for your use case.
+ * Also note that to get more threads available (more vCPUs), + * you need to increase the amount of memory allocated to your Lambda function.
+ + * + * @param event The Lambda event containing the batch to process + * @param context The lambda context + * @return A partial batch response + */ + R processBatchInParallel(E event, Context context); } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java index 83a8bf7dd..81161eb85 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java @@ -19,10 +19,13 @@ import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; /** * A batch message processor for DynamoDB Streams batches. @@ -46,35 +49,60 @@ public DynamoDbBatchMessageHandler(Consumer @Override public StreamsEventResponse processBatch(DynamodbEvent event, Context context) { - List batchFailures = new ArrayList<>(); + StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build(); - for (DynamodbEvent.DynamodbStreamRecord record : event.getRecords()) { - try { + for (DynamodbEvent.DynamodbStreamRecord streamRecord : event.getRecords()) { + processBatchItem(streamRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure)); + } - rawMessageHandler.accept(record, context); - // Report success if we have a handler - if (this.successHandler != null) { - this.successHandler.accept(record); - } - } catch (Throwable t) { - String sequenceNumber = record.getDynamodb().getSequenceNumber(); - LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures", - sequenceNumber, t.getMessage()); - LOGGER.error("Error was", t); - batchFailures.add(new StreamsEventResponse.BatchItemFailure(sequenceNumber)); - - // Report failure if we have a handler - if (this.failureHandler != null) { - // A failing failure handler is no reason to fail the batch - try { - this.failureHandler.accept(record, t); - } catch (Throwable t2) { - LOGGER.warn("failureHandler threw handling failure", t2); - } + return response; + } + + + @Override + public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context) { + MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + + List batchItemFailures = event.getRecords() + .parallelStream() // Parallel processing + .map(eventRecord -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + return processBatchItem(eventRecord, context); + }) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } + + private Optional processBatchItem(DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) { + try { + LOGGER.debug("Processing item {}", streamRecord.getEventID()); + + rawMessageHandler.accept(streamRecord, context); + + // Report success if we have a handler + if (this.successHandler != null) { + this.successHandler.accept(streamRecord); + } + return Optional.empty(); + } catch (Throwable t) { + String sequenceNumber = streamRecord.getDynamodb().getSequenceNumber(); + LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures", + sequenceNumber, t.getMessage()); + LOGGER.error("Error was", t); + + // Report failure if we have a handler + if (this.failureHandler != null) { + // A failing failure handler is no reason to fail the batch + try { + this.failureHandler.accept(streamRecord, t); + } catch (Throwable t2) { + LOGGER.warn("failureHandler threw handling failure", t2); } } + return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(sequenceNumber).build()); } - - return new StreamsEventResponse(batchFailures); } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java index ad1dd302d..e6216794f 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java @@ -20,10 +20,13 @@ import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; import software.amazon.lambda.powertools.utilities.EventDeserializer; /** @@ -57,42 +60,66 @@ public KinesisStreamsBatchMessageHandler(BiConsumer batchFailures = new ArrayList<>(); - - for (KinesisEvent.KinesisEventRecord record : event.getRecords()) { - try { - if (this.rawMessageHandler != null) { - rawMessageHandler.accept(record, context); - } else { - M messageDeserialized = EventDeserializer.extractDataFrom(record).as(messageClass); - messageHandler.accept(messageDeserialized, context); - } + StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build(); - // Report success if we have a handler - if (this.successHandler != null) { - this.successHandler.accept(record); - } - } catch (Throwable t) { - String sequenceNumber = record.getEventID(); - LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures", - sequenceNumber, t.getMessage()); - LOGGER.error("Error was", t); - - batchFailures.add(new StreamsEventResponse.BatchItemFailure(record.getKinesis().getSequenceNumber())); - - // Report failure if we have a handler - if (this.failureHandler != null) { - // A failing failure handler is no reason to fail the batch - try { - this.failureHandler.accept(record, t); - } catch (Throwable t2) { - LOGGER.warn("failureHandler threw handling failure", t2); - } + for (KinesisEvent.KinesisEventRecord eventRecord : event.getRecords()) { + processBatchItem(eventRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure)); + } + + return response; + } + + @Override + public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context context) { + MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + + List batchItemFailures = event.getRecords() + .parallelStream() // Parallel processing + .map(eventRecord -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + return processBatchItem(eventRecord, context); + }) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } + + private Optional processBatchItem(KinesisEvent.KinesisEventRecord eventRecord, Context context) { + try { + LOGGER.debug("Processing item {}", eventRecord.getEventID()); + + if (this.rawMessageHandler != null) { + rawMessageHandler.accept(eventRecord, context); + } else { + M messageDeserialized = EventDeserializer.extractDataFrom(eventRecord).as(messageClass); + messageHandler.accept(messageDeserialized, context); + } + + // Report success if we have a handler + if (this.successHandler != null) { + this.successHandler.accept(eventRecord); + } + return Optional.empty(); + } catch (Throwable t) { + String sequenceNumber = eventRecord.getEventID(); + LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures", + sequenceNumber, t.getMessage()); + LOGGER.error("Error was", t); + + // Report failure if we have a handler + if (this.failureHandler != null) { + // A failing failure handler is no reason to fail the batch + try { + this.failureHandler.accept(eventRecord, t); + } catch (Throwable t2) { + LOGGER.warn("failureHandler threw handling failure", t2); } } - } - return new StreamsEventResponse(batchFailures); + return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(eventRecord.getKinesis().getSequenceNumber()).build()); + } } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java index b634f9b62..dd6874ff9 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java @@ -18,10 +18,15 @@ import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; import software.amazon.lambda.powertools.utilities.EventDeserializer; /** @@ -61,57 +66,27 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { // If we are working on a FIFO queue, when any message fails we should stop processing and return the // rest of the batch as failed too. We use this variable to track when that has happened. // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - boolean failWholeBatch = false; + AtomicBoolean failWholeBatch = new AtomicBoolean(false); int messageCursor = 0; - for (; messageCursor < event.getRecords().size() && !failWholeBatch; messageCursor++) { + for (; messageCursor < event.getRecords().size() && !failWholeBatch.get(); messageCursor++) { SQSEvent.SQSMessage message = event.getRecords().get(messageCursor); String messageGroupId = message.getAttributes() != null ? message.getAttributes().get(MESSAGE_GROUP_ID_KEY) : null; - try { - if (this.rawMessageHandler != null) { - rawMessageHandler.accept(message, context); - } else { - M messageDeserialized = EventDeserializer.extractDataFrom(message).as(messageClass); - messageHandler.accept(messageDeserialized, context); - } - - // Report success if we have a handler - if (this.successHandler != null) { - this.successHandler.accept(message); - } - - } catch (Throwable t) { - LOGGER.error("Error while processing message with messageId {}: {}, adding it to batch item failures", - message.getMessageId(), t.getMessage()); - LOGGER.error("Error was", t); - - response.getBatchItemFailures() - .add(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId()) - .build()); + processBatchItem(message, context).ifPresent(batchItemFailure -> { + response.getBatchItemFailures().add(batchItemFailure); if (messageGroupId != null) { - failWholeBatch = true; + failWholeBatch.set(true); LOGGER.info( "A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" , messageGroupId, message.getMessageId()); } - - // Report failure if we have a handler - if (this.failureHandler != null) { - // A failing failure handler is no reason to fail the batch - try { - this.failureHandler.accept(message, t); - } catch (Throwable t2) { - LOGGER.warn("failureHandler threw handling failure", t2); - } - } - - } + }); } - if (failWholeBatch) { + if (failWholeBatch.get()) { // Add the remaining messages to the batch item failures event.getRecords() .subList(messageCursor, event.getRecords().size()) @@ -121,4 +96,60 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { } return response; } + + @Override + public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) { + if (!event.getRecords().isEmpty() && event.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null) { + LOGGER.warn("FIFO queues are not supported in parallel mode, proceeding in sequence"); + return processBatch(event, context); + } + + MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + List batchItemFailures = event.getRecords() + .parallelStream() // Parallel processing + .map(sqsMessage -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + return processBatchItem(sqsMessage, context); + }) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + return SQSBatchResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } + + private Optional processBatchItem(SQSEvent.SQSMessage message, Context context) { + try { + LOGGER.debug("Processing message {}", message.getMessageId()); + + if (this.rawMessageHandler != null) { + rawMessageHandler.accept(message, context); + } else { + M messageDeserialized = EventDeserializer.extractDataFrom(message).as(messageClass); + messageHandler.accept(messageDeserialized, context); + } + + // Report success if we have a handler + if (this.successHandler != null) { + this.successHandler.accept(message); + } + return Optional.empty(); + } catch (Throwable t) { + LOGGER.error("Error while processing message with messageId {}: {}, adding it to batch item failures", + message.getMessageId(), t.getMessage()); + LOGGER.error("Error was", t); + + // Report failure if we have a handler + if (this.failureHandler != null) { + // A failing failure handler is no reason to fail the batch + try { + this.failureHandler.accept(message, t); + } catch (Throwable t2) { + LOGGER.warn("failureHandler threw handling failure", t2); + } + } + return Optional.of(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId()) + .build()); + } + } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java new file mode 100644 index 000000000..df1c2e7a0 --- /dev/null +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.batch.internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * MDC (SLF4J) is not passed to other threads (ThreadLocal). + * This class permits to manually copy the MDC to a given thread. + */ +public class MultiThreadMDC { + + private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadMDC.class); + + private final List mdcAwareThreads = new ArrayList<>(); + private final Map contextMap; + + public MultiThreadMDC() { + mdcAwareThreads.add("main"); + contextMap = MDC.getCopyOfContextMap(); + } + + public void copyMDCToThread(String thread) { + if (!mdcAwareThreads.contains(thread)) { + LOGGER.debug("Copy MDC to thread {}", thread); + MDC.setContextMap(contextMap); + mdcAwareThreads.add(thread); + } + } +} diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java index 2f9429fa3..7dd51374e 100644 --- a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java +++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java @@ -20,20 +20,43 @@ import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.mockito.Mock; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; import software.amazon.lambda.powertools.batch.model.Product; -public class SQSBatchProcessorTest { +class SQSBatchProcessorTest { @Mock private Context context; + private final List threadList = Collections.synchronizedList(new ArrayList<>()); + + @AfterEach + public void clear() { + threadList.clear(); + } + // A handler that works private void processMessageSucceeds(SQSEvent.SQSMessage sqsMessage) { } + private void processMessageInParallelSucceeds(SQSEvent.SQSMessage sqsMessage) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + // A handler that throws an exception for _one_ of the sample messages private void processMessageFailsForFixedMessage(SQSEvent.SQSMessage message, Context context) { if (message.getMessageId().equals("e9144555-9a4f-4ec3-99a0-34ce359b4b54")) { @@ -41,8 +64,23 @@ private void processMessageFailsForFixedMessage(SQSEvent.SQSMessage message, Con } } + private void processMessageInParallelFailsForFixedMessage(SQSEvent.SQSMessage message, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (message.getMessageId().equals("e9144555-9a4f-4ec3-99a0-34ce359b4b54")) { + throw new RuntimeException("fake exception"); + } + } + // A handler that throws an exception for _one_ of the deserialized products in the same messages - public void processMessageFailsForFixedProduct(Product product, Context context) { + private void processMessageFailsForFixedProduct(Product product, Context context) { if (product.getId() == 12345) { throw new RuntimeException("fake exception"); } @@ -50,7 +88,7 @@ public void processMessageFailsForFixedProduct(Product product, Context context) @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void batchProcessingSucceedsAndReturns(SQSEvent event) { + void batchProcessingSucceedsAndReturns(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -60,13 +98,28 @@ public void batchProcessingSucceedsAndReturns(SQSEvent event) { SQSBatchResponse sqsBatchResponse = handler.processBatch(event, context); // Assert - assertThat(sqsBatchResponse.getBatchItemFailures()).hasSize(0); + assertThat(sqsBatchResponse.getBatchItemFailures()).isEmpty(); } + @ParameterizedTest + @Event(value = "sqs_event_big.json", type = SQSEvent.class) + void parallelBatchProcessingSucceedsAndReturns(SQSEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelSucceeds); + + // Act + SQSBatchResponse sqsBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(sqsBatchResponse.getBatchItemFailures()).isEmpty(); + assertThat(threadList).hasSizeGreaterThan(1); + } @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent event) { + void shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -81,9 +134,27 @@ public void shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent ev assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54"); } + @ParameterizedTest + @Event(value = "sqs_event_big.json", type = SQSEvent.class) + void parallelBatchProcessing_shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelFailsForFixedMessage); + + // Act + SQSBatchResponse sqsBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(sqsBatchResponse.getBatchItemFailures()).hasSize(1); + SQSBatchResponse.BatchItemFailure batchItemFailure = sqsBatchResponse.getBatchItemFailures().get(0); + assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54"); + assertThat(threadList).hasSizeGreaterThan(1); + } + @ParameterizedTest @Event(value = "sqs_fifo_event.json", type = SQSEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withSQSFIFO(SQSEvent event) { + void shouldAddMessageToBatchFailure_whenException_withSQSFIFO(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -103,7 +174,7 @@ public void shouldAddMessageToBatchFailure_whenException_withSQSFIFO(SQSEvent ev @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withProduct(SQSEvent event) { + void shouldAddMessageToBatchFailure_whenException_withProduct(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -121,7 +192,7 @@ public void shouldAddMessageToBatchFailure_whenException_withProduct(SQSEvent ev @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void failingFailureHandlerShouldntFailBatch(SQSEvent event) { + void failingFailureHandlerShouldntFailBatch(SQSEvent event) { // Arrange AtomicBoolean wasCalled = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -144,7 +215,7 @@ public void failingFailureHandlerShouldntFailBatch(SQSEvent event) { @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(SQSEvent event) { + void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(SQSEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() From d9e809c2ae2b73b71cb89b676202ce0ae113be79 Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Fri, 5 Apr 2024 14:55:24 +0200 Subject: [PATCH 02/10] test parallel processing --- powertools-batch/pom.xml | 16 + .../batch/DdbBatchProcessorTest.java | 88 +++- .../batch/KinesisBatchProcessorTest.java | 92 +++- .../src/test/resources/dynamo_event_big.json | 376 +++++++++++++++ .../src/test/resources/kinesis_event_big.json | 224 +++++++++ .../src/test/resources/sqs_event_big.json | 429 ++++++++++++++++++ 6 files changed, 1208 insertions(+), 17 deletions(-) create mode 100644 powertools-batch/src/test/resources/dynamo_event_big.json create mode 100644 powertools-batch/src/test/resources/kinesis_event_big.json create mode 100644 powertools-batch/src/test/resources/sqs_event_big.json diff --git a/powertools-batch/pom.xml b/powertools-batch/pom.xml index 1886f56e6..66a5e3087 100644 --- a/powertools-batch/pom.xml +++ b/powertools-batch/pom.xml @@ -21,6 +21,16 @@ true + + org.apache.maven.plugins + maven-surefire-plugin + + + + -Djava.util.concurrent.ForkJoinPool.common.parallelism=4 + + + @@ -47,6 +57,12 @@ junit-jupiter-api test + + org.slf4j + slf4j-simple + 2.0.7 + test + org.assertj assertj-core diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java index 9e2c211e2..6bb247323 100644 --- a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java +++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java @@ -20,16 +20,27 @@ import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.mockito.Mock; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; -public class DdbBatchProcessorTest { +class DdbBatchProcessorTest { @Mock private Context context; + private final List threadList = Collections.synchronizedList(new ArrayList<>()); + + @AfterEach + public void clear() { + threadList.clear(); + } + private void processMessageSucceeds(DynamodbEvent.DynamodbStreamRecord record, Context context) { // Great success } @@ -40,9 +51,36 @@ private void processMessageFailsForFixedMessage(DynamodbEvent.DynamodbStreamReco } } + private void processMessageInParallelSucceeds(DynamodbEvent.DynamodbStreamRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void processMessageInParallelFailsForFixedMessage(DynamodbEvent.DynamodbStreamRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (record.getDynamodb().getSequenceNumber().equals("4421584500000000017450439091")) { + throw new RuntimeException("fake exception"); + } + } + @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void batchProcessingSucceedsAndReturns(DynamodbEvent event) { + void batchProcessingSucceedsAndReturns(DynamodbEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withDynamoDbBatchHandler() @@ -52,12 +90,28 @@ public void batchProcessingSucceedsAndReturns(DynamodbEvent event) { StreamsEventResponse dynamodbBatchResponse = handler.processBatch(event, context); // Assert - assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(0); + assertThat(dynamodbBatchResponse.getBatchItemFailures()).isEmpty(); + } + + @ParameterizedTest + @Event(value = "dynamo_event_big.json", type = DynamodbEvent.class) + void parallelBatchProcessingSucceedsAndReturns(DynamodbEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelSucceeds); + + // Act + StreamsEventResponse dynamodbBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(dynamodbBatchResponse.getBatchItemFailures()).isEmpty(); + assertThat(threadList).hasSizeGreaterThan(1); } @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEvent event) { + void shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withDynamoDbBatchHandler() @@ -72,9 +126,27 @@ public void shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEve assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); } + @ParameterizedTest + @Event(value = "dynamo_event_big.json", type = DynamodbEvent.class) + void parallelBatchProcessing_shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelFailsForFixedMessage); + + // Act + StreamsEventResponse dynamodbBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(1); + StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0); + assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); + assertThat(threadList).hasSizeGreaterThan(1); + } + @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { + void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -92,7 +164,7 @@ public void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { // Assert assertThat(dynamodbBatchResponse).isNotNull(); - assertThat(dynamodbBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalledAndFailed.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); @@ -100,7 +172,7 @@ public void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(DynamodbEvent event) { + void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(DynamodbEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -118,7 +190,7 @@ public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(DynamodbE // Assert assertThat(dynamodbBatchResponse).isNotNull(); - assertThat(dynamodbBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalledAndFailed.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java index d78638e1d..059a4d2d0 100644 --- a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java +++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java @@ -20,17 +20,28 @@ import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.mockito.Mock; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; import software.amazon.lambda.powertools.batch.model.Product; -public class KinesisBatchProcessorTest { +class KinesisBatchProcessorTest { @Mock private Context context; + private final List threadList = Collections.synchronizedList(new ArrayList<>()); + + @AfterEach + public void clear() { + threadList.clear(); + } + private void processMessageSucceeds(KinesisEvent.KinesisEventRecord record, Context context) { // Great success } @@ -42,6 +53,34 @@ private void processMessageFailsForFixedMessage(KinesisEvent.KinesisEventRecord } } + private void processMessageInParallelSucceeds(KinesisEvent.KinesisEventRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void processMessageInParallelFailsForFixedMessage(KinesisEvent.KinesisEventRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (record.getKinesis().getSequenceNumber() + .equals("49545115243490985018280067714973144582180062593244200961")) { + throw new RuntimeException("fake exception"); + } + } + // A handler that throws an exception for _one_ of the deserialized products in the same messages public void processMessageFailsForFixedProduct(Product product, Context context) { if (product.getId() == 1234) { @@ -51,7 +90,7 @@ public void processMessageFailsForFixedProduct(Product product, Context context) @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void batchProcessingSucceedsAndReturns(KinesisEvent event) { + void batchProcessingSucceedsAndReturns(KinesisEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withKinesisBatchHandler() @@ -61,12 +100,28 @@ public void batchProcessingSucceedsAndReturns(KinesisEvent event) { StreamsEventResponse kinesisBatchResponse = handler.processBatch(event, context); // Assert - assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(0); + assertThat(kinesisBatchResponse.getBatchItemFailures()).isEmpty(); + } + + @ParameterizedTest + @Event(value = "kinesis_event_big.json", type = KinesisEvent.class) + void batchProcessingInParallelSucceedsAndReturns(KinesisEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withKinesisBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelSucceeds); + + // Act + StreamsEventResponse kinesisBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(kinesisBatchResponse.getBatchItemFailures()).isEmpty(); + assertThat(threadList).hasSizeGreaterThan(1); } @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEvent event) { + void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withKinesisBatchHandler() @@ -82,9 +137,28 @@ public void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEven "49545115243490985018280067714973144582180062593244200961"); } + @ParameterizedTest + @Event(value = "kinesis_event_big.json", type = KinesisEvent.class) + void batchProcessingInParallel_shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withKinesisBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelFailsForFixedMessage); + + // Act + StreamsEventResponse kinesisBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1); + StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0); + assertThat(batchItemFailure.getItemIdentifier()).isEqualTo( + "49545115243490985018280067714973144582180062593244200961"); + assertThat(threadList).hasSizeGreaterThan(1); + } + @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEvent event) { + void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withKinesisBatchHandler() @@ -102,7 +176,7 @@ public void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEven @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { + void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { // Arrange AtomicBoolean wasCalled = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -118,7 +192,7 @@ public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { // Assert assertThat(kinesisBatchResponse).isNotNull(); - assertThat(kinesisBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalled.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo( @@ -127,7 +201,7 @@ public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEvent event) { + void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -146,7 +220,7 @@ public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEv // Assert assertThat(kinesisBatchResponse).isNotNull(); - assertThat(kinesisBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalledAndFailed.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo( diff --git a/powertools-batch/src/test/resources/dynamo_event_big.json b/powertools-batch/src/test/resources/dynamo_event_big.json new file mode 100644 index 000000000..fa0a75c24 --- /dev/null +++ b/powertools-batch/src/test/resources/dynamo_event_big.json @@ -0,0 +1,376 @@ +{ + "Records": [ + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439001", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439092", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439031", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439092", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439001", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439092", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439031", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439091", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + } + ] +} \ No newline at end of file diff --git a/powertools-batch/src/test/resources/kinesis_event_big.json b/powertools-batch/src/test/resources/kinesis_event_big.json new file mode 100644 index 000000000..57f702d27 --- /dev/null +++ b/powertools-batch/src/test/resources/kinesis_event_big.json @@ -0,0 +1,224 @@ +{ + "Records": [ + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200962", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200962", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200963", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200963", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200964", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200964", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200965", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200965", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + },{ + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200966", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200966", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200961", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200967", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200967", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200968", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200968", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200969", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200969", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200971", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200971", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200981", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200981", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200992", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200992", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244210961", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244210961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + } + ] +} \ No newline at end of file diff --git a/powertools-batch/src/test/resources/sqs_event_big.json b/powertools-batch/src/test/resources/sqs_event_big.json new file mode 100644 index 000000000..f5c83f442 --- /dev/null +++ b/powertools-batch/src/test/resources/sqs_event_big.json @@ -0,0 +1,429 @@ +{ + "Records": [ + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1234,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "f9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1235,\n \"name\": \"product\",\n \"price\": 43\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "g9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1236,\n \"name\": \"product\",\n \"price\": 44\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "c9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1237,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "b4144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1238,\n \"name\": \"product\",\n \"price\": 486\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a2144552-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1239,\n \"name\": \"product\",\n \"price\": 430\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "32144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1240,\n \"name\": \"product\",\n \"price\": 445\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1241,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b354", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1242,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "e9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1243,\n \"name\": \"product\",\n \"price\": 43\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "19144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1244,\n \"name\": \"product\",\n \"price\": 44\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "c9144555-9a4f-4ec3-99a0-34ce3a9b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1245,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "b4144555-9a4f-4ec3-99a5-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1247,\n \"name\": \"product\",\n \"price\": 486\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a2144555-9a4f-4ec3-97a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1248,\n \"name\": \"product\",\n \"price\": 430\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "3k144555-9a4f-4ec2-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1249,\n \"name\": \"product\",\n \"price\": 445\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "h9144555-9aaf-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1250,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1234,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "f9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1235,\n \"name\": \"product\",\n \"price\": 43\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "g9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1236,\n \"name\": \"product\",\n \"price\": 44\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "c9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1237,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "b4144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1238,\n \"name\": \"product\",\n \"price\": 486\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a2144552-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1239,\n \"name\": \"product\",\n \"price\": 430\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "32144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1240,\n \"name\": \"product\",\n \"price\": 445\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1241,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b354", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1242,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + } + ] +} \ No newline at end of file From 789b91bb8586616f19174e241b8a8cd607bc6d86 Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Fri, 5 Apr 2024 14:55:45 +0200 Subject: [PATCH 03/10] document parallel processing --- docs/utilities/batch.md | 89 +++++++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 29 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 7b571bc6e..d6e5e3654 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -30,6 +30,7 @@ stateDiagram-v2 * Reports batch item failures to reduce number of retries for a record upon errors * Simple interface to process each batch record +* Parallel processing of batches * Integrates with Java Events library and the deserialization module * Build your own batch processor by extending primitives @@ -110,16 +111,9 @@ You can use your preferred deployment framework to set the correct configuration while the `powertools-batch` module handles generating the response, which simply needs to be returned as the result of your Lambda handler. -A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found -[here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering -all of the batch sources. - -For more information on configuring `ReportBatchItemFailures`, -see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), -[Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting),and -[DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). - +A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found [here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering all the batch sources. +For more information on configuring `ReportBatchItemFailures`, see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting), and [DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). !!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires." @@ -150,12 +144,10 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { return handler.processBatch(sqsEvent, context); } - - + private void processMessage(Product p, Context c) { // Process the product } - } ``` @@ -276,7 +268,6 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. private void processMessage(Product p, Context c) { // process the product } - } ``` @@ -475,6 +466,46 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. } ``` +## Parallel processing +You can choose to process batch items in parallel using the `BatchMessageHandler#processBatchInParallel()` +instead of `BatchMessageHandler#processBatch()`. Partial batch failure works the same way but items are processed +in parallel rather than sequentially. + +This feature is available for SQS, Kinesis and DynamoDB Streams but cannot be +used with SQS FIFO. In that case, items will be processed sequentially, even with the `processBatchInParallel` method. + +!!! warning + Note that parallel processing is not always better than sequential processing, + and you should benchmark your code to determine the best approach for your use case. + +!!! info + To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function. + While the exact vCPU allocation isn't published, from observing common patterns customers see an allocation of one vCPU per 1024 MB of memory. + +=== "Example with SQS" + + ```java hl_lines="13" + public class SqsBatchHandler implements RequestHandler { + + private final BatchMessageHandler handler; + + public SqsBatchHandler() { + handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithMessageHandler(this::processMessage, Product.class); + } + + @Override + public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + return handler.processBatchInParallel(sqsEvent, context); + } + + private void processMessage(Product p, Context c) { + // Process the product + } + } + ``` + ## Handling Messages @@ -490,7 +521,7 @@ In general, the deserialized message handler should be used unless you need acce === "Raw Message Handler" - ```java + ```java hl_lines="4 7" public void setup() { BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -505,7 +536,7 @@ In general, the deserialized message handler should be used unless you need acce === "Deserialized Message Handler" - ```java + ```java hl_lines="4 7" public void setup() { BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -529,20 +560,20 @@ provide a custom failure handler. Handlers can be provided when building the batch processor and are available for all event sources. For instance for DynamoDB: -```java - BatchMessageHandler handler = new BatchMessageHandlerBuilder() - .withDynamoDbBatchHandler() - .withSuccessHandler((m) -> { - // Success handler receives the raw message - LOGGER.info("Message with sequenceNumber {} was successfully processed", - m.getDynamodb().getSequenceNumber()); - }) - .withFailureHandler((m, e) -> { - // Failure handler receives the raw message and the exception thrown. - LOGGER.info("Message with sequenceNumber {} failed to be processed: {}" - , e.getDynamodb().getSequenceNumber(), e); - }) - .buildWithMessageHander(this::processMessage); +```java hl_lines="3 8" +BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .withSuccessHandler((m) -> { + // Success handler receives the raw message + LOGGER.info("Message with sequenceNumber {} was successfully processed", + m.getDynamodb().getSequenceNumber()); + }) + .withFailureHandler((m, e) -> { + // Failure handler receives the raw message and the exception thrown. + LOGGER.info("Message with sequenceNumber {} failed to be processed: {}" + , e.getDynamodb().getSequenceNumber(), e); + }) + .buildWithMessageHander(this::processMessage); ``` !!! info From 1bbad5b0cc373a86dead1a58895f7d6e13aea133 Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Tue, 9 Apr 2024 13:58:31 +0200 Subject: [PATCH 04/10] code review --- .../handler/DynamoDbBatchMessageHandler.java | 15 +++++++-------- .../KinesisStreamsBatchMessageHandler.java | 14 +++++++------- .../batch/handler/SqsBatchMessageHandler.java | 12 +++++------- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java index 81161eb85..4b03d0947 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java @@ -17,7 +17,6 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.function.BiConsumer; @@ -49,16 +48,16 @@ public DynamoDbBatchMessageHandler(Consumer @Override public StreamsEventResponse processBatch(DynamodbEvent event, Context context) { - StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build(); - - for (DynamodbEvent.DynamodbStreamRecord streamRecord : event.getRecords()) { - processBatchItem(streamRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure)); - } + List batchItemFailures = event.getRecords() + .stream() + .map(eventRecord -> processBatchItem(eventRecord, context)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); - return response; + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); } - @Override public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context) { MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java index e6216794f..7b4179de7 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java @@ -18,7 +18,6 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.function.BiConsumer; @@ -60,13 +59,14 @@ public KinesisStreamsBatchMessageHandler(BiConsumer()).build(); - - for (KinesisEvent.KinesisEventRecord eventRecord : event.getRecords()) { - processBatchItem(eventRecord, context).ifPresent(batchItemFailure -> response.getBatchItemFailures().add(batchItemFailure)); - } + List batchItemFailures = event.getRecords() + .stream() + .map(eventRecord -> processBatchItem(eventRecord, context)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); - return response; + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); } @Override diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java index dd6874ff9..4060fe97e 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -66,10 +65,10 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { // If we are working on a FIFO queue, when any message fails we should stop processing and return the // rest of the batch as failed too. We use this variable to track when that has happened. // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - AtomicBoolean failWholeBatch = new AtomicBoolean(false); + final Boolean[] failWholeBatch = {false}; int messageCursor = 0; - for (; messageCursor < event.getRecords().size() && !failWholeBatch.get(); messageCursor++) { + for (; messageCursor < event.getRecords().size() && !failWholeBatch[0]; messageCursor++) { SQSEvent.SQSMessage message = event.getRecords().get(messageCursor); String messageGroupId = message.getAttributes() != null ? @@ -78,7 +77,7 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { processBatchItem(message, context).ifPresent(batchItemFailure -> { response.getBatchItemFailures().add(batchItemFailure); if (messageGroupId != null) { - failWholeBatch.set(true); + failWholeBatch[0] = true; LOGGER.info( "A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" , messageGroupId, message.getMessageId()); @@ -86,7 +85,7 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { }); } - if (failWholeBatch.get()) { + if (failWholeBatch[0]) { // Add the remaining messages to the batch item failures event.getRecords() .subList(messageCursor, event.getRecords().size()) @@ -100,8 +99,7 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { @Override public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) { if (!event.getRecords().isEmpty() && event.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null) { - LOGGER.warn("FIFO queues are not supported in parallel mode, proceeding in sequence"); - return processBatch(event, context); + throw new UnsupportedOperationException("FIFO queues are not supported in parallel mode, use the processBatch method instead"); } MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); From c5be5682f0c94ed5e5051ee1e7ee629c90404963 Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Fri, 12 Apr 2024 16:55:40 +0200 Subject: [PATCH 05/10] sqs // batch processing example --- .../deploy/sqs/template.yml | 63 ++++++++++++++-- examples/powertools-examples-batch/pom.xml | 13 +++- .../batch/sqs/AbstractSqsBatchHandler.java | 62 +++++++++++++++ .../org/demo/batch/sqs/SqsBatchHandler.java | 15 ++-- .../org/demo/batch/sqs/SqsBatchSender.java | 25 +++---- .../batch/sqs/SqsParallelBatchHandler.java | 46 ++++++++++++ .../src/main/resources/LogLayout.json | 75 +++++++++++++++++++ .../src/main/resources/log4j2.xml | 3 +- 8 files changed, 272 insertions(+), 30 deletions(-) create mode 100644 examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java create mode 100644 examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java create mode 100644 examples/powertools-examples-batch/src/main/resources/LogLayout.json diff --git a/examples/powertools-examples-batch/deploy/sqs/template.yml b/examples/powertools-examples-batch/deploy/sqs/template.yml index 764ba4863..2f1d6c363 100644 --- a/examples/powertools-examples-batch/deploy/sqs/template.yml +++ b/examples/powertools-examples-batch/deploy/sqs/template.yml @@ -7,12 +7,10 @@ Globals: Function: Timeout: 20 Runtime: java11 - MemorySize: 512 - Tracing: Active + MemorySize: 5400 Environment: Variables: POWERTOOLS_LOG_LEVEL: INFO - POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0 POWERTOOLS_LOGGER_LOG_EVENT: true Resources: @@ -45,6 +43,9 @@ Resources: AliasName: alias/powertools-batch-sqs-demo TargetKeyId: !Ref CustomerKey + Bucket: + Type: AWS::S3::Bucket + DemoDlqSqsQueue: Type: AWS::SQS::Queue Properties: @@ -96,11 +97,57 @@ Resources: DemoSQSConsumerFunction: Type: AWS::Serverless::Function Properties: + Tracing: Active CodeUri: ../.. Handler: org.demo.batch.sqs.SqsBatchHandler::handleRequest Environment: Variables: POWERTOOLS_SERVICE_NAME: sqs-demo + BUCKET: !Ref Bucket + Policies: + - Statement: + - Sid: SQSDeleteGetAttribute + Effect: Allow + Action: + - sqs:DeleteMessageBatch + - sqs:GetQueueAttributes + Resource: !GetAtt DemoSqsQueue.Arn + - Sid: SQSSendMessageBatch + Effect: Allow + Action: + - sqs:SendMessageBatch + - sqs:SendMessage + Resource: !GetAtt DemoDlqSqsQueue.Arn + - Sid: SQSKMSKey + Effect: Allow + Action: + - kms:GenerateDataKey + - kms:Decrypt + Resource: !GetAtt CustomerKey.Arn + - Sid: WriteToS3 + Effect: Allow + Action: + - s3:PutObject + Resource: !Sub ${Bucket.Arn}/* + +# Events: +# MySQSEvent: +# Type: SQS +# Properties: +# Queue: !GetAtt DemoSqsQueue.Arn +# BatchSize: 100 +# MaximumBatchingWindowInSeconds: 60 + + DemoSQSParallelConsumerFunction: + Type: AWS::Serverless::Function + Properties: + Tracing: Active + CodeUri: ../.. + Handler: org.demo.batch.sqs.SqsParallelBatchHandler::handleRequest + Environment: + Variables: + POWERTOOLS_SERVICE_NAME: sqs-demo + BUCKET: !Ref Bucket Policies: - Statement: - Sid: SQSDeleteGetAttribute @@ -121,13 +168,19 @@ Resources: - kms:GenerateDataKey - kms:Decrypt Resource: !GetAtt CustomerKey.Arn + - Sid: WriteToS3 + Effect: Allow + Action: + - s3:PutObject + Resource: !Sub ${Bucket.Arn}/* + Events: MySQSEvent: Type: SQS Properties: Queue: !GetAtt DemoSqsQueue.Arn - BatchSize: 2 - MaximumBatchingWindowInSeconds: 300 + BatchSize: 100 + MaximumBatchingWindowInSeconds: 60 Outputs: DemoSqsQueue: diff --git a/examples/powertools-examples-batch/pom.xml b/examples/powertools-examples-batch/pom.xml index 70a4a6e3f..2b6966dc7 100644 --- a/examples/powertools-examples-batch/pom.xml +++ b/examples/powertools-examples-batch/pom.xml @@ -14,7 +14,7 @@ 11 11 1.9.20.1 - 2.24.10 + 2.25.21 @@ -42,6 +42,17 @@ software.amazon.awssdk sdk-core ${sdk.version} + + + org.slf4j + slf4j-api + + + + + software.amazon.awssdk + s3 + ${sdk.version} software.amazon.awssdk diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java new file mode 100644 index 000000000..008074198 --- /dev/null +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java @@ -0,0 +1,62 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.demo.batch.sqs; + +import com.amazonaws.services.lambda.runtime.Context; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import org.demo.batch.model.Product; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.lambda.powertools.tracing.Tracing; +import software.amazon.lambda.powertools.tracing.TracingUtils; + +public class AbstractSqsBatchHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSqsBatchHandler.class); + private final ObjectMapper mapper = new ObjectMapper(); + private final String bucket = System.getenv("BUCKET"); + private final S3Client s3 = S3Client.builder().httpClient(UrlConnectionHttpClient.create()).build(); + private final Random r = new Random(); + + /** + * Simulate some processing (I/O + S3 put request) + * @param p deserialized product + * @param context Lambda context + */ + @Tracing + protected void processMessage(Product p, Context context) { + TracingUtils.putAnnotation("productId", p.getId()); + LOGGER.info("Processing product {}", p); + char c = (char)(r.nextInt(26) + 'a'); + char[] chars = new char[1024 * 1000]; + Arrays.fill(chars, c); + p.setName(new String(chars)); + try { + File file = new File("/tmp/"+p.getId()+".json"); + mapper.writeValue(file, p); + s3.putObject( + PutObjectRequest.builder().bucket(bucket).key(p.getId()+".json").build(), RequestBody.fromFile(file)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java index 27689485c..bc0f57cb8 100644 --- a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java @@ -4,13 +4,15 @@ import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.demo.batch.model.Product; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.demo.batch.model.Product; import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; +import software.amazon.lambda.powertools.logging.Logging; +import software.amazon.lambda.powertools.tracing.Tracing; -public class SqsBatchHandler implements RequestHandler { +public class SqsBatchHandler extends AbstractSqsBatchHandler implements RequestHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchHandler.class); private final BatchMessageHandler handler; @@ -20,14 +22,11 @@ public SqsBatchHandler() { .buildWithMessageHandler(this::processMessage, Product.class); } + @Logging + @Tracing @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size()); return handler.processBatch(sqsEvent, context); } - - - private void processMessage(Product p, Context c) { - LOGGER.info("Processing product " + p); - } - } diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java index 4050ab98b..58b24d735 100644 --- a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java @@ -10,14 +10,13 @@ import java.security.SecureRandom; import java.util.List; import java.util.stream.IntStream; +import org.demo.batch.model.Product; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.demo.batch.model.Product; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; -import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; /** @@ -45,16 +44,12 @@ public SqsBatchSender() { public String handleRequest(ScheduledEvent scheduledEvent, Context context) { String queueUrl = System.getenv("QUEUE_URL"); - LOGGER.info("handleRequest"); - - // Push 5 messages on each invoke. - List batchRequestEntries = IntStream.range(0, 5) + List batchRequestEntries = IntStream.range(0, 50) .mapToObj(value -> { - long id = random.nextLong(); - float price = random.nextFloat(); + long id = Math.abs(random.nextLong()); + float price = Math.abs(random.nextFloat() * 3465); Product product = new Product(id, "product-" + id, price); try { - return SendMessageBatchRequestEntry.builder() .id(scheduledEvent.getId() + value) .messageBody(objectMapper.writeValueAsString(product)) @@ -65,12 +60,12 @@ public String handleRequest(ScheduledEvent scheduledEvent, Context context) { } }).collect(toList()); - SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(SendMessageBatchRequest.builder() - .queueUrl(queueUrl) - .entries(batchRequestEntries) - .build()); - - LOGGER.info("Sent Message {}", sendMessageBatchResponse); + for (int i = 0; i < 50; i += 10) { + sqsClient.sendMessageBatch(SendMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries(batchRequestEntries.subList(i, i + 10)) + .build()); + } return "Success"; } diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java new file mode 100644 index 000000000..643900518 --- /dev/null +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java @@ -0,0 +1,46 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.demo.batch.sqs; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.demo.batch.model.Product; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder; +import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; +import software.amazon.lambda.powertools.logging.Logging; +import software.amazon.lambda.powertools.tracing.Tracing; + +public class SqsParallelBatchHandler extends AbstractSqsBatchHandler implements RequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsParallelBatchHandler.class); + private final BatchMessageHandler handler; + + public SqsParallelBatchHandler() { + handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithMessageHandler(this::processMessage, Product.class); + } + + @Logging + @Tracing + @Override + public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size()); + return handler.processBatchInParallel(sqsEvent, context); + } +} diff --git a/examples/powertools-examples-batch/src/main/resources/LogLayout.json b/examples/powertools-examples-batch/src/main/resources/LogLayout.json new file mode 100644 index 000000000..60f102e09 --- /dev/null +++ b/examples/powertools-examples-batch/src/main/resources/LogLayout.json @@ -0,0 +1,75 @@ +{ + "level": { + "$resolver": "level", + "field": "name" + }, + "message": { + "$resolver": "message" + }, + "error": { + "message": { + "$resolver": "exception", + "field": "message" + }, + "name": { + "$resolver": "exception", + "field": "className" + }, + "stack": { + "$resolver": "exception", + "field": "stackTrace", + "stackTrace": { + "stringified": true + } + } + }, + "cold_start": { + "$resolver": "powertools", + "field": "cold_start" + }, + "thread": { + "$resolver": "thread", + "field": "name" + }, + "function_arn": { + "$resolver": "powertools", + "field": "function_arn" + }, + "function_memory_size": { + "$resolver": "powertools", + "field": "function_memory_size" + }, + "function_name": { + "$resolver": "powertools", + "field": "function_name" + }, + "function_request_id": { + "$resolver": "powertools", + "field": "function_request_id" + }, + "function_version": { + "$resolver": "powertools", + "field": "function_version" + }, + "sampling_rate": { + "$resolver": "powertools", + "field": "sampling_rate" + }, + "service": { + "$resolver": "powertools", + "field": "service" + }, + "timestamp": { + "$resolver": "timestamp", + "pattern": { + "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" + } + }, + "xray_trace_id": { + "$resolver": "powertools", + "field": "xray_trace_id" + }, + "": { + "$resolver": "powertools" + } +} \ No newline at end of file diff --git a/examples/powertools-examples-batch/src/main/resources/log4j2.xml b/examples/powertools-examples-batch/src/main/resources/log4j2.xml index ea3ecf474..c48ca3ef4 100644 --- a/examples/powertools-examples-batch/src/main/resources/log4j2.xml +++ b/examples/powertools-examples-batch/src/main/resources/log4j2.xml @@ -2,7 +2,8 @@ - + + From 860d464a5b1e1bbd9d9a74038346a960ff3179b9 Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Fri, 12 Apr 2024 16:56:30 +0200 Subject: [PATCH 06/10] rollback to AtomicBoolean --- .../batch/handler/SqsBatchMessageHandler.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java index 4060fe97e..2dfb0a28e 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -65,10 +66,10 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { // If we are working on a FIFO queue, when any message fails we should stop processing and return the // rest of the batch as failed too. We use this variable to track when that has happened. // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - final Boolean[] failWholeBatch = {false}; + final AtomicBoolean failWholeBatch = new AtomicBoolean(false); int messageCursor = 0; - for (; messageCursor < event.getRecords().size() && !failWholeBatch[0]; messageCursor++) { + for (; messageCursor < event.getRecords().size() && !failWholeBatch.get(); messageCursor++) { SQSEvent.SQSMessage message = event.getRecords().get(messageCursor); String messageGroupId = message.getAttributes() != null ? @@ -77,7 +78,7 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { processBatchItem(message, context).ifPresent(batchItemFailure -> { response.getBatchItemFailures().add(batchItemFailure); if (messageGroupId != null) { - failWholeBatch[0] = true; + failWholeBatch.set(true); LOGGER.info( "A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" , messageGroupId, message.getMessageId()); @@ -85,7 +86,7 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { }); } - if (failWholeBatch[0]) { + if (failWholeBatch.get()) { // Add the remaining messages to the batch item failures event.getRecords() .subList(messageCursor, event.getRecords().size()) @@ -106,6 +107,7 @@ public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) List batchItemFailures = event.getRecords() .parallelStream() // Parallel processing .map(sqsMessage -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); return processBatchItem(sqsMessage, context); }) From 73f100d890c2a3415139a7669bac94a6d84e764a Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Tue, 25 Jun 2024 10:55:57 +0200 Subject: [PATCH 07/10] complete sqs batch example with logs and traces annotations --- .../java/org/demo/batch/sqs/AbstractSqsBatchHandler.java | 8 ++++++++ .../java/org/demo/batch/sqs/SqsParallelBatchHandler.java | 2 ++ 2 files changed, 10 insertions(+) diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java index 008074198..25dba47bb 100644 --- a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java @@ -23,10 +23,12 @@ import org.demo.batch.model.Product; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.lambda.powertools.logging.Logging; import software.amazon.lambda.powertools.tracing.Tracing; import software.amazon.lambda.powertools.tracing.TracingUtils; @@ -42,10 +44,14 @@ public class AbstractSqsBatchHandler { * @param p deserialized product * @param context Lambda context */ + @Logging @Tracing protected void processMessage(Product p, Context context) { TracingUtils.putAnnotation("productId", p.getId()); + TracingUtils.putAnnotation("Thread", Thread.currentThread().getName()); + MDC.put("product", String.valueOf(p.getId())); LOGGER.info("Processing product {}", p); + char c = (char)(r.nextInt(26) + 'a'); char[] chars = new char[1024 * 1000]; Arrays.fill(chars, c); @@ -57,6 +63,8 @@ protected void processMessage(Product p, Context context) { PutObjectRequest.builder().bucket(bucket).key(p.getId()+".json").build(), RequestBody.fromFile(file)); } catch (IOException e) { throw new RuntimeException(e); + } finally { + MDC.remove("product"); } } } diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java index 643900518..0151c0a32 100644 --- a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java @@ -21,6 +21,7 @@ import org.demo.batch.model.Product; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; import software.amazon.lambda.powertools.logging.Logging; @@ -41,6 +42,7 @@ public SqsParallelBatchHandler() { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size()); + MDC.put("requestId", context.getAwsRequestId()); // should be propagated to other threads return handler.processBatchInParallel(sqsEvent, context); } } From 1e55a9b3d9b7b21f39dd7da89753d46413dbf558 Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Wed, 26 Jun 2024 09:39:37 +0200 Subject: [PATCH 08/10] update doc --- docs/utilities/batch.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index d6e5e3654..f7b2dc0cf 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -472,7 +472,7 @@ instead of `BatchMessageHandler#processBatch()`. Partial batch failure works the in parallel rather than sequentially. This feature is available for SQS, Kinesis and DynamoDB Streams but cannot be -used with SQS FIFO. In that case, items will be processed sequentially, even with the `processBatchInParallel` method. +used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown. !!! warning Note that parallel processing is not always better than sequential processing, @@ -480,7 +480,7 @@ used with SQS FIFO. In that case, items will be processed sequentially, even wit !!! info To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function. - While the exact vCPU allocation isn't published, from observing common patterns customers see an allocation of one vCPU per 1024 MB of memory. + === "Example with SQS" From c35a87ac3e3e0d9ad09b9c95e56a317633fff16c Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Fri, 28 Jun 2024 14:07:54 +0200 Subject: [PATCH 09/10] update doc --- docs/utilities/batch.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index f7b2dc0cf..778be28ef 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -480,6 +480,9 @@ used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown. !!! info To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function. + While it is technically possible to increase the number of threads using Java options or custom thread pools, it can + decrease performance when miss-used (see [here](https://www.baeldung.com/java-when-to-use-parallel-stream#fork-join-framework) + and [here](https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool) for more information) and we don't recommend it. === "Example with SQS" From 395f37a997c250ec4f4d4474a9b249191bba54a5 Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden Date: Tue, 2 Jul 2024 09:40:52 +0200 Subject: [PATCH 10/10] update doc --- docs/utilities/batch.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 778be28ef..7693ac98f 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -480,9 +480,11 @@ used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown. !!! info To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function. - While it is technically possible to increase the number of threads using Java options or custom thread pools, it can - decrease performance when miss-used (see [here](https://www.baeldung.com/java-when-to-use-parallel-stream#fork-join-framework) - and [here](https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool) for more information) and we don't recommend it. + While it is possible to increase the number of threads using Java options or custom thread pools, + in most cases the defaults work well, and changing them is more likely to decrease performance + (see [here](https://www.baeldung.com/java-when-to-use-parallel-stream#fork-join-framework) + and [here](https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool)). + In situations where this may be useful - such as performing IO-bound work in parallel - make sure to measure before and after! === "Example with SQS"