From 071af2414c31212ee50e96a5551417c168662571 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Tue, 10 Feb 2026 16:59:04 -0500 Subject: [PATCH 01/15] Adding final response evaluation and some minor improvements --- .../{evaluation.py => evaluation/offline.py} | 2 +- .../report_generation/evaluation/online.py | 63 +++++++++++++++++++ implementations/report_generation/demo.py | 9 +++ implementations/report_generation/evaluate.py | 2 +- .../report_generation/gradio_utils.py | 12 +++- 5 files changed, 83 insertions(+), 5 deletions(-) rename aieng-eval-agents/aieng/agent_evals/report_generation/{evaluation.py => evaluation/offline.py} (99%) create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py similarity index 99% rename from aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py rename to aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py index 9099170a..8e179a09 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py @@ -3,7 +3,7 @@ Example ------- ->>> from aieng.agent_evals.report_generation.evaluation import evaluate +>>> from aieng.agent_evals.report_generation.evaluation.offline import evaluate >>> evaluate( >>> dataset_name="OnlineRetailReportEval", >>> reports_output_path=Path("reports/"), diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py new file mode 100644 index 00000000..d487592d --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py @@ -0,0 +1,63 @@ +"""Functions to report online evaluation of the report generation agent to Langfuse.""" + +from aieng.agent_evals.report_generation.agent import EventParser, EventType +from google.adk.events.event import Event +from langfuse import Langfuse + + +def report_if_final_response(event: Event, langfuse_client: Langfuse, string_match: str = "") -> None: + """Report a score to Langfuse if the event is a final response. + + The score will be reported as 1 if the final response is valid + and contains the string match. Otherwise, the score will be reported as 0. + + Parameters + ---------- + event : Event + The event to check. + langfuse_client : Langfuse + The Langfuse client to use. + string_match : str + The string to match in the final response. + Optional, default to empty string. + """ + trace_id = langfuse_client.get_current_trace_id() + + if event.is_final_response(): + parsed_events = EventParser.parse(event) + for parsed_event in parsed_events: + if parsed_event.type == EventType.FINAL_RESPONSE: + if string_match in parsed_event.text: + langfuse_client.create_score( + name="Valid Final Response", + value=1, + trace_id=trace_id, + comment="Final response contains the string match.", + metadata={ + "final_response": parsed_event.text, + "string_match": string_match, + }, + ) + return + + langfuse_client.create_score( + name="Valid Final Response", + value=0, + trace_id=trace_id, + comment="Final response does not contains the string match.", + metadata={ + "final_response": parsed_event.text, + "string_match": string_match, + }, + ) + return + + langfuse_client.create_score( + name="Valid Final Response", + value=0, + trace_id=trace_id, + comment="Final response not found in the event", + metadata={ + "string_match": string_match, + }, + ) diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index 7771b9b7..b6978cf2 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -15,6 +15,7 @@ import gradio as gr from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.report_generation.agent import get_report_generation_agent +from aieng.agent_evals.report_generation.evaluation.online import report_if_final_response from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS from dotenv import load_dotenv from google.adk.runners import Runner @@ -65,6 +66,9 @@ async def agent_session_handler( langfuse_project_name=get_langfuse_project_name() if enable_trace else None, ) + # Get the Langfuse client for online reporting + langfuse_client = AsyncClientManager.get_instance().langfuse_client + # Construct an in-memory session for the agent to maintain # conversation history across multiple turns of a chat # This makes it possible to ask follow-up questions that refer to @@ -86,12 +90,17 @@ async def agent_session_handler( session_id=current_session.id, new_message=content, ): + # Report the final response evaluation to Langfuse + report_if_final_response(event, langfuse_client, string_match="](gradio_api/file=") + # Parse the stream events, convert to Gradio chat messages and append to # the chat history turn_messages += agent_event_to_gradio_messages(event) if len(turn_messages) > 0: yield turn_messages + langfuse_client.flush() + @click.command() @click.option("--enable-trace", required=False, default=True, help="Whether to enable tracing with Langfuse.") diff --git a/implementations/report_generation/evaluate.py b/implementations/report_generation/evaluate.py index 4df0879f..d46fda4d 100644 --- a/implementations/report_generation/evaluate.py +++ b/implementations/report_generation/evaluate.py @@ -11,7 +11,7 @@ import asyncio import click -from aieng.agent_evals.report_generation.evaluation import evaluate +from aieng.agent_evals.report_generation.evaluation.offline import evaluate from dotenv import load_dotenv from implementations.report_generation.data.langfuse_upload import DEFAULT_EVALUATION_DATASET_NAME diff --git a/implementations/report_generation/gradio_utils.py b/implementations/report_generation/gradio_utils.py index 76338f8b..9510a3c6 100644 --- a/implementations/report_generation/gradio_utils.py +++ b/implementations/report_generation/gradio_utils.py @@ -1,5 +1,6 @@ """Utility functions for the report generation agent.""" +import json import logging from aieng.agent_evals.report_generation.agent import EventParser, EventType @@ -40,11 +41,15 @@ def agent_event_to_gradio_messages(event: Event) -> list[ChatMessage]: ) ) elif parsed_event.type == EventType.TOOL_CALL: + formatted_arguments = json.dumps(parsed_event.arguments, indent=2).replace("\\n", "\n") output.append( ChatMessage( role="assistant", - content=f"```\n{parsed_event.arguments}\n```", - metadata={"title": f"🛠️ Used tool `{parsed_event.text}`"}, + content=f"```\n{formatted_arguments}\n```", + metadata={ + "title": f"🛠️ Used tool `{parsed_event.text}`", + "status": "done", # This makes it collapsed by default + }, ) ) elif parsed_event.type == EventType.THOUGHT: @@ -56,10 +61,11 @@ def agent_event_to_gradio_messages(event: Event) -> list[ChatMessage]: ) ) elif parsed_event.type == EventType.TOOL_RESPONSE: + formatted_arguments = json.dumps(parsed_event.arguments, indent=2).replace("\\n", "\n") output.append( ChatMessage( role="assistant", - content=f"```\n{parsed_event.arguments}\n```", + content=f"```\n{formatted_arguments}\n```", metadata={ "title": f"📝 Tool call output: `{parsed_event.text}`", "status": "done", # This makes it collapsed by default From cad754af210871628c59b3dbf1e3f5bd036e620d Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Tue, 10 Feb 2026 18:17:28 -0500 Subject: [PATCH 02/15] Finished online scores, need to put it in a thread --- .../report_generation/evaluation/online.py | 207 +++++++++++++++--- implementations/report_generation/demo.py | 12 +- 2 files changed, 180 insertions(+), 39 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py index d487592d..4b0dfe64 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py @@ -1,63 +1,200 @@ """Functions to report online evaluation of the report generation agent to Langfuse.""" +import logging +from typing import Any + +from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.report_generation.agent import EventParser, EventType from google.adk.events.event import Event from langfuse import Langfuse +from langfuse.api.resources.commons.types.observations_view import ObservationsView +from langfuse.api.resources.observations.types.observations_views import ObservationsViews +from tenacity import retry, stop_after_attempt, wait_exponential + + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger(__name__) -def report_if_final_response(event: Event, langfuse_client: Langfuse, string_match: str = "") -> None: +def report_final_response_score(event: Event, string_match: str = "") -> None: """Report a score to Langfuse if the event is a final response. The score will be reported as 1 if the final response is valid and contains the string match. Otherwise, the score will be reported as 0. + This has to be called within the context of a Langfuse trace. + Parameters ---------- event : Event The event to check. - langfuse_client : Langfuse - The Langfuse client to use. string_match : str The string to match in the final response. Optional, default to empty string. + + Raises + ------ + ValueError + If the event is not a final response. + """ + if not event.is_final_response(): + raise ValueError("Event is not a final response") + + langfuse_client = AsyncClientManager.get_instance().langfuse_client + trace_id = langfuse_client.get_current_trace_id() + + if trace_id is None: + raise ValueError("Langfuse trace ID is None.") + + logger.info("Reporting score for valid final response") + + parsed_events = EventParser.parse(event) + for parsed_event in parsed_events: + if parsed_event.type == EventType.FINAL_RESPONSE: + if string_match in parsed_event.text: + score = 1 + comment = "Final response contains the string match." + else: + score = 0 + comment = "Final response does not contains the string match." + + logger.info("Reporting score for valid final response") + langfuse_client.create_score( + name="Valid Final Response", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "final_response": parsed_event.text, + "string_match": string_match, + }, + ) + return + + langfuse_client.create_score( + name="Valid Final Response", + value=0, + trace_id=trace_id, + comment="Final response not found in the event", + metadata={ + "string_match": string_match, + }, + ) + + +def report_usage_scores( + token_threshold: int = 0, + latency_threshold: int = 0, + cost_threshold: float = 0, +) -> None: + """Report usage metrics to Langfuse. + + This function has to be called within the context of a Langfuse trace. + + Parameters + ---------- + token_threshold: int + The token threshold to report the metrics for. + if the token count is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + latency_threshold: int + The latency threshold in seconds to report the metrics for. + if the latency is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + cost_threshold: float + The cost threshold to report the metrics for. + if the cost is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). """ + langfuse_client = AsyncClientManager.get_instance().langfuse_client trace_id = langfuse_client.get_current_trace_id() - if event.is_final_response(): - parsed_events = EventParser.parse(event) - for parsed_event in parsed_events: - if parsed_event.type == EventType.FINAL_RESPONSE: - if string_match in parsed_event.text: - langfuse_client.create_score( - name="Valid Final Response", - value=1, - trace_id=trace_id, - comment="Final response contains the string match.", - metadata={ - "final_response": parsed_event.text, - "string_match": string_match, - }, - ) - return - - langfuse_client.create_score( - name="Valid Final Response", - value=0, - trace_id=trace_id, - comment="Final response does not contains the string match.", - metadata={ - "final_response": parsed_event.text, - "string_match": string_match, - }, - ) - return + if trace_id is None: + raise ValueError("Langfuse trace ID is None.") + + observations = _get_observations_with_retry(trace_id, langfuse_client) + if token_threshold > 0: + total_tokens = sum(_obs_attr(observation, "totalTokens") for observation in observations.data) + if total_tokens <= token_threshold: + score = 1 + comment = "Token count is less than or equal to the threshold." + else: + score = 0 + comment = "Token count is greater than the threshold." + + logger.info("Reporting score for token count") + langfuse_client.create_score( + name="Token Count", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "total_tokens": total_tokens, + "token_threshold": token_threshold, + }, + ) + + if latency_threshold > 0: + total_latency = sum(_obs_attr(observation, "latency") for observation in observations.data) + if total_latency <= latency_threshold: + score = 1 + comment = "Latency is less than or equal to the threshold." + else: + score = 0 + comment = "Latency is greater than the threshold." + + logger.info("Reporting score for latency") + langfuse_client.create_score( + name="Latency", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "total_latency": total_latency, + "latency_threshold": latency_threshold, + }, + ) + + if cost_threshold > 0: + total_cost = sum(_obs_attr(observation, "calculated_total_cost") for observation in observations.data) + if total_cost <= cost_threshold: + score = 1 + comment = "Cost is less than or equal to the threshold." + else: + score = 0 + comment = "Cost is greater than the threshold." + + logger.info("Reporting score for cost") langfuse_client.create_score( - name="Valid Final Response", - value=0, + name="Cost", + value=score, trace_id=trace_id, - comment="Final response not found in the event", + comment=comment, metadata={ - "string_match": string_match, + "total_cost": total_cost, + "cost_threshold": cost_threshold, }, ) + + langfuse_client.flush() + + +def _obs_attr(observation: ObservationsView, attribute: str) -> Any: + attribute_value = getattr(observation, attribute) + if attribute_value == 0: + logger.error(f"Observation attribute value for {attribute} is 0") + return 0 + if attribute_value is None: + logger.error(f"Observation attribute value for {attribute} is None") + return 0 + return attribute_value + + +@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=5, max=15)) +def _get_observations_with_retry(trace_id: str, langfuse_client: Langfuse) -> ObservationsViews: + logger.info(f"Getting observations for trace {trace_id}...") + return langfuse_client.api.observations.get_many(trace_id=trace_id, type="GENERATION") diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index b6978cf2..383b1b92 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -15,7 +15,7 @@ import gradio as gr from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.report_generation.agent import get_report_generation_agent -from aieng.agent_evals.report_generation.evaluation.online import report_if_final_response +from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score, report_usage_scores from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS from dotenv import load_dotenv from google.adk.runners import Runner @@ -90,15 +90,19 @@ async def agent_session_handler( session_id=current_session.id, new_message=content, ): - # Report the final response evaluation to Langfuse - report_if_final_response(event, langfuse_client, string_match="](gradio_api/file=") - # Parse the stream events, convert to Gradio chat messages and append to # the chat history turn_messages += agent_event_to_gradio_messages(event) if len(turn_messages) > 0: yield turn_messages + if event.is_final_response(): + # Report the final response evaluation to Langfuse + report_final_response_score(event, string_match="](gradio_api/file=") + + # TODO: need to put this in a thread + report_usage_scores(token_threshold=10000, latency_threshold=60) + langfuse_client.flush() From 3aedc8eb6ea47e07250a26ea51dad5d2dddb743b Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Wed, 11 Feb 2026 12:06:45 -0500 Subject: [PATCH 03/15] Moving the score reporting to the lasngfuse module for better reusability --- .../aieng/agent_evals/langfuse.py | 123 +++++++++++++++++ .../report_generation/evaluation/online.py | 126 +----------------- implementations/report_generation/demo.py | 17 ++- 3 files changed, 139 insertions(+), 127 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index eda20055..e5d5dade 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -11,11 +11,15 @@ from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.configs import Configs from aieng.agent_evals.progress import track_with_progress +from langfuse import Langfuse +from langfuse.api.resources.commons.types.observations_view import ObservationsView +from langfuse.api.resources.observations.types.observations_views import ObservationsViews from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor +from tenacity import retry, stop_after_attempt, wait_exponential logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") @@ -370,3 +374,122 @@ def _normalize_dataset_record(item: Any, record_number: int) -> dict[str, Any]: "expected_output": item["expected_output"], "metadata": metadata, } + + +def report_usage_scores( + trace_id: str, + token_threshold: int = 0, + latency_threshold: int = 0, + cost_threshold: float = 0, +) -> None: + """Report usage scores to Langfuse for a given trace ID. + + WARNING: Due to the nature of the Langfuse API, this function may hang + while trying to fetch the observations. + + Parameters + ---------- + trace_id: str + The ID of the trace to report the usage scores for. + token_threshold: int + The token threshold to report the score for. + if the token count is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + latency_threshold: int + The latency threshold in seconds to report the score for. + if the latency is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + cost_threshold: float + The cost threshold to report the score for. + if the cost is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + """ + langfuse_client = AsyncClientManager.get_instance().langfuse_client + observations = _get_observations_with_retry(trace_id, langfuse_client) + + if token_threshold > 0: + total_tokens = sum(_obs_attr(observation, "totalTokens") for observation in observations.data) + if total_tokens <= token_threshold: + score = 1 + comment = "Token count is less than or equal to the threshold." + else: + score = 0 + comment = "Token count is greater than the threshold." + + logger.info("Reporting score for token count") + langfuse_client.create_score( + name="Token Count", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "total_tokens": total_tokens, + "token_threshold": token_threshold, + }, + ) + + if latency_threshold > 0: + total_latency = sum(_obs_attr(observation, "latency") for observation in observations.data) + if total_latency <= latency_threshold: + score = 1 + comment = "Latency is less than or equal to the threshold." + else: + score = 0 + comment = "Latency is greater than the threshold." + + logger.info("Reporting score for latency") + langfuse_client.create_score( + name="Latency", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "total_latency": total_latency, + "latency_threshold": latency_threshold, + }, + ) + + if cost_threshold > 0: + total_cost = sum(_obs_attr(observation, "calculated_total_cost") for observation in observations.data) + if total_cost <= cost_threshold: + score = 1 + comment = "Cost is less than or equal to the threshold." + else: + score = 0 + comment = "Cost is greater than the threshold." + + logger.info("Reporting score for cost") + langfuse_client.create_score( + name="Cost", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "total_cost": total_cost, + "cost_threshold": cost_threshold, + }, + ) + + langfuse_client.flush() + + +def _obs_attr(observation: ObservationsView, attribute: str) -> Any: + """Get the value of an attribute from an observation.""" + attribute_value = getattr(observation, attribute) + if attribute_value == 0: + logger.error(f"Observation attribute value for {attribute} is 0") + return 0 + if attribute_value is None: + logger.error(f"Observation attribute value for {attribute} is None") + return 0 + return attribute_value + + +@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=5, max=15)) +def _get_observations_with_retry(trace_id: str, langfuse_client: Langfuse) -> ObservationsViews: + """Get the observations for a given trace ID with retry/backoff.""" + logger.info(f"Getting observations for trace {trace_id}...") + return langfuse_client.api.observations.get_many(trace_id=trace_id, type="GENERATION") diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py index 4b0dfe64..fdf26e46 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py @@ -1,15 +1,10 @@ """Functions to report online evaluation of the report generation agent to Langfuse.""" import logging -from typing import Any from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.report_generation.agent import EventParser, EventType from google.adk.events.event import Event -from langfuse import Langfuse -from langfuse.api.resources.commons.types.observations_view import ObservationsView -from langfuse.api.resources.observations.types.observations_views import ObservationsViews -from tenacity import retry, stop_after_attempt, wait_exponential logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") @@ -46,8 +41,6 @@ def report_final_response_score(event: Event, string_match: str = "") -> None: if trace_id is None: raise ValueError("Langfuse trace ID is None.") - logger.info("Reporting score for valid final response") - parsed_events = EventParser.parse(event) for parsed_event in parsed_events: if parsed_event.type == EventType.FINAL_RESPONSE: @@ -69,8 +62,10 @@ def report_final_response_score(event: Event, string_match: str = "") -> None: "string_match": string_match, }, ) + langfuse_client.flush() return + logger.info("Reporting score for invalid final response") langfuse_client.create_score( name="Valid Final Response", value=0, @@ -80,121 +75,4 @@ def report_final_response_score(event: Event, string_match: str = "") -> None: "string_match": string_match, }, ) - - -def report_usage_scores( - token_threshold: int = 0, - latency_threshold: int = 0, - cost_threshold: float = 0, -) -> None: - """Report usage metrics to Langfuse. - - This function has to be called within the context of a Langfuse trace. - - Parameters - ---------- - token_threshold: int - The token threshold to report the metrics for. - if the token count is greater than the threshold, the score - will be reported as 0. - Optional, default to 0 (no reporting). - latency_threshold: int - The latency threshold in seconds to report the metrics for. - if the latency is greater than the threshold, the score - will be reported as 0. - Optional, default to 0 (no reporting). - cost_threshold: float - The cost threshold to report the metrics for. - if the cost is greater than the threshold, the score - will be reported as 0. - Optional, default to 0 (no reporting). - """ - langfuse_client = AsyncClientManager.get_instance().langfuse_client - trace_id = langfuse_client.get_current_trace_id() - - if trace_id is None: - raise ValueError("Langfuse trace ID is None.") - - observations = _get_observations_with_retry(trace_id, langfuse_client) - - if token_threshold > 0: - total_tokens = sum(_obs_attr(observation, "totalTokens") for observation in observations.data) - if total_tokens <= token_threshold: - score = 1 - comment = "Token count is less than or equal to the threshold." - else: - score = 0 - comment = "Token count is greater than the threshold." - - logger.info("Reporting score for token count") - langfuse_client.create_score( - name="Token Count", - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "total_tokens": total_tokens, - "token_threshold": token_threshold, - }, - ) - - if latency_threshold > 0: - total_latency = sum(_obs_attr(observation, "latency") for observation in observations.data) - if total_latency <= latency_threshold: - score = 1 - comment = "Latency is less than or equal to the threshold." - else: - score = 0 - comment = "Latency is greater than the threshold." - - logger.info("Reporting score for latency") - langfuse_client.create_score( - name="Latency", - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "total_latency": total_latency, - "latency_threshold": latency_threshold, - }, - ) - - if cost_threshold > 0: - total_cost = sum(_obs_attr(observation, "calculated_total_cost") for observation in observations.data) - if total_cost <= cost_threshold: - score = 1 - comment = "Cost is less than or equal to the threshold." - else: - score = 0 - comment = "Cost is greater than the threshold." - - logger.info("Reporting score for cost") - langfuse_client.create_score( - name="Cost", - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "total_cost": total_cost, - "cost_threshold": cost_threshold, - }, - ) - langfuse_client.flush() - - -def _obs_attr(observation: ObservationsView, attribute: str) -> Any: - attribute_value = getattr(observation, attribute) - if attribute_value == 0: - logger.error(f"Observation attribute value for {attribute} is 0") - return 0 - if attribute_value is None: - logger.error(f"Observation attribute value for {attribute} is None") - return 0 - return attribute_value - - -@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=5, max=15)) -def _get_observations_with_retry(trace_id: str, langfuse_client: Langfuse) -> ObservationsViews: - logger.info(f"Getting observations for trace {trace_id}...") - return langfuse_client.api.observations.get_many(trace_id=trace_id, type="GENERATION") diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index 383b1b92..e39f05ca 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -8,14 +8,16 @@ import asyncio import logging +import threading from functools import partial from typing import Any, AsyncGenerator import click import gradio as gr from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.langfuse import report_usage_scores from aieng.agent_evals.report_generation.agent import get_report_generation_agent -from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score, report_usage_scores +from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS from dotenv import load_dotenv from google.adk.runners import Runner @@ -100,8 +102,17 @@ async def agent_session_handler( # Report the final response evaluation to Langfuse report_final_response_score(event, string_match="](gradio_api/file=") - # TODO: need to put this in a thread - report_usage_scores(token_threshold=10000, latency_threshold=60) + # Run usage scoring in a thread so it doesn't block the UI + thread = threading.Thread( + target=report_usage_scores, + kwargs={ + "trace_id": langfuse_client.get_current_trace_id(), + "token_threshold": 10000, + "latency_threshold": 60, + }, + daemon=True, + ) + thread.start() langfuse_client.flush() From aed2cf285803aa37397e41161533b50bc87b302c Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 11:10:34 -0500 Subject: [PATCH 04/15] Adding missing init files --- aieng-eval-agents/aieng/agent_evals/report_generation/__init__.py | 0 .../aieng/agent_evals/report_generation/evaluation/__init__.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/__init__.py create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/__init__.py diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/__init__.py b/aieng-eval-agents/aieng/agent_evals/report_generation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/__init__.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/__init__.py new file mode 100644 index 00000000..e69de29b From efa320e0164dd5ab09e11db9836bc773787ee561 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 12:21:14 -0500 Subject: [PATCH 05/15] Using the trace fetch functions instead of making them myself --- .../aieng/agent_evals/evaluation/trace.py | 15 ++- .../aieng/agent_evals/langfuse.py | 125 ++++++------------ implementations/report_generation/demo.py | 2 +- 3 files changed, 56 insertions(+), 86 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py index 4fa543f9..5a749f1b 100644 --- a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py +++ b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py @@ -20,7 +20,6 @@ TraceObservationPredicate, TraceWaitConfig, ) -from aieng.agent_evals.langfuse import flush_traces from langfuse import Langfuse from langfuse.api import ObservationsView from langfuse.api.core import ApiError @@ -143,6 +142,16 @@ async def _evaluate_item( return result +def flush_traces() -> None: + """Flush any pending traces to Langfuse. + + Call this before your application exits to ensure all traces are sent. + """ + manager = AsyncClientManager.get_instance() + if manager._langfuse_client is not None: + manager._langfuse_client.flush() + + def extract_trace_metrics( trace: TraceWithFullDetails, *, @@ -226,7 +235,7 @@ async def _evaluate_trace( return [], TraceEvalStatus.SKIPPED, "Missing `trace_id` on experiment item result." try: - trace, ready = await _fetch_trace_with_wait(langfuse_client, trace_id, wait) + trace, ready = await fetch_trace_with_wait(langfuse_client, trace_id, wait) except Exception as exc: return [], TraceEvalStatus.FAILED, f"Trace fetch failed: {exc}" @@ -248,7 +257,7 @@ async def _evaluate_trace( return evaluations, TraceEvalStatus.OK, None -async def _fetch_trace_with_wait( +async def fetch_trace_with_wait( langfuse_client: Langfuse, trace_id: str, wait: TraceWaitConfig ) -> tuple[TraceWithFullDetails | None, bool]: """Fetch a trace with retry/backoff until it is ready or timeout expires.""" diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index e5d5dade..1b087f4e 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -1,5 +1,6 @@ """Functions and objects pertaining to Langfuse.""" +import asyncio import base64 import hashlib import json @@ -10,16 +11,15 @@ from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.configs import Configs +from aieng.agent_evals.evaluation.trace import extract_trace_metrics, fetch_trace_with_wait +from aieng.agent_evals.evaluation.types import TraceWaitConfig from aieng.agent_evals.progress import track_with_progress from langfuse import Langfuse -from langfuse.api.resources.commons.types.observations_view import ObservationsView -from langfuse.api.resources.observations.types.observations_views import ObservationsViews from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor -from tenacity import retry, stop_after_attempt, wait_exponential logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") @@ -164,16 +164,6 @@ def init_tracing(service_name: str = "aieng-eval-agents") -> bool: return False -def flush_traces() -> None: - """Flush any pending traces to Langfuse. - - Call this before your application exits to ensure all traces are sent. - """ - manager = AsyncClientManager.get_instance() - if manager._langfuse_client is not None: - manager._langfuse_client.flush() - - def is_tracing_enabled() -> bool: """Check if Langfuse tracing is currently enabled. @@ -392,7 +382,7 @@ def report_usage_scores( trace_id: str The ID of the trace to report the usage scores for. token_threshold: int - The token threshold to report the score for. + The total token (input + output) threshold to report the score for. if the token count is greater than the threshold, the score will be reported as 0. Optional, default to 0 (no reporting). @@ -408,88 +398,59 @@ def report_usage_scores( Optional, default to 0 (no reporting). """ langfuse_client = AsyncClientManager.get_instance().langfuse_client - observations = _get_observations_with_retry(trace_id, langfuse_client) - if token_threshold > 0: - total_tokens = sum(_obs_attr(observation, "totalTokens") for observation in observations.data) - if total_tokens <= token_threshold: - score = 1 - comment = "Token count is less than or equal to the threshold." - else: - score = 0 - comment = "Token count is greater than the threshold." + logger.info(f"Fetching trace {trace_id}...") + trace, ready = asyncio.run( + fetch_trace_with_wait(langfuse_client, trace_id, TraceWaitConfig()), + ) - logger.info("Reporting score for token count") - langfuse_client.create_score( - name="Token Count", - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "total_tokens": total_tokens, - "token_threshold": token_threshold, - }, - ) + if trace is None: + logger.error(f"Trace {trace_id} not found. Will not report usage scores.") + return - if latency_threshold > 0: - total_latency = sum(_obs_attr(observation, "latency") for observation in observations.data) - if total_latency <= latency_threshold: - score = 1 - comment = "Latency is less than or equal to the threshold." - else: - score = 0 - comment = "Latency is greater than the threshold." + if not ready: + logger.warning(f"Trace {trace_id} is not ready. Scores will be reported on partial traces.") - logger.info("Reporting score for latency") - langfuse_client.create_score( - name="Latency", - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "total_latency": total_latency, - "latency_threshold": latency_threshold, - }, - ) + trace_metrics = extract_trace_metrics(trace) + + if token_threshold > 0: + total_tokens = trace_metrics.total_input_tokens + trace_metrics.total_output_tokens + _report_score(langfuse_client, "Token Count", total_tokens, token_threshold, trace_id) + + if latency_threshold > 0: + _report_score(langfuse_client, "Latency", trace_metrics.latency_sec, latency_threshold, trace_id) if cost_threshold > 0: - total_cost = sum(_obs_attr(observation, "calculated_total_cost") for observation in observations.data) - if total_cost <= cost_threshold: + _report_score(langfuse_client, "Cost", trace_metrics.total_cost, cost_threshold, trace_id) + + langfuse_client.flush() + + +def _report_score( + langfuse_client: Langfuse, + name: str, + value: int | float | None, + threshold: int | float, + trace_id: str, +) -> None: + if value is None: + logger.error(f"Trace {trace_id} has no value for {name}. Will not report score for {name}.") + + else: + if value <= threshold: score = 1 - comment = "Cost is less than or equal to the threshold." + comment = f"{value} is less than or equal to the threshold." else: score = 0 - comment = "Cost is greater than the threshold." + comment = f"{value} is greater than the threshold." - logger.info("Reporting score for cost") langfuse_client.create_score( - name="Cost", + name=name, value=score, trace_id=trace_id, comment=comment, metadata={ - "total_cost": total_cost, - "cost_threshold": cost_threshold, + "value": value, + "threshold": threshold, }, ) - - langfuse_client.flush() - - -def _obs_attr(observation: ObservationsView, attribute: str) -> Any: - """Get the value of an attribute from an observation.""" - attribute_value = getattr(observation, attribute) - if attribute_value == 0: - logger.error(f"Observation attribute value for {attribute} is 0") - return 0 - if attribute_value is None: - logger.error(f"Observation attribute value for {attribute} is None") - return 0 - return attribute_value - - -@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=5, max=15)) -def _get_observations_with_retry(trace_id: str, langfuse_client: Langfuse) -> ObservationsViews: - """Get the observations for a given trace ID with retry/backoff.""" - logger.info(f"Getting observations for trace {trace_id}...") - return langfuse_client.api.observations.get_many(trace_id=trace_id, type="GENERATION") diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index e39f05ca..a5eb2672 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -107,7 +107,7 @@ async def agent_session_handler( target=report_usage_scores, kwargs={ "trace_id": langfuse_client.get_current_trace_id(), - "token_threshold": 10000, + "token_threshold": 20000, "latency_threshold": 60, }, daemon=True, From 1de6a3716dd2662d6839ace9cd2f856adfd9f3af Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 12:22:50 -0500 Subject: [PATCH 06/15] Adjusting the token threshold to 15k --- implementations/report_generation/demo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index a5eb2672..9a3dbfb7 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -107,7 +107,7 @@ async def agent_session_handler( target=report_usage_scores, kwargs={ "trace_id": langfuse_client.get_current_trace_id(), - "token_threshold": 20000, + "token_threshold": 15000, "latency_threshold": 60, }, daemon=True, From ac7ba2c662c46c9b09135435926f91ee290f4c83 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 12:23:52 -0500 Subject: [PATCH 07/15] Adding log for every metric reported --- aieng-eval-agents/aieng/agent_evals/langfuse.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index 1b087f4e..8f5f59ef 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -444,6 +444,7 @@ def _report_score( score = 0 comment = f"{value} is greater than the threshold." + logger.info(f"Reporting score for {name}") langfuse_client.create_score( name=name, value=score, From 0bf23513977680e2fe4451d25f5d4c22cce29e98 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 17:25:25 -0500 Subject: [PATCH 08/15] Adding agent.py to the demo as well --- .../aieng/agent_evals/langfuse.py | 40 ++++++----- .../agent_evals/report_generation/agent.py | 3 + implementations/report_generation/agent.py | 67 +++++++++++++++++++ implementations/report_generation/demo.py | 24 +++++-- 4 files changed, 111 insertions(+), 23 deletions(-) create mode 100644 implementations/report_generation/agent.py diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index 8f5f59ef..88667da9 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -435,23 +435,27 @@ def _report_score( ) -> None: if value is None: logger.error(f"Trace {trace_id} has no value for {name}. Will not report score for {name}.") + return + + if value == 0: + logger.error(f"Trace {trace_id} has a value of 0 for {name}. Will not report score for {name}.") + return + if value <= threshold: + score = 1 + comment = f"{value} is less than or equal to the threshold." else: - if value <= threshold: - score = 1 - comment = f"{value} is less than or equal to the threshold." - else: - score = 0 - comment = f"{value} is greater than the threshold." - - logger.info(f"Reporting score for {name}") - langfuse_client.create_score( - name=name, - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "value": value, - "threshold": threshold, - }, - ) + score = 0 + comment = f"{value} is greater than the threshold." + + logger.info(f"Reporting score for {name}") + langfuse_client.create_score( + name=name, + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "value": value, + "threshold": threshold, + }, + ) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py index 4daf8a1d..58743154 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py @@ -24,6 +24,7 @@ from aieng.agent_evals.langfuse import setup_langfuse_tracer from aieng.agent_evals.report_generation.file_writer import ReportFileWriter from google.adk.agents import Agent +from google.adk.agents.base_agent import AfterAgentCallback from google.adk.events.event import Event from pydantic import BaseModel @@ -36,6 +37,7 @@ def get_report_generation_agent( instructions: str, reports_output_path: Path, langfuse_project_name: str | None, + after_agent_callback: AfterAgentCallback | None = None, ) -> Agent: """ Define the report generation agent. @@ -72,6 +74,7 @@ def get_report_generation_agent( client_manager.report_generation_db().get_schema_info, report_file_writer.write_xlsx, ], + after_agent_callback=after_agent_callback, ) diff --git a/implementations/report_generation/agent.py b/implementations/report_generation/agent.py new file mode 100644 index 00000000..75fe2984 --- /dev/null +++ b/implementations/report_generation/agent.py @@ -0,0 +1,67 @@ +"""Entry point for the Google ADK UI for the report generation agent. + +Example +------- +$ adk web implementations/ +""" + +import logging +import threading + +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.langfuse import report_usage_scores +from aieng.agent_evals.report_generation.agent import get_report_generation_agent +from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score +from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS +from dotenv import load_dotenv +from google.adk.agents.callback_context import CallbackContext + +from .env_vars import get_langfuse_project_name, get_reports_output_path + + +load_dotenv(verbose=True) +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger(__name__) + + +def calculate_and_send_scores(callback_context: CallbackContext) -> None: + """Calculate token usage and latency scores and submit them to Langfuse. + + This is a callback function to be called after the agent has run. + + Parameters + ---------- + callback_context : CallbackContext + The callback context at the end of the agent run. + """ + langfuse_client = AsyncClientManager.get_instance().langfuse_client + langfuse_client.flush() + + for event in callback_context.session.events: + if event.is_final_response() and event.content and event.content.role == "model": + # Report the final response evaluation to Langfuse + report_final_response_score(event, string_match="](gradio_api/file=") + + # Run usage scoring in a thread so it doesn't block the UI + thread = threading.Thread( + target=report_usage_scores, + kwargs={ + "trace_id": langfuse_client.get_current_trace_id(), + "token_threshold": 15000, + "latency_threshold": 60, + }, + daemon=True, + ) + thread.start() + + return + + logger.error("No final response found in the callback context. Will not report scores to Langfuse.") + + +root_agent = get_report_generation_agent( + instructions=MAIN_AGENT_INSTRUCTIONS, + reports_output_path=get_reports_output_path(), + langfuse_project_name=get_langfuse_project_name(), + after_agent_callback=calculate_and_send_scores, +) diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index 9a3dbfb7..b2ca8b22 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -20,6 +20,7 @@ from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS from dotenv import load_dotenv +from google.adk.agents.callback_context import CallbackContext from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService from google.genai.types import Content, Part @@ -66,11 +67,9 @@ async def agent_session_handler( instructions=MAIN_AGENT_INSTRUCTIONS, reports_output_path=get_reports_output_path(), langfuse_project_name=get_langfuse_project_name() if enable_trace else None, + after_agent_callback=calculate_and_send_scores, ) - # Get the Langfuse client for online reporting - langfuse_client = AsyncClientManager.get_instance().langfuse_client - # Construct an in-memory session for the agent to maintain # conversation history across multiple turns of a chat # This makes it possible to ask follow-up questions that refer to @@ -98,11 +97,24 @@ async def agent_session_handler( if len(turn_messages) > 0: yield turn_messages - if event.is_final_response(): + +def calculate_and_send_scores(callback_context: CallbackContext) -> None: + """Calculate token usage and latency scores and submit them to Langfuse. + + This is a callback function to be called after the agent has run. + + Parameters + ---------- + callback_context : CallbackContext + The callback context at the end of the agent run. + """ + for event in callback_context.session.events: + if event.is_final_response() and event.content and event.content.role == "model": # Report the final response evaluation to Langfuse report_final_response_score(event, string_match="](gradio_api/file=") # Run usage scoring in a thread so it doesn't block the UI + langfuse_client = AsyncClientManager.get_instance().langfuse_client thread = threading.Thread( target=report_usage_scores, kwargs={ @@ -114,7 +126,9 @@ async def agent_session_handler( ) thread.start() - langfuse_client.flush() + return + + logger.error("No final response found in the callback context. Will not report scores to Langfuse.") @click.command() From 20ede2d2d565d033a604a0f33f5838b512c9d8b0 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 18:52:59 -0500 Subject: [PATCH 09/15] Adding feedback button --- implementations/report_generation/demo.py | 108 +++++++++++++++++----- 1 file changed, 84 insertions(+), 24 deletions(-) diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index b2ca8b22..612bbd3a 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -35,6 +35,9 @@ logger = logging.getLogger(__name__) +GRADIO_STATE = gr.State(value={"trace_id": None}) + + async def agent_session_handler( query: str, history: list[ChatMessage], @@ -60,6 +63,9 @@ async def agent_session_handler( AsyncGenerator[list[ChatMessage], Any] An async chat messages generator. """ + # Reset the trace ID in the state + GRADIO_STATE.value["trace_id"] = None + # Initialize list of chat messages for a single turn turn_messages: list[ChatMessage] = [] @@ -110,15 +116,21 @@ def calculate_and_send_scores(callback_context: CallbackContext) -> None: """ for event in callback_context.session.events: if event.is_final_response() and event.content and event.content.role == "model": + langfuse_client = AsyncClientManager.get_instance().langfuse_client + trace_id = langfuse_client.get_current_trace_id() + + # Storing the trace ID in the state so it can be used + # in the feedback buttons callback + GRADIO_STATE.value["trace_id"] = trace_id + # Report the final response evaluation to Langfuse report_final_response_score(event, string_match="](gradio_api/file=") # Run usage scoring in a thread so it doesn't block the UI - langfuse_client = AsyncClientManager.get_instance().langfuse_client thread = threading.Thread( target=report_usage_scores, kwargs={ - "trace_id": langfuse_client.get_current_trace_id(), + "trace_id": trace_id, "token_threshold": 15000, "latency_threshold": 60, }, @@ -131,6 +143,35 @@ def calculate_and_send_scores(callback_context: CallbackContext) -> None: logger.error("No final response found in the callback context. Will not report scores to Langfuse.") +def on_feedback(liked: bool): + """Handle thumbs up (liked=True) or thumbs down (liked=False).""" + trace_id = GRADIO_STATE.value["trace_id"] + if trace_id is None: + logger.error("No trace ID found in the state. Will not report feedback to Langfuse.") + return None + + score = 1 if liked else 0 + + logger.info(f"Reporting user feedback score for trace {trace_id} with value {score}") + langfuse_client = AsyncClientManager.get_instance().langfuse_client + langfuse_client.create_score( + value=score, + name="User Feedback", + comment=f"The user gave this response a thumbs {'up' if liked else 'down'}.", + trace_id=trace_id, + ) + langfuse_client.flush() + + GRADIO_STATE.value["trace_id"] = None + return gr.update(visible=False), gr.update(visible=True) + + +def toggle_feedback_row(): + """Toggle the feedback row if there is a trace ID in the state.""" + trace_id = GRADIO_STATE.value["trace_id"] + return gr.update(visible=trace_id is not None and trace_id != ""), gr.update(visible=False) + + @click.command() @click.option("--enable-trace", required=False, default=True, help="Whether to enable tracing with Langfuse.") @click.option( @@ -153,33 +194,52 @@ def start_gradio_app(enable_trace: bool = True, enable_public_link: bool = False """ partial_agent_session_handler = partial(agent_session_handler, enable_trace=enable_trace) - demo = gr.ChatInterface( - partial_agent_session_handler, - chatbot=gr.Chatbot(height=600), - textbox=gr.Textbox(lines=1, placeholder="Enter your prompt"), - # Additional input to maintain session state across multiple turns - # NOTE: Examples must be a list of lists when additional inputs are provided - additional_inputs=gr.State(value={}, render=False), - examples=[ - ["Generate a monthly sales performance report."], - ["Generate a report of the top 5 selling products per year and the total sales value for each product."], - ["Generate a report of the average order value per invoice per month."], - [ - "Generate a report with the month-over-month trends in sales. The report should include the monthly sales, the month-over-month change and the percentage change." - ], - ["Generate a report on sales revenue by country per year."], - ["Generate a report on the 5 highest-value customers per year vs. the average customer."], - [ - "Generate a report on the average amount spent by one time buyers for each year vs. the average customer." - ], - ], - title="2.1: ReAct for Retrieval-Augmented Generation with OpenAI Agent SDK", - ) + with gr.Blocks(title="Report Generator Agent") as demo: + with gr.Row(): + gradio_chatbot = gr.Chatbot(height=600) + gr.ChatInterface( + partial_agent_session_handler, + chatbot=gradio_chatbot, + textbox=gr.Textbox(lines=1, placeholder="Enter your prompt"), + # Additional input to maintain session state across multiple turns + # NOTE: Examples must be a list of lists when additional inputs + # are provided + additional_inputs=gr.State(value={}, render=False), + examples=[ + ["Generate a monthly sales performance report."], + [ + "Generate a report of the top 5 selling products per year and the total sales value for each product." + ], + ["Generate a report of the average order value per invoice per month."], + [ + "Generate a report with the month-over-month trends in sales. The report should include the monthly sales, the month-over-month change and the percentage change." + ], + ["Generate a report on sales revenue by country per year."], + ["Generate a report on the 5 highest-value customers per year vs. the average customer."], + [ + "Generate a report on the average amount spent by one time buyers for each year vs. the average customer." + ], + ], + ) + + with gr.Row(elem_id="thank_you_msg", visible=False) as thank_you_row: + gr.Markdown("Thank you for your feedback 🙂") + + # Feedback buttons + with gr.Row(elem_id="feedback_buttons", visible=False) as feedback_row: + gr.Markdown("Provide feedback on the response:") + thumbs_up = gr.Button("👍") + thumbs_up.click(fn=lambda: on_feedback(True), outputs=[feedback_row, thank_you_row]) + thumbs_down = gr.Button("👎") + thumbs_down.click(fn=lambda: on_feedback(False), outputs=[feedback_row, thank_you_row]) + + gradio_chatbot.change(fn=toggle_feedback_row, outputs=[feedback_row, thank_you_row]) try: demo.launch( share=enable_public_link, allowed_paths=[str(get_reports_output_path().absolute())], + css="#feedback_buttons { width: 600px; }", ) finally: asyncio.run(AsyncClientManager.get_instance().close()) From 1f29c7df256cfb5b32ad2dbfbf0812db7c830f71 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Fri, 13 Feb 2026 09:48:09 -0500 Subject: [PATCH 10/15] Using init_tracing instead --- .../aieng/agent_evals/report_generation/agent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py index 58743154..9629fc41 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py @@ -21,7 +21,7 @@ from typing import Any from aieng.agent_evals.async_client_manager import AsyncClientManager -from aieng.agent_evals.langfuse import setup_langfuse_tracer +from aieng.agent_evals.langfuse import init_tracing from aieng.agent_evals.report_generation.file_writer import ReportFileWriter from google.adk.agents import Agent from google.adk.agents.base_agent import AfterAgentCallback @@ -58,7 +58,7 @@ def get_report_generation_agent( """ # Setup langfuse tracing if project name is provided if langfuse_project_name: - setup_langfuse_tracer(langfuse_project_name) + init_tracing(langfuse_project_name) # Get the client manager singleton instance client_manager = AsyncClientManager.get_instance() From 80717babcb112389714efa9273a9b3bb76771c44 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Tue, 17 Feb 2026 11:25:11 -0500 Subject: [PATCH 11/15] Adding missing docstring and return type hints --- implementations/report_generation/demo.py | 29 +++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index 8081ae32..795ef38a 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -144,8 +144,23 @@ def calculate_and_send_scores(callback_context: CallbackContext) -> None: logger.error("No final response found in the callback context. Will not report scores to Langfuse.") -def on_feedback(liked: bool): - """Handle thumbs up (liked=True) or thumbs down (liked=False).""" +def on_feedback(liked: bool) -> tuple[dict[str, Any], dict[str, Any]] | None: + """Handle thumbs up (liked=True) or thumbs down (liked=False). + + Send the result of the feedback to Langfuse and returns the updated + states for the feedback row and the thank you message row. + + Parameters + ---------- + liked : bool + Whether the user liked the agent's response. + + Returns + ------- + tuple[dict[str, Any], dict[str, Any]] | None + The updated states for the feedback row and the thank you message row. + If no trace ID is found in the state, returns None. + """ trace_id = GRADIO_STATE.value["trace_id"] if trace_id is None: logger.error("No trace ID found in the state. Will not report feedback to Langfuse.") @@ -167,8 +182,14 @@ def on_feedback(liked: bool): return gr.update(visible=False), gr.update(visible=True) -def toggle_feedback_row(): - """Toggle the feedback row if there is a trace ID in the state.""" +def toggle_feedback_row() -> tuple[dict[str, Any], dict[str, Any]]: + """Toggle the feedback row if there is a trace ID in the state. + + Returns + ------- + tuple[dict[str, Any], dict[str, Any]] + The updated states for the feedback row and the thank you message row. + """ trace_id = GRADIO_STATE.value["trace_id"] return gr.update(visible=trace_id is not None and trace_id != ""), gr.update(visible=False) From fd69cc32e10a522900002cdfae4d39ad38dec396 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Tue, 17 Feb 2026 12:28:34 -0500 Subject: [PATCH 12/15] Adding max concurrency parameter to evaluation and updating the readme file --- .../report_generation/evaluation/offline.py | 5 +++- implementations/report_generation/README.md | 30 ++++++++++++++++--- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py index ccd82468..0b0a12d1 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py @@ -62,6 +62,7 @@ async def evaluate( dataset_name: str, reports_output_path: Path, langfuse_project_name: str, + max_concurrency: int = 5, ) -> None: """Evaluate the report generation agent against a Langfuse dataset. @@ -76,6 +77,8 @@ async def evaluate( The path to the reports output directory. langfuse_project_name : str The name of the Langfuse project to use for tracing. + max_concurrency : int, optional + The maximum concurrency to use for the evaluation, by default 5. """ # Get the client manager singleton instance and langfuse client client_manager = AsyncClientManager.get_instance() @@ -99,7 +102,7 @@ async def evaluate( description="Evaluate the Report Generation Agent with data from Langfuse", task=report_generation_task.run, evaluators=[final_result_evaluator, trajectory_evaluator], - max_concurrency=1, + max_concurrency=max_concurrency, ) # Log the evaluation result diff --git a/implementations/report_generation/README.md b/implementations/report_generation/README.md index 967de0c4..e3e3020d 100644 --- a/implementations/report_generation/README.md +++ b/implementations/report_generation/README.md @@ -56,6 +56,21 @@ features a text input so you can make your own report requests to it. The agent will automatically upload a trace to Langfuse that can be used to evaluate the run or debug any issues. +### Running the agent using Google ADK + +If you wish to run the agent using the Google Agent Development Kit UI, +please run the command below from the project's root folder: + +```bash +adk web implementations/ +``` + +Once the service is up, it will be available at `http://127.0.0.1:8000`. On the +"Select an Agent" dropdown, please select "report_generation". You can type a request to +the agent on the text box on the right hand side panel. + + + ## Running the Evaluations ### Uploading the Ground Truth Dataset to Langfuse @@ -76,16 +91,16 @@ To upload custom data or use a different dataset name, please run: uv run --env-file .env python -m implementations.report_generation.data.langfuse_upload --dataset-path --dataset-name ``` -### Running the Evaluation Script +### Running the Offline Evaluation Script -Once the dataset has been uploaded to Langfuse, the evaluations can be run with -the command below: +Once the dataset has been uploaded to Langfuse, the offline evaluations +against a pre-determined dataset can be run with the command below: ```bash uv run --env-file .env python -m implementations.report_generation.evaluate ``` -To run the evaluations against a custom dataset, please execute: +To run the offline evaluations against a custom dataset, please execute: ```bash uv run --env-file .env python -m implementations.report_generation.evaluate --dataset-name @@ -98,3 +113,10 @@ agent used against the ground truth and produce True/False scores along with a r At the end of the run, an evaluation report will be displayed along with a link to check details about the evaluation in Langfuse. + +### Online Evaluations + +The agent is also set to collect online evaluation metrics in both the Gradio Demo UI +and the Google ADK UI. The online evaluations will check if the token usage and execution +time are higher than a certain threshold defined in the code, and it will also +check if the final result is present and contains a link to the report. From fc12d4904a0b03dc1f906cbdb9677968eb1e4ba3 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Tue, 17 Feb 2026 12:33:11 -0500 Subject: [PATCH 13/15] Adding user feedback to the readme file --- implementations/report_generation/README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/implementations/report_generation/README.md b/implementations/report_generation/README.md index e3e3020d..fb417e14 100644 --- a/implementations/report_generation/README.md +++ b/implementations/report_generation/README.md @@ -120,3 +120,10 @@ The agent is also set to collect online evaluation metrics in both the Gradio De and the Google ADK UI. The online evaluations will check if the token usage and execution time are higher than a certain threshold defined in the code, and it will also check if the final result is present and contains a link to the report. + +### User Feedback + +On the Gradio Demo UI, there are two buttons to record user feedback: a thumbs +up button to record positive user feedback and a thumbs down button to record +negative user feedback. The buttons will appear at the end of the agent's execution +and it will record the user feedback as Langfuse scores. From a850f096f1b253b25f647fe8ebe2a32dca59894d Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Tue, 17 Feb 2026 12:34:24 -0500 Subject: [PATCH 14/15] Adding one more paragraph --- implementations/report_generation/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/implementations/report_generation/README.md b/implementations/report_generation/README.md index e3e3020d..641527e3 100644 --- a/implementations/report_generation/README.md +++ b/implementations/report_generation/README.md @@ -120,3 +120,6 @@ The agent is also set to collect online evaluation metrics in both the Gradio De and the Google ADK UI. The online evaluations will check if the token usage and execution time are higher than a certain threshold defined in the code, and it will also check if the final result is present and contains a link to the report. + +Those evaluation results will be sent to Langfuse as scores, where they can be analyzed +both in an aggregate fashion as well as individually. From c30b56fa8f4557a98357944b7996c4618d928ddb Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Tue, 17 Feb 2026 13:52:55 -0500 Subject: [PATCH 15/15] Adding missing docstring --- aieng-eval-agents/aieng/agent_evals/report_generation/agent.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py index cc1ede5d..13744901 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py @@ -51,6 +51,9 @@ def get_report_generation_agent( The path to the reports output directory. langfuse_project_name : str | None The name of the Langfuse project to use for tracing. + after_agent_callback : AfterAgentCallback | None + The callback function to be called after the agent has + finished executing. Returns -------