Closed
Description
Is your feature request related to a problem? Please describe.
At the moment, the batch module processes messages in sequence (code), which could be improved with a parallel processing for better performance.
Describe the solution you'd like
- The
BatchMessageHandler
could provide aprocessBatchInParallel
method with the same signature asprocessBatch
but with a different behaviour (parallel processing instead of serial) - Instead of iterating through the list of messages, we could use a
CompletableFuture
. It would be something like this (probably not that easy but that's a start):
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
List<CompletableFuture<Optional<SQSBatchResponse.BatchItemFailure>>> collect = event.getRecords().stream()
.map(sqsMessage -> CompletableFuture.supplyAsync(
() -> processTheMessageAndReturnOptionalOfBatchItemFailure(sqsMessage, context), executor)
).collect(Collectors.toList());
CompletableFuture<List<Optional<SQSBatchResponse.BatchItemFailure>>> listCompletableFuture = CompletableFuture
.allOf(collect.toArray(new CompletableFuture[0]))
.thenApply(unused -> collect
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
List<SQSBatchResponse.BatchItemFailure> batchItemFailures =
listCompletableFuture.get().stream().filter(Optional::isPresent).map(Optional::get)
.collect(Collectors.toList());
Describe alternatives you've considered
streams provide a parallel
method which is based on the number of vCPUs (Runtime.getRuntime().availableProcessors()
). Using CompletableFuture
, we can define the number of executors, potentially more than the number of vCPUs. We should probably perform some load tests on Lambda to check if that's actually better, because parallel is probably much easier to implement.
Additional context
Metadata
Metadata
Assignees
Type
Projects
Status
Coming soon
Milestone
Relationships
Development
No branches or pull requests
Activity
scottgerring commentedon Dec 21, 2023
Initial thoughts -
It's a cool idea though! Especially where we are fanning out to do downstream IO it should make a substantial difference to runtime and be a nice cost optimization for users.
jeromevdl commentedon Dec 21, 2023
I don't understand... If we use
CompletableFuture
, we need the executor coming from the thread pool. It's not visible from the user, it will be internal code in PT.Not logging, serialization module with
JsonConfig
and the object mapper, but you need to review that one 😉. We probably need to check idempotency too, as we can integrate both batch and idempotency, not sure how thread-safe is idempotency...itsmichaelwang commentedon Dec 22, 2023
I just wanted to Chime in add my support for this. We use Kotlin and having some kind of Async interface (which I believe exists in the Python library?) would be really cool to have, performance wise. Especially if it's a case where a Lambda pulls messages from a queue just to make a DDB call, or something like that.
jeromevdl commentedon Dec 22, 2023
Hi @itsmichaelwang, thanks for your comment. I'm not super familiar with Kotlin, but if you could share the kind of signature you expect, it would help. We probably won't make public an async interface (with java Future), and will handle it internally, but happy to discuss this...
scottgerring commentedon Dec 22, 2023
You need a ThreadPool, but I don't think you shouldn't have to create a new one but rather use thread pools the runtime provides and manages. It looks like we should use
ForkJoinPool.commonPool()
; it'll be there already, we don't need to create more threads, we don't need to handle lifecycle, and it is explicitly for this sort of processing. I think we can even use something like theRecursiveTask
and skip thinking about the futurey nature of it at all.MDC uses thread local - isn't this a problem?
👍
jeromevdl commentedon Dec 22, 2023
CompletableFuture.supplyAsync
needs anexecutor
. I'm not an expert with multithreading apis and happy ifForkJoinPool.commonPool()
works, but if it gives us the same amount of thread as theparallel()
method (in Streams), then let's just use the parallel method... Some reading: see this, this, this, this ==> We certainly need to test the different approaches and measure to see what provides the best value.From the last article:
We don't really know what users will do (CPU-intensive or not). Maybe
parallel
is already a good improvement or should we provide the option to users (a boolean set to true if CPU intensive 😛)Yes, you're right, MDC uses ThreadLocal. The way we handle powertools fields today is based on MDC so it could be yes... We probably would need to pass the MDC context map to the threads in order to fill their own version of it...
We should first try to find the best way to implement parallelism (
Stream.parallel()
,CompletableFuture
, ...), and then see the impacts on other modules, but we can already list them for sure:jeromevdl commentedon Dec 22, 2023
It's actually much simpler with parallel:
Note that in both case I don't handle the FIFO
failWholeBatch
... I guess we should not process in parallel messages in a FIFO queue ;)12 remaining items