diff --git a/src/strands_evals/mappers/constants.py b/src/strands_evals/mappers/constants.py new file mode 100644 index 0000000..884f80d --- /dev/null +++ b/src/strands_evals/mappers/constants.py @@ -0,0 +1,30 @@ +"""Constants for OTEL trace mappers. + +Scope names identify which instrumentation library produced the spans. +Attribute keys and values are used for span type detection and data extraction. +""" + +# --- Instrumentation scope names --- +SCOPE_LANGCHAIN_OTEL = "opentelemetry.instrumentation.langchain" +SCOPE_OPENINFERENCE = "openinference.instrumentation.langchain" +SCOPE_STRANDS = "strands.telemetry.tracer" + +# --- OTEL semantic convention attribute keys --- +ATTR_LLM_REQUEST_TYPE = "llm.request.type" + +# --- Traceloop/OpenLLMetry attribute keys --- +ATTR_TRACELOOP_SPAN_KIND = "traceloop.span.kind" +ATTR_TRACELOOP_ENTITY_NAME = "traceloop.entity.name" +ATTR_TRACELOOP_ENTITY_INPUT = "traceloop.entity.input" +ATTR_TRACELOOP_ENTITY_OUTPUT = "traceloop.entity.output" + +# --- Traceloop/OpenLLMetry attribute values --- +KIND_WORKFLOW = "workflow" +KIND_TOOL = "tool" +LLM_TYPE_CHAT = "chat" + +# --- ADOT body field values (LangChain serialization) --- +ADOT_ROLE_UNKNOWN = "unknown" +ADOT_LANGGRAPH_NAME = "LangGraph" +ADOT_TOOL_CALL_WITH_CONTEXT = "tool_call_with_context" +ADOT_INPUT_STR_KEY = "input_str" diff --git a/src/strands_evals/mappers/langchain_otel_session_mapper.py b/src/strands_evals/mappers/langchain_otel_session_mapper.py index ae0bcbf..1bb3ece 100644 --- a/src/strands_evals/mappers/langchain_otel_session_mapper.py +++ b/src/strands_evals/mappers/langchain_otel_session_mapper.py @@ -31,32 +31,25 @@ Trace, UserMessage, ) +from .constants import ( + ADOT_INPUT_STR_KEY, + ADOT_LANGGRAPH_NAME, + ADOT_ROLE_UNKNOWN, + ADOT_TOOL_CALL_WITH_CONTEXT, + ATTR_LLM_REQUEST_TYPE, + ATTR_TRACELOOP_ENTITY_INPUT, + ATTR_TRACELOOP_ENTITY_NAME, + ATTR_TRACELOOP_ENTITY_OUTPUT, + ATTR_TRACELOOP_SPAN_KIND, + KIND_TOOL, + KIND_WORKFLOW, + LLM_TYPE_CHAT, + SCOPE_LANGCHAIN_OTEL, +) from .session_mapper import SessionMapper logger = logging.getLogger(__name__) -SCOPE_NAME = "opentelemetry.instrumentation.langchain" - -# --- OTEL semantic convention attribute keys --- -_ATTR_LLM_REQUEST_TYPE = "llm.request.type" - -# --- Traceloop/OpenLLMetry attribute keys --- -_ATTR_TRACELOOP_SPAN_KIND = "traceloop.span.kind" -_ATTR_TRACELOOP_ENTITY_NAME = "traceloop.entity.name" -_ATTR_TRACELOOP_ENTITY_INPUT = "traceloop.entity.input" -_ATTR_TRACELOOP_ENTITY_OUTPUT = "traceloop.entity.output" - -# --- Traceloop/OpenLLMetry attribute values --- -_KIND_WORKFLOW = "workflow" -_KIND_TOOL = "tool" -_LLM_TYPE_CHAT = "chat" - -# --- ADOT body field values (LangChain serialization) --- -_ADOT_ROLE_UNKNOWN = "unknown" -_ADOT_LANGGRAPH_NAME = "LangGraph" -_ADOT_TOOL_CALL_WITH_CONTEXT = "tool_call_with_context" -_ADOT_INPUT_STR_KEY = "input_str" - class LangChainOtelSessionMapper(SessionMapper): """Maps Traceloop/OpenLLMetry LangChain traces to Session format. @@ -103,7 +96,7 @@ def map_to_session(self, data: Any, session_id: str) -> Session: spans = self._normalize_to_flat_spans(data) # Filter to only spans from this scope - langchain_spans = [s for s in spans if self._get_scope_name(s) == SCOPE_NAME] + langchain_spans = [s for s in spans if self._get_scope_name(s) == SCOPE_LANGCHAIN_OTEL] # Group spans by trace_id grouped = defaultdict(list) @@ -186,12 +179,12 @@ def _is_inference_span(self, span: dict) -> bool: 2. ADOT body: role == "unknown" with raw string content (direct LLM I/O) """ attrs = span.get("attributes", {}) - if attrs.get(_ATTR_LLM_REQUEST_TYPE) == _LLM_TYPE_CHAT: + if attrs.get(ATTR_LLM_REQUEST_TYPE) == LLM_TYPE_CHAT: return True # ADOT fallback: records with role="unknown" and raw string content are LLM calls input_messages, _ = self._get_messages_from_span_events(span) - if input_messages and input_messages[0].get("role") == _ADOT_ROLE_UNKNOWN: + if input_messages and input_messages[0].get("role") == ADOT_ROLE_UNKNOWN: return True return False @@ -204,14 +197,14 @@ def _is_tool_execution_span(self, span: dict) -> bool: 2. ADOT body: input has "input_str" key (direct tool call format) """ attrs = span.get("attributes", {}) - if attrs.get(_ATTR_TRACELOOP_SPAN_KIND) == _KIND_TOOL: + if attrs.get(ATTR_TRACELOOP_SPAN_KIND) == KIND_TOOL: return True # ADOT fallback: "input_str" is the direct tool invocation format. # Note: "tool_call_with_context" records are graph-level wrappers that # duplicate the actual tool call — skip them to avoid duplicate spans. in_parsed = self._parse_adot_body(span) - if isinstance(in_parsed, dict) and _ADOT_INPUT_STR_KEY in in_parsed: + if isinstance(in_parsed, dict) and ADOT_INPUT_STR_KEY in in_parsed: return True return False @@ -228,14 +221,14 @@ def _is_agent_invocation_span(self, span: dict) -> bool: RunnableSequence, tools) are internal graph steps and must be skipped. """ attrs = span.get("attributes", {}) - if attrs.get(_ATTR_TRACELOOP_SPAN_KIND) == _KIND_WORKFLOW: + if attrs.get(ATTR_TRACELOOP_SPAN_KIND) == KIND_WORKFLOW: return True # ADOT fallback: only the root LangGraph node is the agent invocation in_parsed = self._parse_adot_body(span) if isinstance(in_parsed, dict): kwargs = in_parsed.get("kwargs") - if isinstance(kwargs, dict) and kwargs.get("name") == _ADOT_LANGGRAPH_NAME: + if isinstance(kwargs, dict) and kwargs.get("name") == ADOT_LANGGRAPH_NAME: return True return False @@ -308,7 +301,7 @@ def _convert_tool_execution_span(self, span: dict, session_id: str) -> ToolExecu span_info = self._create_span_info(span, session_id) attrs = span.get("attributes", {}) - tool_name = attrs.get(_ATTR_TRACELOOP_ENTITY_NAME) + tool_name = attrs.get(ATTR_TRACELOOP_ENTITY_NAME) tool_parameters: dict | None = None tool_output_content: str | None = None tool_call_id: str | None = None @@ -325,7 +318,7 @@ def _convert_tool_execution_span(self, span: dict, session_id: str) -> ToolExecu inputs = in_parsed.get("inputs") if isinstance(inputs, dict): # tool_call_with_context: {"inputs": {"__type": "tool_call_with_context", "tool_call": {...}}} - if inputs.get("__type") == _ADOT_TOOL_CALL_WITH_CONTEXT: + if inputs.get("__type") == ADOT_TOOL_CALL_WITH_CONTEXT: tc = inputs.get("tool_call", {}) tool_parameters = tc.get("args", {}) tool_name = tool_name or tc.get("name") @@ -333,8 +326,8 @@ def _convert_tool_execution_span(self, span: dict, session_id: str) -> ToolExecu else: # Direct inputs dict: {"inputs": {"a": 1, "b": 2}} tool_parameters = inputs - if tool_parameters is None and _ADOT_INPUT_STR_KEY in in_parsed: - params_parsed = self._safe_json_parse(in_parsed.get(_ADOT_INPUT_STR_KEY, "")) + if tool_parameters is None and ADOT_INPUT_STR_KEY in in_parsed: + params_parsed = self._safe_json_parse(in_parsed.get(ADOT_INPUT_STR_KEY, "")) if isinstance(params_parsed, dict): tool_parameters = params_parsed @@ -354,16 +347,16 @@ def _convert_tool_execution_span(self, span: dict, session_id: str) -> ToolExecu tool_name = tool_name or top_kwargs.get("name") else: # Live format - extract from traceloop.entity.* attributes - entity_input = attrs.get(_ATTR_TRACELOOP_ENTITY_INPUT, "") - entity_output = attrs.get(_ATTR_TRACELOOP_ENTITY_OUTPUT, "") + entity_input = attrs.get(ATTR_TRACELOOP_ENTITY_INPUT, "") + entity_output = attrs.get(ATTR_TRACELOOP_ENTITY_OUTPUT, "") if entity_input: parsed = self._safe_json_parse(entity_input) if isinstance(parsed, dict): if "inputs" in parsed and isinstance(parsed.get("inputs"), dict): tool_parameters = parsed.get("inputs") - elif _ADOT_INPUT_STR_KEY in parsed: - params_parsed = self._safe_json_parse(parsed.get(_ADOT_INPUT_STR_KEY, "")) + elif ADOT_INPUT_STR_KEY in parsed: + params_parsed = self._safe_json_parse(parsed.get(ADOT_INPUT_STR_KEY, "")) if isinstance(params_parsed, dict): tool_parameters = params_parsed @@ -413,8 +406,8 @@ def _convert_agent_invocation_span( agent_response = self._extract_agent_response_from_output(output_messages) else: # Live format - extract from traceloop.entity.* attributes - entity_input = attrs.get(_ATTR_TRACELOOP_ENTITY_INPUT, "") - entity_output = attrs.get(_ATTR_TRACELOOP_ENTITY_OUTPUT, "") + entity_input = attrs.get(ATTR_TRACELOOP_ENTITY_INPUT, "") + entity_output = attrs.get(ATTR_TRACELOOP_ENTITY_OUTPUT, "") if entity_input: parsed = self._safe_json_parse(entity_input) @@ -557,7 +550,7 @@ def _extract_messages_from_span(self, span: dict) -> tuple[list[dict], list[dict span_events = span.get("span_events", []) for event in span_events: event_name = event.get("event_name", "") - if event_name == SCOPE_NAME: + if event_name == SCOPE_LANGCHAIN_OTEL: body = event.get("body", {}) input_group = body.get("input", {}) output_group = body.get("output", {}) diff --git a/src/strands_evals/mappers/openinference_session_mapper.py b/src/strands_evals/mappers/openinference_session_mapper.py index fe6755e..6ecfee4 100644 --- a/src/strands_evals/mappers/openinference_session_mapper.py +++ b/src/strands_evals/mappers/openinference_session_mapper.py @@ -4,6 +4,7 @@ Handles traces with scope: openinference.instrumentation.langchain """ +import ast import json import logging from collections import defaultdict @@ -26,12 +27,11 @@ Trace, UserMessage, ) +from .constants import SCOPE_OPENINFERENCE from .session_mapper import SessionMapper logger = logging.getLogger(__name__) -SCOPE_NAME = "openinference.instrumentation.langchain" - class OpenInferenceSessionMapper(SessionMapper): """Maps OpenInference LangChain traces to Session format. @@ -47,10 +47,13 @@ def __init__(self): super().__init__() # Track tools per trace (tools appear in LLM spans, not agent spans) self._trace_tools_map: dict[str, dict[str, ToolConfig]] = defaultdict(dict) - # Track if agent invocation span already found for trace - self._trace_has_agent_invocation: dict[str, bool] = defaultdict(bool) # Track system prompts per trace self._trace_system_prompt_map: dict[str, str] = defaultdict(str) + # Cache: span_id -> (input_messages, output_messages) from _get_messages_from_span_events + # Avoids re-parsing span_events body during detection and again during conversion. + self._span_messages_cache: dict[str, tuple[list[dict], list[dict]]] = {} + # Cache: span_id -> parsed output body from _parse_adot_output + self._adot_output_cache: dict[str, Any] = {} def map_to_session(self, data: Any, session_id: str) -> Session: """Map OpenInference LangChain spans to Session format. @@ -67,14 +70,15 @@ def map_to_session(self, data: Any, session_id: str) -> Session: """ # Reset state for new mapping self._trace_tools_map = defaultdict(dict) - self._trace_has_agent_invocation = defaultdict(bool) self._trace_system_prompt_map = defaultdict(str) + self._span_messages_cache = {} + self._adot_output_cache = {} # Normalize input to flat spans spans = self._normalize_to_flat_spans(data) # Filter to only spans from this scope - openinference_spans = [s for s in spans if self._get_scope_name(s) == SCOPE_NAME] + openinference_spans = [s for s in spans if self._get_scope_name(s) == SCOPE_OPENINFERENCE] # Group spans by trace_id grouped = defaultdict(list) @@ -112,6 +116,25 @@ def _build_trace(self, trace_id: str, spans: list[dict], session_id: str) -> Tra except Exception as e: logger.warning(f"Failed to convert span {span.get('span_id', 'unknown')}: {e}") + # In multi-agent LangGraph systems, each nested sub-graph produces its own + # LangGraph CHAIN span. Keep only the last one (root graph finishes last). + agent_spans = [s for s in converted_spans if isinstance(s, AgentInvocationSpan)] + if len(agent_spans) > 1: + root = agent_spans[-1] + converted_spans = [s for s in converted_spans if not isinstance(s, AgentInvocationSpan) or s is root] + + # If no tools found from attributes (common in ADOT), collect from converted tool spans + trace_tools = self._trace_tools_map.get(trace_id, {}) + if not trace_tools: + trace_tools = self._collect_tools_from_spans(converted_spans) + + # Back-fill available_tools into agent spans that don't have them yet + if trace_tools: + tools_list = sorted(trace_tools.values(), key=lambda t: t.name) + for converted in converted_spans: + if isinstance(converted, AgentInvocationSpan) and not converted.available_tools: + converted.available_tools = tools_list + return Trace(spans=converted_spans, trace_id=trace_id, session_id=session_id) # ========================================================================= @@ -119,62 +142,75 @@ def _build_trace(self, trace_id: str, spans: list[dict], session_id: str) -> Tra # ========================================================================= def _is_inference_span(self, span: dict) -> bool: - """Check if span is an LLM inference span.""" + """Check if span is an LLM inference span. + + Detection: + 1. Live instrumentation: openinference.span.kind == "LLM" + 2. ADOT body: output contains "generations" key (LLMResult format) + """ attrs = span.get("attributes", {}) - return attrs.get("openinference.span.kind") == "LLM" + if attrs.get("openinference.span.kind") == "LLM": + return True + + # ADOT fallback: LLM calls produce LLMResult with "generations" key + out_parsed = self._parse_adot_output(span) + if isinstance(out_parsed, dict) and "generations" in out_parsed: + return True + + return False def _is_tool_execution_span(self, span: dict) -> bool: - """Check if span is a tool execution span.""" + """Check if span is a tool execution span. + + Detection: + 1. Live instrumentation: openinference.span.kind == "TOOL" + 2. ADOT body: output has "type": "tool" (tool result message), + excluding tool_call_with_context wrappers + """ attrs = span.get("attributes", {}) - return attrs.get("openinference.span.kind") == "TOOL" + if attrs.get("openinference.span.kind") == "TOOL": + return True + + # ADOT fallback: tool results have "type": "tool" in output + out_parsed = self._parse_adot_output(span) + if isinstance(out_parsed, dict) and out_parsed.get("type") == "tool": + # Skip tool_call_with_context wrappers (graph-level duplicates) + input_messages, _ = self._get_messages_from_span_events(span) + if input_messages: + in_content = input_messages[0].get("content", "") + in_parsed = self._safe_json_parse(in_content) if isinstance(in_content, str) else in_content + if isinstance(in_parsed, dict) and in_parsed.get("__type") == "tool_call_with_context": + return False + return True + + return False def _is_agent_invocation_span(self, span: dict) -> bool: """Check if span is an agent invocation span. - Returns True for: - 1. Default agent: CHAIN + name=LangGraph (takes priority) - 2. Custom-named agent: AGENT + langgraph_step >= 2 - 3. Model chain: CHAIN + name=model + langgraph_step >= 2 - - Enforces: Only ONE agent invocation span per trace. + Detection: + 1. Live instrumentation: CHAIN + name=LangGraph + 2. ADOT body: root LangGraph graph node — input has "messages" without + "remaining_steps" (intermediate nodes always have "remaining_steps"), + and output has "messages". """ attrs = span.get("attributes", {}) span_kind = attrs.get("openinference.span.kind", "") span_name = span.get("name", "") - trace_id = span.get("trace_id", "") - - # If we've already found an agent invocation for this trace, skip - if self._trace_has_agent_invocation.get(trace_id, False): - return False - - # Case 1: Default unnamed agent (standard LangGraph pattern) if span_kind == "CHAIN" and span_name == "LangGraph": - self._trace_has_agent_invocation[trace_id] = True return True - # Case 2: Custom-named agent - if span_kind == "AGENT": - metadata_str = attrs.get("metadata", "{}") - try: - metadata = json.loads(metadata_str) if isinstance(metadata_str, str) else metadata_str - step = metadata.get("langgraph_step", 0) - if step >= 2: - self._trace_has_agent_invocation[trace_id] = True + # ADOT fallback: root LangGraph node has messages in/out but no remaining_steps. + # Intermediate agent nodes (from create_react_agent) always include + # "remaining_steps" in input — the root graph invocation does not. + input_messages, _ = self._get_messages_from_span_events(span) + if input_messages: + in_content = input_messages[0].get("content", "") + in_parsed = self._safe_json_parse(in_content) if isinstance(in_content, str) else in_content + if isinstance(in_parsed, dict) and "messages" in in_parsed and "remaining_steps" not in in_parsed: + out_parsed = self._parse_adot_output(span) + if isinstance(out_parsed, dict) and "messages" in out_parsed: return True - except (json.JSONDecodeError, AttributeError): - pass - - # Case 3: Model chain - if span_kind == "CHAIN" and span_name == "model": - metadata_str = attrs.get("metadata", "{}") - try: - metadata = json.loads(metadata_str) if isinstance(metadata_str, str) else metadata_str - step = metadata.get("langgraph_step", 0) - if step >= 2: - self._trace_has_agent_invocation[trace_id] = True - return True - except (json.JSONDecodeError, AttributeError): - pass return False @@ -208,6 +244,11 @@ def _convert_inference_span(self, span: dict, session_id: str) -> InferenceSpan logger.warning(f"No assistant contents for span {span.get('span_id')}") return None + # Skip spans where assistant only has empty text (no tool calls) — + # these are LangGraph internal re-invocations with no useful content. + if all(isinstance(c, TextContent) and not c.text for c in assistant_contents): + return None + assistant_message = AssistantMessage(content=assistant_contents) # Extract tool schemas @@ -228,18 +269,27 @@ def _convert_tool_execution_span(self, span: dict, session_id: str) -> ToolExecu tool_status: str | None = None # Try live format first (attributes) - tool_name = attrs.get("tool.name") or span.get("name") + tool_name = attrs.get("tool.name") + if not tool_name: + span_name = span.get("name", "") + # ADOT synthetic spans use the scope name as span name — skip it + if span_name and span_name != SCOPE_OPENINFERENCE: + tool_name = span_name # Get input from attributes input_value = attrs.get("input.value") if input_value: - try: - if isinstance(input_value, str): - tool_parameters = json.loads(input_value.replace("'", '"')) - elif isinstance(input_value, dict): - tool_parameters = input_value - except json.JSONDecodeError: - tool_parameters = {} + if isinstance(input_value, dict): + tool_parameters = input_value + elif isinstance(input_value, str): + try: + tool_parameters = json.loads(input_value) + except json.JSONDecodeError: + try: + parsed = ast.literal_eval(input_value) + tool_parameters = parsed if isinstance(parsed, dict) else {} + except (ValueError, SyntaxError): + tool_parameters = {} # Get output from attributes output_value = attrs.get("output.value") @@ -266,11 +316,13 @@ def _convert_tool_execution_span(self, span: dict, session_id: str) -> ToolExecu content_str = input_msg.get("content", "") if isinstance(content_str, str): try: - parsed = json.loads(content_str.replace("'", '"')) - if isinstance(parsed, dict): - tool_parameters = parsed + tool_parameters = json.loads(content_str) except json.JSONDecodeError: - pass + try: + parsed = ast.literal_eval(content_str) + tool_parameters = parsed if isinstance(parsed, dict) else None + except (ValueError, SyntaxError): + pass # Extract tool result from output if output_messages and tool_output_content is None: @@ -281,10 +333,11 @@ def _convert_tool_execution_span(self, span: dict, session_id: str) -> ToolExecu try: tool_data = json.loads(content_str) tool_output_content = tool_data.get("content") - tool_call_id = tool_data.get("tool_call_id") + tool_call_id = tool_call_id or tool_data.get("tool_call_id") tool_status = tool_data.get("status") - if not tool_name: - tool_name = tool_data.get("name") + # Always prefer tool name from output data (ADOT spans + # lack tool.name attribute) + tool_name = tool_data.get("name") or tool_name except json.JSONDecodeError: pass @@ -337,6 +390,19 @@ def _convert_agent_invocation_span(self, span: dict, session_id: str) -> AgentIn # Helper Methods # ========================================================================= + @staticmethod + def _collect_tools_from_spans( + converted_spans: list[InferenceSpan | ToolExecutionSpan | AgentInvocationSpan], + ) -> dict[str, ToolConfig]: + """Collect tool configs from ToolExecutionSpans by name.""" + tools: dict[str, ToolConfig] = {} + for span in converted_spans: + if isinstance(span, ToolExecutionSpan): + name = span.tool_call.name + if name and name not in tools: + tools[name] = ToolConfig(name=name, description=None, parameters=None) + return tools + def _get_scope_name(self, span: dict) -> str: """Extract scope name from span.""" scope = span.get("scope", {}) @@ -375,6 +441,38 @@ def _parse_timestamp(self, value: Any) -> datetime: return datetime.fromtimestamp(value, tz=timezone.utc) return datetime.now(timezone.utc) + def _safe_json_parse(self, content: Any) -> Any: + """Safely parse JSON content.""" + if isinstance(content, dict): + return content + if isinstance(content, str): + try: + return json.loads(content) + except json.JSONDecodeError: + return content + return content + + def _parse_adot_output(self, span: dict) -> Any: + """Parse the output content from the first ADOT body message. + + Returns the parsed JSON object or raw string, or None if no body found. + Result is cached per span_id to avoid re-parsing across detection methods. + """ + span_id = span.get("span_id", "") + if span_id in self._adot_output_cache: + return self._adot_output_cache[span_id] + + _, output_messages = self._get_messages_from_span_events(span) + if not output_messages: + result = None + else: + out_content = output_messages[0].get("content", "") + result = self._safe_json_parse(out_content) if isinstance(out_content, str) else out_content + + if span_id: + self._adot_output_cache[span_id] = result + return result + def _get_messages_from_span_events(self, span: dict) -> tuple[list[dict], list[dict]]: """Extract input and output messages from span events or attributes. @@ -382,11 +480,22 @@ def _get_messages_from_span_events(self, span: dict) -> tuple[list[dict], list[d 1. CloudWatch/ADOT: span_events[].body.input/output.messages 2. Live instrumentation: attributes.input.value / output.value """ + span_id = span.get("span_id", "") + if span_id in self._span_messages_cache: + return self._span_messages_cache[span_id] + + result = self._extract_messages_from_span(span) + if span_id: + self._span_messages_cache[span_id] = result + return result + + def _extract_messages_from_span(self, span: dict) -> tuple[list[dict], list[dict]]: + """Extract messages without caching — called by _get_messages_from_span_events.""" # Try span_events first (CloudWatch/ADOT format) span_events = span.get("span_events", []) for event in span_events: event_name = event.get("event_name", "") - if event_name == SCOPE_NAME: + if event_name == SCOPE_OPENINFERENCE: body = event.get("body", {}) input_group = body.get("input", {}) output_group = body.get("output", {}) @@ -453,18 +562,54 @@ def _extract_tools_from_attributes(self, attrs: dict) -> list[ToolConfig]: def _extract_user_contents( self, input_messages: list[dict], span: dict ) -> tuple[list[TextContent | ToolResultContent], str | None]: - """Extract user contents from input messages or attributes.""" - results: list[TextContent | ToolResultContent] = [] - system_prompt: str | None = None + """Extract user contents from input messages or attributes. + + Tries three strategies in order: + 1. Live attributes (llm.input_messages.*) + 2. ADOT structured messages (body with LangChain serialized messages) + 3. Raw content fallback + """ attrs = span.get("attributes", {}) - # First, try to get from llm.input_messages attributes (live format) - user_content = attrs.get("llm.input_messages.0.message.content") - if user_content: - results.append(TextContent(text=user_content)) + # Strategy 1: live attributes + results, system_prompt = self._extract_user_from_live_attrs(attrs) + if results: return results, system_prompt - # Try to find structured message list from input_messages + # Strategy 2: ADOT structured messages + results, system_prompt = self._extract_user_from_structured_messages(input_messages) + if results: + return results, system_prompt + + # Strategy 3: raw content fallback + results = self._extract_user_from_raw_content(input_messages) + return results, None + + def _extract_user_from_live_attrs(self, attrs: dict) -> tuple[list[TextContent | ToolResultContent], str | None]: + """Extract user content from llm.input_messages.* attributes (live instrumentation).""" + results: list[TextContent | ToolResultContent] = [] + system_prompt: str | None = None + idx = 0 + while True: + role = attrs.get(f"llm.input_messages.{idx}.message.role") + if role is None: + break + content = attrs.get(f"llm.input_messages.{idx}.message.content") + if role == "system" and content: + system_prompt = content + elif role == "user" and content: + results.append(TextContent(text=content)) + return results, system_prompt + idx += 1 + return results, system_prompt + + def _extract_user_from_structured_messages( + self, input_messages: list[dict] + ) -> tuple[list[TextContent | ToolResultContent], str | None]: + """Extract user content from ADOT structured messages (LangChain serialized format).""" + results: list[TextContent | ToolResultContent] = [] + system_prompt: str | None = None + structured_messages = None for msg in input_messages: if msg.get("role") != "user": @@ -477,91 +622,116 @@ def _extract_user_contents( except (json.JSONDecodeError, TypeError): pass - if structured_messages: - # Handle nested list - if ( - isinstance(structured_messages, list) - and structured_messages - and isinstance(structured_messages[0], list) - ): - structured_messages = structured_messages[0] - - for item in structured_messages: - if not isinstance(item, dict): - continue - kwargs = item.get("kwargs", {}) - data_type = kwargs.get("type") + if not structured_messages: + return results, system_prompt - if data_type == "human": - results.append(TextContent(text=kwargs.get("content", ""))) - elif data_type == "tool": - status = kwargs.get("status") - results.append( - ToolResultContent( - content=kwargs.get("content", ""), - tool_call_id=kwargs.get("tool_call_id"), - error=None if status == "success" else status, - ) + # Handle nested list (e.g. [[msg1, msg2]]) + if isinstance(structured_messages, list) and structured_messages and isinstance(structured_messages[0], list): + structured_messages = structured_messages[0] + + for item in structured_messages: + if not isinstance(item, dict): + continue + kwargs = item.get("kwargs", {}) + data_type = kwargs.get("type") + + if data_type == "human": + results.append(TextContent(text=kwargs.get("content", ""))) + elif data_type == "tool": + status = kwargs.get("status") + results.append( + ToolResultContent( + content=kwargs.get("content", ""), + tool_call_id=kwargs.get("tool_call_id"), + error=None if status == "success" else status, ) - elif data_type == "system": - system_prompt = kwargs.get("content") - else: - # Fallback: use raw content - for msg in input_messages: - if msg.get("role") == "user": - content = msg.get("content", "") - # Try to parse if it's a JSON string - try: - parsed = json.loads(content) - if isinstance(parsed, dict) and "messages" in parsed: - # Extract first human message - for m in parsed.get("messages", []): - if isinstance(m, dict): - c = m.get("content", "") - if c: - results.append(TextContent(text=c)) - break - else: - results.append(TextContent(text=content)) - except (json.JSONDecodeError, TypeError): - results.append(TextContent(text=content)) - break + ) + elif data_type == "system": + system_prompt = kwargs.get("content") return results, system_prompt + def _extract_user_from_raw_content(self, input_messages: list[dict]) -> list[TextContent | ToolResultContent]: + """Extract user content from raw message content (fallback).""" + results: list[TextContent | ToolResultContent] = [] + for msg in input_messages: + if msg.get("role") != "user": + continue + content = msg.get("content", "") + try: + parsed = json.loads(content) + if isinstance(parsed, dict) and "messages" in parsed: + for m in parsed.get("messages", []): + if isinstance(m, dict): + c = m.get("content", "") + if c: + results.append(TextContent(text=c)) + break + else: + results.append(TextContent(text=content)) + except (json.JSONDecodeError, TypeError): + results.append(TextContent(text=content)) + break + return results + def _extract_assistant_contents( self, output_messages: list[dict], span: dict ) -> list[TextContent | ToolCallContent]: - """Extract assistant contents from output messages or attributes.""" - results: list[TextContent | ToolCallContent] = [] + """Extract assistant contents from output messages or attributes. + + Tries three strategies in order: + 1. Live attributes (llm.output_messages.*) + 2. ADOT generations format (LLMResult with generations key) + 3. Raw content fallback + """ attrs = span.get("attributes", {}) - # First, try to get from llm.output_messages attributes (live format) + # Strategy 1: live attributes + results = self._extract_assistant_from_live_attrs(attrs) + if results: + return results + + # Strategy 2: ADOT generations format + results = self._extract_assistant_from_generations(output_messages) + if results: + return results + + # Strategy 3: raw content fallback + return self._extract_assistant_from_raw_content(output_messages) + + def _extract_assistant_from_live_attrs(self, attrs: dict) -> list[TextContent | ToolCallContent]: + """Extract assistant content from llm.output_messages.* attributes (live instrumentation).""" + results: list[TextContent | ToolCallContent] = [] + assistant_content = attrs.get("llm.output_messages.0.message.content") - if assistant_content: - results.append(TextContent(text=assistant_content)) - - # Check for tool calls in attributes - tool_idx = 0 - while True: - tool_name = attrs.get(f"llm.output_messages.0.message.tool_calls.{tool_idx}.tool_call.function.name") - if not tool_name: - break - tool_args_str = attrs.get( - f"llm.output_messages.0.message.tool_calls.{tool_idx}.tool_call.function.arguments", "{}" - ) - tool_id = attrs.get(f"llm.output_messages.0.message.tool_calls.{tool_idx}.tool_call.id") - try: - tool_args = json.loads(tool_args_str) if isinstance(tool_args_str, str) else tool_args_str - except json.JSONDecodeError: - tool_args = {} - results.append(ToolCallContent(name=tool_name, arguments=tool_args, tool_call_id=tool_id)) - tool_idx += 1 + if not assistant_content: + return results + + results.append(TextContent(text=assistant_content)) + + # Extract tool calls from attributes + tool_idx = 0 + while True: + tool_name = attrs.get(f"llm.output_messages.0.message.tool_calls.{tool_idx}.tool_call.function.name") + if not tool_name: + break + tool_args_str = attrs.get( + f"llm.output_messages.0.message.tool_calls.{tool_idx}.tool_call.function.arguments", "{}" + ) + tool_id = attrs.get(f"llm.output_messages.0.message.tool_calls.{tool_idx}.tool_call.id") + try: + tool_args = json.loads(tool_args_str) if isinstance(tool_args_str, str) else tool_args_str + except json.JSONDecodeError: + tool_args = {} + results.append(ToolCallContent(name=tool_name, arguments=tool_args, tool_call_id=tool_id)) + tool_idx += 1 - if results: - return results + return results - # Try parsing from output_messages + @staticmethod + def _extract_assistant_from_generations(output_messages: list[dict]) -> list[TextContent | ToolCallContent]: + """Extract assistant content from ADOT generations format (LLMResult).""" + results: list[TextContent | ToolCallContent] = [] for msg in output_messages: if msg.get("role") != "assistant": continue @@ -572,31 +742,35 @@ def _extract_assistant_contents( try: gen = json.loads(content_str) - if "generations" in gen: - gen_item = gen["generations"][0][0] - results.append(TextContent(text=gen_item.get("text", ""))) - - # Extract tool calls - kwargs = gen_item.get("message", {}).get("kwargs", {}) - for call in kwargs.get("tool_calls", []): - results.append( - ToolCallContent( - name=call.get("name", ""), - arguments=call.get("args", {}), - tool_call_id=call.get("id"), - ) + if "generations" not in gen: + continue + gen_item = gen["generations"][0][0] + results.append(TextContent(text=gen_item.get("text", ""))) + + kwargs = gen_item.get("message", {}).get("kwargs", {}) + for call in kwargs.get("tool_calls", []): + results.append( + ToolCallContent( + name=call.get("name", ""), + arguments=call.get("args", {}), + tool_call_id=call.get("id"), ) - return results + ) + return results except (json.JSONDecodeError, KeyError, IndexError, TypeError): pass - # Fallback: use raw content + return results + + @staticmethod + def _extract_assistant_from_raw_content(output_messages: list[dict]) -> list[TextContent | ToolCallContent]: + """Extract assistant content from raw message content (fallback).""" + results: list[TextContent | ToolCallContent] = [] for msg in output_messages: if msg.get("role") == "assistant": content_str = msg.get("content", "") if isinstance(content_str, str): results.append(TextContent(text=content_str)) - return results def _extract_user_prompt(self, input_messages: list[dict], span: dict) -> str | None: @@ -640,16 +814,24 @@ def _extract_agent_response(self, output_messages: list[dict], span: dict) -> st try: j = json.loads(content_str) + # ADOT: supervisor finish decision has "final_answer" key + if isinstance(j, dict) and "final_answer" in j: + answer = j["final_answer"] + if answer: + return answer + # Check for messages structure if "messages" in j: messages = j["messages"] if isinstance(messages, list): - # Iterate backwards to find last AI message + # Iterate backwards to find last AI message with non-empty content for item in reversed(messages): if isinstance(item, dict): kwargs = item.get("kwargs", {}) if "kwargs" in item else item if item.get("type") == "ai" or kwargs.get("type") == "ai": - return kwargs.get("content", "") + content = kwargs.get("content", "") + if content: + return content # Check for generations structure if "generations" in j: diff --git a/src/strands_evals/mappers/utils.py b/src/strands_evals/mappers/utils.py index 7ac3249..050458b 100644 --- a/src/strands_evals/mappers/utils.py +++ b/src/strands_evals/mappers/utils.py @@ -4,6 +4,7 @@ from typing import Any +from .constants import SCOPE_LANGCHAIN_OTEL, SCOPE_OPENINFERENCE, SCOPE_STRANDS from .session_mapper import SessionMapper @@ -48,13 +49,13 @@ def detect_otel_mapper(spans: list[Any]) -> SessionMapper: for span in spans: scope_name = get_scope_name(span) - if scope_name == "opentelemetry.instrumentation.langchain": + if scope_name == SCOPE_LANGCHAIN_OTEL: return LangChainOtelSessionMapper() - if scope_name == "openinference.instrumentation.langchain": + if scope_name == SCOPE_OPENINFERENCE: return OpenInferenceSessionMapper() - if scope_name == "strands.telemetry.tracer": + if scope_name == SCOPE_STRANDS: # Auto-detect format for Strands if get_body(span) is not None: return CloudWatchSessionMapper() diff --git a/src/strands_evals/providers/cloudwatch_provider.py b/src/strands_evals/providers/cloudwatch_provider.py index 1dbea9d..16e50d3 100644 --- a/src/strands_evals/providers/cloudwatch_provider.py +++ b/src/strands_evals/providers/cloudwatch_provider.py @@ -41,6 +41,7 @@ def __init__( lookback_days: int = 30, query_timeout_seconds: float = 60.0, mapper: SessionMapper | None = None, + end_time: datetime | None = None, ): """Initialize the CloudWatch provider. @@ -67,6 +68,13 @@ def __init__( mapper=LangChainOtelSessionMapper(), ) + # Narrow the query window with an explicit end time + from datetime import datetime, timezone + provider = CloudWatchProvider( + log_group="/aws/...", + end_time=datetime(2026, 3, 01, 12, 0, 0, tzinfo=timezone.utc), + ) + Args: region: AWS region. Falls back to AWS_REGION / AWS_DEFAULT_REGION env vars. log_group: Full CloudWatch log group path. @@ -79,6 +87,8 @@ def __init__( If not provided, the mapper is auto-detected from the span data using ``detect_otel_mapper()``, which inspects ``scope.name`` to determine the correct framework mapper. + end_time: Upper bound for the query window. Defaults to now (UTC). + Combined with ``lookback_days`` to form the full time range. Raises: ProviderError: If neither `log_group` nor `agent_name` is provided, @@ -101,6 +111,7 @@ def __init__( self._lookback_days = lookback_days self._query_timeout_seconds = query_timeout_seconds self._mapper = mapper + self._end_time = end_time def _discover_log_group(self, agent_name: str) -> str: """Discover the runtime log group for an agent via describe_log_groups.""" @@ -162,12 +173,12 @@ def get_evaluation_data(self, session_id: str) -> TaskOutput: def _fetch_span_hierarchy(self, session_id: str) -> dict[str, str]: """Fetch spanId→parentSpanId map from aws/spans. Returns empty dict on failure.""" query = f"fields spanId, parentSpanId | filter attributes.session.id like '{session_id}' | limit 10000" - now = datetime.now(tz=timezone.utc) + end = self._end_time or datetime.now(tz=timezone.utc) try: response = self._client.start_query( logGroupName=self.SPANS_LOG_GROUP, - startTime=int((now - timedelta(days=self._lookback_days)).timestamp()), - endTime=int(now.timestamp()), + startTime=int((end - timedelta(days=self._lookback_days)).timestamp()), + endTime=int(end.timestamp()), queryString=query, ) raw_results = self._poll_query_results(response["queryId"]) @@ -189,14 +200,14 @@ def _fetch_span_hierarchy(self, session_id: str) -> dict[str, str]: def _run_logs_insights_query(self, query: str) -> list[dict[str, Any]]: """Execute a CW Logs Insights query and return parsed span dicts from @message fields.""" - now = datetime.now(tz=timezone.utc) - start_time = now - timedelta(days=self._lookback_days) + end = self._end_time or datetime.now(tz=timezone.utc) + start_time = end - timedelta(days=self._lookback_days) try: response = self._client.start_query( logGroupName=self._log_group, startTime=int(start_time.timestamp()), - endTime=int(now.timestamp()), + endTime=int(end.timestamp()), queryString=query, ) except Exception as e: diff --git a/tests/strands_evals/mappers/fixtures/openinference_adot_spans.json b/tests/strands_evals/mappers/fixtures/openinference_adot_spans.json new file mode 100644 index 0000000..2c58b55 --- /dev/null +++ b/tests/strands_evals/mappers/fixtures/openinference_adot_spans.json @@ -0,0 +1,170 @@ +[ + { + "trace_id": "adot-trace-1", + "span_id": "adot-inf-1", + "parent_span_id": null, + "name": "openinference.instrumentation.langchain", + "start_time": 1700000000000000000, + "end_time": 1700000001000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "scope": {"name": "openinference.instrumentation.langchain", "version": ""}, + "status": {"code": "OK"}, + "span_events": [{ + "event_name": "openinference.instrumentation.langchain", + "span_id": "adot-inf-1", + "timestamp": 1700000000000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "body": { + "input": {"messages": [{"content": "What is the weather in Alaska?", "role": "user"}]}, + "output": {"messages": [{"content": "{\"generations\": [[{\"text\": \"Let me check the weather for you.\", \"message\": {\"tool_calls\": [{\"name\": \"get_weather\", \"args\": {\"city\": \"Alaska\"}, \"id\": \"tc_01\"}]}}]], \"llm_output\": null}", "role": "assistant"}]} + } + }] + }, + { + "trace_id": "adot-trace-1", + "span_id": "adot-tool-1", + "parent_span_id": null, + "name": "openinference.instrumentation.langchain", + "start_time": 1700000001000000000, + "end_time": 1700000002000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "scope": {"name": "openinference.instrumentation.langchain", "version": ""}, + "status": {"code": "OK"}, + "span_events": [{ + "event_name": "openinference.instrumentation.langchain", + "span_id": "adot-tool-1", + "timestamp": 1700000001000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "body": { + "input": {"messages": [{"content": "{'city': 'Alaska'}", "role": "user"}]}, + "output": {"messages": [{"content": "{\"name\": \"get_weather\", \"type\": \"tool\", \"content\": \"Weather in Alaska: -5C, snowy\", \"tool_call_id\": \"tc_01\", \"status\": \"success\"}", "role": "assistant"}]} + } + }] + }, + { + "trace_id": "adot-trace-1", + "span_id": "adot-inf-2", + "parent_span_id": null, + "name": "openinference.instrumentation.langchain", + "start_time": 1700000002000000000, + "end_time": 1700000003000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "scope": {"name": "openinference.instrumentation.langchain", "version": ""}, + "status": {"code": "OK"}, + "span_events": [{ + "event_name": "openinference.instrumentation.langchain", + "span_id": "adot-inf-2", + "timestamp": 1700000002000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "body": { + "input": {"messages": [{"content": "What is the weather in Alaska?", "role": "user"}]}, + "output": {"messages": [{"content": "{\"generations\": [[{\"text\": \"The weather in Alaska is -5C and snowy.\"}]], \"llm_output\": null}", "role": "assistant"}]} + } + }] + }, + { + "trace_id": "adot-trace-1", + "span_id": "adot-skip-remaining", + "parent_span_id": null, + "name": "openinference.instrumentation.langchain", + "start_time": 1700000003000000000, + "end_time": 1700000004000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "scope": {"name": "openinference.instrumentation.langchain", "version": ""}, + "status": {"code": "OK"}, + "span_events": [{ + "event_name": "openinference.instrumentation.langchain", + "span_id": "adot-skip-remaining", + "timestamp": 1700000003000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "body": { + "input": {"messages": [{"content": "{\"messages\": [{\"content\": \"hi\", \"type\": \"human\"}], \"remaining_steps\": 10}", "role": "user"}]}, + "output": {"messages": [{"content": "{\"messages\": [{\"content\": \"ok\", \"type\": \"ai\"}]}", "role": "assistant"}]} + } + }] + }, + { + "trace_id": "adot-trace-1", + "span_id": "adot-skip-end", + "parent_span_id": null, + "name": "openinference.instrumentation.langchain", + "start_time": 1700000004000000000, + "end_time": 1700000005000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "scope": {"name": "openinference.instrumentation.langchain", "version": ""}, + "status": {"code": "OK"}, + "span_events": [{ + "event_name": "openinference.instrumentation.langchain", + "span_id": "adot-skip-end", + "timestamp": 1700000004000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "body": { + "input": {"messages": [{"content": "some input", "role": "user"}]}, + "output": {"messages": [{"content": "__end__", "role": "assistant"}]} + } + }] + }, + { + "trace_id": "adot-trace-1", + "span_id": "adot-agent-1", + "parent_span_id": null, + "name": "openinference.instrumentation.langchain", + "start_time": 1700000000000000000, + "end_time": 1700000005000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "scope": {"name": "openinference.instrumentation.langchain", "version": ""}, + "status": {"code": "OK"}, + "span_events": [{ + "event_name": "openinference.instrumentation.langchain", + "span_id": "adot-agent-1", + "timestamp": 1700000000000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "body": { + "input": {"messages": [{"content": "{\"messages\": [{\"content\": \"What is the weather in Alaska?\", \"type\": \"human\"}]}", "role": "user"}]}, + "output": {"messages": [{"content": "{\"messages\": [{\"content\": \"The weather in Alaska is -5C and snowy.\", \"type\": \"ai\"}]}", "role": "assistant"}]} + } + }] + }, + { + "trace_id": "adot-trace-2", + "span_id": "adot-t2-inf-1", + "parent_span_id": null, + "name": "openinference.instrumentation.langchain", + "start_time": 1700000010000000000, + "end_time": 1700000011000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "scope": {"name": "openinference.instrumentation.langchain", "version": ""}, + "status": {"code": "OK"}, + "span_events": [{ + "event_name": "openinference.instrumentation.langchain", + "span_id": "adot-t2-inf-1", + "timestamp": 1700000010000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "body": { + "input": {"messages": [{"content": "Tell me a joke", "role": "user"}]}, + "output": {"messages": [{"content": "{\"generations\": [[{\"text\": \"Why did the chicken cross the road? To get to the other side!\"}]], \"llm_output\": null}", "role": "assistant"}]} + } + }] + }, + { + "trace_id": "adot-trace-2", + "span_id": "adot-t2-agent-1", + "parent_span_id": null, + "name": "openinference.instrumentation.langchain", + "start_time": 1700000010000000000, + "end_time": 1700000012000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "scope": {"name": "openinference.instrumentation.langchain", "version": ""}, + "status": {"code": "OK"}, + "span_events": [{ + "event_name": "openinference.instrumentation.langchain", + "span_id": "adot-t2-agent-1", + "timestamp": 1700000010000000000, + "attributes": {"event.name": "openinference.instrumentation.langchain"}, + "body": { + "input": {"messages": [{"content": "{\"messages\": [{\"content\": \"Tell me a joke\", \"type\": \"human\"}]}", "role": "user"}]}, + "output": {"messages": [{"content": "{\"messages\": [{\"content\": \"Why did the chicken cross the road? To get to the other side!\", \"type\": \"ai\"}]}", "role": "assistant"}]} + } + }] + } +] diff --git a/tests/strands_evals/mappers/fixtures/openinference_live_spans.json b/tests/strands_evals/mappers/fixtures/openinference_live_spans.json new file mode 100644 index 0000000..a46f5ef --- /dev/null +++ b/tests/strands_evals/mappers/fixtures/openinference_live_spans.json @@ -0,0 +1,113 @@ +[ + { + "trace_id": "live-trace-1", + "span_id": "llm-1", + "parent_span_id": "chain-root", + "name": "ChatBedrock", + "start_time": 1700000000000000000, + "end_time": 1700000001000000000, + "attributes": { + "openinference.span.kind": "LLM", + "llm.input_messages.0.message.role": "system", + "llm.input_messages.0.message.content": "You are a helpful weather assistant.", + "llm.input_messages.1.message.role": "user", + "llm.input_messages.1.message.content": "What is the weather difference between Taipei and Seattle?", + "llm.output_messages.0.message.role": "assistant", + "llm.output_messages.0.message.content": "I'll check the weather for both cities.", + "llm.output_messages.0.message.tool_calls.0.tool_call.function.name": "get_weather", + "llm.output_messages.0.message.tool_calls.0.tool_call.function.arguments": "{\"city\": \"Taipei\"}", + "llm.output_messages.0.message.tool_calls.0.tool_call.id": "toolu_001", + "llm.tools.0.tool.json_schema": "{\"name\": \"get_weather\", \"description\": \"Get weather\", \"parameters\": {\"type\": \"object\", \"properties\": {\"city\": {\"type\": \"string\"}}}}" + }, + "scope": {"name": "openinference.instrumentation.langchain", "version": "0.1.0"}, + "status": {"code": "OK"}, + "span_events": [] + }, + { + "trace_id": "live-trace-1", + "span_id": "llm-2", + "parent_span_id": "chain-root", + "name": "ChatBedrock", + "start_time": 1700000002000000000, + "end_time": 1700000003000000000, + "attributes": { + "openinference.span.kind": "LLM", + "llm.input_messages.0.message.role": "user", + "llm.input_messages.0.message.content": "Now compare them.", + "llm.output_messages.0.message.role": "assistant", + "llm.output_messages.0.message.content": "Taipei is warmer than Seattle. Taipei averages 25C while Seattle averages 12C.", + "llm.output_messages.0.message.tool_calls.0.tool_call.function.name": "finish", + "llm.output_messages.0.message.tool_calls.0.tool_call.function.arguments": "{\"answer\": \"Taipei is warmer\"}", + "llm.output_messages.0.message.tool_calls.0.tool_call.id": "toolu_002" + }, + "scope": {"name": "openinference.instrumentation.langchain", "version": "0.1.0"}, + "status": {"code": "OK"}, + "span_events": [] + }, + { + "trace_id": "live-trace-1", + "span_id": "tool-finish", + "parent_span_id": "chain-root", + "name": "finish", + "start_time": 1700000003000000000, + "end_time": 1700000004000000000, + "attributes": { + "openinference.span.kind": "TOOL", + "tool.name": "finish", + "input.value": "{\"answer\": \"Taipei is warmer\"}", + "output.value": "{\"content\": \"Taipei is warmer than Seattle.\", \"tool_call_id\": \"toolu_002\", \"status\": \"success\"}" + }, + "scope": {"name": "openinference.instrumentation.langchain", "version": "0.1.0"}, + "status": {"code": "OK"}, + "span_events": [] + }, + { + "trace_id": "live-trace-1", + "span_id": "agent-mislabeled", + "parent_span_id": "chain-root", + "name": "route_to_agent", + "start_time": 1700000001000000000, + "end_time": 1700000002000000000, + "attributes": { + "openinference.span.kind": "AGENT", + "metadata": "{\"langgraph_step\": 2}", + "input.value": "{'agent_name': 'research_agent'}", + "output.value": "{\"result\": \"done\"}" + }, + "scope": {"name": "openinference.instrumentation.langchain", "version": "0.1.0"}, + "status": {"code": "OK"}, + "span_events": [] + }, + { + "trace_id": "live-trace-1", + "span_id": "chain-sub", + "parent_span_id": "chain-root", + "name": "LangGraph", + "start_time": 1700000000000000000, + "end_time": 1700000003000000000, + "attributes": { + "openinference.span.kind": "CHAIN", + "input.value": "{\"messages\": [{\"kwargs\": {\"content\": \"What is the weather difference between Taipei and Seattle?\", \"type\": \"human\"}}]}", + "output.value": "{\"messages\": [{\"kwargs\": {\"content\": \"Taipei is warmer than Seattle.\", \"type\": \"ai\"}}]}" + }, + "scope": {"name": "openinference.instrumentation.langchain", "version": "0.1.0"}, + "status": {"code": "OK"}, + "span_events": [] + }, + { + "trace_id": "live-trace-1", + "span_id": "chain-root", + "parent_span_id": null, + "name": "LangGraph", + "start_time": 1700000000000000000, + "end_time": 1700000005000000000, + "attributes": { + "openinference.span.kind": "CHAIN", + "input.value": "{\"messages\": [{\"kwargs\": {\"content\": \"What is the weather difference between Taipei and Seattle?\", \"type\": \"human\"}}]}", + "output.value": "{\"messages\": [{\"kwargs\": {\"content\": \"Taipei is warmer than Seattle overall.\", \"type\": \"ai\"}}]}" + }, + "scope": {"name": "openinference.instrumentation.langchain", "version": "0.1.0"}, + "status": {"code": "OK"}, + "span_events": [] + } +] diff --git a/tests/strands_evals/mappers/test_openinference_session_mapper.py b/tests/strands_evals/mappers/test_openinference_session_mapper.py index 5b0185f..3683550 100644 --- a/tests/strands_evals/mappers/test_openinference_session_mapper.py +++ b/tests/strands_evals/mappers/test_openinference_session_mapper.py @@ -1,14 +1,23 @@ """Tests for OpenInferenceSessionMapper - OpenInference LangChain trace → Session conversion.""" import json +from pathlib import Path + +import pytest from strands_evals.mappers import OpenInferenceSessionMapper from strands_evals.types.trace import ( AgentInvocationSpan, InferenceSpan, ToolExecutionSpan, + ToolResultContent, ) +# Path to fixture files +_FIXTURES_DIR = Path(__file__).resolve().parent / "fixtures" +_LIVE_SPANS_FILE = _FIXTURES_DIR / "openinference_live_spans.json" +_ADOT_SPANS_FILE = _FIXTURES_DIR / "openinference_adot_spans.json" + SCOPE_NAME = "openinference.instrumentation.langchain" @@ -114,6 +123,59 @@ def make_chain_span( return make_span(trace_id=trace_id, span_id=span_id, name=name, attributes=attrs) +def make_adot_span( + trace_id="trace-1", + span_id="span-1", + input_messages=None, + output_messages=None, + start_time=1700000000000000000, + end_time=1700000001000000000, +): + """Build an ADOT/CloudWatch span with data in span_events body. + + All ADOT spans have the same generic name (scope name), no openinference.span.kind, + and carry data inside span_events[].body.input/output.messages. + """ + body = {} + if input_messages is not None: + body["input"] = {"messages": input_messages} + if output_messages is not None: + body["output"] = {"messages": output_messages} + + return { + "trace_id": trace_id, + "span_id": span_id, + "parent_span_id": None, + "name": SCOPE_NAME, + "start_time": start_time, + "end_time": end_time, + "attributes": {"event.name": SCOPE_NAME}, + "scope": {"name": SCOPE_NAME, "version": ""}, + "status": {"code": "OK"}, + "span_events": [ + { + "event_name": SCOPE_NAME, + "span_id": span_id, + "timestamp": start_time, + "attributes": {"event.name": SCOPE_NAME}, + "body": body, + } + ], + } + + +def _load_live_spans(): + """Load real live (in-memory) spans from fixture file.""" + with open(_LIVE_SPANS_FILE) as f: + return json.load(f) + + +def _load_adot_spans(): + """Load ADOT/CloudWatch spans from fixture file.""" + with open(_ADOT_SPANS_FILE) as f: + return json.load(f) + + class TestSpanTypeDetection: def setup_method(self): self.mapper = OpenInferenceSessionMapper() @@ -362,3 +424,928 @@ def test_trace_objects_input(self): session = self.mapper.map_to_session(trace_objects, "sess-1") assert len(session.traces) == 1 + + +# ========================================================================= +# Empty Span Filter Tests +# ========================================================================= + + +class TestEmptySpanFilter: + """Tests for the empty-response InferenceSpan filter (line 249 of mapper).""" + + def setup_method(self): + self.mapper = OpenInferenceSessionMapper() + + def test_empty_text_only_assistant_filtered(self): + """InferenceSpan with assistant=[TextContent(text="")] returns None.""" + span = make_llm_span( + user_content="Hello", + assistant_content="", + ) + session = self.mapper.map_to_session([span], "sess-1") + # Empty assistant text, no tool calls → filtered out + assert session.traces == [] + + def test_empty_text_with_tool_call_preserved(self): + """InferenceSpan with empty text + ToolCallContent is preserved (ADOT path).""" + # In the ADOT/generations path, the LLM can return empty text with tool calls. + # The live attrs path requires non-empty content to extract tool calls, + # so we test via ADOT generations format. + span = make_adot_span( + span_id="filter-tc", + input_messages=[ + { + "content": json.dumps( + { + "messages": [ + [ + {"lc": 1, "kwargs": {"content": "System.", "type": "system"}}, + {"lc": 1, "kwargs": {"content": "Calculate 2+2", "type": "human"}}, + ] + ] + } + ), + "role": "user", + }, + {"content": "Calculate 2+2", "role": "user"}, + ], + output_messages=[ + { + "content": json.dumps( + { + "generations": [ + [ + { + "text": "", + "type": "ChatGeneration", + "message": { + "lc": 1, + "kwargs": { + "content": "", + "type": "ai", + "tool_calls": [ + { + "name": "calculator", + "args": {"expr": "2+2"}, + "id": "tc-1", + "type": "tool_call", + }, + ], + }, + }, + } + ] + ], + "type": "LLMResult", + } + ), + "role": "assistant", + }, + ], + ) + session = self.mapper.map_to_session([span], "sess-1") + # Has a tool call → NOT filtered by empty span filter + assert len(session.traces) == 1 + assert len(session.traces[0].spans) == 1 + inference = session.traces[0].spans[0] + assert isinstance(inference, InferenceSpan) + # Should have empty text + tool call in assistant content + assert len(inference.messages[1].content) == 2 + + def test_nonempty_text_assistant_preserved(self): + """InferenceSpan with non-empty text is preserved.""" + span = make_llm_span( + user_content="Hello", + assistant_content="Hi there!", + ) + session = self.mapper.map_to_session([span], "sess-1") + assert len(session.traces[0].spans) == 1 + assert isinstance(session.traces[0].spans[0], InferenceSpan) + + +# ========================================================================= +# System Prompt Extraction Tests +# ========================================================================= + + +class TestSystemPromptExtraction: + """Tests for system prompt at index 0, user at index 1.""" + + def setup_method(self): + self.mapper = OpenInferenceSessionMapper() + + def test_system_prompt_at_index_0_user_at_index_1(self): + """System prompt at index 0, user message at index 1 — user content extracted correctly.""" + attrs = { + "openinference.span.kind": "LLM", + "llm.input_messages.0.message.role": "system", + "llm.input_messages.0.message.content": "You are a helpful assistant.", + "llm.input_messages.1.message.role": "user", + "llm.input_messages.1.message.content": "What's the weather?", + "llm.output_messages.0.message.role": "assistant", + "llm.output_messages.0.message.content": "It's sunny.", + } + span = make_span(attributes=attrs) + session = self.mapper.map_to_session([span], "sess-1") + + assert len(session.traces[0].spans) == 1 + inference = session.traces[0].spans[0] + assert isinstance(inference, InferenceSpan) + # User content should be the actual user message, not the system prompt + assert inference.messages[0].content[0].text == "What's the weather?" + + def test_system_prompt_only_returns_no_span(self): + """If only system message exists (no user), span is skipped.""" + attrs = { + "openinference.span.kind": "LLM", + "llm.input_messages.0.message.role": "system", + "llm.input_messages.0.message.content": "You are a helpful assistant.", + "llm.output_messages.0.message.role": "assistant", + "llm.output_messages.0.message.content": "Hello!", + } + span = make_span(attributes=attrs) + session = self.mapper.map_to_session([span], "sess-1") + # No user content → no InferenceSpan produced + assert session.traces == [] + + +# ========================================================================= +# Python Repr Parsing Tests +# ========================================================================= + + +class TestPythonReprParsing: + """Tests for ast.literal_eval fallback when input.value is Python repr.""" + + def setup_method(self): + self.mapper = OpenInferenceSessionMapper() + + def test_python_repr_tool_input(self): + """Tool span with Python repr input.value parsed via ast.literal_eval.""" + # Real pattern from OpenInference: input.value = "{'city': 'Alaska'}" + output_value = json.dumps( + { + "content": "Weather in Alaska: Partly cloudy, 65°F", + "tool_call_id": "toolu_bdrk_123", + "status": "success", + } + ) + attrs = { + "openinference.span.kind": "TOOL", + "tool.name": "get_weather", + "input.value": "{'city': 'Alaska'}", # Python repr, not JSON + "output.value": output_value, + } + span = make_span(span_id="repr-tool", name="get_weather", attributes=attrs) + session = self.mapper.map_to_session([span], "sess-1") + + assert len(session.traces[0].spans) == 1 + tool_span = session.traces[0].spans[0] + assert isinstance(tool_span, ToolExecutionSpan) + assert tool_span.tool_call.name == "get_weather" + assert tool_span.tool_call.arguments == {"city": "Alaska"} + assert "Partly cloudy" in tool_span.tool_result.content + + +# ========================================================================= +# Empty Trailing AI Message Tests +# ========================================================================= + + +class TestEmptyTrailingAIMessage: + """Tests for skipping empty trailing AI messages in agent response extraction.""" + + def setup_method(self): + self.mapper = OpenInferenceSessionMapper() + + def test_empty_trailing_ai_message_skipped(self): + """LangGraph output with empty trailing AI message → non-empty AI content returned.""" + # Real pattern: output.value has messages list where last AI message is empty + output_value = json.dumps( + { + "messages": [ + {"kwargs": {"content": "The answer is 42.", "type": "ai"}}, + {"kwargs": {"content": "", "type": "ai"}}, # Empty trailing AI message + ] + } + ) + input_value = json.dumps({"messages": [{"kwargs": {"content": "Calculate 6*7", "type": "human"}}]}) + attrs = { + "openinference.span.kind": "CHAIN", + "input.value": input_value, + "output.value": output_value, + } + span = make_span(name="LangGraph", span_id="trailing-ai", attributes=attrs) + session = self.mapper.map_to_session([span], "sess-1") + + assert len(session.traces[0].spans) == 1 + agent = session.traces[0].spans[0] + assert isinstance(agent, AgentInvocationSpan) + assert agent.agent_response == "The answer is 42." + + +# ========================================================================= +# Multi-Agent Dedup Tests +# ========================================================================= + + +class TestMultiAgentDedup: + """Tests for multi-agent LangGraph dedup — only last agent span kept.""" + + def setup_method(self): + self.mapper = OpenInferenceSessionMapper() + + def test_two_langgraph_chains_dedup_to_one(self): + """2 LangGraph CHAIN spans in same trace → 1 AgentInvocationSpan (last kept).""" + span1 = make_chain_span( + trace_id="t1", + span_id="s1", + name="LangGraph", + user_query="Q1", + agent_response="A1", + ) + span2 = make_chain_span( + trace_id="t1", + span_id="s2", + name="LangGraph", + user_query="Q2", + agent_response="A2", + ) + session = self.mapper.map_to_session([span1, span2], "sess-1") + + agent_spans = [s for s in session.traces[0].spans if isinstance(s, AgentInvocationSpan)] + assert len(agent_spans) == 1 + # Last one (s2) should be kept + assert agent_spans[0].user_prompt == "Q2" + assert agent_spans[0].agent_response == "A2" + + def test_single_agent_span_not_deduped(self): + """Single agent span is preserved as-is.""" + span = make_chain_span( + trace_id="t1", + span_id="s1", + name="LangGraph", + user_query="Hello", + agent_response="Hi", + ) + session = self.mapper.map_to_session([span], "sess-1") + agent_spans = [s for s in session.traces[0].spans if isinstance(s, AgentInvocationSpan)] + assert len(agent_spans) == 1 + + +# ========================================================================= +# AGENT-Kind Span Rejection Tests +# ========================================================================= + + +class TestAgentKindSpanRejection: + """Tests for AGENT-kind spans NOT matched as agent invocation.""" + + def setup_method(self): + self.mapper = OpenInferenceSessionMapper() + + def test_agent_kind_not_detected_as_agent_invocation(self): + """Span with kind=AGENT is NOT detected as agent invocation.""" + # OpenInference mislabels route_to_agent as AGENT instead of TOOL + span = make_span( + name="route_to_agent", + attributes={ + "openinference.span.kind": "AGENT", + "input.value": "{'agent_name': 'research_agent'}", + }, + ) + assert self.mapper._is_agent_invocation_span(span) is False + + def test_agent_kind_not_detected_as_tool(self): + """Span with kind=AGENT is NOT detected as tool.""" + span = make_span( + name="route_to_agent", + attributes={"openinference.span.kind": "AGENT"}, + ) + assert self.mapper._is_tool_execution_span(span) is False + + def test_agent_kind_not_detected_as_inference(self): + """Span with kind=AGENT is NOT detected as inference.""" + span = make_span( + name="route_to_agent", + attributes={"openinference.span.kind": "AGENT"}, + ) + assert self.mapper._is_inference_span(span) is False + + +# ========================================================================= +# ADOT Path Detection Tests +# ========================================================================= + + +class TestAdotSpanTypeDetection: + """Tests for ADOT/CloudWatch span type detection via body content.""" + + def setup_method(self): + self.mapper = OpenInferenceSessionMapper() + + def test_adot_inference_detected_by_generations(self): + """ADOT span with 'generations' in output detected as inference.""" + span = make_adot_span( + span_id="adot-llm", + input_messages=[ + {"content": "What's the weather?", "role": "user"}, + ], + output_messages=[ + { + "content": json.dumps( + { + "generations": [ + [ + { + "text": "It's sunny.", + "type": "ChatGeneration", + "message": {"lc": 1, "kwargs": {"content": "It's sunny.", "type": "ai"}}, + } + ] + ], + "type": "LLMResult", + } + ), + "role": "assistant", + } + ], + ) + assert self.mapper._is_inference_span(span) is True + assert self.mapper._is_tool_execution_span(span) is False + assert self.mapper._is_agent_invocation_span(span) is False + + def test_adot_tool_detected_by_type_tool(self): + """ADOT span with 'type': 'tool' in output detected as tool execution.""" + span = make_adot_span( + span_id="adot-tool", + input_messages=[ + {"content": "{'city': 'Alaska'}", "role": "user"}, + ], + output_messages=[ + { + "content": json.dumps( + { + "content": "Weather in Alaska: 65°F", + "type": "tool", + "name": "get_weather", + "tool_call_id": "toolu_123", + "status": "success", + } + ), + "role": "assistant", + } + ], + ) + assert self.mapper._is_tool_execution_span(span) is True + assert self.mapper._is_inference_span(span) is False + assert self.mapper._is_agent_invocation_span(span) is False + + def test_adot_tool_call_with_context_skipped(self): + """ADOT tool_call_with_context wrapper is NOT detected as tool.""" + span = make_adot_span( + span_id="adot-wrapper", + input_messages=[ + { + "content": json.dumps( + { + "__type": "tool_call_with_context", + "tool_call": {"name": "get_weather", "args": {"city": "Alaska"}}, + "state": {"messages": [], "remaining_steps": 9999}, + } + ), + "role": "user", + } + ], + output_messages=[ + { + "content": json.dumps( + { + "messages": [ + { + "content": "Weather: 65°F", + "type": "tool", + "name": "get_weather", + "tool_call_id": "toolu_123", + "status": "success", + } + ] + } + ), + "role": "assistant", + } + ], + ) + assert self.mapper._is_tool_execution_span(span) is False + + def test_adot_agent_detected_by_messages_without_remaining_steps(self): + """ADOT root graph span detected as agent invocation.""" + span = make_adot_span( + span_id="adot-agent", + input_messages=[ + { + "content": json.dumps( + { + "messages": [{"content": "What's the weather?", "type": "human"}], + } + ), + "role": "user", + } + ], + output_messages=[ + { + "content": json.dumps( + { + "messages": [{"content": "It's sunny.", "type": "ai"}], + } + ), + "role": "assistant", + } + ], + ) + assert self.mapper._is_agent_invocation_span(span) is True + + def test_adot_intermediate_node_with_remaining_steps_skipped(self): + """ADOT intermediate node with remaining_steps is NOT agent invocation.""" + span = make_adot_span( + span_id="adot-intermediate", + input_messages=[ + { + "content": json.dumps( + { + "messages": [{"content": "Hello", "type": "human"}], + "remaining_steps": 9999, + } + ), + "role": "user", + } + ], + output_messages=[ + { + "content": json.dumps( + { + "messages": [{"content": "Hi", "type": "ai"}], + } + ), + "role": "assistant", + } + ], + ) + assert self.mapper._is_agent_invocation_span(span) is False + + def test_adot_end_node_not_matched(self): + """ADOT __end__ output node is not matched as any span type.""" + span = make_adot_span( + span_id="adot-end", + input_messages=[ + { + "content": json.dumps( + { + "messages": [{"content": "Hello", "type": "human"}], + "remaining_steps": 9997, + } + ), + "role": "user", + } + ], + output_messages=[ + {"content": "__end__", "role": "assistant"}, + ], + ) + assert self.mapper._is_inference_span(span) is False + assert self.mapper._is_tool_execution_span(span) is False + assert self.mapper._is_agent_invocation_span(span) is False + + +# ========================================================================= +# ADOT Path Conversion Tests +# ========================================================================= + + +class TestAdotSpanConversion: + """Tests for ADOT span conversion to Session types.""" + + def setup_method(self): + self.mapper = OpenInferenceSessionMapper() + + def test_adot_inference_span_conversion(self): + """ADOT inference span with generations → InferenceSpan with correct content.""" + span = make_adot_span( + span_id="adot-llm-conv", + input_messages=[ + { + "content": json.dumps( + { + "messages": [ + [ + {"lc": 1, "kwargs": {"content": "You are helpful.", "type": "system"}}, + { + "lc": 1, + "kwargs": {"content": "What's the weather?", "type": "human", "id": "msg-1"}, + }, + ] + ] + } + ), + "role": "user", + }, + {"content": "What's the weather?", "role": "user"}, + ], + output_messages=[ + { + "content": json.dumps( + { + "generations": [ + [ + { + "text": "It's sunny today.", + "type": "ChatGeneration", + "message": { + "lc": 1, + "kwargs": { + "content": "It's sunny today.", + "type": "ai", + "tool_calls": [], + }, + }, + } + ] + ], + "type": "LLMResult", + } + ), + "role": "assistant", + }, + ], + ) + session = self.mapper.map_to_session([span], "sess-1") + + assert len(session.traces[0].spans) == 1 + inference = session.traces[0].spans[0] + assert isinstance(inference, InferenceSpan) + assert inference.messages[1].content[0].text == "It's sunny today." + + def test_adot_tool_span_conversion(self): + """ADOT tool span → ToolExecutionSpan with correct name, params, result.""" + span = make_adot_span( + span_id="adot-tool-conv", + input_messages=[ + {"content": "{'city': 'Alaska'}", "role": "user"}, + ], + output_messages=[ + { + "content": json.dumps( + { + "content": "Weather in Alaska: Partly cloudy, 65°F", + "type": "tool", + "name": "get_weather", + "tool_call_id": "toolu_bdrk_123", + "status": "success", + } + ), + "role": "assistant", + } + ], + ) + session = self.mapper.map_to_session([span], "sess-1") + + assert len(session.traces[0].spans) == 1 + tool_span = session.traces[0].spans[0] + assert isinstance(tool_span, ToolExecutionSpan) + assert tool_span.tool_call.name == "get_weather" + assert tool_span.tool_call.arguments == {"city": "Alaska"} + assert "Partly cloudy" in tool_span.tool_result.content + assert tool_span.tool_result.tool_call_id == "toolu_bdrk_123" + + def test_adot_agent_span_conversion(self): + """ADOT root graph span → AgentInvocationSpan with user_prompt and agent_response.""" + span = make_adot_span( + span_id="adot-agent-conv", + input_messages=[ + { + "content": json.dumps( + { + "messages": [ + {"content": "What's the weather?", "type": "human", "id": "msg-1"}, + ] + } + ), + "role": "user", + }, + {"content": "What's the weather?", "role": "user"}, + ], + output_messages=[ + { + "content": json.dumps( + { + "messages": [ + {"content": "What's the weather?", "type": "human"}, + {"content": "It's sunny in both cities.", "type": "ai"}, + ] + } + ), + "role": "assistant", + } + ], + ) + session = self.mapper.map_to_session([span], "sess-1") + + assert len(session.traces[0].spans) == 1 + agent = session.traces[0].spans[0] + assert isinstance(agent, AgentInvocationSpan) + assert agent.user_prompt == "What's the weather?" + assert agent.agent_response == "It's sunny in both cities." + + def test_adot_inference_with_tool_calls(self): + """ADOT inference span with tool calls in generations → preserved.""" + span = make_adot_span( + span_id="adot-llm-tc", + input_messages=[ + { + "content": json.dumps( + { + "messages": [ + [ + {"lc": 1, "kwargs": {"content": "System prompt.", "type": "system"}}, + {"lc": 1, "kwargs": {"content": "Check weather in Alaska", "type": "human"}}, + ] + ] + } + ), + "role": "user", + }, + {"content": "Check weather in Alaska", "role": "user"}, + ], + output_messages=[ + { + "content": json.dumps( + { + "generations": [ + [ + { + "text": "", + "type": "ChatGeneration", + "message": { + "lc": 1, + "kwargs": { + "content": "", + "type": "ai", + "tool_calls": [ + { + "name": "get_weather", + "args": {"city": "Alaska"}, + "id": "toolu_1", + "type": "tool_call", + }, + ], + }, + }, + } + ] + ], + "type": "LLMResult", + } + ), + "role": "assistant", + }, + ], + ) + session = self.mapper.map_to_session([span], "sess-1") + + assert len(session.traces[0].spans) == 1 + inference = session.traces[0].spans[0] + assert isinstance(inference, InferenceSpan) + # Should have empty text + tool call (not filtered because tool call present) + assert len(inference.messages[1].content) == 2 + assert inference.messages[1].content[1].name == "get_weather" + + def test_adot_structured_user_messages_with_tool_results(self): + """ADOT structured messages extract human + tool result contents.""" + span = make_adot_span( + span_id="adot-structured", + input_messages=[ + { + "content": json.dumps( + { + "messages": [ + [ + {"lc": 1, "kwargs": {"content": "You are helpful.", "type": "system"}}, + {"lc": 1, "kwargs": {"content": "What's the weather?", "type": "human"}}, + { + "lc": 1, + "kwargs": { + "content": "Weather in Alaska: 65°F", + "type": "tool", + "name": "get_weather", + "tool_call_id": "toolu_1", + "status": "success", + }, + }, + ] + ] + } + ), + "role": "user", + }, + ], + output_messages=[ + { + "content": json.dumps( + { + "generations": [ + [ + { + "text": "The weather is 65°F.", + "type": "ChatGeneration", + "message": { + "lc": 1, + "kwargs": {"content": "The weather is 65°F.", "type": "ai"}, + }, + } + ] + ], + "type": "LLMResult", + } + ), + "role": "assistant", + }, + ], + ) + session = self.mapper.map_to_session([span], "sess-1") + + assert len(session.traces[0].spans) == 1 + inference = session.traces[0].spans[0] + assert isinstance(inference, InferenceSpan) + user_msg = inference.messages[0] + # Should have human text + tool result + assert len(user_msg.content) == 2 + assert user_msg.content[0].text == "What's the weather?" + assert isinstance(user_msg.content[1], ToolResultContent) + assert "65°F" in user_msg.content[1].content + + +# ========================================================================= +# Integration Tests: Real Fixture Files +# ========================================================================= + + +@pytest.fixture(scope="module") +def live_session(): + """Map real live (in-memory) spans to a Session.""" + spans = _load_live_spans() + mapper = OpenInferenceSessionMapper() + return mapper.map_to_session(spans, "live-sess") + + +@pytest.fixture(scope="module") +def adot_session(): + """Map real ADOT/CloudWatch spans to a Session.""" + spans = _load_adot_spans() + mapper = OpenInferenceSessionMapper() + return mapper.map_to_session(spans, "adot-sess") + + +class TestLiveFixtureIntegration: + """Integration tests using real live (in-memory) OpenInference trace data.""" + + def test_session_has_traces(self, live_session): + """Live fixture produces at least one trace.""" + assert len(live_session.traces) >= 1 + + def test_produces_all_span_types(self, live_session): + """Live fixture produces InferenceSpan, ToolExecutionSpan, and AgentInvocationSpan.""" + all_spans = [s for t in live_session.traces for s in t.spans] + span_types = {type(s).__name__ for s in all_spans} + assert "InferenceSpan" in span_types + assert "ToolExecutionSpan" in span_types + assert "AgentInvocationSpan" in span_types + + def test_inference_spans_have_user_and_assistant(self, live_session): + """Every InferenceSpan has user and assistant messages.""" + all_spans = [s for t in live_session.traces for s in t.spans] + inference_spans = [s for s in all_spans if isinstance(s, InferenceSpan)] + assert len(inference_spans) > 0 + for span in inference_spans: + assert len(span.messages) == 2 + assert span.messages[0].role.value == "user" + assert span.messages[1].role.value == "assistant" + assert len(span.messages[0].content) > 0 + assert len(span.messages[1].content) > 0 + + def test_tool_spans_have_name_and_result(self, live_session): + """Every ToolExecutionSpan has a tool name and result.""" + all_spans = [s for t in live_session.traces for s in t.spans] + tool_spans = [s for s in all_spans if isinstance(s, ToolExecutionSpan)] + # Only finish tool is correctly labeled as TOOL (route_to_agent is mislabeled as AGENT) + assert len(tool_spans) >= 1 + for span in tool_spans: + assert span.tool_call.name + assert span.tool_result.content is not None + + def test_agent_spans_have_prompt_and_response(self, live_session): + """Every AgentInvocationSpan has user_prompt and agent_response.""" + all_spans = [s for t in live_session.traces for s in t.spans] + agent_spans = [s for s in all_spans if isinstance(s, AgentInvocationSpan)] + assert len(agent_spans) >= 1 + for span in agent_spans: + assert span.user_prompt + assert span.agent_response + + def test_agent_kind_spans_not_included(self, live_session): + """AGENT-kind spans (route_to_agent) are not mapped as AgentInvocationSpan.""" + all_spans = [s for t in live_session.traces for s in t.spans] + agent_spans = [s for s in all_spans if isinstance(s, AgentInvocationSpan)] + # Agent spans should only come from CHAIN+LangGraph, not AGENT-kind + for span in agent_spans: + assert span.user_prompt != "{'agent_name': 'research_agent'}" + + def test_one_agent_span_per_trace(self, live_session): + """Each trace has at most 1 AgentInvocationSpan (multi-agent dedup).""" + for trace in live_session.traces: + agent_spans = [s for s in trace.spans if isinstance(s, AgentInvocationSpan)] + assert len(agent_spans) <= 1 + + def test_no_empty_response_inference_spans(self, live_session): + """No InferenceSpan has empty-text-only assistant content.""" + all_spans = [s for t in live_session.traces for s in t.spans] + for span in all_spans: + if isinstance(span, InferenceSpan): + assistant_content = span.messages[1].content + # If all content is text, at least one must be non-empty + text_only = all(hasattr(c, "text") for c in assistant_content) + if text_only: + assert any(c.text for c in assistant_content) + + +class TestAdotFixtureIntegration: + """Integration tests using real ADOT/CloudWatch OpenInference trace data.""" + + def test_session_has_traces(self, adot_session): + """ADOT fixture produces traces (3 trace_ids in the file).""" + assert len(adot_session.traces) >= 1 + + def test_produces_expected_span_types(self, adot_session): + """ADOT fixture produces InferenceSpan and AgentInvocationSpan.""" + all_spans = [s for t in adot_session.traces for s in t.spans] + span_types = {type(s).__name__ for s in all_spans} + assert "InferenceSpan" in span_types + # Tool spans should be present (get_weather) + assert "ToolExecutionSpan" in span_types + + def test_tool_spans_have_correct_names(self, adot_session): + """ADOT tool spans extract correct tool names from output body.""" + all_spans = [s for t in adot_session.traces for s in t.spans] + tool_spans = [s for s in all_spans if isinstance(s, ToolExecutionSpan)] + tool_names = {s.tool_call.name for s in tool_spans} + assert "get_weather" in tool_names + + def test_tool_spans_have_python_repr_params(self, adot_session): + """ADOT tool spans correctly parse Python repr input (e.g. {'city': 'Alaska'}).""" + all_spans = [s for t in adot_session.traces for s in t.spans] + tool_spans = [s for s in all_spans if isinstance(s, ToolExecutionSpan)] + # Should have parsed the Python repr dict + for span in tool_spans: + assert isinstance(span.tool_call.arguments, dict) + assert len(span.tool_call.arguments) > 0 + + def test_inference_spans_have_content(self, adot_session): + """ADOT inference spans have non-empty user and assistant content.""" + all_spans = [s for t in adot_session.traces for s in t.spans] + inference_spans = [s for s in all_spans if isinstance(s, InferenceSpan)] + assert len(inference_spans) > 0 + for span in inference_spans: + assert len(span.messages[0].content) > 0 + assert len(span.messages[1].content) > 0 + + def test_agent_spans_have_content(self, adot_session): + """ADOT agent spans have user_prompt and agent_response.""" + all_spans = [s for t in adot_session.traces for s in t.spans] + agent_spans = [s for s in all_spans if isinstance(s, AgentInvocationSpan)] + assert len(agent_spans) >= 1 + for span in agent_spans: + assert span.user_prompt + assert span.agent_response + + def test_one_agent_span_per_trace(self, adot_session): + """Each trace has at most 1 AgentInvocationSpan.""" + for trace in adot_session.traces: + agent_spans = [s for s in trace.spans if isinstance(s, AgentInvocationSpan)] + assert len(agent_spans) <= 1 + + def test_intermediate_nodes_filtered(self, adot_session): + """ADOT intermediate nodes (remaining_steps, __end__) are filtered out.""" + # Total spans should be much less than 41 raw input spans + # Each trace should have: ~2 inference + 2 tool + 1 agent = ~5 max + for trace in adot_session.traces: + assert len(trace.spans) <= 10, f"Trace {trace.trace_id} has too many spans: {len(trace.spans)}" + + def test_no_empty_response_inference_spans(self, adot_session): + """No InferenceSpan has empty-text-only assistant content (filter applied).""" + all_spans = [s for t in adot_session.traces for s in t.spans] + for span in all_spans: + if isinstance(span, InferenceSpan): + assistant_content = span.messages[1].content + text_only = all(hasattr(c, "text") for c in assistant_content) + if text_only: + assert any(c.text for c in assistant_content)