Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

from .exporters.agent365_exporter import Agent365Exporter
from .exporters.agent365_exporter_options import Agent365ExporterOptions
from .exporters.utils import is_agent365_exporter_enabled
from .trace_processor.span_processor import SpanProcessor

Expand Down Expand Up @@ -51,6 +52,7 @@ def configure(
logger_name: str = DEFAULT_LOGGER_NAME,
token_resolver: Callable[[str, str], str | None] | None = None,
cluster_category: str = "prod",
exporter_options: Optional[Agent365ExporterOptions] = None,
**kwargs: Any,
) -> bool:
"""
Expand All @@ -59,8 +61,12 @@ def configure(
:param service_name: The name of the service.
:param service_namespace: The namespace of the service.
:param logger_name: The name of the logger to collect telemetry from.
:param token_resolver: Callable that returns an auth token for a given agent + tenant.
:param cluster_category: Environment / cluster category (e.g., "preprod", "prod").
:param token_resolver: (Deprecated) Callable that returns an auth token for a given agent + tenant.
Use exporter_options instead.
:param cluster_category: (Deprecated) Environment / cluster category (e.g. "prod").
Use exporter_options instead.
:param exporter_options: Agent365ExporterOptions instance for configuring the exporter.
If provided, exporter_options takes precedence. If exporter_options is None, the token_resolver and cluster_category parameters are used as fallback/legacy support to construct a default Agent365ExporterOptions instance.
:return: True if configuration succeeded, False otherwise.
"""
try:
Expand All @@ -71,6 +77,7 @@ def configure(
logger_name,
token_resolver,
cluster_category,
exporter_options,
**kwargs,
)
except Exception as e:
Expand All @@ -84,6 +91,7 @@ def _configure_internal(
logger_name: str,
token_resolver: Callable[[str, str], str | None] | None = None,
cluster_category: str = "prod",
exporter_options: Optional[Agent365ExporterOptions] = None,
**kwargs: Any,
) -> bool:
"""Internal configuration method - not thread-safe, must be called with lock."""
Expand Down Expand Up @@ -115,11 +123,26 @@ def _configure_internal(
trace.set_tracer_provider(tracer_provider)
self._tracer_provider = tracer_provider

if is_agent365_exporter_enabled() and token_resolver is not None:
exporter = Agent365Exporter(
token_resolver=token_resolver,
# Use exporter_options if provided, otherwise create default options with legacy parameters
if exporter_options is None:
exporter_options = Agent365ExporterOptions(
cluster_category=cluster_category,
**kwargs,
token_resolver=token_resolver,
)

# Extract configuration for BatchSpanProcessor
batch_processor_kwargs = {
"max_queue_size": exporter_options.max_queue_size,
"schedule_delay_millis": exporter_options.scheduled_delay_ms,
"export_timeout_millis": exporter_options.exporter_timeout_ms,
"max_export_batch_size": exporter_options.max_export_batch_size,
}

if is_agent365_exporter_enabled() and exporter_options.token_resolver is not None:
exporter = Agent365Exporter(
token_resolver=exporter_options.token_resolver,
cluster_category=exporter_options.cluster_category,
use_s2s_endpoint=exporter_options.use_s2s_endpoint,
)
else:
exporter = ConsoleSpanExporter()
Expand All @@ -130,7 +153,7 @@ def _configure_internal(
# Add span processors

# Create BatchSpanProcessor with optimized settings
batch_processor = BatchSpanProcessor(exporter)
batch_processor = BatchSpanProcessor(exporter, **batch_processor_kwargs)
agent_processor = SpanProcessor()

tracer_provider.add_span_processor(batch_processor)
Expand Down Expand Up @@ -197,6 +220,7 @@ def configure(
logger_name: str = DEFAULT_LOGGER_NAME,
token_resolver: Callable[[str, str], str | None] | None = None,
cluster_category: str = "prod",
exporter_options: Optional[Agent365ExporterOptions] = None,
**kwargs: Any,
) -> bool:
"""
Expand All @@ -205,6 +229,12 @@ def configure(
:param service_name: The name of the service.
:param service_namespace: The namespace of the service.
:param logger_name: The name of the logger to collect telemetry from.
:param token_resolver: (Deprecated) Callable that returns an auth token for a given agent + tenant.
Use exporter_options instead.
:param cluster_category: (Deprecated) Environment / cluster category (e.g. "prod").
Use exporter_options instead.
:param exporter_options: Agent365ExporterOptions instance for configuring the exporter.
If provided, exporter_options takes precedence. If exporter_options is None, the token_resolver and cluster_category parameters are used as fallback/legacy support to construct a default Agent365ExporterOptions instance.
:return: True if configuration succeeded, False otherwise.
"""
return _telemetry_manager.configure(
Expand All @@ -213,6 +243,7 @@ def configure(
logger_name,
token_resolver,
cluster_category,
exporter_options,
**kwargs,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@
GEN_AI_AGENT_BLUEPRINT_ID_KEY = "gen_ai.agent.applicationid"
CORRELATION_ID_KEY = "correlation.id"
HIRING_MANAGER_ID_KEY = "hiring.manager.id"
SESSION_DESCRIPTION_KEY = "session.description"

# Execution context dimensions
GEN_AI_EXECUTION_TYPE_KEY = "gen_ai.execution.type"
GEN_AI_EXECUTION_PAYLOAD_KEY = "gen_ai.execution.payload"

# Source metadata dimensions
GEN_AI_EXECUTION_SOURCE_ID_KEY = "gen_ai.execution.sourceMetadata.id"
GEN_AI_EXECUTION_SOURCE_NAME_KEY = "gen_ai.channel.name"
GEN_AI_EXECUTION_SOURCE_DESCRIPTION_KEY = "gen_ai.channel.link"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from typing import Awaitable, Callable, Optional


class Agent365ExporterOptions:
"""
Configuration for Agent365Exporter.
Only cluster_category and token_resolver are required for core operation.
"""

def __init__(
self,
cluster_category: str = "prod",
token_resolver: Optional[Callable[[str, str], Awaitable[Optional[str]]]] = None,
use_s2s_endpoint: bool = False,
max_queue_size: int = 2048,
scheduled_delay_ms: int = 5000,
exporter_timeout_ms: int = 30000,
max_export_batch_size: int = 512,
):
"""
Args:
cluster_category: Cluster region argument. Defaults to 'prod'.
token_resolver: Async callable that resolves the auth token (REQUIRED).
use_s2s_endpoint: Use the S2S endpoint instead of standard endpoint.
max_queue_size: Maximum queue size for the batch processor. Default is 2048.
scheduled_delay_ms: Delay between export batches (ms). Default is 5000.
exporter_timeout_ms: Timeout for the export operation (ms). Default is 30000.
max_export_batch_size: Maximum batch size for export operations. Default is 512.
"""
self.cluster_category = cluster_category
self.token_resolver = token_resolver
self.use_s2s_endpoint = use_s2s_endpoint
self.max_queue_size = max_queue_size
self.scheduled_delay_ms = scheduled_delay_ms
self.exporter_timeout_ms = exporter_timeout_ms
self.max_export_batch_size = max_export_batch_size
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
GEN_AI_CALLER_UPN_KEY,
GEN_AI_CALLER_USER_ID_KEY,
GEN_AI_EXECUTION_SOURCE_DESCRIPTION_KEY,
GEN_AI_EXECUTION_SOURCE_ID_KEY,
GEN_AI_EXECUTION_SOURCE_NAME_KEY,
GEN_AI_EXECUTION_TYPE_KEY,
GEN_AI_INPUT_MESSAGES_KEY,
Expand Down Expand Up @@ -111,7 +110,6 @@ def __init__(
# Set request metadata if provided
if request:
if request.source_metadata:
self.set_tag_maybe(GEN_AI_EXECUTION_SOURCE_ID_KEY, request.source_metadata.id)
self.set_tag_maybe(GEN_AI_EXECUTION_SOURCE_NAME_KEY, request.source_metadata.name)
self.set_tag_maybe(
GEN_AI_EXECUTION_SOURCE_DESCRIPTION_KEY, request.source_metadata.description
Expand All @@ -121,6 +119,7 @@ def __init__(
GEN_AI_EXECUTION_TYPE_KEY,
request.execution_type.value if request.execution_type else None,
)
self.set_tag_maybe(GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps([request.content]))

# Set caller details tags
if caller_details:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
GEN_AI_CONVERSATION_ID_KEY,
GEN_AI_CONVERSATION_ITEM_LINK_KEY,
GEN_AI_EXECUTION_SOURCE_DESCRIPTION_KEY,
GEN_AI_EXECUTION_SOURCE_ID_KEY,
GEN_AI_EXECUTION_SOURCE_NAME_KEY,
HIRING_MANAGER_ID_KEY,
OPERATION_SOURCE_KEY,
SESSION_DESCRIPTION_KEY,
SESSION_ID_KEY,
TENANT_ID_KEY,
)
from ..models.operation_source import OperationSource
from ..utils import deprecated
from .turn_context_baggage import from_turn_context


Expand All @@ -50,16 +53,18 @@ def __init__(self):
"""Initialize the baggage builder."""
self._pairs: dict[str, str] = {}

def operation_source(self, value: str | None) -> "BaggageBuilder":
def operation_source(self, value: OperationSource | None) -> "BaggageBuilder":
"""Set the operation source baggage value.

Args:
value: The operation source value
value: The operation source enum value

Returns:
Self for method chaining
"""
self._set(OPERATION_SOURCE_KEY, value)
# Convert enum to string value for baggage storage
str_value = value.value if value is not None else None
self._set(OPERATION_SOURCE_KEY, str_value)
return self

def tenant_id(self, value: str | None) -> "BaggageBuilder":
Expand Down Expand Up @@ -188,18 +193,38 @@ def conversation_item_link(self, value: str | None) -> "BaggageBuilder":
self._set(GEN_AI_CONVERSATION_ITEM_LINK_KEY, value)
return self

@deprecated("This is a no-op. Use channel_name() or channel_links() instead.")
def source_metadata_id(self, value: str | None) -> "BaggageBuilder":
"""Set the execution source metadata ID (e.g., channel ID)."""
self._set(GEN_AI_EXECUTION_SOURCE_ID_KEY, value)
return self

@deprecated("Use channel_name() instead")
def source_metadata_name(self, value: str | None) -> "BaggageBuilder":
"""Set the execution source metadata name (e.g., channel name)."""
self._set(GEN_AI_EXECUTION_SOURCE_NAME_KEY, value)
return self
return self.channel_name(value)

@deprecated("Use channel_links() instead")
def source_metadata_description(self, value: str | None) -> "BaggageBuilder":
"""Set the execution source metadata description (e.g., channel description)."""
return self.channel_links(value)

def session_id(self, value: str | None) -> "BaggageBuilder":
"""Set the session ID baggage value."""
self._set(SESSION_ID_KEY, value)
return self

def session_description(self, value: str | None) -> "BaggageBuilder":
"""Set the session description baggage value."""
self._set(SESSION_DESCRIPTION_KEY, value)
return self

def channel_name(self, value: str | None) -> "BaggageBuilder":
"""Sets the channel name baggage value (e.g., 'Teams', 'msteams')."""
self._set(GEN_AI_EXECUTION_SOURCE_NAME_KEY, value)
return self

def channel_links(self, value: str | None) -> "BaggageBuilder":
"""Sets the channel link baggage value. (e.g., channel links or description)."""
self._set(GEN_AI_EXECUTION_SOURCE_DESCRIPTION_KEY, value)
return self

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import json
from typing import Any, Iterable, Iterator, Mapping
from typing import Any, Iterator, Mapping

from ..constants import (
GEN_AI_AGENT_AUID_KEY,
Expand All @@ -17,7 +17,6 @@
GEN_AI_CONVERSATION_ID_KEY,
GEN_AI_CONVERSATION_ITEM_LINK_KEY,
GEN_AI_EXECUTION_SOURCE_DESCRIPTION_KEY,
GEN_AI_EXECUTION_SOURCE_ID_KEY,
GEN_AI_EXECUTION_SOURCE_NAME_KEY,
GEN_AI_EXECUTION_TYPE_KEY,
TENANT_ID_KEY,
Expand All @@ -26,9 +25,6 @@

AGENT_ROLE = "agenticUser"
CHANNEL_ID_AGENTS = "agents"
ENTITY_TYPE_WPX_COMMENT = "wpxcomment"
ENTITY_TYPE_EMAIL_NOTIFICATION = "emailNotification"
WPX_CONVERSATION_ID_FORMAT = "{document_id}_{parent_comment_id}"


def _safe_get(obj: Any, *names: str) -> Any:
Expand Down Expand Up @@ -135,37 +131,40 @@ def _iter_tenant_id_pair(activity: Any) -> Iterator[tuple[str, Any]]:


def _iter_source_metadata_pairs(activity: Any) -> Iterator[tuple[str, Any]]:
"""
Generate source metadata pairs from activity, handling both string and ChannelId object cases.

:param activity: The activity object (Activity instance or dict)
:return: Iterator of (key, value) tuples for source metadata
"""
# Handle channel_id (can be string or ChannelId object)
channel_id = _safe_get(activity, "channel_id")
yield GEN_AI_EXECUTION_SOURCE_ID_KEY, channel_id
yield GEN_AI_EXECUTION_SOURCE_NAME_KEY, channel_id
yield GEN_AI_EXECUTION_SOURCE_DESCRIPTION_KEY, _safe_get(activity, "type", "Type")

# Extract channel name from either string or ChannelId object
channel_name = None
sub_channel = None

if channel_id is not None:
if isinstance(channel_id, str):
# Direct string value
channel_name = channel_id
elif hasattr(channel_id, "channel"):
# ChannelId object
channel_name = channel_id.channel
sub_channel = getattr(channel_id, "sub_channel", None)
elif isinstance(channel_id, dict):
# Serialized ChannelId as dict
channel_name = channel_id.get("channel")
sub_channel = channel_id.get("sub_channel")

# Yield channel name as source name
yield GEN_AI_EXECUTION_SOURCE_NAME_KEY, channel_name
yield GEN_AI_EXECUTION_SOURCE_DESCRIPTION_KEY, sub_channel


def _iter_conversation_pairs(activity: Any) -> Iterator[tuple[str, Any]]:
channel_id = _safe_get(activity, "channel_id")
entities = _safe_get(activity, "entities") or []
conversation_id = None

if channel_id == CHANNEL_ID_AGENTS and isinstance(entities, Iterable):
# search entities for wpxcomment or emailNotification
for e in entities:
etype = _safe_get(e, "type", "Type")
if etype == ENTITY_TYPE_WPX_COMMENT:
document_id = _safe_get(e, "documentId", "document_id")
parent_comment_id = _safe_get(e, "parentCommentId", "parent_comment_id")
if document_id and parent_comment_id:
conversation_id = WPX_CONVERSATION_ID_FORMAT.format(
document_id=document_id,
parent_comment_id=parent_comment_id,
)
break
elif etype == ENTITY_TYPE_EMAIL_NOTIFICATION:
conversation_id = _safe_get(e, "conversationId", "conversation_id")
if conversation_id:
break
if not conversation_id:
conv = _safe_get(activity, "conversation")
conversation_id = _safe_get(conv, "id", "Id")
conv = _safe_get(activity, "conversation")
conversation_id = _safe_get(conv, "id")

item_link = _safe_get(activity, "service_url")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Operation source enumeration for Agent365 SDK."""

from enum import Enum


class OperationSource(Enum):
"""
Enumeration representing the source of an operation.
"""

SDK = "SDK"
"""Operation executed by SDK."""

GATEWAY = "Gateway"
"""Operation executed by Gateway."""

MCP_SERVER = "MCPServer"
"""Operation executed by MCP Server."""
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,3 @@ class Request:
execution_type: ExecutionType
session_id: str | None = None
source_metadata: SourceMetadata | None = None
payload: str | None = None
Loading
Loading