From d46cb05b2179d60e4e96486d5da578a602a204f2 Mon Sep 17 00:00:00 2001 From: poshinchen Date: Mon, 16 Mar 2026 13:27:40 -0400 Subject: [PATCH 1/2] feat(mapper): added framework detection for traces from CloudWatch --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b31a3e7..498a76d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,11 +14,11 @@ authors = [ ] dependencies = [ - "pydantic>=2.0.0,<3.0.0", + "pydantic>=2.4.0,<3.0.0", "rich>=14.0.0,<15.0.0", "strands-agents>=1.0.0", "strands-agents-tools>=0.1.0,<1.0.0", - "typing-extensions>=4.0", + "typing-extensions>=4.13.2,<5.0.0", "opentelemetry-api>=1.20.0", "opentelemetry-sdk>=1.20.0", "opentelemetry-instrumentation-threading>=0.51b0,<1.00b0", From aa99927dd6d8b6ff040fd8c865213cb959ef4737 Mon Sep 17 00:00:00 2001 From: poshinchen Date: Mon, 16 Mar 2026 17:19:30 -0400 Subject: [PATCH 2/2] feat(mappers): updated languse_provider to support different frameworks --- pyproject.toml | 2 +- .../providers/langfuse_provider.py | 246 +++++++++++---- .../providers/test_langfuse_provider.py | 297 +++++++++++++++++- 3 files changed, 482 insertions(+), 63 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 498a76d..308c834 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ dev = [ "ruff>=0.13.0,<0.15.0", ] -langfuse = ["langfuse>=2.0.0,<3"] +langfuse = ["langfuse>=2.0.0,<4"] otel = ["opentelemetry-exporter-otlp-proto-http>=1.30.0,<2.0.0"] langchain = [ "langchain>=0.3.0", diff --git a/src/strands_evals/providers/langfuse_provider.py b/src/strands_evals/providers/langfuse_provider.py index 73a2c45..db348b3 100644 --- a/src/strands_evals/providers/langfuse_provider.py +++ b/src/strands_evals/providers/langfuse_provider.py @@ -195,12 +195,19 @@ def _convert_observations(self, observations: list[Any], session_id: str) -> lis def _convert_observation(self, obs: Any, session_id: str) -> Any: """Route a single Langfuse observation to the appropriate span converter. - Langfuse observation fields used for routing: - obs.type: str — "GENERATION" | "SPAN" | "EVENT" | ... - obs.name: str — e.g. "execute_tool calc", "invoke_agent my_agent", "chat" + Langfuse normalizes traces from ALL frameworks into its own Observation + format. The `obs.type` field is universal across frameworks: + + - GENERATION — LLM call (LangChain, Strands, LlamaIndex, etc.) + - TOOL — Tool invocation (LangChain sends these) + - CHAIN — Orchestration/agent (root chain = agent invocation) + - SPAN — Strands-specific spans (fallback by ``obs.name``) Routing: obs.type == "GENERATION" → InferenceSpan + obs.type == "TOOL" → ToolExecutionSpan + obs.type == "CHAIN" and no parent → AgentInvocationSpan + obs.type == "AGENT" and no parent → AgentInvocationSpan (Langfuse v4+) obs.type == "SPAN", name starts "execute_tool" → ToolExecutionSpan obs.type == "SPAN", name starts "invoke_agent" → AgentInvocationSpan Otherwise → None (skipped) @@ -210,17 +217,21 @@ def _convert_observation(self, obs: Any, session_id: str) -> Any: if obs_type == "GENERATION": return self._convert_generation(obs, session_id) - if obs_type != "SPAN": - logger.debug("Skipping observation with type: %s", obs_type) - return None - - obs_name = obs.name or "" - if obs_name.startswith("execute_tool"): + if obs_type == "TOOL": return self._convert_tool_execution(obs, session_id) - if obs_name.startswith("invoke_agent"): + + if obs_type in ("CHAIN", "AGENT") and obs.parent_observation_id is None: return self._convert_agent_invocation(obs, session_id) - logger.debug("Skipping SPAN with unrecognized name: %s", obs_name) + # Strands-specific fallback for SPAN type + if obs_type == "SPAN": + obs_name = obs.name or "" + if obs_name.startswith("execute_tool"): + return self._convert_tool_execution(obs, session_id) + if obs_name.startswith("invoke_agent"): + return self._convert_agent_invocation(obs, session_id) + + logger.debug("Skipping observation: type=%s, name=%s", obs_type, obs.name) return None def _create_span_info(self, obs: Any, session_id: str) -> SpanInfo: @@ -314,6 +325,18 @@ def _convert_message(self, msg: dict) -> UserMessage | AssistantMessage | None: if role == "assistant": assistant_content = self._parse_assistant_content(content_data) + # LangChain format: tool_calls as a separate field + tool_calls = msg.get("tool_calls") + if isinstance(tool_calls, list): + for tc in tool_calls: + if isinstance(tc, dict) and "name" in tc: + assistant_content.append( + ToolCallContent( + name=tc["name"], + arguments=tc.get("args") or tc.get("input") or {}, + tool_call_id=tc.get("id"), + ) + ) return AssistantMessage(content=assistant_content) if assistant_content else None elif role == "user": user_content = self._parse_user_content(content_data) @@ -412,26 +435,44 @@ def _parse_tool_result_content(self, content_data: list) -> list[TextContent | T return result def _convert_tool_execution(self, obs: Any, session_id: str) -> ToolExecutionSpan: - """Convert an execute_tool SPAN observation to a ToolExecutionSpan. - - Langfuse observation (obs.type == "SPAN", obs.name starts with "execute_tool"): - obs.input: dict — tool call details - {"name": "calc", "arguments": {"x": "2+2"}, "toolUseId": "tooluse_abc123"} - obs.output: str | dict — tool execution result - str: "42" - dict: {"result": "4", "status": "success"} - obs.metadata: dict | None + """Convert a tool observation to a ToolExecutionSpan. - Returns: - ToolExecutionSpan with tool_call and tool_result populated from the above. + Handles two formats: + + **Strands** (obs.type == "SPAN", name starts with "execute_tool"): + obs.input: ``{"name": "calc", "arguments": {"x": "2+2"}, "toolUseId": "..."}`` + obs.output: ``"42"`` or ``{"result": "4", "status": "success"}`` + + **LangChain / universal** (obs.type == "TOOL"): + obs.name: tool name (e.g. ``"add_numbers"``) + obs.input: tool arguments (dict or other) + obs.output: tool result """ span_info = self._create_span_info(obs, session_id) obs_input = obs.input or {} - if isinstance(obs_input, dict): + if isinstance(obs_input, dict) and "name" in obs_input: + # Strands format: input carries name/arguments/toolUseId tool_name = obs_input.get("name", "") tool_arguments = obs_input.get("arguments", {}) tool_call_id = obs_input.get("toolUseId") + elif obs.type == "TOOL": + # LangChain/universal: obs.name is the tool, obs.input is arguments + tool_name = obs.name or "" + if isinstance(obs_input, dict): + tool_arguments = obs_input + elif isinstance(obs_input, str): + # Try parsing as JSON; LangChain may send stringified dicts + try: + parsed = json.loads(obs_input) + tool_arguments = parsed if isinstance(parsed, dict) else {"input": obs_input} + except (json.JSONDecodeError, ValueError): + tool_arguments = {"input": obs_input} + elif obs_input: + tool_arguments = {"input": str(obs_input)} + else: + tool_arguments = {} + tool_call_id = None else: tool_name = "" tool_arguments = {} @@ -449,42 +490,46 @@ def _parse_tool_result(self, obs_output: Any) -> tuple[str, str | None]: """Parse tool execution output into (content, error). Input formats: - str: "42" → ("42", None) - dict: {"result": "4", "status": "success"} → ("4", None) - dict: {"result": "...", "status": "error"} → ("...", "error") - dict: {"result": "...", "status": ""} → ("...", None) - None: → ("", None) + str: ``"42"`` → ``("42", None)`` + dict: ``{"result": "4", "status": "success"}`` → ``("4", None)`` + dict: ``{"result": "...", "status": "error"}`` → ``("...", "error")`` + dict: ``{"content": "Weather...", "type": "tool", ...}`` → ``("Weather...", None)`` + (LangChain ToolMessage format via Langfuse) + None: → ``("", None)`` """ if isinstance(obs_output, str): return obs_output, None if isinstance(obs_output, dict): - content = obs_output.get("result", str(obs_output)) - status = obs_output.get("status", "") - error = None if status == "success" else (str(status) if status else None) - return content, error + # Strands format: {"result": "...", "status": "success"|"error"} + if "result" in obs_output: + content = obs_output["result"] + status = obs_output.get("status", "") + error = None if status == "success" else (str(status) if status else None) + return content, error + # LangChain ToolMessage format: {"content": "...", "type": "tool", ...} + if "content" in obs_output: + content = obs_output["content"] + if isinstance(content, str): + return content, None + return str(content), None + return str(obs_output), None content = str(obs_output) if obs_output is not None else "" return content, None def _convert_agent_invocation(self, obs: Any, session_id: str) -> AgentInvocationSpan: - """Convert an invoke_agent SPAN observation to an AgentInvocationSpan. - - Langfuse observation (obs.type == "SPAN", obs.name starts with "invoke_agent"): - obs.input: str | list[dict] | dict — user prompt - str: "Hello" - list[dict]: [{"text": "Hello"}] - dict: {"text": "Hello"} - obs.output: str | dict — agent response - str: "Hi there!" - dict: {"message": "Hi there!", "finish_reason": "end_turn"} - dict: {"text": "Hi there!"} - dict: {"content": [{"text": "Hi there!"}]} - obs.metadata: dict | None — may contain "tools" key with available tool names - {"tools": ["shell", "get_pull_request", ...]} + """Convert an agent observation to an AgentInvocationSpan. - Returns: - AgentInvocationSpan with user_prompt, agent_response, and available_tools extracted. + Handles two formats: + + **Strands** (obs.type == "SPAN", name starts with "invoke_agent"): + obs.input: ``"Hello"`` | ``[{"text": "Hello"}]`` | ``{"text": "Hello"}`` + obs.output: ``"Hi!"`` | ``{"message": "Hi!", "finish_reason": "end_turn"}`` + + **LangChain / universal** (obs.type == "CHAIN", root observation): + obs.input: ``{"input": "question"}`` or ``{"messages": [...]}`` + obs.output: ``{"output": "answer"}`` or ``{"content": "answer"}`` """ span_info = self._create_span_info(obs, session_id) obs_input = obs.input @@ -507,43 +552,118 @@ def _convert_agent_invocation(self, obs: Any, session_id: str) -> AgentInvocatio metadata=obs.metadata or {}, ) + def _extract_text_from_content(self, content: Any) -> str: + """Extract plain text from a message content field. + + Handles formats seen in Strands observations via OTEL→Langfuse: + str: ``"Hello"`` → ``"Hello"`` + str (JSON): ``'[{"text": "Hello"}]'`` → ``"Hello"`` + list[dict]: ``[{"text": "Hello"}]`` → ``"Hello"`` + None: → ``""`` + """ + if isinstance(content, str): + try: + parsed = json.loads(content) + except (json.JSONDecodeError, ValueError): + return content + return self._first_text_from_list(parsed) or content + if isinstance(content, list): + return self._first_text_from_list(content) or "" + return str(content) if content else "" + + def _first_text_from_list(self, items: Any) -> str | None: + """Return the first "text" value from a list of dicts, or None.""" + if isinstance(items, list): + for item in items: + if isinstance(item, dict) and "text" in item: + return item["text"] + return None + + def _find_message_content_by_role( + self, messages: list, roles: tuple[str, ...], *, reverse: bool = False + ) -> str | None: + """Find content of the first message matching any of the given roles. + + Searches ``messages`` (a list of dicts with ``type`` and ``content`` keys) + for the first entry whose ``type`` is in *roles*. Returns ``None`` when no + match is found so callers can fall through to other extraction strategies. + """ + if not isinstance(messages, list): + return None + items = reversed(messages) if reverse else iter(messages) + for msg in items: + if isinstance(msg, dict) and msg.get("type") in roles: + return str(msg.get("content", "")) + # Fallback: last message + if messages and isinstance(messages[-1], dict): + return str(messages[-1].get("content", "")) + return None + def _extract_user_prompt(self, obs_input: Any) -> str: """Extract user prompt string from observation input. Input formats: - str: "Hello" → "Hello" - list[dict]: [{"text": "Hello"}] → "Hello" - dict: {"text": "Hello"} → "Hello" - None: → "" + str: ``"Hello"`` → ``"Hello"`` + list[dict]: ``[{"text": "Hello"}]`` → ``"Hello"`` + list[dict]: ``[{"role": "user", "content": ...}]`` + → ``"Hello"`` (Strands via OTEL) + dict: ``{"text": "Hello"}`` → ``"Hello"`` + dict: ``{"input": "Hello"}`` → ``"Hello"`` (LangChain CHAIN) + dict: ``{"messages": [{"type": "human", "content": "Hello"}]}`` + → ``"Hello"`` (LangChain messages) + None: → ``""`` """ if isinstance(obs_input, str): return obs_input if isinstance(obs_input, list): for item in obs_input: - if isinstance(item, dict) and "text" in item: + if not isinstance(item, dict): + continue + if "text" in item: return item["text"] - if isinstance(obs_input, dict) and "text" in obs_input: - return obs_input["text"] + # Strands message-list format via OTEL→Langfuse + if item.get("role") in ("user", "human"): + return self._extract_text_from_content(item.get("content")) + if isinstance(obs_input, dict): + if "text" in obs_input: + return obs_input["text"] + if "input" in obs_input: + return str(obs_input["input"]) + if "messages" in obs_input: + result = self._find_message_content_by_role(obs_input["messages"], ("human", "user")) + if result is not None: + return result return str(obs_input) if obs_input else "" def _extract_agent_response(self, obs_output: Any) -> str: """Extract agent response string from observation output. Input formats: - str: "Hi there!" → "Hi there!" - dict: {"text": "Hi there!"} → "Hi there!" - dict: {"message": "Hi there!", "finish_reason": "..."} → "Hi there!" - dict: {"content": [{"text": "Hi there!"}]} → "Hi there!" - dict: {"content": "Hi there!"} → "Hi there!" - None: → "" + str: ``"Hi there!"`` → ``"Hi there!"`` + list[dict]: ``[{"role": "assistant", "content": ...}]`` → ``"Hi!"`` (Strands via OTEL) + dict: ``{"text": "Hi there!"}`` → ``"Hi there!"`` + dict: ``{"message": "Hi!", "finish_reason": "..."}`` → ``"Hi!"`` + dict: ``{"output": "Hi!"}`` → ``"Hi!"`` (LangChain CHAIN) + dict: ``{"content": [{"text": "Hi!"}]}`` → ``"Hi!"`` + dict: ``{"content": "Hi!"}`` → ``"Hi!"`` + dict: ``{"messages": [{"type": "ai", "content": "Hi!"}]}`` + → ``"Hi!"`` (LangGraph output) + None: → ``""`` """ if isinstance(obs_output, str): return obs_output + if isinstance(obs_output, list): + # Strands message-list format via OTEL→Langfuse + for item in reversed(obs_output): + if isinstance(item, dict) and item.get("role") in ("assistant", "ai"): + return self._extract_text_from_content(item.get("content")) if isinstance(obs_output, dict): if "text" in obs_output: return obs_output["text"] if "message" in obs_output: return obs_output["message"] + if "output" in obs_output: + return str(obs_output["output"]) if "content" in obs_output: content = obs_output["content"] if isinstance(content, list): @@ -552,6 +672,10 @@ def _extract_agent_response(self, obs_output: Any) -> str: return item["text"] elif isinstance(content, str): return content + if "messages" in obs_output: + result = self._find_message_content_by_role(obs_output["messages"], ("ai", "assistant"), reverse=True) + if result is not None: + return result return str(obs_output) if obs_output else "" def _extract_available_tools(self, metadata: Any) -> list[ToolConfig]: diff --git a/tests/strands_evals/providers/test_langfuse_provider.py b/tests/strands_evals/providers/test_langfuse_provider.py index cdad090..a765e24 100644 --- a/tests/strands_evals/providers/test_langfuse_provider.py +++ b/tests/strands_evals/providers/test_langfuse_provider.py @@ -21,7 +21,7 @@ def _meta(page=1, total_pages=1, total_items=10, limit=100): - m = MagicMock() + m = MagicMock(spec=["page", "limit", "total_items", "total_pages"]) m.page, m.limit, m.total_items, m.total_pages = page, limit, total_items, total_pages return m @@ -584,3 +584,298 @@ def test_nonempty_agent_response_used(self, provider, mock_client): ], ) assert result["output"] == "agent says this" + + +# --- LangChain framework support via obs.type routing --- + + +class TestLangChainToolType: + """TOOL-type observations from LangChain via Langfuse.""" + + def _get_spans(self, provider, mock_client, observations): + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + mock_client.api.observations.get_many.return_value = _paginated(observations) + return provider.get_evaluation_data("s1")["trajectory"].traces[0].spans + + def test_tool_type_produces_tool_execution_span(self, provider, mock_client): + """obs.type == 'TOOL' is routed to ToolExecutionSpan.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs("o-tool", "t1", "TOOL", name="add_numbers", obs_input={"a": 2, "b": 3}, obs_output="5"), + _obs("o-agent", "t1", "SPAN", name="invoke_agent a", obs_input=[{"text": "q"}], obs_output="a"), + ], + ) + tools = [s for s in spans if isinstance(s, ToolExecutionSpan)] + assert len(tools) == 1 + assert tools[0].tool_call.name == "add_numbers" + assert tools[0].tool_call.arguments == {"a": 2, "b": 3} + assert tools[0].tool_result.content == "5" + + def test_tool_type_with_dict_output(self, provider, mock_client): + """TOOL with dict output parses result/status correctly.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o-tool", + "t1", + "TOOL", + name="search", + obs_input={"query": "weather"}, + obs_output={"result": "Sunny", "status": "success"}, + ), + _obs("o-agent", "t1", "SPAN", name="invoke_agent a", obs_input=[{"text": "q"}], obs_output="a"), + ], + ) + tools = [s for s in spans if isinstance(s, ToolExecutionSpan)] + assert tools[0].tool_result.content == "Sunny" + assert tools[0].tool_result.error is None + + def test_tool_type_with_string_input(self, provider, mock_client): + """TOOL with string input wraps it in a dict.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs("o-tool", "t1", "TOOL", name="echo", obs_input="hello", obs_output="hello"), + _obs("o-agent", "t1", "SPAN", name="invoke_agent a", obs_input=[{"text": "q"}], obs_output="a"), + ], + ) + tools = [s for s in spans if isinstance(s, ToolExecutionSpan)] + assert tools[0].tool_call.name == "echo" + assert tools[0].tool_call.arguments == {"input": "hello"} + + +class TestLangChainChainType: + """CHAIN-type observations from LangChain via Langfuse.""" + + def _get_spans(self, provider, mock_client, observations): + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + mock_client.api.observations.get_many.return_value = _paginated(observations) + return provider.get_evaluation_data("s1")["trajectory"].traces[0].spans + + def test_root_chain_produces_agent_invocation(self, provider, mock_client): + """Root CHAIN (parent_observation_id=None) → AgentInvocationSpan.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o-chain", + "t1", + "CHAIN", + name="AgentExecutor", + obs_input={"input": "What is 2+2?"}, + obs_output={"output": "4"}, + parent_observation_id=None, + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert len(agents) == 1 + assert agents[0].user_prompt == "What is 2+2?" + assert agents[0].agent_response == "4" + + def test_child_chain_is_skipped(self, provider, mock_client): + """Non-root CHAIN (has parent) is skipped.""" + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + mock_client.api.observations.get_many.return_value = _paginated( + [ + _obs( + "o-child", + "t1", + "CHAIN", + name="SubChain", + obs_input={"input": "sub"}, + obs_output={"output": "sub-out"}, + parent_observation_id="o-parent", + ), + _obs("o-agent", "t1", "SPAN", name="invoke_agent a", obs_input=[{"text": "q"}], obs_output="a"), + ], + ) + spans = provider.get_evaluation_data("s1")["trajectory"].traces[0].spans + assert len(spans) == 1 + assert isinstance(spans[0], AgentInvocationSpan) + + def test_chain_with_messages_input(self, provider, mock_client): + """CHAIN with LangChain messages-style input extracts human message.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o-chain", + "t1", + "CHAIN", + name="LangGraph", + obs_input={"messages": [{"type": "human", "content": "Tell me a joke"}]}, + obs_output={"output": "Why did the chicken cross the road?"}, + parent_observation_id=None, + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert agents[0].user_prompt == "Tell me a joke" + + def test_chain_with_content_output(self, provider, mock_client): + """CHAIN with content-style output.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o-chain", + "t1", + "CHAIN", + name="LangGraph", + obs_input={"input": "Hi"}, + obs_output={"content": "Hello!"}, + parent_observation_id=None, + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert agents[0].agent_response == "Hello!" + + +class TestLangChainEndToEnd: + """Full LangChain agent trace: CHAIN + GENERATION + TOOL.""" + + def test_full_langchain_trace(self, provider, mock_client): + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + mock_client.api.observations.get_many.return_value = _paginated( + [ + _obs( + "o-chain", + "t1", + "CHAIN", + name="AgentExecutor", + obs_input={"input": "What is the weather?"}, + obs_output={"output": "Sunny, 72F."}, + parent_observation_id=None, + ), + _obs( + "o-gen", + "t1", + "GENERATION", + name="ChatOpenAI", + obs_input=[{"role": "user", "content": [{"text": "What is the weather?"}]}], + obs_output={"role": "assistant", "content": [{"text": "Let me check."}]}, + parent_observation_id="o-chain", + ), + _obs( + "o-tool", + "t1", + "TOOL", + name="get_weather", + obs_input={"location": "SF"}, + obs_output="Sunny, 72F", + parent_observation_id="o-chain", + ), + ], + ) + result = provider.get_evaluation_data("s1") + spans = result["trajectory"].traces[0].spans + + agent_spans = [s for s in spans if isinstance(s, AgentInvocationSpan)] + inference_spans = [s for s in spans if isinstance(s, InferenceSpan)] + tool_spans = [s for s in spans if isinstance(s, ToolExecutionSpan)] + + assert len(agent_spans) == 1 + assert len(inference_spans) == 1 + assert len(tool_spans) == 1 + + assert agent_spans[0].user_prompt == "What is the weather?" + assert agent_spans[0].agent_response == "Sunny, 72F." + assert tool_spans[0].tool_call.name == "get_weather" + assert tool_spans[0].tool_result.content == "Sunny, 72F" + assert result["output"] == "Sunny, 72F." + + +class TestStrandsOtelViaLangfuse: + """Strands message-list format arriving via OTEL→Langfuse OTLP endpoint.""" + + def _get_spans(self, provider, mock_client, observations): + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + mock_client.api.observations.get_many.return_value = _paginated(observations) + return provider.get_evaluation_data("s1")["trajectory"].traces[0].spans + + def test_strands_message_list_input(self, provider, mock_client): + """Strands invoke_agent with message-list input extracts user text.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o1", + "t1", + "SPAN", + name="invoke_agent my_agent", + obs_input=[{"role": "user", "content": '[{"text": "What\'s the weather?"}]'}], + obs_output="Sunny and warm!", + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert len(agents) == 1 + assert agents[0].user_prompt == "What's the weather?" + + def test_strands_message_list_input_plain_string_content(self, provider, mock_client): + """Strands message-list where content is a plain string (not JSON).""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o1", + "t1", + "SPAN", + name="invoke_agent my_agent", + obs_input=[{"role": "user", "content": "Hello there"}], + obs_output="Hi!", + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert agents[0].user_prompt == "Hello there" + + def test_strands_message_list_output(self, provider, mock_client): + """Strands invoke_agent with message-list output extracts assistant text.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o1", + "t1", + "SPAN", + name="invoke_agent my_agent", + obs_input=[{"text": "Hello"}], + obs_output=[{"role": "assistant", "content": '[{"text": "Hi there!"}]'}], + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert agents[0].agent_response == "Hi there!" + + def test_strands_message_list_content_as_list(self, provider, mock_client): + """Strands message-list where content is already a parsed list.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o1", + "t1", + "SPAN", + name="invoke_agent my_agent", + obs_input=[{"role": "user", "content": [{"text": "How are you?"}]}], + obs_output="I'm fine!", + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert agents[0].user_prompt == "How are you?"