Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/strands_evals/mappers/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Constants for OTEL trace mappers.

Scope names identify which instrumentation library produced the spans.
Attribute keys and values are used for span type detection and data extraction.
"""

# --- Instrumentation scope names ---
SCOPE_LANGCHAIN_OTEL = "opentelemetry.instrumentation.langchain"
SCOPE_OPENINFERENCE = "openinference.instrumentation.langchain"
SCOPE_STRANDS = "strands.telemetry.tracer"

# --- OTEL semantic convention attribute keys ---
ATTR_LLM_REQUEST_TYPE = "llm.request.type"

# --- Traceloop/OpenLLMetry attribute keys ---
ATTR_TRACELOOP_SPAN_KIND = "traceloop.span.kind"
ATTR_TRACELOOP_ENTITY_NAME = "traceloop.entity.name"
ATTR_TRACELOOP_ENTITY_INPUT = "traceloop.entity.input"
ATTR_TRACELOOP_ENTITY_OUTPUT = "traceloop.entity.output"

# --- Traceloop/OpenLLMetry attribute values ---
KIND_WORKFLOW = "workflow"
KIND_TOOL = "tool"
LLM_TYPE_CHAT = "chat"

# --- ADOT body field values (LangChain serialization) ---
ADOT_ROLE_UNKNOWN = "unknown"
ADOT_LANGGRAPH_NAME = "LangGraph"
ADOT_TOOL_CALL_WITH_CONTEXT = "tool_call_with_context"
ADOT_INPUT_STR_KEY = "input_str"
73 changes: 33 additions & 40 deletions src/strands_evals/mappers/langchain_otel_session_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,25 @@
Trace,
UserMessage,
)
from .constants import (
ADOT_INPUT_STR_KEY,
ADOT_LANGGRAPH_NAME,
ADOT_ROLE_UNKNOWN,
ADOT_TOOL_CALL_WITH_CONTEXT,
ATTR_LLM_REQUEST_TYPE,
ATTR_TRACELOOP_ENTITY_INPUT,
ATTR_TRACELOOP_ENTITY_NAME,
ATTR_TRACELOOP_ENTITY_OUTPUT,
ATTR_TRACELOOP_SPAN_KIND,
KIND_TOOL,
KIND_WORKFLOW,
LLM_TYPE_CHAT,
SCOPE_LANGCHAIN_OTEL,
)
from .session_mapper import SessionMapper

logger = logging.getLogger(__name__)

SCOPE_NAME = "opentelemetry.instrumentation.langchain"

# --- OTEL semantic convention attribute keys ---
_ATTR_LLM_REQUEST_TYPE = "llm.request.type"

# --- Traceloop/OpenLLMetry attribute keys ---
_ATTR_TRACELOOP_SPAN_KIND = "traceloop.span.kind"
_ATTR_TRACELOOP_ENTITY_NAME = "traceloop.entity.name"
_ATTR_TRACELOOP_ENTITY_INPUT = "traceloop.entity.input"
_ATTR_TRACELOOP_ENTITY_OUTPUT = "traceloop.entity.output"

# --- Traceloop/OpenLLMetry attribute values ---
_KIND_WORKFLOW = "workflow"
_KIND_TOOL = "tool"
_LLM_TYPE_CHAT = "chat"

# --- ADOT body field values (LangChain serialization) ---
_ADOT_ROLE_UNKNOWN = "unknown"
_ADOT_LANGGRAPH_NAME = "LangGraph"
_ADOT_TOOL_CALL_WITH_CONTEXT = "tool_call_with_context"
_ADOT_INPUT_STR_KEY = "input_str"


class LangChainOtelSessionMapper(SessionMapper):
"""Maps Traceloop/OpenLLMetry LangChain traces to Session format.
Expand Down Expand Up @@ -103,7 +96,7 @@ def map_to_session(self, data: Any, session_id: str) -> Session:
spans = self._normalize_to_flat_spans(data)

# Filter to only spans from this scope
langchain_spans = [s for s in spans if self._get_scope_name(s) == SCOPE_NAME]
langchain_spans = [s for s in spans if self._get_scope_name(s) == SCOPE_LANGCHAIN_OTEL]

# Group spans by trace_id
grouped = defaultdict(list)
Expand Down Expand Up @@ -186,12 +179,12 @@ def _is_inference_span(self, span: dict) -> bool:
2. ADOT body: role == "unknown" with raw string content (direct LLM I/O)
"""
attrs = span.get("attributes", {})
if attrs.get(_ATTR_LLM_REQUEST_TYPE) == _LLM_TYPE_CHAT:
if attrs.get(ATTR_LLM_REQUEST_TYPE) == LLM_TYPE_CHAT:
return True

# ADOT fallback: records with role="unknown" and raw string content are LLM calls
input_messages, _ = self._get_messages_from_span_events(span)
if input_messages and input_messages[0].get("role") == _ADOT_ROLE_UNKNOWN:
if input_messages and input_messages[0].get("role") == ADOT_ROLE_UNKNOWN:
return True

return False
Expand All @@ -204,14 +197,14 @@ def _is_tool_execution_span(self, span: dict) -> bool:
2. ADOT body: input has "input_str" key (direct tool call format)
"""
attrs = span.get("attributes", {})
if attrs.get(_ATTR_TRACELOOP_SPAN_KIND) == _KIND_TOOL:
if attrs.get(ATTR_TRACELOOP_SPAN_KIND) == KIND_TOOL:
return True

# ADOT fallback: "input_str" is the direct tool invocation format.
# Note: "tool_call_with_context" records are graph-level wrappers that
# duplicate the actual tool call — skip them to avoid duplicate spans.
in_parsed = self._parse_adot_body(span)
if isinstance(in_parsed, dict) and _ADOT_INPUT_STR_KEY in in_parsed:
if isinstance(in_parsed, dict) and ADOT_INPUT_STR_KEY in in_parsed:
return True

return False
Expand All @@ -228,14 +221,14 @@ def _is_agent_invocation_span(self, span: dict) -> bool:
RunnableSequence, tools) are internal graph steps and must be skipped.
"""
attrs = span.get("attributes", {})
if attrs.get(_ATTR_TRACELOOP_SPAN_KIND) == _KIND_WORKFLOW:
if attrs.get(ATTR_TRACELOOP_SPAN_KIND) == KIND_WORKFLOW:
return True

# ADOT fallback: only the root LangGraph node is the agent invocation
in_parsed = self._parse_adot_body(span)
if isinstance(in_parsed, dict):
kwargs = in_parsed.get("kwargs")
if isinstance(kwargs, dict) and kwargs.get("name") == _ADOT_LANGGRAPH_NAME:
if isinstance(kwargs, dict) and kwargs.get("name") == ADOT_LANGGRAPH_NAME:
return True

return False
Expand Down Expand Up @@ -308,7 +301,7 @@ def _convert_tool_execution_span(self, span: dict, session_id: str) -> ToolExecu
span_info = self._create_span_info(span, session_id)
attrs = span.get("attributes", {})

tool_name = attrs.get(_ATTR_TRACELOOP_ENTITY_NAME)
tool_name = attrs.get(ATTR_TRACELOOP_ENTITY_NAME)
tool_parameters: dict | None = None
tool_output_content: str | None = None
tool_call_id: str | None = None
Expand All @@ -325,16 +318,16 @@ def _convert_tool_execution_span(self, span: dict, session_id: str) -> ToolExecu
inputs = in_parsed.get("inputs")
if isinstance(inputs, dict):
# tool_call_with_context: {"inputs": {"__type": "tool_call_with_context", "tool_call": {...}}}
if inputs.get("__type") == _ADOT_TOOL_CALL_WITH_CONTEXT:
if inputs.get("__type") == ADOT_TOOL_CALL_WITH_CONTEXT:
tc = inputs.get("tool_call", {})
tool_parameters = tc.get("args", {})
tool_name = tool_name or tc.get("name")
tool_call_id = tool_call_id or tc.get("id")
else:
# Direct inputs dict: {"inputs": {"a": 1, "b": 2}}
tool_parameters = inputs
if tool_parameters is None and _ADOT_INPUT_STR_KEY in in_parsed:
params_parsed = self._safe_json_parse(in_parsed.get(_ADOT_INPUT_STR_KEY, ""))
if tool_parameters is None and ADOT_INPUT_STR_KEY in in_parsed:
params_parsed = self._safe_json_parse(in_parsed.get(ADOT_INPUT_STR_KEY, ""))
if isinstance(params_parsed, dict):
tool_parameters = params_parsed

Expand All @@ -354,16 +347,16 @@ def _convert_tool_execution_span(self, span: dict, session_id: str) -> ToolExecu
tool_name = tool_name or top_kwargs.get("name")
else:
# Live format - extract from traceloop.entity.* attributes
entity_input = attrs.get(_ATTR_TRACELOOP_ENTITY_INPUT, "")
entity_output = attrs.get(_ATTR_TRACELOOP_ENTITY_OUTPUT, "")
entity_input = attrs.get(ATTR_TRACELOOP_ENTITY_INPUT, "")
entity_output = attrs.get(ATTR_TRACELOOP_ENTITY_OUTPUT, "")

if entity_input:
parsed = self._safe_json_parse(entity_input)
if isinstance(parsed, dict):
if "inputs" in parsed and isinstance(parsed.get("inputs"), dict):
tool_parameters = parsed.get("inputs")
elif _ADOT_INPUT_STR_KEY in parsed:
params_parsed = self._safe_json_parse(parsed.get(_ADOT_INPUT_STR_KEY, ""))
elif ADOT_INPUT_STR_KEY in parsed:
params_parsed = self._safe_json_parse(parsed.get(ADOT_INPUT_STR_KEY, ""))
if isinstance(params_parsed, dict):
tool_parameters = params_parsed

Expand Down Expand Up @@ -413,8 +406,8 @@ def _convert_agent_invocation_span(
agent_response = self._extract_agent_response_from_output(output_messages)
else:
# Live format - extract from traceloop.entity.* attributes
entity_input = attrs.get(_ATTR_TRACELOOP_ENTITY_INPUT, "")
entity_output = attrs.get(_ATTR_TRACELOOP_ENTITY_OUTPUT, "")
entity_input = attrs.get(ATTR_TRACELOOP_ENTITY_INPUT, "")
entity_output = attrs.get(ATTR_TRACELOOP_ENTITY_OUTPUT, "")

if entity_input:
parsed = self._safe_json_parse(entity_input)
Expand Down Expand Up @@ -557,7 +550,7 @@ def _extract_messages_from_span(self, span: dict) -> tuple[list[dict], list[dict
span_events = span.get("span_events", [])
for event in span_events:
event_name = event.get("event_name", "")
if event_name == SCOPE_NAME:
if event_name == SCOPE_LANGCHAIN_OTEL:
body = event.get("body", {})
input_group = body.get("input", {})
output_group = body.get("output", {})
Expand Down
Loading