From 2e4414123ca28419b3aaebefce37c795bc99282d Mon Sep 17 00:00:00 2001 From: Philipp Page Date: Wed, 19 Nov 2025 10:37:33 +0100 Subject: [PATCH] improv(batch): Propagate trace entity to worker threads during parallel batch processing. --- docs/utilities/batch.md | 82 +++++++++++++++++- .../handler/DynamoDbBatchMessageHandler.java | 54 ++++++++---- .../KinesisStreamsBatchMessageHandler.java | 60 +++++++++----- .../batch/handler/SqsBatchMessageHandler.java | 70 ++++++++++------ .../internal/XRayTraceEntityPropagator.java | 83 +++++++++++++++++++ 5 files changed, 288 insertions(+), 61 deletions(-) create mode 100644 powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/XRayTraceEntityPropagator.java diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 3f9b6e53d..b535a90f6 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -484,7 +484,9 @@ used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown. in most cases the defaults work well, and changing them is more likely to decrease performance (see [here](https://www.baeldung.com/java-when-to-use-parallel-stream#fork-join-framework) and [here](https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool)). - In situations where this may be useful - such as performing IO-bound work in parallel - make sure to measure before and after! + In situations where this may be useful, such as performing IO-bound work in parallel, make sure to measure before and after! + +When using parallel processing with X-Ray tracing enabled, the Tracing utility automatically handles trace context propagation to worker threads. This ensures that subsegments created during parallel message processing appear under the correct parent segment in your X-Ray trace, maintaining proper trace hierarchy and visibility into your batch processing performance. === "Example with SQS" @@ -536,6 +538,84 @@ used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown. } ``` +=== "Example with X-Ray Tracing" + + ```java hl_lines="12 17" + public class SqsBatchHandler implements RequestHandler { + + private final BatchMessageHandler handler; + + public SqsBatchHandler() { + handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithMessageHandler(this::processMessage, Product.class); + } + + @Override + @Tracing + public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + return handler.processBatchInParallel(sqsEvent, context); + } + + @Tracing // This will appear correctly under the handleRequest subsegment + private void processMessage(Product p, Context c) { + // Process the product - subsegments will appear under handleRequest + } + } + ``` + +### Choosing the right concurrency model + +The `processBatchInParallel` method has two overloads with different concurrency characteristics: + +#### Without custom executor (parallelStream) + +When you call `processBatchInParallel(event, context)` without providing an executor, the implementation uses Java's `parallelStream()` which leverages the common `ForkJoinPool`. + +**Best for: CPU-bound workloads** + +- Thread pool size matches available CPU cores +- Optimized for computational tasks (data transformation, calculations, parsing) +- Main thread participates in work-stealing +- Simple to use with no configuration needed + +```java +// Good for CPU-intensive processing +return handler.processBatchInParallel(sqsEvent, context); +``` + +#### With custom executor (CompletableFuture) + +When you call `processBatchInParallel(event, context, executor)` with a custom executor, the implementation uses `CompletableFuture` which gives you full control over the thread pool. + +**Best for: I/O-bound workloads** + +- You control thread pool size and characteristics +- Ideal for I/O operations (HTTP calls, database queries, S3 operations) +- Can use larger thread pools since threads spend time waiting, not computing +- Main thread only waits; worker threads do all processing + +```java +// Good for I/O-intensive processing (API calls, DB queries, etc.) +ExecutorService executor = Executors.newFixedThreadPool(50); +return handler.processBatchInParallel(sqsEvent, context, executor); +``` + +**For Java 21+: Virtual Threads** + +If you're using Java 21 or later, virtual threads are ideal for I/O-bound workloads: + +```java +ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); +return handler.processBatchInParallel(sqsEvent, context, executor); +``` + +Virtual threads are lightweight and can handle thousands of concurrent I/O operations efficiently without the overhead of platform threads. + +**Recommendation for typical Lambda SQS processing:** + +Most Lambda functions processing SQS messages perform I/O operations (calling APIs, querying databases, writing to S3). For these workloads, use the custom executor approach with a thread pool sized appropriately for your I/O operations or virtual threads for Java 21+. + ## Handling Messages diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java index df7179a88..dbfdf63cd 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java @@ -14,21 +14,25 @@ package software.amazon.lambda.powertools.batch.handler; -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; -import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; - import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; + import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; +import software.amazon.lambda.powertools.batch.internal.XRayTraceEntityPropagator; /** * A batch message processor for DynamoDB Streams batches. @@ -43,8 +47,8 @@ public class DynamoDbBatchMessageHandler implements BatchMessageHandler rawMessageHandler; public DynamoDbBatchMessageHandler(Consumer successHandler, - BiConsumer failureHandler, - BiConsumer rawMessageHandler) { + BiConsumer failureHandler, + BiConsumer rawMessageHandler) { this.successHandler = successHandler; this.failureHandler = failureHandler; this.rawMessageHandler = rawMessageHandler; @@ -65,14 +69,23 @@ public StreamsEventResponse processBatch(DynamodbEvent event, Context context) { @Override public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context) { MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + Object capturedSubsegment = XRayTraceEntityPropagator.captureTraceEntity(); List batchItemFailures = event.getRecords() .parallelStream() // Parallel processing .map(eventRecord -> { - multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); - Optional failureOpt = processBatchItem(eventRecord, context); - multiThreadMDC.removeThread(Thread.currentThread().getName()); - return failureOpt; + AtomicReference> result = new AtomicReference<>(); + + XRayTraceEntityPropagator.runWithEntity(capturedSubsegment, () -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + try { + result.set(processBatchItem(eventRecord, context)); + } finally { + multiThreadMDC.removeThread(Thread.currentThread().getName()); + } + }); + + return result.get(); }) .filter(Optional::isPresent) .map(Optional::get) @@ -84,21 +97,29 @@ public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context @Override public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context, Executor executor) { MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + Object capturedSubsegment = XRayTraceEntityPropagator.captureTraceEntity(); List batchItemFailures = new ArrayList<>(); List> futures = event.getRecords().stream() .map(eventRecord -> CompletableFuture.runAsync(() -> { - multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); - Optional failureOpt = processBatchItem(eventRecord, context); - failureOpt.ifPresent(batchItemFailures::add); - multiThreadMDC.removeThread(Thread.currentThread().getName()); + XRayTraceEntityPropagator.runWithEntity(capturedSubsegment, () -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + try { + Optional failureOpt = processBatchItem(eventRecord, + context); + failureOpt.ifPresent(batchItemFailures::add); + } finally { + multiThreadMDC.removeThread(Thread.currentThread().getName()); + } + }); }, executor)) .collect(Collectors.toList()); futures.forEach(CompletableFuture::join); return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); } - private Optional processBatchItem(DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) { + private Optional processBatchItem( + DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) { try { LOGGER.debug("Processing item {}", streamRecord.getEventID()); @@ -124,7 +145,8 @@ private Optional processBatchItem(Dynamod LOGGER.warn("failureHandler threw handling failure", e2); } } - return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(sequenceNumber).build()); + return Optional + .of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(sequenceNumber).build()); } } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java index 233830462..f147578d4 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java @@ -14,22 +14,25 @@ package software.amazon.lambda.powertools.batch.handler; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.events.KinesisEvent; -import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; - import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; + import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; +import software.amazon.lambda.powertools.batch.internal.XRayTraceEntityPropagator; import software.amazon.lambda.powertools.utilities.EventDeserializer; /** @@ -49,10 +52,10 @@ public class KinesisStreamsBatchMessageHandler implements BatchMessageHandler private final BiConsumer failureHandler; public KinesisStreamsBatchMessageHandler(BiConsumer rawMessageHandler, - BiConsumer messageHandler, - Class messageClass, - Consumer successHandler, - BiConsumer failureHandler) { + BiConsumer messageHandler, + Class messageClass, + Consumer successHandler, + BiConsumer failureHandler) { this.rawMessageHandler = rawMessageHandler; this.messageHandler = messageHandler; @@ -76,14 +79,23 @@ public StreamsEventResponse processBatch(KinesisEvent event, Context context) { @Override public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context context) { MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + Object capturedSubsegment = XRayTraceEntityPropagator.captureTraceEntity(); List batchItemFailures = event.getRecords() .parallelStream() // Parallel processing .map(eventRecord -> { - multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); - Optional failureOpt = processBatchItem(eventRecord, context); - multiThreadMDC.removeThread(Thread.currentThread().getName()); - return failureOpt; + AtomicReference> result = new AtomicReference<>(); + + XRayTraceEntityPropagator.runWithEntity(capturedSubsegment, () -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + try { + result.set(processBatchItem(eventRecord, context)); + } finally { + multiThreadMDC.removeThread(Thread.currentThread().getName()); + } + }); + + return result.get(); }) .filter(Optional::isPresent) .map(Optional::get) @@ -95,21 +107,29 @@ public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context c @Override public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context context, Executor executor) { MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + Object capturedSubsegment = XRayTraceEntityPropagator.captureTraceEntity(); List batchItemFailures = new ArrayList<>(); List> futures = event.getRecords().stream() .map(eventRecord -> CompletableFuture.runAsync(() -> { - multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); - Optional failureOpt = processBatchItem(eventRecord, context); - failureOpt.ifPresent(batchItemFailures::add); - multiThreadMDC.removeThread(Thread.currentThread().getName()); + XRayTraceEntityPropagator.runWithEntity(capturedSubsegment, () -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + try { + Optional failureOpt = processBatchItem(eventRecord, + context); + failureOpt.ifPresent(batchItemFailures::add); + } finally { + multiThreadMDC.removeThread(Thread.currentThread().getName()); + } + }); }, executor)) .collect(Collectors.toList()); futures.forEach(CompletableFuture::join); return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); } - private Optional processBatchItem(KinesisEvent.KinesisEventRecord eventRecord, Context context) { + private Optional processBatchItem( + KinesisEvent.KinesisEventRecord eventRecord, Context context) { try { LOGGER.debug("Processing item {}", eventRecord.getEventID()); @@ -141,8 +161,8 @@ private Optional processBatchItem(Kinesis } } - return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(eventRecord.getKinesis().getSequenceNumber()).build()); + return Optional.of(StreamsEventResponse.BatchItemFailure.builder() + .withItemIdentifier(eventRecord.getKinesis().getSequenceNumber()).build()); } } } - diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java index ccb6a6dd7..737c7cceb 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java @@ -14,24 +14,26 @@ package software.amazon.lambda.powertools.batch.handler; -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.events.KinesisEvent; -import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; -import com.amazonaws.services.lambda.runtime.events.SQSEvent; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; -import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; + import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; +import software.amazon.lambda.powertools.batch.internal.XRayTraceEntityPropagator; import software.amazon.lambda.powertools.utilities.EventDeserializer; /** @@ -54,9 +56,9 @@ public class SqsBatchMessageHandler implements BatchMessageHandler failureHandler; public SqsBatchMessageHandler(BiConsumer messageHandler, Class messageClass, - BiConsumer rawMessageHandler, - Consumer successHandler, - BiConsumer failureHandler) { + BiConsumer rawMessageHandler, + Consumer successHandler, + BiConsumer failureHandler) { this.messageHandler = messageHandler; this.messageClass = messageClass; this.rawMessageHandler = rawMessageHandler; @@ -77,16 +79,16 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { for (; messageCursor < event.getRecords().size() && !failWholeBatch.get(); messageCursor++) { SQSEvent.SQSMessage message = event.getRecords().get(messageCursor); - String messageGroupId = message.getAttributes() != null ? - message.getAttributes().get(MESSAGE_GROUP_ID_KEY) : null; + String messageGroupId = message.getAttributes() != null ? message.getAttributes().get(MESSAGE_GROUP_ID_KEY) + : null; processBatchItem(message, context).ifPresent(batchItemFailure -> { response.getBatchItemFailures().add(batchItemFailure); if (messageGroupId != null) { failWholeBatch.set(true); LOGGER.info( - "A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" - , messageGroupId, message.getMessageId()); + "A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too", + messageGroupId, message.getMessageId()); } }); } @@ -105,18 +107,28 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { @Override public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) { if (isFIFOEnabled(event)) { - throw new UnsupportedOperationException("FIFO queues are not supported in parallel mode, use the processBatch method instead"); + throw new UnsupportedOperationException( + "FIFO queues are not supported in parallel mode, use the processBatch method instead"); } MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + Object capturedSubsegment = XRayTraceEntityPropagator.captureTraceEntity(); + List batchItemFailures = event.getRecords() .parallelStream() // Parallel processing .map(sqsMessage -> { - - multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); - Optional failureOpt = processBatchItem(sqsMessage, context); - multiThreadMDC.removeThread(Thread.currentThread().getName()); - return failureOpt; + AtomicReference> result = new AtomicReference<>(); + + XRayTraceEntityPropagator.runWithEntity(capturedSubsegment, () -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + try { + result.set(processBatchItem(sqsMessage, context)); + } finally { + multiThreadMDC.removeThread(Thread.currentThread().getName()); + } + }); + + return result.get(); }) .filter(Optional::isPresent) .map(Optional::get) @@ -128,17 +140,26 @@ public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) @Override public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context, Executor executor) { if (isFIFOEnabled(event)) { - throw new UnsupportedOperationException("FIFO queues are not supported in parallel mode, use the processBatch method instead"); + throw new UnsupportedOperationException( + "FIFO queues are not supported in parallel mode, use the processBatch method instead"); } MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + Object capturedSubsegment = XRayTraceEntityPropagator.captureTraceEntity(); + List batchItemFailures = new ArrayList<>(); List> futures = event.getRecords().stream() .map(eventRecord -> CompletableFuture.runAsync(() -> { - multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); - Optional failureOpt = processBatchItem(eventRecord, context); - failureOpt.ifPresent(batchItemFailures::add); - multiThreadMDC.removeThread(Thread.currentThread().getName()); + XRayTraceEntityPropagator.runWithEntity(capturedSubsegment, () -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + try { + Optional failureOpt = processBatchItem(eventRecord, + context); + failureOpt.ifPresent(batchItemFailures::add); + } finally { + multiThreadMDC.removeThread(Thread.currentThread().getName()); + } + }); }, executor)) .collect(Collectors.toList()); futures.forEach(CompletableFuture::join); @@ -182,6 +203,7 @@ private Optional processBatchItem(SQSEvent.SQ } private boolean isFIFOEnabled(SQSEvent sqsEvent) { - return !sqsEvent.getRecords().isEmpty() && sqsEvent.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null; + return !sqsEvent.getRecords().isEmpty() + && sqsEvent.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null; } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/XRayTraceEntityPropagator.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/XRayTraceEntityPropagator.java new file mode 100644 index 000000000..2858f4756 --- /dev/null +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/XRayTraceEntityPropagator.java @@ -0,0 +1,83 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.batch.internal; + +import java.lang.reflect.Method; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class to propagate X-Ray trace entity context to worker threads using reflection. + * Reflection is used to avoid taking a dependency on X-RAY SDK. + */ +public final class XRayTraceEntityPropagator { + private static final Logger LOGGER = LoggerFactory.getLogger(XRayTraceEntityPropagator.class); + private static final boolean XRAY_AVAILABLE; + private static final Method GET_TRACE_ENTITY_METHOD; + + // We do the more "expensive" Class.forName in this static block to detect exactly once at import time if X-RAY + // is available or not. Subsequent .invoke() are very fast on modern JDKs. + static { + Method method = null; + boolean available = false; + + try { + Class awsXRayClass = Class.forName("com.amazonaws.xray.AWSXRay"); + method = awsXRayClass.getMethod("getTraceEntity"); + available = true; + LOGGER.debug("X-Ray SDK detected. Trace context will be propagated to worker threads."); + } catch (ClassNotFoundException | NoSuchMethodException e) { + LOGGER.debug("X-Ray SDK not detected. Trace context propagation disabled"); + } + + GET_TRACE_ENTITY_METHOD = method; + XRAY_AVAILABLE = available; + } + + private XRayTraceEntityPropagator() { + // Utility class + } + + public static Object captureTraceEntity() { + if (!XRAY_AVAILABLE) { + return null; + } + + try { + return GET_TRACE_ENTITY_METHOD.invoke(null); + } catch (Exception e) { + // We don't want to break batch processing if this fails. + LOGGER.warn("Failed to capture trace entity.", e); + return null; + } + } + + // See https://docs.aws.amazon.com/xray/latest/devguide/scorekeep-workerthreads.html + public static void runWithEntity(Object traceEntity, Runnable runnable) { + if (!XRAY_AVAILABLE || traceEntity == null) { + runnable.run(); + return; + } + + try { + traceEntity.getClass().getMethod("run", Runnable.class).invoke(traceEntity, runnable); + } catch (Exception e) { + // We don't want to break batch processing if this fails. + LOGGER.warn("Failed to run with trace entity, falling back to direct execution.", e); + runnable.run(); + } + } +}