diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index a08374bbe..07299d714 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -265,6 +265,17 @@ def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource: ) +def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvider, resource: Resource = None): + if not is_agent_observability_enabled(): + return + + traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) + + span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider()) + + trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter)) + + def _is_defer_to_workers_enabled(): return os.environ.get(OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG, "false").strip().lower() == "true" @@ -408,9 +419,14 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) -> if _is_lambda_environment(): provider.add_span_processor(AwsLambdaSpanProcessor()) + # We always send 100% spans to Genesis platform for agent observability because + # AI applications typically have low throughput traffic patterns and require + # comprehensive monitoring to catch subtle failure modes like hallucinations + # and quality degradation that sampling could miss. # Add session.id baggage attribute to span attributes to support AI Agent use cases # enabling session ID tracking in spans. if is_agent_observability_enabled(): + _export_unsampled_span_for_agent_observability(provider, resource) def session_id_predicate(baggage_key: str) -> bool: return baggage_key == "session.id" diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index dbaee3c33..4abb99d0c 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -25,6 +25,7 @@ _customize_sampler, _customize_span_exporter, _customize_span_processors, + _export_unsampled_span_for_agent_observability, _export_unsampled_span_for_lambda, _init_logging, _is_application_signals_enabled, @@ -795,6 +796,81 @@ def test_export_unsampled_span_for_lambda(self): os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None) os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None) + # pylint: disable=no-self-use + def test_export_unsampled_span_for_agent_observability(self): + mock_tracer_provider: TracerProvider = MagicMock() + + # Test when agent observability is disabled (default) + _export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty()) + self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0) + + # Test when agent observability is enabled with AWS endpoint (the default case) + os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" + os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces" + _export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty()) + self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1) + processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0] + self.assertIsInstance(processor, BatchUnsampledSpanProcessor) + + # Clean up + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None) + + # pylint: disable=no-self-use + def test_export_unsampled_span_for_agent_observability_uses_aws_exporter(self): + """Test that OTLPAwsSpanExporter is used for AWS endpoints""" + mock_tracer_provider: TracerProvider = MagicMock() + + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.OTLPAwsSpanExporter" + ) as mock_aws_exporter: + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.BatchUnsampledSpanProcessor" + ) as mock_processor: + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider" + ) as mock_logger_provider: + os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" + os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces" + + _export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty()) + + # Verify OTLPAwsSpanExporter is created with correct parameters + mock_aws_exporter.assert_called_once_with( + endpoint="https://xray.us-east-1.amazonaws.com/v1/traces", + logger_provider=mock_logger_provider.return_value, + ) + # Verify BatchUnsampledSpanProcessor wraps the exporter + mock_processor.assert_called_once_with(span_exporter=mock_aws_exporter.return_value) + # Verify processor is added to tracer provider + mock_tracer_provider.add_span_processor.assert_called_once_with(mock_processor.return_value) + + # Clean up + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None) + + # pylint: disable=no-self-use + def test_customize_span_processors_calls_export_unsampled_span(self): + """Test that _customize_span_processors calls _export_unsampled_span_for_agent_observability""" + mock_tracer_provider: TracerProvider = MagicMock() + + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator._export_unsampled_span_for_agent_observability" + ) as mock_agent_observability: + # Test that agent observability function is NOT called when disabled + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + _customize_span_processors(mock_tracer_provider, Resource.get_empty()) + mock_agent_observability.assert_not_called() + + # Test that agent observability function is called when enabled + mock_agent_observability.reset_mock() + os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" + _customize_span_processors(mock_tracer_provider, Resource.get_empty()) + mock_agent_observability.assert_called_once_with(mock_tracer_provider, Resource.get_empty()) + + # Clean up + os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + def test_customize_metric_exporter(self): metric_readers = [] views = []