From dfbce6929373c75e195467722b4fed03e0d3658e Mon Sep 17 00:00:00 2001 From: fcogidi <41602287+fcogidi@users.noreply.github.com> Date: Tue, 3 Feb 2026 14:02:26 -0500 Subject: [PATCH 1/2] Extract langfuse tracing setup for google-adk to more general location --- .../agent_evals/knowledge_agent/__init__.py | 5 - .../agent_evals/knowledge_agent/tracing.py | 138 ------------------ .../aieng/agent_evals/langfuse.py | 113 +++++++++++++- 3 files changed, 112 insertions(+), 144 deletions(-) delete mode 100644 aieng-eval-agents/aieng/agent_evals/knowledge_agent/tracing.py diff --git a/aieng-eval-agents/aieng/agent_evals/knowledge_agent/__init__.py b/aieng-eval-agents/aieng/agent_evals/knowledge_agent/__init__.py index 7ee55718..aeb1ade1 100644 --- a/aieng-eval-agents/aieng/agent_evals/knowledge_agent/__init__.py +++ b/aieng-eval-agents/aieng/agent_evals/knowledge_agent/__init__.py @@ -30,7 +30,6 @@ DSQAExample, EvaluationResult, ) -from .tracing import flush_traces, init_tracing, is_tracing_enabled __all__ = [ @@ -49,8 +48,4 @@ "DeepSearchQAEvaluator", "DSQAExample", "EvaluationResult", - # Tracing - "init_tracing", - "is_tracing_enabled", - "flush_traces", ] diff --git a/aieng-eval-agents/aieng/agent_evals/knowledge_agent/tracing.py b/aieng-eval-agents/aieng/agent_evals/knowledge_agent/tracing.py deleted file mode 100644 index b5dfaef3..00000000 --- a/aieng-eval-agents/aieng/agent_evals/knowledge_agent/tracing.py +++ /dev/null @@ -1,138 +0,0 @@ -"""Langfuse tracing integration for the knowledge-grounded QA agent. - -This module provides automatic tracing of Google ADK agent interactions -using Langfuse via OpenTelemetry and OpenInference instrumentation. - -Example -------- ->>> from aieng.agent_evals.knowledge_agent import init_tracing, KnowledgeGroundedAgent ->>> init_tracing() # Call once at startup ->>> agent = KnowledgeGroundedAgent() ->>> response = agent.answer("What is the population of Tokyo?") -# Traces are automatically sent to Langfuse -""" - -import base64 -import logging -import os - -from aieng.agent_evals.async_client_manager import AsyncClientManager -from openinference.instrumentation.google_adk import GoogleADKInstrumentor -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor - - -logger = logging.getLogger(__name__) - - -def init_tracing() -> bool: - """Initialize Langfuse tracing for Google ADK agents. - - This function sets up OpenTelemetry with OTLP exporter to send traces - to Langfuse, and initializes OpenInference instrumentation for Google ADK - to automatically capture all agent interactions, tool calls, and model responses. - - Returns - ------- - bool - True if tracing was successfully initialized, False otherwise. - - Notes - ----- - This function should be called once at application startup, before - creating any agents. Subsequent calls are no-ops. - - Langfuse credentials are read from environment variables via Configs: - - LANGFUSE_PUBLIC_KEY: Langfuse public key (pk-lf-...) - - LANGFUSE_SECRET_KEY: Langfuse secret key (sk-lf-...) - - LANGFUSE_HOST: Langfuse host URL (default: https://us.cloud.langfuse.com) - - Examples - -------- - >>> from aieng.agent_evals.knowledge_agent import init_tracing - >>> if init_tracing(): - ... print("Tracing enabled!") - """ - manager = AsyncClientManager.get_instance() - - if manager.otel_instrumented: - logger.debug("Tracing already initialized") - return True - - try: - # Verify Langfuse client authentication - langfuse_client = manager.langfuse_client - if not langfuse_client.auth_check(): - logger.warning("Langfuse authentication failed. Check your credentials.") - return False - - # Get credentials from configs - configs = manager.configs - public_key = configs.langfuse_public_key or "" - secret_key = configs.langfuse_secret_key.get_secret_value() if configs.langfuse_secret_key else "" - langfuse_host = configs.langfuse_host - - # Set up OpenTelemetry OTLP exporter to send traces to Langfuse - # Create base64 encoded auth string for OTLP headers - auth_string = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode() - otel_endpoint = f"{langfuse_host.rstrip('/')}/api/public/otel" - - # Configure OpenTelemetry environment variables - os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = otel_endpoint - os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {auth_string}" - - # Create a resource with service name - resource = Resource.create({"service.name": "knowledge-agent"}) - - # Create TracerProvider - provider = TracerProvider(resource=resource) - - # Create OTLP exporter pointing to Langfuse - exporter = OTLPSpanExporter( - endpoint=f"{otel_endpoint}/v1/traces", - headers={"Authorization": f"Basic {auth_string}"}, - ) - - # Add batch processor for efficient trace export - provider.add_span_processor(BatchSpanProcessor(exporter)) - - # Set as global tracer provider - trace.set_tracer_provider(provider) - - # Initialize OpenInference instrumentation for Google ADK - GoogleADKInstrumentor().instrument(tracer_provider=provider) - - manager.otel_instrumented = True - logger.info(f"Langfuse tracing initialized successfully (endpoint: {otel_endpoint})") - return True - - except ImportError as e: - logger.warning(f"Could not import tracing dependencies: {e}") - return False - except Exception as e: - logger.warning(f"Failed to initialize tracing: {e}") - return False - - -def flush_traces() -> None: - """Flush any pending traces to Langfuse. - - Call this before your application exits to ensure all traces are sent. - """ - manager = AsyncClientManager.get_instance() - if manager._langfuse_client is not None: - manager._langfuse_client.flush() - - -def is_tracing_enabled() -> bool: - """Check if Langfuse tracing is currently enabled. - - Returns - ------- - bool - True if tracing has been initialized, False otherwise. - """ - return AsyncClientManager.get_instance().otel_instrumented diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index 05bcd774..14dd7d22 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -11,8 +11,9 @@ from aieng.agent_evals.configs import Configs from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") @@ -79,6 +80,116 @@ def setup_langfuse_tracer(service_name: str = "aieng-eval-agents") -> "trace.Tra return trace.get_tracer(__name__) +def init_tracing(service_name: str = "aieng-eval-agents") -> bool: + """Initialize Langfuse tracing for Google ADK agents. + + This function sets up OpenTelemetry with OTLP exporter to send traces + to Langfuse, and initializes OpenInference instrumentation for Google ADK + to automatically capture all agent interactions, tool calls, and model responses. + + Parameters + ---------- + service_name : str, optional, default="aieng-eval-agents" + Service name to attach to emitted traces. + + Returns + ------- + bool + True if tracing was successfully initialized, False otherwise. + + Examples + -------- + Basic usage: + >>> from aieng.agent_evals.langfuse import init_tracing + >>> from aieng.agent_evals.knowledge_agent import KnowledgeGroundedAgent + >>> init_tracing() # Call once at startup + >>> agent = KnowledgeGroundedAgent() + >>> response = agent.answer("What is the population of Tokyo?") + # Traces are automatically sent to Langfuse + """ + manager = AsyncClientManager.get_instance() + + if manager.otel_instrumented: + logger.debug("Tracing already initialized") + return True + + try: + # Verify Langfuse client authentication + langfuse_client = manager.langfuse_client + if not langfuse_client.auth_check(): + logger.warning("Langfuse authentication failed. Check your credentials.") + return False + + # Get credentials from configs + configs = manager.configs + public_key = configs.langfuse_public_key or "" + secret_key = configs.langfuse_secret_key.get_secret_value() if configs.langfuse_secret_key else "" + langfuse_host = configs.langfuse_host + + # Set up OpenTelemetry OTLP exporter to send traces to Langfuse + auth_string = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode() + otel_endpoint = f"{langfuse_host.rstrip('/')}/api/public/otel" + + # Configure OpenTelemetry environment variables + os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = otel_endpoint + os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {auth_string}" + + # Create a resource with service name + resource = Resource.create({"service.name": service_name}) + + # Create TracerProvider + provider = TracerProvider(resource=resource) + + # Create OTLP exporter pointing to Langfuse + exporter = OTLPSpanExporter( + endpoint=f"{otel_endpoint}/v1/traces", + headers={"Authorization": f"Basic {auth_string}"}, + ) + + # Add batch processor for efficient trace export + provider.add_span_processor(BatchSpanProcessor(exporter)) + + # Set as global tracer provider + trace.set_tracer_provider(provider) + + # Initialize OpenInference instrumentation for Google ADK + from openinference.instrumentation.google_adk import GoogleADKInstrumentor # noqa: PLC0415 + + GoogleADKInstrumentor().instrument(tracer_provider=provider) + + manager.otel_instrumented = True + logger.info("Langfuse tracing initialized successfully (endpoint: %s)", otel_endpoint) + return True + + except ImportError as e: + logger.warning("Could not import tracing dependencies: %s", e) + return False + except Exception as e: + logger.warning("Failed to initialize tracing: %s", e) + return False + + +def flush_traces() -> None: + """Flush any pending traces to Langfuse. + + Call this before your application exits to ensure all traces are sent. + """ + manager = AsyncClientManager.get_instance() + if manager._langfuse_client is not None: + manager._langfuse_client.flush() + + +def is_tracing_enabled() -> bool: + """Check if Langfuse tracing is currently enabled. + + Returns + ------- + bool + True if tracing has been initialized, False otherwise. + """ + return AsyncClientManager.get_instance().otel_instrumented + + async def upload_dataset_to_langfuse(dataset_path: str, dataset_name: str): """Upload a dataset to Langfuse. From 956db613178af8bd234f84161040bfeacd014f2d Mon Sep 17 00:00:00 2001 From: fcogidi <41602287+fcogidi@users.noreply.github.com> Date: Tue, 3 Feb 2026 14:27:12 -0500 Subject: [PATCH 2/2] Update init_tracing docstring for clarity --- aieng-eval-agents/aieng/agent_evals/langfuse.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index 14dd7d22..2831d5b0 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -99,12 +99,9 @@ def init_tracing(service_name: str = "aieng-eval-agents") -> bool: Examples -------- - Basic usage: >>> from aieng.agent_evals.langfuse import init_tracing - >>> from aieng.agent_evals.knowledge_agent import KnowledgeGroundedAgent >>> init_tracing() # Call once at startup - >>> agent = KnowledgeGroundedAgent() - >>> response = agent.answer("What is the population of Tokyo?") + >>> # Create and use your Google ADK agent as usual # Traces are automatically sent to Langfuse """ manager = AsyncClientManager.get_instance()