diff --git a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java index 7ffb4dd09..fb66cc517 100644 --- a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java +++ b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java @@ -17,7 +17,7 @@ public class AgentCardProducer { @ConfigProperty(name = "quarkus.agentcard.protocol", defaultValue = "JSONRPC") - TransportProtocol protocol; + String protocolStr; @Produces @PublicAgentCard @@ -48,6 +48,7 @@ public AgentCard agentCard() { } private AgentInterface getAgentInterface() { + TransportProtocol protocol = TransportProtocol.fromString(protocolStr); String url = switch (protocol) { case GRPC -> "localhost:9000"; case JSONRPC, HTTP_JSON -> "http://localhost:9999"; diff --git a/extras/opentelemetry/README.md b/extras/opentelemetry/README.md new file mode 100644 index 000000000..7ea5ed427 --- /dev/null +++ b/extras/opentelemetry/README.md @@ -0,0 +1,215 @@ +# OpenTelemetry Integration for A2A + +This module provides OpenTelemetry observability integration for A2A servers, including distributed tracing, metrics, and context propagation across asynchronous boundaries. + +## Features + +- **Distributed Tracing**: Automatic span creation for all A2A protocol methods +- **Context Propagation**: OpenTelemetry trace context propagation across async operations +- **Request/Response Logging**: Optional extraction of request and response data into spans +- **Error Tracking**: Automatic error status and error type attributes on failures + +## Modules + +### `opentelemetry-common` +Common utilities and constants shared across OpenTelemetry modules. + +### `opentelemetry-client` +OpenTelemetry integration for A2A clients. + +### `opentelemetry-client-propagation` +Context propagation support for A2A clients. + +### `opentelemetry-server` +OpenTelemetry integration for A2A servers, including the context-aware executor. + +### `opentelemetry-integration-tests` +Integration tests for OpenTelemetry functionality. + +## Usage + +### Basic Setup + +Add the OpenTelemetry server module to your dependencies: + +```xml + + io.a2a + a2a-extras-opentelemetry-server + ${a2a.version} + +``` + +### Context-Aware Async Executor + +The `AsyncManagedExecutorProducer` provides a `ManagedExecutor` that automatically propagates OpenTelemetry trace context across asynchronous boundaries. This ensures that spans created in async tasks are properly linked to their parent spans. + +#### How It Works + +When the OpenTelemetry server module is included, the `AsyncManagedExecutorProducer` automatically replaces the default `AsyncExecutorProducer` using CDI alternatives: + +- **Priority 20**: Takes precedence over the default executor producer (priority 10) +- **Automatic Activation**: No configuration needed - just include the module +- **Context Propagation**: Uses MicroProfile Context Propagation to maintain trace context + +#### Configuration + +The `ManagedExecutor` is container-managed and configured through your runtime environment: + +**Quarkus:** +```properties +# Configure the managed executor pool +quarkus.thread-pool.core-threads=10 +quarkus.thread-pool.max-threads=50 +quarkus.thread-pool.queue-size=100 +``` + +**Other Runtimes:** +Consult your MicroProfile Context Propagation implementation documentation for configuration options. + +> **Note**: Unlike the default `AsyncExecutorProducer`, the `AsyncManagedExecutorProducer` does not use the `a2a.executor.*` configuration properties. Pool sizing is controlled by the container's ManagedExecutor configuration. + +#### Example + +```java +@ApplicationScoped +public class MyAgent implements Agent { + + @Inject + @Internal + Executor executor; // Automatically uses ManagedExecutor with context propagation + + @Override + public void execute(RequestContext context, AgentEmitter emitter) { + // Current span context is automatically propagated + executor.execute(() -> { + // This code runs in a different thread but maintains the trace context + Span currentSpan = Span.current(); + currentSpan.addEvent("Processing in async task"); + + // Do async work... + }); + } +} +``` + +### Request/Response Extraction + +Enable request and response data extraction in spans: + +```properties +# Extract request parameters into span attributes +a2a.opentelemetry.extract-request=true + +# Extract response data into span attributes +a2a.opentelemetry.extract-response=true +``` + +> **Warning**: Extracting request/response data may expose sensitive information in traces. Use with caution in production environments. + +### Span Attributes + +The following attributes are automatically added to spans: + +- `genai.request`: Request parameters (if extraction enabled) +- `genai.response`: Response data (if extraction enabled) +- `error.type`: Error message (on failures) + +## Architecture + +### Request Handler Decoration + +The `OpenTelemetryRequestHandlerDecorator` wraps the default request handler and creates spans for each A2A protocol method: + +``` +Client Request + ↓ +OpenTelemetryRequestHandlerDecorator + ↓ (creates span) +Default RequestHandler + ↓ +Agent Execution (with context propagation) + ↓ +Response +``` + +### Context Propagation Flow + +``` +HTTP Request (with trace headers) + ↓ +OpenTelemetry extracts context + ↓ +Span created for A2A method + ↓ +ManagedExecutor propagates context + ↓ +Async agent execution (maintains trace context) + ↓ +Response (with trace headers) +``` + +## Testing + +The module includes comprehensive unit tests: + +- `AsyncManagedExecutorProducerTest`: Tests for the context-aware executor producer +- `OpenTelemetryRequestHandlerDecoratorTest`: Tests for span creation and error handling + +Run tests: +```bash +mvn test -pl extras/opentelemetry/server +``` + +## Troubleshooting + +### Context Not Propagating + +**Symptom**: Spans in async tasks are not linked to parent spans. + +**Solution**: Ensure the OpenTelemetry server module is included and the `ManagedExecutor` is being injected correctly. Check logs for: +``` +Initializing OpenTelemetry-aware ManagedExecutor for async operations +``` + +### ManagedExecutor Not Available + +**Symptom**: `IllegalStateException: ManagedExecutor not injected - ensure MicroProfile Context Propagation is available` + +**Solution**: Ensure your runtime provides MicroProfile Context Propagation support. For Quarkus, add: +```xml + + io.quarkus + quarkus-smallrye-context-propagation + +``` + +### Performance Impact + +**Symptom**: Increased latency with OpenTelemetry enabled. + +**Solution**: +- Disable request/response extraction in production +- Configure sampling rate to reduce trace volume +- Ensure your OpenTelemetry collector is properly sized + +## Best Practices + +1. **Sampling**: Configure appropriate sampling rates for production environments +2. **Sensitive Data**: Disable request/response extraction if handling sensitive data +3. **Resource Attributes**: Add service name and version as resource attributes +4. **Collector Configuration**: Use batch processors to reduce network overhead +5. **Monitoring**: Monitor the OpenTelemetry collector's health and performance + +## Dependencies + +- MicroProfile Telemetry 2.0.1+ +- MicroProfile Context Propagation 1.3+ +- OpenTelemetry API +- A2A Server Common + +## See Also + +- [OpenTelemetry Documentation](https://opentelemetry.io/docs/) +- [MicroProfile Telemetry Specification](https://github.com/eclipse/microprofile-telemetry) +- [MicroProfile Context Propagation](https://github.com/eclipse/microprofile-context-propagation) diff --git a/extras/opentelemetry/pom.xml b/extras/opentelemetry/pom.xml index 0741ff55a..8d8ccd8da 100644 --- a/extras/opentelemetry/pom.xml +++ b/extras/opentelemetry/pom.xml @@ -19,6 +19,7 @@ 2.0.1 + 1.3 diff --git a/extras/opentelemetry/server/pom.xml b/extras/opentelemetry/server/pom.xml index afc62d631..3a7cb44a9 100644 --- a/extras/opentelemetry/server/pom.xml +++ b/extras/opentelemetry/server/pom.xml @@ -39,6 +39,12 @@ pom provided + + org.eclipse.microprofile.context-propagation + microprofile-context-propagation-api + ${version.org.eclipse.microprofile.context-propagation} + provided + jakarta.enterprise jakarta.enterprise.cdi-api diff --git a/extras/opentelemetry/server/src/main/java/io/a2a/extras/opentelemetry/AsyncManagedExecutorProducer.java b/extras/opentelemetry/server/src/main/java/io/a2a/extras/opentelemetry/AsyncManagedExecutorProducer.java new file mode 100644 index 000000000..cd30196e1 --- /dev/null +++ b/extras/opentelemetry/server/src/main/java/io/a2a/extras/opentelemetry/AsyncManagedExecutorProducer.java @@ -0,0 +1,65 @@ +package io.a2a.extras.opentelemetry; + +import io.a2a.server.util.async.Internal; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Alternative; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; +import java.util.concurrent.Executor; +import org.eclipse.microprofile.context.ManagedExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Alternative executor producer that provides a ManagedExecutor with OpenTelemetry context propagation. + *

+ * This producer replaces the default {@code AsyncExecutorProducer} when the OpenTelemetry extras module + * is included in the application. The ManagedExecutor ensures that OpenTelemetry trace context is + * properly propagated across asynchronous boundaries. + *

+ * Priority 20 ensures this alternative takes precedence over the default producer (priority 10). + * + *

Configuration

+ * The ManagedExecutor is container-managed and injected via CDI. Its configuration depends on the + * runtime environment: + * + *

+ * Unlike the default {@code AsyncExecutorProducer}, this producer does not use the {@code a2a.executor.*} + * configuration properties. The executor pool sizing and behavior are controlled by the container's + * ManagedExecutor configuration. + * + * @see org.eclipse.microprofile.context.ManagedExecutor + */ +@ApplicationScoped +@Alternative +@Priority(20) +public class AsyncManagedExecutorProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncManagedExecutorProducer.class); + + @Inject + ManagedExecutor managedExecutor; + + @PostConstruct + public void init() { + LOGGER.info("Initializing OpenTelemetry-aware ManagedExecutor for async operations"); + if (managedExecutor == null) { + LOGGER.warn("ManagedExecutor not available - context propagation may not work correctly"); + } + } + + @Produces + @Internal + public Executor produce() { + LOGGER.debug("Using ManagedExecutor for async operations with OpenTelemetry context propagation"); + if (managedExecutor == null) { + throw new IllegalStateException("ManagedExecutor not injected - ensure MicroProfile Context Propagation is available"); + } + return managedExecutor; + } + +} diff --git a/extras/opentelemetry/server/src/test/java/io/a2a/extras/opentelemetry/AsyncManagedExecutorProducerTest.java b/extras/opentelemetry/server/src/test/java/io/a2a/extras/opentelemetry/AsyncManagedExecutorProducerTest.java new file mode 100644 index 000000000..ff4c86a16 --- /dev/null +++ b/extras/opentelemetry/server/src/test/java/io/a2a/extras/opentelemetry/AsyncManagedExecutorProducerTest.java @@ -0,0 +1,181 @@ +package io.a2a.extras.opentelemetry; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.Executor; +import org.eclipse.microprofile.context.ManagedExecutor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class AsyncManagedExecutorProducerTest { + + @Mock + private ManagedExecutor managedExecutor; + + private AsyncManagedExecutorProducer producer; + + @BeforeEach + void setUp() { + producer = new AsyncManagedExecutorProducer(); + } + + @Nested + class InitializationTests { + @Test + void init_withValidManagedExecutor_logsSuccessfully() { + producer.managedExecutor = managedExecutor; + + assertDoesNotThrow(() -> producer.init()); + assertNotNull(producer.managedExecutor); + } + + @Test + void init_withNullManagedExecutor_logsWarning() { + producer.managedExecutor = null; + + // Should not throw, just log warning + assertDoesNotThrow(() -> producer.init()); + assertNull(producer.managedExecutor); + } + } + + @Nested + class ProduceTests { + @Test + void produce_withValidManagedExecutor_returnsExecutor() { + producer.managedExecutor = managedExecutor; + + Executor result = producer.produce(); + + assertNotNull(result); + assertSame(managedExecutor, result); + } + + @Test + void produce_withNullManagedExecutor_throwsIllegalStateException() { + producer.managedExecutor = null; + + IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> producer.produce() + ); + + assertEquals( + "ManagedExecutor not injected - ensure MicroProfile Context Propagation is available", + exception.getMessage() + ); + } + + @Test + void produce_returnsSameInstanceOnMultipleCalls() { + producer.managedExecutor = managedExecutor; + + Executor result1 = producer.produce(); + Executor result2 = producer.produce(); + + assertSame(result1, result2); + assertSame(managedExecutor, result1); + } + } + + @Nested + class CDIIntegrationTests { + @Test + void producer_hasCorrectAnnotations() { + // Verify class has @ApplicationScoped + assertTrue( + AsyncManagedExecutorProducer.class.isAnnotationPresent( + jakarta.enterprise.context.ApplicationScoped.class + ) + ); + + // Verify class has @Alternative + assertTrue( + AsyncManagedExecutorProducer.class.isAnnotationPresent( + jakarta.enterprise.inject.Alternative.class + ) + ); + + // Verify class has @Priority with value 20 + assertTrue( + AsyncManagedExecutorProducer.class.isAnnotationPresent( + jakarta.annotation.Priority.class + ) + ); + assertEquals( + 20, + AsyncManagedExecutorProducer.class.getAnnotation( + jakarta.annotation.Priority.class + ).value() + ); + } + + @Test + void produceMethod_hasCorrectAnnotations() throws NoSuchMethodException { + var method = AsyncManagedExecutorProducer.class.getMethod("produce"); + + // Verify method has @Produces + assertTrue( + method.isAnnotationPresent(jakarta.enterprise.inject.Produces.class) + ); + + // Verify method has @Internal + assertTrue( + method.isAnnotationPresent(io.a2a.server.util.async.Internal.class) + ); + } + + @Test + void initMethod_hasPostConstructAnnotation() throws NoSuchMethodException { + var method = AsyncManagedExecutorProducer.class.getMethod("init"); + + assertTrue( + method.isAnnotationPresent(jakarta.annotation.PostConstruct.class) + ); + } + + @Test + void managedExecutorField_hasInjectAnnotation() throws NoSuchFieldException { + var field = AsyncManagedExecutorProducer.class.getDeclaredField("managedExecutor"); + + assertTrue( + field.isAnnotationPresent(jakarta.inject.Inject.class) + ); + } + } + + @Nested + class ExecutorBehaviorTests { + @Test + void producedExecutor_canExecuteRunnables() { + producer.managedExecutor = managedExecutor; + Runnable task = mock(Runnable.class); + + Executor executor = producer.produce(); + executor.execute(task); + + verify(managedExecutor).execute(task); + } + + @Test + void producedExecutor_delegatesToManagedExecutor() { + producer.managedExecutor = managedExecutor; + Runnable task1 = mock(Runnable.class); + Runnable task2 = mock(Runnable.class); + + Executor executor = producer.produce(); + executor.execute(task1); + executor.execute(task2); + + verify(managedExecutor).execute(task1); + verify(managedExecutor).execute(task2); + verifyNoMoreInteractions(managedExecutor); + } + } +}