Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
DSQAExample,
EvaluationResult,
)
from .tracing import flush_traces, init_tracing, is_tracing_enabled


__all__ = [
Expand All @@ -49,8 +48,4 @@
"DeepSearchQAEvaluator",
"DSQAExample",
"EvaluationResult",
# Tracing
"init_tracing",
"is_tracing_enabled",
"flush_traces",
]
138 changes: 0 additions & 138 deletions aieng-eval-agents/aieng/agent_evals/knowledge_agent/tracing.py

This file was deleted.

110 changes: 109 additions & 1 deletion aieng-eval-agents/aieng/agent_evals/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
from aieng.agent_evals.configs import Configs
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor


logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
Expand Down Expand Up @@ -79,6 +80,113 @@ def setup_langfuse_tracer(service_name: str = "aieng-eval-agents") -> "trace.Tra
return trace.get_tracer(__name__)


def init_tracing(service_name: str = "aieng-eval-agents") -> bool:
"""Initialize Langfuse tracing for Google ADK agents.

This function sets up OpenTelemetry with OTLP exporter to send traces
to Langfuse, and initializes OpenInference instrumentation for Google ADK
to automatically capture all agent interactions, tool calls, and model responses.

Parameters
----------
service_name : str, optional, default="aieng-eval-agents"
Service name to attach to emitted traces.

Returns
-------
bool
True if tracing was successfully initialized, False otherwise.

Examples
--------
>>> from aieng.agent_evals.langfuse import init_tracing
>>> init_tracing() # Call once at startup
>>> # Create and use your Google ADK agent as usual
# Traces are automatically sent to Langfuse
"""
manager = AsyncClientManager.get_instance()

if manager.otel_instrumented:
logger.debug("Tracing already initialized")
return True

try:
# Verify Langfuse client authentication
langfuse_client = manager.langfuse_client
if not langfuse_client.auth_check():
logger.warning("Langfuse authentication failed. Check your credentials.")
return False

# Get credentials from configs
configs = manager.configs
public_key = configs.langfuse_public_key or ""
secret_key = configs.langfuse_secret_key.get_secret_value() if configs.langfuse_secret_key else ""
langfuse_host = configs.langfuse_host

# Set up OpenTelemetry OTLP exporter to send traces to Langfuse
auth_string = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
otel_endpoint = f"{langfuse_host.rstrip('/')}/api/public/otel"

# Configure OpenTelemetry environment variables
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = otel_endpoint
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {auth_string}"

# Create a resource with service name
resource = Resource.create({"service.name": service_name})

# Create TracerProvider
provider = TracerProvider(resource=resource)

# Create OTLP exporter pointing to Langfuse
exporter = OTLPSpanExporter(
endpoint=f"{otel_endpoint}/v1/traces",
headers={"Authorization": f"Basic {auth_string}"},
)

# Add batch processor for efficient trace export
provider.add_span_processor(BatchSpanProcessor(exporter))

# Set as global tracer provider
trace.set_tracer_provider(provider)

# Initialize OpenInference instrumentation for Google ADK
from openinference.instrumentation.google_adk import GoogleADKInstrumentor # noqa: PLC0415

GoogleADKInstrumentor().instrument(tracer_provider=provider)

manager.otel_instrumented = True
logger.info("Langfuse tracing initialized successfully (endpoint: %s)", otel_endpoint)
return True

except ImportError as e:
logger.warning("Could not import tracing dependencies: %s", e)
return False
except Exception as e:
logger.warning("Failed to initialize tracing: %s", e)
return False


def flush_traces() -> None:
"""Flush any pending traces to Langfuse.

Call this before your application exits to ensure all traces are sent.
"""
manager = AsyncClientManager.get_instance()
if manager._langfuse_client is not None:
manager._langfuse_client.flush()


def is_tracing_enabled() -> bool:
"""Check if Langfuse tracing is currently enabled.

Returns
-------
bool
True if tracing has been initialized, False otherwise.
"""
return AsyncClientManager.get_instance().otel_instrumented


async def upload_dataset_to_langfuse(dataset_path: str, dataset_name: str):
"""Upload a dataset to Langfuse.

Expand Down