Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
public class AgentCardProducer {

@ConfigProperty(name = "quarkus.agentcard.protocol", defaultValue = "JSONRPC")
TransportProtocol protocol;
String protocolStr;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use a Converter https://github.com/microprofile/microprofile-config/blob/main/spec/src/main/asciidoc/converters.asciidoc but perhaps that is more trouble than it is worth

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too much trouble imho


@Produces
@PublicAgentCard
Expand Down Expand Up @@ -48,6 +48,7 @@ public AgentCard agentCard() {
}

private AgentInterface getAgentInterface() {
TransportProtocol protocol = TransportProtocol.fromString(protocolStr);
String url = switch (protocol) {
case GRPC -> "localhost:9000";
case JSONRPC, HTTP_JSON -> "http://localhost:9999";
Expand Down
215 changes: 215 additions & 0 deletions extras/opentelemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# OpenTelemetry Integration for A2A

This module provides OpenTelemetry observability integration for A2A servers, including distributed tracing, metrics, and context propagation across asynchronous boundaries.

## Features

- **Distributed Tracing**: Automatic span creation for all A2A protocol methods
- **Context Propagation**: OpenTelemetry trace context propagation across async operations
- **Request/Response Logging**: Optional extraction of request and response data into spans
- **Error Tracking**: Automatic error status and error type attributes on failures

## Modules

### `opentelemetry-common`
Common utilities and constants shared across OpenTelemetry modules.

### `opentelemetry-client`
OpenTelemetry integration for A2A clients.

### `opentelemetry-client-propagation`
Context propagation support for A2A clients.

### `opentelemetry-server`
OpenTelemetry integration for A2A servers, including the context-aware executor.

### `opentelemetry-integration-tests`
Integration tests for OpenTelemetry functionality.

## Usage

### Basic Setup

Add the OpenTelemetry server module to your dependencies:

```xml
<dependency>
<groupId>io.a2a</groupId>
<artifactId>a2a-extras-opentelemetry-server</artifactId>
<version>${a2a.version}</version>
</dependency>
```

### Context-Aware Async Executor

The `AsyncManagedExecutorProducer` provides a `ManagedExecutor` that automatically propagates OpenTelemetry trace context across asynchronous boundaries. This ensures that spans created in async tasks are properly linked to their parent spans.

#### How It Works

When the OpenTelemetry server module is included, the `AsyncManagedExecutorProducer` automatically replaces the default `AsyncExecutorProducer` using CDI alternatives:

- **Priority 20**: Takes precedence over the default executor producer (priority 10)
- **Automatic Activation**: No configuration needed - just include the module
- **Context Propagation**: Uses MicroProfile Context Propagation to maintain trace context

#### Configuration

The `ManagedExecutor` is container-managed and configured through your runtime environment:

**Quarkus:**
```properties
# Configure the managed executor pool
quarkus.thread-pool.core-threads=10
quarkus.thread-pool.max-threads=50
quarkus.thread-pool.queue-size=100
```

**Other Runtimes:**
Consult your MicroProfile Context Propagation implementation documentation for configuration options.

> **Note**: Unlike the default `AsyncExecutorProducer`, the `AsyncManagedExecutorProducer` does not use the `a2a.executor.*` configuration properties. Pool sizing is controlled by the container's ManagedExecutor configuration.

#### Example

```java
@ApplicationScoped
public class MyAgent implements Agent {

@Inject
@Internal
Executor executor; // Automatically uses ManagedExecutor with context propagation

@Override
public void execute(RequestContext context, AgentEmitter emitter) {
// Current span context is automatically propagated
executor.execute(() -> {
// This code runs in a different thread but maintains the trace context
Span currentSpan = Span.current();
currentSpan.addEvent("Processing in async task");

// Do async work...
});
}
}
```

### Request/Response Extraction

Enable request and response data extraction in spans:

```properties
# Extract request parameters into span attributes
a2a.opentelemetry.extract-request=true

# Extract response data into span attributes
a2a.opentelemetry.extract-response=true
```

> **Warning**: Extracting request/response data may expose sensitive information in traces. Use with caution in production environments.

### Span Attributes

The following attributes are automatically added to spans:

- `genai.request`: Request parameters (if extraction enabled)
- `genai.response`: Response data (if extraction enabled)
- `error.type`: Error message (on failures)

## Architecture

### Request Handler Decoration

The `OpenTelemetryRequestHandlerDecorator` wraps the default request handler and creates spans for each A2A protocol method:

```
Client Request
OpenTelemetryRequestHandlerDecorator
↓ (creates span)
Default RequestHandler
Agent Execution (with context propagation)
Response
```

### Context Propagation Flow

```
HTTP Request (with trace headers)
OpenTelemetry extracts context
Span created for A2A method
ManagedExecutor propagates context
Async agent execution (maintains trace context)
Response (with trace headers)
```

## Testing

The module includes comprehensive unit tests:

- `AsyncManagedExecutorProducerTest`: Tests for the context-aware executor producer
- `OpenTelemetryRequestHandlerDecoratorTest`: Tests for span creation and error handling

Run tests:
```bash
mvn test -pl extras/opentelemetry/server
```

## Troubleshooting

### Context Not Propagating

**Symptom**: Spans in async tasks are not linked to parent spans.

**Solution**: Ensure the OpenTelemetry server module is included and the `ManagedExecutor` is being injected correctly. Check logs for:
```
Initializing OpenTelemetry-aware ManagedExecutor for async operations
```

### ManagedExecutor Not Available

**Symptom**: `IllegalStateException: ManagedExecutor not injected - ensure MicroProfile Context Propagation is available`

**Solution**: Ensure your runtime provides MicroProfile Context Propagation support. For Quarkus, add:
```xml
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-context-propagation</artifactId>
</dependency>
```

### Performance Impact

**Symptom**: Increased latency with OpenTelemetry enabled.

**Solution**:
- Disable request/response extraction in production
- Configure sampling rate to reduce trace volume
- Ensure your OpenTelemetry collector is properly sized

## Best Practices

1. **Sampling**: Configure appropriate sampling rates for production environments
2. **Sensitive Data**: Disable request/response extraction if handling sensitive data
3. **Resource Attributes**: Add service name and version as resource attributes
4. **Collector Configuration**: Use batch processors to reduce network overhead
5. **Monitoring**: Monitor the OpenTelemetry collector's health and performance

## Dependencies

- MicroProfile Telemetry 2.0.1+
- MicroProfile Context Propagation 1.3+
- OpenTelemetry API
- A2A Server Common

## See Also

- [OpenTelemetry Documentation](https://opentelemetry.io/docs/)
- [MicroProfile Telemetry Specification](https://github.com/eclipse/microprofile-telemetry)
- [MicroProfile Context Propagation](https://github.com/eclipse/microprofile-context-propagation)
1 change: 1 addition & 0 deletions extras/opentelemetry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

<properties>
<version.org.eclipse.microprofile.telemetry>2.0.1</version.org.eclipse.microprofile.telemetry>
<version.org.eclipse.microprofile.context-propagation>1.3</version.org.eclipse.microprofile.context-propagation>
</properties>
<dependencies>
<dependency>
Expand Down
6 changes: 6 additions & 0 deletions extras/opentelemetry/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.context-propagation</groupId>
<artifactId>microprofile-context-propagation-api</artifactId>
<version>${version.org.eclipse.microprofile.context-propagation}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.enterprise</groupId>
<artifactId>jakarta.enterprise.cdi-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.a2a.extras.opentelemetry;

import io.a2a.server.util.async.Internal;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Alternative;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import java.util.concurrent.Executor;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Alternative executor producer that provides a ManagedExecutor with OpenTelemetry context propagation.
* <p>
* This producer replaces the default {@code AsyncExecutorProducer} when the OpenTelemetry extras module
* is included in the application. The ManagedExecutor ensures that OpenTelemetry trace context is
* properly propagated across asynchronous boundaries.
* <p>
* Priority 20 ensures this alternative takes precedence over the default producer (priority 10).
*
* <h2>Configuration</h2>
* The ManagedExecutor is container-managed and injected via CDI. Its configuration depends on the
* runtime environment:
* <ul>
* <li><b>Quarkus:</b> Configure via {@code quarkus.thread-pool.*} properties</li>
* <li><b>Other runtimes:</b> Consult your MicroProfile Context Propagation implementation documentation</li>
* </ul>
* <p>
* Unlike the default {@code AsyncExecutorProducer}, this producer does not use the {@code a2a.executor.*}
* configuration properties. The executor pool sizing and behavior are controlled by the container's
* ManagedExecutor configuration.
*
* @see org.eclipse.microprofile.context.ManagedExecutor
*/
@ApplicationScoped
@Alternative
@Priority(20)
public class AsyncManagedExecutorProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncManagedExecutorProducer.class);

@Inject
ManagedExecutor managedExecutor;

@PostConstruct
public void init() {
LOGGER.info("Initializing OpenTelemetry-aware ManagedExecutor for async operations");
if (managedExecutor == null) {
LOGGER.warn("ManagedExecutor not available - context propagation may not work correctly");
}
}

@Produces
@Internal
public Executor produce() {
LOGGER.debug("Using ManagedExecutor for async operations with OpenTelemetry context propagation");
if (managedExecutor == null) {
throw new IllegalStateException("ManagedExecutor not injected - ensure MicroProfile Context Propagation is available");
}
return managedExecutor;
}

}
Loading