Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ authors = [
]

dependencies = [
"pydantic>=2.0.0,<3.0.0",
"pydantic>=2.4.0,<3.0.0",
"rich>=14.0.0,<15.0.0",
"strands-agents>=1.0.0",
"strands-agents-tools>=0.1.0,<1.0.0",
"typing-extensions>=4.0",
"typing-extensions>=4.13.2,<5.0.0",
"opentelemetry-api>=1.20.0",
"opentelemetry-sdk>=1.20.0",
"opentelemetry-instrumentation-threading>=0.51b0,<1.00b0",
Expand All @@ -44,7 +44,7 @@ dev = [
"ruff>=0.13.0,<0.15.0",
]

langfuse = ["langfuse>=2.0.0,<3"]
langfuse = ["langfuse>=2.0.0,<4"]
otel = ["opentelemetry-exporter-otlp-proto-http>=1.30.0,<2.0.0"]
langchain = [
"langchain>=0.3.0",
Expand Down
246 changes: 185 additions & 61 deletions src/strands_evals/providers/langfuse_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,19 @@ def _convert_observations(self, observations: list[Any], session_id: str) -> lis
def _convert_observation(self, obs: Any, session_id: str) -> Any:
"""Route a single Langfuse observation to the appropriate span converter.

Langfuse observation fields used for routing:
obs.type: str — "GENERATION" | "SPAN" | "EVENT" | ...
obs.name: str — e.g. "execute_tool calc", "invoke_agent my_agent", "chat"
Langfuse normalizes traces from ALL frameworks into its own Observation
format. The `obs.type` field is universal across frameworks:

- GENERATION — LLM call (LangChain, Strands, LlamaIndex, etc.)
- TOOL — Tool invocation (LangChain sends these)
- CHAIN — Orchestration/agent (root chain = agent invocation)
- SPAN — Strands-specific spans (fallback by ``obs.name``)

Routing:
obs.type == "GENERATION" → InferenceSpan
obs.type == "TOOL" → ToolExecutionSpan
obs.type == "CHAIN" and no parent → AgentInvocationSpan
obs.type == "AGENT" and no parent → AgentInvocationSpan (Langfuse v4+)
obs.type == "SPAN", name starts "execute_tool" → ToolExecutionSpan
obs.type == "SPAN", name starts "invoke_agent" → AgentInvocationSpan
Otherwise → None (skipped)
Expand All @@ -210,17 +217,21 @@ def _convert_observation(self, obs: Any, session_id: str) -> Any:
if obs_type == "GENERATION":
return self._convert_generation(obs, session_id)

if obs_type != "SPAN":
logger.debug("Skipping observation with type: %s", obs_type)
return None

obs_name = obs.name or ""
if obs_name.startswith("execute_tool"):
if obs_type == "TOOL":
return self._convert_tool_execution(obs, session_id)
if obs_name.startswith("invoke_agent"):

if obs_type in ("CHAIN", "AGENT") and obs.parent_observation_id is None:
return self._convert_agent_invocation(obs, session_id)

logger.debug("Skipping SPAN with unrecognized name: %s", obs_name)
# Strands-specific fallback for SPAN type
if obs_type == "SPAN":
obs_name = obs.name or ""
if obs_name.startswith("execute_tool"):
return self._convert_tool_execution(obs, session_id)
if obs_name.startswith("invoke_agent"):
return self._convert_agent_invocation(obs, session_id)

logger.debug("Skipping observation: type=%s, name=%s", obs_type, obs.name)
return None

def _create_span_info(self, obs: Any, session_id: str) -> SpanInfo:
Expand Down Expand Up @@ -314,6 +325,18 @@ def _convert_message(self, msg: dict) -> UserMessage | AssistantMessage | None:

if role == "assistant":
assistant_content = self._parse_assistant_content(content_data)
# LangChain format: tool_calls as a separate field
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if isinstance(tc, dict) and "name" in tc:
assistant_content.append(
ToolCallContent(
name=tc["name"],
arguments=tc.get("args") or tc.get("input") or {},
tool_call_id=tc.get("id"),
)
)
return AssistantMessage(content=assistant_content) if assistant_content else None
elif role == "user":
user_content = self._parse_user_content(content_data)
Expand Down Expand Up @@ -412,26 +435,44 @@ def _parse_tool_result_content(self, content_data: list) -> list[TextContent | T
return result

def _convert_tool_execution(self, obs: Any, session_id: str) -> ToolExecutionSpan:
"""Convert an execute_tool SPAN observation to a ToolExecutionSpan.

Langfuse observation (obs.type == "SPAN", obs.name starts with "execute_tool"):
obs.input: dict — tool call details
{"name": "calc", "arguments": {"x": "2+2"}, "toolUseId": "tooluse_abc123"}
obs.output: str | dict — tool execution result
str: "42"
dict: {"result": "4", "status": "success"}
obs.metadata: dict | None
"""Convert a tool observation to a ToolExecutionSpan.

Returns:
ToolExecutionSpan with tool_call and tool_result populated from the above.
Handles two formats:

**Strands** (obs.type == "SPAN", name starts with "execute_tool"):
obs.input: ``{"name": "calc", "arguments": {"x": "2+2"}, "toolUseId": "..."}``
obs.output: ``"42"`` or ``{"result": "4", "status": "success"}``

**LangChain / universal** (obs.type == "TOOL"):
obs.name: tool name (e.g. ``"add_numbers"``)
obs.input: tool arguments (dict or other)
obs.output: tool result
"""
span_info = self._create_span_info(obs, session_id)
obs_input = obs.input or {}

if isinstance(obs_input, dict):
if isinstance(obs_input, dict) and "name" in obs_input:
# Strands format: input carries name/arguments/toolUseId
tool_name = obs_input.get("name", "")
tool_arguments = obs_input.get("arguments", {})
tool_call_id = obs_input.get("toolUseId")
elif obs.type == "TOOL":
# LangChain/universal: obs.name is the tool, obs.input is arguments
tool_name = obs.name or ""
if isinstance(obs_input, dict):
tool_arguments = obs_input
elif isinstance(obs_input, str):
# Try parsing as JSON; LangChain may send stringified dicts
try:
parsed = json.loads(obs_input)
tool_arguments = parsed if isinstance(parsed, dict) else {"input": obs_input}
except (json.JSONDecodeError, ValueError):
tool_arguments = {"input": obs_input}
elif obs_input:
tool_arguments = {"input": str(obs_input)}
else:
tool_arguments = {}
tool_call_id = None
else:
tool_name = ""
tool_arguments = {}
Expand All @@ -449,42 +490,46 @@ def _parse_tool_result(self, obs_output: Any) -> tuple[str, str | None]:
"""Parse tool execution output into (content, error).

Input formats:
str: "42" → ("42", None)
dict: {"result": "4", "status": "success"} → ("4", None)
dict: {"result": "...", "status": "error"} → ("...", "error")
dict: {"result": "...", "status": ""} → ("...", None)
None: → ("", None)
str: ``"42"`` → ``("42", None)``
dict: ``{"result": "4", "status": "success"}`` → ``("4", None)``
dict: ``{"result": "...", "status": "error"}`` → ``("...", "error")``
dict: ``{"content": "Weather...", "type": "tool", ...}`` → ``("Weather...", None)``
(LangChain ToolMessage format via Langfuse)
None: → ``("", None)``
"""
if isinstance(obs_output, str):
return obs_output, None

if isinstance(obs_output, dict):
content = obs_output.get("result", str(obs_output))
status = obs_output.get("status", "")
error = None if status == "success" else (str(status) if status else None)
return content, error
# Strands format: {"result": "...", "status": "success"|"error"}
if "result" in obs_output:
content = obs_output["result"]
status = obs_output.get("status", "")
error = None if status == "success" else (str(status) if status else None)
return content, error
# LangChain ToolMessage format: {"content": "...", "type": "tool", ...}
if "content" in obs_output:
content = obs_output["content"]
if isinstance(content, str):
return content, None
return str(content), None
return str(obs_output), None

content = str(obs_output) if obs_output is not None else ""
return content, None

def _convert_agent_invocation(self, obs: Any, session_id: str) -> AgentInvocationSpan:
"""Convert an invoke_agent SPAN observation to an AgentInvocationSpan.

Langfuse observation (obs.type == "SPAN", obs.name starts with "invoke_agent"):
obs.input: str | list[dict] | dict — user prompt
str: "Hello"
list[dict]: [{"text": "Hello"}]
dict: {"text": "Hello"}
obs.output: str | dict — agent response
str: "Hi there!"
dict: {"message": "Hi there!", "finish_reason": "end_turn"}
dict: {"text": "Hi there!"}
dict: {"content": [{"text": "Hi there!"}]}
obs.metadata: dict | None — may contain "tools" key with available tool names
{"tools": ["shell", "get_pull_request", ...]}
"""Convert an agent observation to an AgentInvocationSpan.

Returns:
AgentInvocationSpan with user_prompt, agent_response, and available_tools extracted.
Handles two formats:

**Strands** (obs.type == "SPAN", name starts with "invoke_agent"):
obs.input: ``"Hello"`` | ``[{"text": "Hello"}]`` | ``{"text": "Hello"}``
obs.output: ``"Hi!"`` | ``{"message": "Hi!", "finish_reason": "end_turn"}``

**LangChain / universal** (obs.type == "CHAIN", root observation):
obs.input: ``{"input": "question"}`` or ``{"messages": [...]}``
obs.output: ``{"output": "answer"}`` or ``{"content": "answer"}``
"""
span_info = self._create_span_info(obs, session_id)
obs_input = obs.input
Expand All @@ -507,43 +552,118 @@ def _convert_agent_invocation(self, obs: Any, session_id: str) -> AgentInvocatio
metadata=obs.metadata or {},
)

def _extract_text_from_content(self, content: Any) -> str:
"""Extract plain text from a message content field.

Handles formats seen in Strands observations via OTEL→Langfuse:
str: ``"Hello"`` → ``"Hello"``
str (JSON): ``'[{"text": "Hello"}]'`` → ``"Hello"``
list[dict]: ``[{"text": "Hello"}]`` → ``"Hello"``
None: → ``""``
"""
if isinstance(content, str):
try:
parsed = json.loads(content)
except (json.JSONDecodeError, ValueError):
return content
return self._first_text_from_list(parsed) or content
if isinstance(content, list):
return self._first_text_from_list(content) or ""
return str(content) if content else ""

def _first_text_from_list(self, items: Any) -> str | None:
"""Return the first "text" value from a list of dicts, or None."""
if isinstance(items, list):
for item in items:
if isinstance(item, dict) and "text" in item:
return item["text"]
return None

def _find_message_content_by_role(
self, messages: list, roles: tuple[str, ...], *, reverse: bool = False
) -> str | None:
"""Find content of the first message matching any of the given roles.

Searches ``messages`` (a list of dicts with ``type`` and ``content`` keys)
for the first entry whose ``type`` is in *roles*. Returns ``None`` when no
match is found so callers can fall through to other extraction strategies.
"""
if not isinstance(messages, list):
return None
items = reversed(messages) if reverse else iter(messages)
for msg in items:
if isinstance(msg, dict) and msg.get("type") in roles:
return str(msg.get("content", ""))
# Fallback: last message
if messages and isinstance(messages[-1], dict):
return str(messages[-1].get("content", ""))
return None

def _extract_user_prompt(self, obs_input: Any) -> str:
"""Extract user prompt string from observation input.

Input formats:
str: "Hello" → "Hello"
list[dict]: [{"text": "Hello"}] → "Hello"
dict: {"text": "Hello"} → "Hello"
None: → ""
str: ``"Hello"`` → ``"Hello"``
list[dict]: ``[{"text": "Hello"}]`` → ``"Hello"``
list[dict]: ``[{"role": "user", "content": ...}]``
→ ``"Hello"`` (Strands via OTEL)
dict: ``{"text": "Hello"}`` → ``"Hello"``
dict: ``{"input": "Hello"}`` → ``"Hello"`` (LangChain CHAIN)
dict: ``{"messages": [{"type": "human", "content": "Hello"}]}``
→ ``"Hello"`` (LangChain messages)
None: → ``""``
"""
if isinstance(obs_input, str):
return obs_input
if isinstance(obs_input, list):
for item in obs_input:
if isinstance(item, dict) and "text" in item:
if not isinstance(item, dict):
continue
if "text" in item:
return item["text"]
if isinstance(obs_input, dict) and "text" in obs_input:
return obs_input["text"]
# Strands message-list format via OTEL→Langfuse
if item.get("role") in ("user", "human"):
return self._extract_text_from_content(item.get("content"))
if isinstance(obs_input, dict):
if "text" in obs_input:
return obs_input["text"]
if "input" in obs_input:
return str(obs_input["input"])
if "messages" in obs_input:
result = self._find_message_content_by_role(obs_input["messages"], ("human", "user"))
if result is not None:
return result
return str(obs_input) if obs_input else ""

def _extract_agent_response(self, obs_output: Any) -> str:
"""Extract agent response string from observation output.

Input formats:
str: "Hi there!" → "Hi there!"
dict: {"text": "Hi there!"} → "Hi there!"
dict: {"message": "Hi there!", "finish_reason": "..."} → "Hi there!"
dict: {"content": [{"text": "Hi there!"}]} → "Hi there!"
dict: {"content": "Hi there!"} → "Hi there!"
None: → ""
str: ``"Hi there!"`` → ``"Hi there!"``
list[dict]: ``[{"role": "assistant", "content": ...}]`` → ``"Hi!"`` (Strands via OTEL)
dict: ``{"text": "Hi there!"}`` → ``"Hi there!"``
dict: ``{"message": "Hi!", "finish_reason": "..."}`` → ``"Hi!"``
dict: ``{"output": "Hi!"}`` → ``"Hi!"`` (LangChain CHAIN)
dict: ``{"content": [{"text": "Hi!"}]}`` → ``"Hi!"``
dict: ``{"content": "Hi!"}`` → ``"Hi!"``
dict: ``{"messages": [{"type": "ai", "content": "Hi!"}]}``
→ ``"Hi!"`` (LangGraph output)
None: → ``""``
"""
if isinstance(obs_output, str):
return obs_output
if isinstance(obs_output, list):
# Strands message-list format via OTEL→Langfuse
for item in reversed(obs_output):
if isinstance(item, dict) and item.get("role") in ("assistant", "ai"):
return self._extract_text_from_content(item.get("content"))
if isinstance(obs_output, dict):
if "text" in obs_output:
return obs_output["text"]
if "message" in obs_output:
return obs_output["message"]
if "output" in obs_output:
return str(obs_output["output"])
if "content" in obs_output:
content = obs_output["content"]
if isinstance(content, list):
Expand All @@ -552,6 +672,10 @@ def _extract_agent_response(self, obs_output: Any) -> str:
return item["text"]
elif isinstance(content, str):
return content
if "messages" in obs_output:
result = self._find_message_content_by_role(obs_output["messages"], ("ai", "assistant"), reverse=True)
if result is not None:
return result
return str(obs_output) if obs_output else ""

def _extract_available_tools(self, metadata: Any) -> list[ToolConfig]:
Expand Down
Loading