Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from .exporters.utils import is_agent365_exporter_enabled
from .trace_processor.span_processor import SpanProcessor

DEFAULT_LOGGER_NAME = __name__


class TelemetryManager:
"""
Expand Down Expand Up @@ -46,7 +48,7 @@ def configure(
self,
service_name: str,
service_namespace: str,
logger_name: str = "agent365",
logger_name: str = DEFAULT_LOGGER_NAME,
token_resolver: Callable[[str, str], str | None] | None = None,
cluster_category: str = "prod",
**kwargs: Any,
Expand Down Expand Up @@ -158,11 +160,11 @@ def get_tracer(self, name: str | None = None, version: str | None = None):
to call `configure(...)` during application startup so the tracer
returned is backed by the configured TracerProvider.

:param name: Optional tracer name. Defaults to 'agent365' when not provided.
:param name: Optional tracer name. Defaults to 'microsoft_agents_a365.observability.core' when not provided.
:param version: Optional tracer version.
:return: An OpenTelemetry Tracer instance.
"""
tracer_name = name or "agent365"
tracer_name = name or DEFAULT_LOGGER_NAME
if self._tracer_provider is None:
# Not configured — return whatever tracer OpenTelemetry provides (no-op)
self._logger.warning(
Expand Down Expand Up @@ -192,7 +194,7 @@ def get_tracer_provider(self):
def configure(
service_name: str,
service_namespace: str,
logger_name: str = "agent365",
logger_name: str = DEFAULT_LOGGER_NAME,
token_resolver: Callable[[str, str], str | None] | None = None,
cluster_category: str = "prod",
**kwargs: Any,
Expand Down Expand Up @@ -228,7 +230,7 @@ def get_tracer(name: str | None = None, version: str | None = None):
"""
Return a tracer tied to the TracerProvider configured by the SDK.

:param name: Optional tracer name. If omitted, defaults to 'agent365'.
:param name: Optional tracer name. If omitted, defaults to 'microsoft_agents_a365.observability.core'.
:param version: Optional tracer version.
:return: An OpenTelemetry Tracer (may be a no-op tracer if SDK isn't configured).
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

import json
import logging
import threading
import time
from collections.abc import Callable, Sequence
Expand All @@ -30,6 +31,9 @@
DEFAULT_HTTP_TIMEOUT_SECONDS = 30.0
DEFAULT_MAX_RETRIES = 3

# Create logger for this module - inherits from 'microsoft_agents_a365.observability.core'
logger = logging.getLogger(__name__)


class Agent365Exporter(SpanExporter):
"""
Expand Down Expand Up @@ -65,8 +69,15 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
groups = partition_by_identity(spans)
if not groups:
# No spans with identity; treat as success
logger.info("No spans with tenant/agent identity found; nothing exported.")
return SpanExportResult.SUCCESS

# Debug: Log number of groups and total span count
total_spans = sum(len(activities) for activities in groups.values())
logger.info(
f"Found {len(groups)} identity groups with {total_spans} total spans to export"
)

any_failure = False
for (tenant_id, agent_id), activities in groups.items():
payload = self._build_export_request(activities)
Expand All @@ -82,13 +93,25 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
)
url = f"https://{endpoint}{endpoint_path}?api-version=1"

# Debug: Log endpoint being used
logger.info(
f"Exporting {len(activities)} spans to endpoint: {url} "
f"(tenant: {tenant_id}, agent: {agent_id})"
)

headers = {"content-type": "application/json"}
try:
token = self._token_resolver(agent_id, tenant_id)
if token:
headers["authorization"] = f"Bearer {token}"
except Exception:
logger.info(f"Token resolved successfully for agent {agent_id}")
else:
logger.info(f"No token returned for agent {agent_id}")
except Exception as e:
# If token resolution fails, treat as failure for this group
logger.error(
f"Token resolution failed for agent {agent_id}, tenant {tenant_id}: {e}"
)
any_failure = True
continue

Expand All @@ -99,8 +122,9 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:

return SpanExportResult.FAILURE if any_failure else SpanExportResult.SUCCESS

except Exception:
except Exception as e:
# Exporters should not raise; signal failure.
logger.error(f"Export failed with exception: {e}")
return SpanExportResult.FAILURE

def shutdown(self) -> None:
Expand All @@ -118,6 +142,13 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:

# ------------- HTTP helper ----------------------

@staticmethod
def _truncate_text(text: str, max_length: int) -> str:
"""Truncate text to a maximum length, adding '...' if truncated."""
if len(text) > max_length:
return text[:max_length] + "..."
return text

def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bool:
for attempt in range(DEFAULT_MAX_RETRIES + 1):
try:
Expand All @@ -127,20 +158,54 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo
headers=headers,
timeout=DEFAULT_HTTP_TIMEOUT_SECONDS,
)

# Extract correlation ID from response headers for logging
correlation_id = (
resp.headers.get("x-ms-correlation-id")
or resp.headers.get("request-id")
or "N/A"
)

# 2xx => success
if 200 <= resp.status_code < 300:
logger.info(
f"HTTP {resp.status_code} success on attempt {attempt + 1}. "
f"Correlation ID: {correlation_id}. "
f"Response: {self._truncate_text(resp.text, 200)}"
)
return True

# Log non-success responses
response_text = self._truncate_text(resp.text, 500)

# Retry transient
if resp.status_code in (408, 429) or 500 <= resp.status_code < 600:
if attempt < DEFAULT_MAX_RETRIES:
time.sleep(0.2 * (attempt + 1))
continue
# Final attempt failed
logger.error(
f"HTTP {resp.status_code} final failure after {DEFAULT_MAX_RETRIES + 1} attempts. "
f"Correlation ID: {correlation_id}. "
f"Response: {response_text}"
)
else:
# Non-retryable error
logger.error(
f"HTTP {resp.status_code} non-retryable error. "
f"Correlation ID: {correlation_id}. "
f"Response: {response_text}"
)
return False
except requests.RequestException:

except requests.RequestException as e:
if attempt < DEFAULT_MAX_RETRIES:
time.sleep(0.2 * (attempt + 1))
continue
# Final attempt failed
logger.error(
f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts with exception: {e}"
)
return False
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# Base class for OpenTelemetry tracing scopes.

import logging
import os
import time
from threading import Lock
Expand Down Expand Up @@ -35,6 +36,9 @@
from .agent_details import AgentDetails
from .tenant_details import TenantDetails

# Create logger for this module - inherits from 'microsoft_agents_a365.observability.core'
logger = logging.getLogger(__name__)


class OpenTelemetryScope:
"""Base class for OpenTelemetry tracing scopes in the SDK."""
Expand Down Expand Up @@ -104,6 +108,13 @@ def __init__(
activity_name, kind=activity_kind, context=current_context
)

# Log span creation
if self._span:
span_id = f"{self._span.context.span_id:016x}" if self._span.context else "unknown"
logger.info(f"Span started: '{activity_name}' ({span_id})")
else:
logger.error(f"Failed to create span: '{activity_name}' - tracer returned None")

# Set common tags
if self._span:
self._span.set_attribute(GEN_AI_SYSTEM_KEY, GEN_AI_SYSTEM_VALUE)
Expand Down Expand Up @@ -210,6 +221,9 @@ def _end(self) -> None:
"""End the span and record metrics."""
if self._span and self._is_telemetry_enabled() and not self._has_ended:
self._has_ended = True
span_id = f"{self._span.context.span_id:016x}" if self._span.context else "unknown"
logger.info(f"Span ended: '{self._span.name}' ({span_id})")

self._span.end()

def __enter__(self):
Expand Down
85 changes: 85 additions & 0 deletions tests/observability/core/test_agent365_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,91 @@ def test_default_endpoint_path_when_s2s_disabled(self):
self.assertEqual(headers["authorization"], "Bearer test_token_123")
self.assertEqual(headers["content-type"], "application/json")

@patch("microsoft_agents_a365.observability.core.exporters.agent365_exporter.logger")
@patch(
"microsoft_agents_a365.observability.core.exporters.agent365_exporter.PowerPlatformApiDiscovery"
)
def test_export_logging(self, mock_discovery, mock_logger):
"""Test that the exporter logs appropriate messages during export."""
# Mock the discovery service
mock_discovery_instance = Mock()
mock_discovery_instance.get_tenant_island_cluster_endpoint.return_value = (
"test-endpoint.com"
)
mock_discovery.return_value = mock_discovery_instance

# Mock successful HTTP response
with patch("requests.Session.post") as mock_post:
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = "success"
mock_response.headers = {"x-ms-correlation-id": "test-correlation-123"}
mock_post.return_value = mock_response

# Create test spans
spans = [
self._create_mock_span(
name="test_span_1",
attributes={
TENANT_ID_KEY: "test-tenant-123",
GEN_AI_AGENT_ID_KEY: "test-agent-456",
},
),
self._create_mock_span(
name="test_span_2",
attributes={
TENANT_ID_KEY: "test-tenant-123",
GEN_AI_AGENT_ID_KEY: "test-agent-456",
},
),
]

# Export spans
result = self.exporter.export(spans)

# Verify export succeeded
self.assertEqual(result, SpanExportResult.SUCCESS)

# Verify logging calls
expected_log_calls = [
# Should log groups found
unittest.mock.call.info("Found 1 identity groups with 2 total spans to export"),
# Should log endpoint being used
unittest.mock.call.info(
"Exporting 2 spans to endpoint: https://test-endpoint.com/maven/agent365/agents/test-agent-456/traces?api-version=1 "
"(tenant: test-tenant-123, agent: test-agent-456)"
),
# Should log token resolution success
unittest.mock.call.info("Token resolved successfully for agent test-agent-456"),
# Should log HTTP success
unittest.mock.call.info(
"HTTP 200 success on attempt 1. Correlation ID: test-correlation-123. Response: success"
),
]

# Check that all expected info calls were made
for expected_call in expected_log_calls:
self.assertIn(expected_call, mock_logger.info.call_args_list)

@patch("microsoft_agents_a365.observability.core.exporters.agent365_exporter.logger")
def test_export_error_logging(self, mock_logger):
"""Test that the exporter logs errors appropriately."""
# Create spans without tenant/agent identity - explicitly pass None values
spans = [
self._create_mock_span(name="test_span", attributes={}, tenant_id=None, agent_id=None)
]

# Export spans (should succeed but log no identity)
result = self.exporter.export(spans)

# Verify export succeeded (no identity spans are treated as success)
self.assertEqual(result, SpanExportResult.SUCCESS)

# Verify info log for no identity
mock_logger.info.assert_called_with(
"No spans with tenant/agent identity found; nothing exported."
)


if __name__ == "__main__":
unittest.main()
Loading
Loading