diff --git a/.env.example b/.env.example index c76f0e3d..b9423aee 100644 --- a/.env.example +++ b/.env.example @@ -9,7 +9,12 @@ DEFAULT_PLANNER_MODEL="gemini-2.5-pro" DEFAULT_WORKER_MODEL="gemini-2.5-flash" DEFAULT_EVALUATOR_MODEL="gemini-2.5-pro" -# LangFuse (optional, for tracing) +# LangFuse for agent execution tracing and evaluations LANGFUSE_SECRET_KEY="sk-lf-..." LANGFUSE_PUBLIC_KEY="pk-lf-..." LANGFUSE_HOST="https://us.cloud.langfuse.com" + +# Report Generation (all optional, defaults are in implementations/report_generation/env_vars.py) +REPORT_GENERATION_OUTPUT_PATH="..." +REPORT_GENERATION_DB_PATH="..." +REPORT_GENERATION_LANGFUSE_PROJECT_NAME="..." diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index eb701f37..05bcd774 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -1,11 +1,13 @@ """Functions and objects pertaining to Langfuse.""" import base64 +import json import logging import os import logfire import nest_asyncio +from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.configs import Configs from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter @@ -75,3 +77,48 @@ def setup_langfuse_tracer(service_name: str = "aieng-eval-agents") -> "trace.Tra # Set the global default tracer provider trace.set_tracer_provider(trace_provider) return trace.get_tracer(__name__) + + +async def upload_dataset_to_langfuse(dataset_path: str, dataset_name: str): + """Upload a dataset to Langfuse. + + Parameters + ---------- + dataset_path : str + Path to the dataset to upload. The dataset must be a json file + containing a list of dictionaries. Each dictionary must contain a + `input` and `expected_output` keys. Additionally, it can include + an `id` key that will be added to the metadata of the dataset item. + dataset_name : str + Name of the dataset to upload. + """ + # Get the client manager singleton instance and langfuse client + client_manager = AsyncClientManager.get_instance() + langfuse_client = client_manager.langfuse_client + + # Load the ground truth dataset from the file path + logger.info(f"Loading dataset from '{dataset_path}'") + with open(dataset_path, "r") as file: + dataset = json.load(file) + + # Create the dataset in Langfuse + langfuse_client.create_dataset(name=dataset_name) + + # Upload each item to the dataset + for item in dataset: + assert "input" in item, "`input` is required for all items in the dataset" + assert "expected_output" in item, "`expected_output` is required for all items in the dataset" + + langfuse_client.create_dataset_item( + dataset_name=dataset_name, + input=item["input"], + expected_output=item["expected_output"], + metadata={ + "id": item.get("id", None), + }, + ) + + logger.info(f"Uploaded {len(dataset)} items to dataset '{dataset_name}'") + + # Gracefully close the services + await client_manager.close() diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py new file mode 100644 index 00000000..c7fa14c4 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py @@ -0,0 +1,82 @@ +""" +Definitions for the the report generation agent. + +Example +------- +>>> from aieng.agent_evals.report_generation.agent import get_report_generation_agent +>>> from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS +>>> agent = get_report_generation_agent( +>>> instructions=MAIN_AGENT_INSTRUCTIONS, +>>> sqlite_db_path=Path("data/OnlineRetail.db"), +>>> reports_output_path=Path("reports/"), +>>> langfuse_project_name="Report Generation", +>>> ) +""" + +from pathlib import Path + +import agents +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.langfuse import setup_langfuse_tracer +from aieng.agent_evals.report_generation.file_writer import ReportFileWriter + + +def get_report_generation_agent( + instructions: str, + sqlite_db_path: Path, + reports_output_path: Path, + langfuse_project_name: str | None, +) -> agents.Agent: + """ + Define the report generation agent. + + Parameters + ---------- + instructions : str + The instructions for the agent. + sqlite_db_path : Path + The path to the SQLite database. + reports_output_path : Path + The path to the reports output directory. + langfuse_project_name : str | None + The name of the Langfuse project to use for tracing. + + Returns + ------- + agents.Agent + The report generation agent. + """ + # Setup langfuse tracing if project name is provided + if langfuse_project_name: + setup_langfuse_tracer(langfuse_project_name) + + # Get the client manager singleton instance + client_manager = AsyncClientManager.get_instance() + report_file_writer = ReportFileWriter(reports_output_path) + + # Define an agent using the OpenAI Agent SDK + return agents.Agent( + name="Report Generation Agent", # Agent name for logging and debugging purposes + instructions=instructions, # System instructions for the agent + # Tools available to the agent + # We wrap the `execute_sql_query` and `write_report_to_file` methods + # with `function_tool`, which will construct the tool definition JSON + # schema by extracting the necessary information from the method + # signature and docstring. + tools=[ + agents.function_tool( + client_manager.sqlite_connection(sqlite_db_path).execute, + name_override="execute_sql_query", + description_override="Execute a SQL query against the SQLite database.", + ), + agents.function_tool( + report_file_writer.write, + name_override="write_report_to_file", + description_override="Write the report data to a downloadable XLSX file.", + ), + ], + model=agents.OpenAIChatCompletionsModel( + model=client_manager.configs.default_worker_model, + openai_client=client_manager.openai_client, + ), + ) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py new file mode 100644 index 00000000..83c8324b --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py @@ -0,0 +1,341 @@ +""" +Evaluate the report generation agent against a Langfuse dataset. + +Example +------- +>>> from aieng.agent_evals.report_generation.evaluation import evaluate +>>> evaluate( +>>> dataset_name="OnlineRetailReportEval", +>>> sqlite_db_path=Path("data/OnlineRetail.db"), +>>> reports_output_path=Path("reports/"), +>>> langfuse_project_name="Report Generation", +>>> ) +""" + +import logging +from pathlib import Path +from typing import Any + +import agents +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.report_generation.agent import get_report_generation_agent +from aieng.agent_evals.report_generation.prompts import ( + MAIN_AGENT_INSTRUCTIONS, + RESULT_EVALUATOR_INSTRUCTIONS, + RESULT_EVALUATOR_TEMPLATE, + TRAJECTORY_EVALUATOR_INSTRUCTIONS, + TRAJECTORY_EVALUATOR_TEMPLATE, +) +from langfuse._client.datasets import DatasetItemClient +from langfuse.experiment import Evaluation +from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall +from openai.types.responses.response_output_message import ResponseOutputMessage +from openai.types.responses.response_output_refusal import ResponseOutputRefusal +from openai.types.responses.response_output_text import ResponseOutputText +from pydantic import BaseModel +from tenacity import retry, stop_after_attempt, wait_exponential + + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger(__name__) + +# Will have the structure: +# { +# "final_report": str | None, +# "trajectory": { +# "actions": list[str], +# "parameters": list[str], +# }, +# } +EvaluationOutput = dict[str, None | Any] + + +class EvaluatorResponse(BaseModel): + """Typed response from the evaluator.""" + + explanation: str + is_answer_correct: bool + + +async def evaluate( + dataset_name: str, + sqlite_db_path: Path, + reports_output_path: Path, + langfuse_project_name: str, +): + """Evaluate the report generation agent against a Langfuse dataset. + + Parameters + ---------- + dataset_name : str + Name of the Langfuse dataset to evaluate against. + sqlite_db_path : Path + The path to the SQLite database. + reports_output_path : Path + The path to the reports output directory. + langfuse_project_name : str + The name of the Langfuse project to use for tracing. + """ + # Get the client manager singleton instance and langfuse client + client_manager = AsyncClientManager.get_instance() + langfuse_client = client_manager.langfuse_client + + # Find the dataset in Langfuse + dataset = langfuse_client.get_dataset(dataset_name) + + # Initialize the task for the report generation agent evaluation + # We need this task so we can pass parameters to the agent, since + # the agent has to be instantiated inside the task function + report_generation_task = ReportGenerationTask( + sqlite_db_path=sqlite_db_path, + reports_output_path=reports_output_path, + langfuse_project_name=langfuse_project_name, + ) + + # Run the experiment with the agent task and evaluator + # against the dataset items + result = dataset.run_experiment( + name="Evaluate Report Generation Agent", + description="Evaluate the Report Generation Agent with data from Langfuse", + task=report_generation_task.run, + evaluators=[final_result_evaluator, trajectory_evaluator], + max_concurrency=1, + ) + + # Log the evaluation result + logger.info(result.format().replace("\\n", "\n")) + + try: + # Gracefully close the services + await client_manager.close() + except Exception as e: + logger.warning(f"Client manager services not closed successfully: {e}") + + +class ReportGenerationTask: + """Define a task for the the report generation agent.""" + + def __init__( + self, + sqlite_db_path: Path, + reports_output_path: Path, + langfuse_project_name: str, + ): + """Initialize the task for an report generation agent evaluation. + + Parameters + ---------- + sqlite_db_path : Path + The path to the SQLite database. + reports_output_path : Path + The path to the reports output directory. + langfuse_project_name : str + The name of the Langfuse project to use for tracing. + """ + self.sqlite_db_path = sqlite_db_path + self.reports_output_path = reports_output_path + self.langfuse_project_name = langfuse_project_name + + async def run(self, *, item: DatasetItemClient, **kwargs) -> EvaluationOutput: + """Run the report generation agent against an item from a Langfuse dataset. + + Parameters + ---------- + item : DatasetItemClient + The item from the Langfuse dataset to evaluate against. + + Returns + ------- + EvaluationOutput + The output of the report generation agent with the values it should + be evaluated against. + """ + # Run the report generation agent + report_generation_agent = get_report_generation_agent( + instructions=MAIN_AGENT_INSTRUCTIONS, + sqlite_db_path=self.sqlite_db_path, + reports_output_path=self.reports_output_path, + langfuse_project_name=self.langfuse_project_name, + ) + result = await run_agent_with_retry(report_generation_agent, item.input) + + # Extract the report data and trajectory from the agent's response + actions = [] + parameters = [] + final_report = None + for raw_response in result.raw_responses: + for output in raw_response.output: + # The trajectory will be the list of actions and the + # parameters passed to each one of them + if isinstance(output, ResponseFunctionToolCall): + actions.append(output.name) + parameters.append(output.arguments) + + # The final report will be the arguments sent by the + # write_report_to_file function call + # If there is more than one call to the write_report_to_file + # function, the last one will be used because the previous + # calls were likely be failed calls + if isinstance(output, ResponseFunctionToolCall) and "write_report_to_file" in output.name: + final_report = output.arguments + + if isinstance(output, ResponseOutputMessage): + for content in output.content: + actions.append(content.type) + if isinstance(content, ResponseOutputText): + parameters.append(content.text) + elif isinstance(content, ResponseOutputRefusal): + parameters.append(content.refusal) + + if final_report is None: + logger.warning("No call to write_report_to_file function found in the agent's response") + + return { + "final_report": final_report, + "trajectory": { + "actions": actions, + "parameters": parameters, + }, + } + + +async def final_result_evaluator( + *, + input: str, + output: EvaluationOutput, + expected_output: EvaluationOutput, + **kwargs, +) -> Evaluation: + # ruff: noqa: A002 + """Evaluate the proposed final answer against the ground truth. + + Uses LLM-as-a-judge and returns the reasoning behind the answer. + + Parameters + ---------- + input : str + The input to the report generation agent. + output : EvaluationOutput + The output of the report generation agent with the values it should be + evaluated against. + expected_output : EvaluationOutput + The evaluation output the report generation agent should have. + kwargs : dict + Additional keyword arguments. + + Returns + ------- + Evaluation + The evaluation result, including the reasoning behind the answer. + """ + # Define the evaluator agent + client_manager = AsyncClientManager.get_instance() + evaluator_agent = agents.Agent( + name="Final Result Evaluator Agent", + instructions=RESULT_EVALUATOR_INSTRUCTIONS, + output_type=EvaluatorResponse, + model=agents.OpenAIChatCompletionsModel( + model=client_manager.configs.default_planner_model, + openai_client=client_manager.openai_client, + ), + ) + # Format the input for the evaluator agent + evaluator_input = RESULT_EVALUATOR_TEMPLATE.format( + question=input, + ground_truth=expected_output["final_report"], + proposed_response=output["final_report"], + ) + # Run the evaluator agent with retry + result = await run_agent_with_retry(evaluator_agent, evaluator_input) + evaluation_response = result.final_output_as(EvaluatorResponse) + + # Return the evaluation result + return Evaluation( + name="Final Result", + value=evaluation_response.is_answer_correct, + comment=evaluation_response.explanation, + ) + + +async def trajectory_evaluator( + *, + input: str, + output: EvaluationOutput, + expected_output: EvaluationOutput, + **kwargs, +) -> Evaluation: + # ruff: noqa: A002 + """Evaluate the agent's trajectory against the ground truth. + + Uses LLM-as-a-judge and returns the reasoning behind the answer. + + Parameters + ---------- + input : str + The input to the report generation agent. + output : EvaluationOutput + The output of the report generation agent with the values it should be + evaluated against. + expected_output : EvaluationOutput + The evaluation output the report generation agent should have. + kwargs : dict + Additional keyword arguments. + + Returns + ------- + Evaluation + The evaluation result, including the reasoning behind the answer. + """ + # Define the evaluator agent + client_manager = AsyncClientManager.get_instance() + evaluator_agent = agents.Agent( + name="Trajectory Evaluator Agent", + instructions=TRAJECTORY_EVALUATOR_INSTRUCTIONS, + output_type=EvaluatorResponse, + model=agents.OpenAIChatCompletionsModel( + model=client_manager.configs.default_planner_model, + openai_client=client_manager.openai_client, + ), + ) + + assert isinstance(expected_output["trajectory"], dict), "Expected trajectory must be a dictionary" + assert isinstance(output["trajectory"], dict), "Actual trajectory must be a dictionary" + + # Format the input for the evaluator agent + evaluator_input = TRAJECTORY_EVALUATOR_TEMPLATE.format( + question=input, + expected_actions=expected_output["trajectory"]["actions"], + expected_descriptions=expected_output["trajectory"]["description"], + actual_actions=output["trajectory"]["actions"], + actual_parameters=output["trajectory"]["parameters"], + ) + # Run the evaluator agent with retry + result = await run_agent_with_retry(evaluator_agent, evaluator_input) + evaluation_response = result.final_output_as(EvaluatorResponse) + + # Return the evaluation result + return Evaluation( + name="Trajectory", + value=evaluation_response.is_answer_correct, + comment=evaluation_response.explanation, + ) + + +@retry(stop=stop_after_attempt(5), wait=wait_exponential()) +async def run_agent_with_retry(agent: agents.Agent, agent_input: str) -> agents.RunResult: + """Run an agent with Tenacity's retry mechanism. + + Parameters + ---------- + agent : agents.Agent + The agent to run. + agent_input : str + The input to the agent. + + Returns + ------- + agents.RunnerResult + The result of the agent run. + """ + logger.info(f"Running agent {agent.name} with input '{agent_input[:100]}...'") + return await agents.Runner.run(agent, input=agent_input) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/file_writer.py b/aieng-eval-agents/aieng/agent_evals/report_generation/file_writer.py new file mode 100644 index 00000000..60dc09e8 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/file_writer.py @@ -0,0 +1,72 @@ +""" +Report file writer class. + +Example +------- +>>> from aieng.agent_evals.report_generation.file_writer import ReportFileWriter +>>> report_file_writer = ReportFileWriter(reports_output_path=Path("reports/")) +>>> report_file_writer.write( +... report_data=[["2026-01-01", 100], ["2026-01-02", 200]], +... report_columns=["Date", "Sales"], +... ) +""" + +import urllib.parse +from pathlib import Path +from typing import Any + +import pandas as pd + + +class ReportFileWriter: + """Write reports to an XLSX file.""" + + def __init__(self, reports_output_path: Path): + """Initialize the report writer. + + Parameters + ---------- + reports_output_path : Path + The path to the reports output directory. + """ + self.reports_output_path = reports_output_path + + def write( + self, + report_data: list[Any], + report_columns: list[str], + filename: str = "report.xlsx", + gradio_link: bool = True, + ) -> str: + """Write a report to a XLSX file. + + Parameters + ---------- + report_data : list[Any] + The data of the report. + report_columns : list[str] + The columns of the report. + filename : str, optional + The name of the file to create. Default is "report.xlsx". + gradio_link : bool, optional + Whether to return a file link that works with Gradio UI. + Default is True. + + Returns + ------- + str + The path to the report file. If `gradio_link` is True, will return + a URL link that allows Gradio UI to download the file. + """ + # Create reports directory if it doesn't exist + self.reports_output_path.mkdir(exist_ok=True) + filepath = self.reports_output_path / filename + + report_df = pd.DataFrame(report_data, columns=report_columns) + report_df.to_excel(filepath, index=False) + + file_uri = str(filepath) + if gradio_link: + file_uri = f"gradio_api/file={urllib.parse.quote(str(file_uri), safe='')}" + + return file_uri diff --git a/implementations/report_generation/prompts.py b/aieng-eval-agents/aieng/agent_evals/report_generation/prompts.py similarity index 100% rename from implementations/report_generation/prompts.py rename to aieng-eval-agents/aieng/agent_evals/report_generation/prompts.py diff --git a/implementations/report_generation/README.md b/implementations/report_generation/README.md index 5f30539d..4161bf7a 100644 --- a/implementations/report_generation/README.md +++ b/implementations/report_generation/README.md @@ -1,7 +1,7 @@ # Report Generation Agent This code implements an example of a Report Generation Agent for single-table relational -data source, including a demo agent UI and evaluations with [Langfuse](https://langfuse.com/). +data source, including a demo agent demo UI and evaluations with [Langfuse](https://langfuse.com/). The data source implemented here is [SQLite](https://sqlite.org/) which is supported natively by Python and saves the data in disk. @@ -44,7 +44,7 @@ an environment variable named `REPORT_GENERATION_DB_PATH`. To run the agent, please execute: ```bash -uv run --env-file .env python -m implementations.report_generation.main +uv run --env-file .env python -m implementations.report_generation.demo ``` The agent will be available through a [Gradio](https://www.gradio.app/) web UI under the diff --git a/implementations/report_generation/data/import_online_retail_data.py b/implementations/report_generation/data/import_online_retail_data.py index 4c1a3b61..88cfa47f 100644 --- a/implementations/report_generation/data/import_online_retail_data.py +++ b/implementations/report_generation/data/import_online_retail_data.py @@ -1,4 +1,11 @@ -"""Import the Online Retail dataset to a SQLite database.""" +""" +Import the Online Retail dataset to a SQLite database. + +Example +------- +$ python -m implementations.report_generation.data.import_online_retail_data\ + --dataset-path +""" import logging import sqlite3 @@ -9,7 +16,7 @@ import pandas as pd from dotenv import load_dotenv -from implementations.report_generation.main import get_sqlite_db_path +from implementations.report_generation.demo import get_sqlite_db_path logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") diff --git a/implementations/report_generation/data/langfuse_upload.py b/implementations/report_generation/data/langfuse_upload.py index 79753b17..37308f13 100644 --- a/implementations/report_generation/data/langfuse_upload.py +++ b/implementations/report_generation/data/langfuse_upload.py @@ -1,11 +1,19 @@ -"""Upload a dataset to Langfuse.""" +""" +Upload a dataset to Langfuse. + +Example +------- +$ python -m implementations.report_generation.data.langfuse_upload +$ python -m implementations.report_generation.data.langfuse_upload \ + --dataset-path \ + --dataset-name +""" import asyncio -import json import logging import click -from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.langfuse import upload_dataset_to_langfuse logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") @@ -16,48 +24,6 @@ DEFAULT_EVALUATION_DATASET_NAME = "OnlineRetailReportEval" -async def upload_dataset_to_langfuse(dataset_path: str, dataset_name: str): - """Upload a dataset to Langfuse. - - Parameters - ---------- - dataset_path : str - Path to the dataset to upload. - dataset_name : str - Name of the dataset to upload. - """ - # Get the client manager singleton instance and langfuse client - client_manager = AsyncClientManager.get_instance() - langfuse_client = client_manager.langfuse_client - - # Load the ground truth dataset from the file path - logger.info(f"Loading dataset from '{dataset_path}'") - with open(dataset_path, "r") as file: - dataset = json.load(file) - - # Create the dataset in Langfuse - langfuse_client.create_dataset(name=dataset_name) - - # Upload each item to the dataset - for item in dataset: - assert "input" in item, "`input` is required for all items in the dataset" - assert "expected_output" in item, "`expected_output` is required for all items in the dataset" - - langfuse_client.create_dataset_item( - dataset_name=dataset_name, - input=item["input"], - expected_output=item["expected_output"], - metadata={ - "id": item.get("id", None), - }, - ) - - logger.info(f"Uploaded {len(dataset)} items to dataset '{dataset_name}'") - - # Gracefully close the services - await client_manager.close() - - @click.command() @click.option( "--dataset-path", diff --git a/implementations/report_generation/main.py b/implementations/report_generation/demo.py similarity index 63% rename from implementations/report_generation/main.py rename to implementations/report_generation/demo.py index 9e0a6553..4cba4944 100644 --- a/implementations/report_generation/main.py +++ b/implementations/report_generation/demo.py @@ -1,23 +1,31 @@ -"""Reason-and-Act Knowledge Retrieval Agent via the OpenAI Agent SDK.""" +""" +Demo UI for the report generation agent. + +Example +------- +$ python -m implementations.report_generation.demo +""" import asyncio import logging -import os from functools import partial -from pathlib import Path from typing import Any, AsyncGenerator import agents import click import gradio as gr from aieng.agent_evals.async_client_manager import AsyncClientManager -from aieng.agent_evals.langfuse import setup_langfuse_tracer +from aieng.agent_evals.report_generation.agent import get_report_generation_agent +from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS from aieng.agent_evals.utils import get_or_create_session, oai_agent_stream_to_gradio_messages from dotenv import load_dotenv from gradio.components.chatbot import ChatMessage -from implementations.report_generation.file_writer import get_reports_output_path, write_report_to_file -from implementations.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS +from implementations.report_generation.env_vars import ( + get_langfuse_project_name, + get_reports_output_path, + get_sqlite_db_path, +) load_dotenv(verbose=True) @@ -25,72 +33,6 @@ logger = logging.getLogger(__name__) -LANGFUSE_PROJECT_NAME = "Report Generation" - - -def get_sqlite_db_path() -> Path: - """Get the SQLite database path. - - If no path is provided in the REPORT_GENERATION_DB_PATH env var, will use the - default path in `implementations/report_generation/data/OnlineRetail.db`. - - Returns - ------- - Path - The SQLite database path. - """ - default_sqlite_db_path = "implementations/report_generation/data/OnlineRetail.db" - return Path(os.getenv("REPORT_GENERATION_DB_PATH", default_sqlite_db_path)) - - -def get_report_generation_agent(enable_trace: bool = True) -> agents.Agent: - """ - Define the report generation agent. - - Parameters - ---------- - enable_trace : bool, optional - Whether to enable tracing with Langfuse for evaluation purposes. - Default is True. - - Returns - ------- - agents.Agent - The report generation agent. - """ - # Setup langfuse tracing if enabled - if enable_trace: - setup_langfuse_tracer(LANGFUSE_PROJECT_NAME) - - # Get the client manager singleton instance - client_manager = AsyncClientManager.get_instance() - - # Define an agent using the OpenAI Agent SDK - return agents.Agent( - name="Report Generation Agent", # Agent name for logging and debugging purposes - instructions=MAIN_AGENT_INSTRUCTIONS, # System instructions for the agent - # Tools available to the agent - # We wrap the `search_knowledgebase` method with `function_tool`, which - # will construct the tool definition JSON schema by extracting the necessary - # information from the method signature and docstring. - tools=[ - agents.function_tool( - client_manager.sqlite_connection(get_sqlite_db_path()).execute, - name_override="execute_sql_query", - description_override="Execute a SQL query against the SQLite database.", - ), - agents.function_tool( - write_report_to_file, - description_override="Write the report data to a file.", - ), - ], - model=agents.OpenAIChatCompletionsModel( - model=client_manager.configs.default_worker_model, - openai_client=client_manager.openai_client, - ), - ) - - async def agent_session_handler( query: str, history: list[ChatMessage], @@ -125,7 +67,12 @@ async def agent_session_handler( # previous turns in the conversation session = get_or_create_session(history, session_state) - main_agent = get_report_generation_agent(enable_trace=enable_trace) + main_agent = get_report_generation_agent( + instructions=MAIN_AGENT_INSTRUCTIONS, + sqlite_db_path=get_sqlite_db_path(), + reports_output_path=get_reports_output_path(), + langfuse_project_name=get_langfuse_project_name() if enable_trace else None, + ) # Run the agent in streaming mode to get and display intermediate outputs result_stream = agents.Runner.run_streamed(main_agent, input=query, session=session) diff --git a/implementations/report_generation/env_vars.py b/implementations/report_generation/env_vars.py new file mode 100644 index 00000000..b4e3a846 --- /dev/null +++ b/implementations/report_generation/env_vars.py @@ -0,0 +1,51 @@ +"""Environment variables and their defaults for the report generation agent.""" + +import os +from pathlib import Path + + +DEFAULT_SQLITE_DB_PATH = "implementations/report_generation/data/OnlineRetail.db" +DEFAULT_REPORTS_OUTPUT_PATH = "implementations/report_generation/reports/" +DEFAULT_LANGFUSE_PROJECT_NAME = "Report Generation" + + +def get_reports_output_path() -> Path: + """Get the reports output path. + + If no path is provided in the REPORTS_OUTPUT_PATH env var, will use the + default path in DEFAULT_REPORTS_OUTPUT_PATH. + + Returns + ------- + Path + The reports output path. + """ + return Path(os.getenv("REPORT_GENERATION_OUTPUT_PATH", DEFAULT_REPORTS_OUTPUT_PATH)) + + +def get_sqlite_db_path() -> Path: + """Get the SQLite database path for report generation. + + If no path is provided in the REPORT_GENERATION_DB_PATH env var, will use the + default path in DEFAULT_SQLITE_DB_PATH. + + Returns + ------- + Path + The default SQLite database path for report generation. + """ + return Path(os.getenv("REPORT_GENERATION_DB_PATH", DEFAULT_SQLITE_DB_PATH)) + + +def get_langfuse_project_name() -> str: + """Get the Langfuse project name for report generation. + + If no project name is provided in the REPORT_GENERATION_LANGFUSE_PROJECT_NAME + env var, will use the default project name in DEFAULT_LANGFUSE_PROJECT_NAME. + + Returns + ------- + str + The default Langfuse project name for report generation. + """ + return os.getenv("REPORT_GENERATION_LANGFUSE_PROJECT_NAME", DEFAULT_LANGFUSE_PROJECT_NAME) diff --git a/implementations/report_generation/evaluate.py b/implementations/report_generation/evaluate.py index 453c7183..29bf2dcc 100644 --- a/implementations/report_generation/evaluate.py +++ b/implementations/report_generation/evaluate.py @@ -1,288 +1,28 @@ -"""Evaluate the report generation agent against a Langfuse dataset.""" +""" +Evaluate the report generation agent against a Langfuse dataset. + +Example +------- +$ python -m implementations.report_generation.evaluate +$ python -m implementations.report_generation.evaluate \ + --dataset-name +""" import asyncio -import logging -from typing import Any -import agents import click -from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.report_generation.evaluation import evaluate from dotenv import load_dotenv -from langfuse._client.datasets import DatasetItemClient -from langfuse.experiment import Evaluation -from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall -from openai.types.responses.response_output_message import ResponseOutputMessage -from openai.types.responses.response_output_refusal import ResponseOutputRefusal -from openai.types.responses.response_output_text import ResponseOutputText -from pydantic import BaseModel -from tenacity import retry, stop_after_attempt, wait_exponential from implementations.report_generation.data.langfuse_upload import DEFAULT_EVALUATION_DATASET_NAME -from implementations.report_generation.main import get_report_generation_agent -from implementations.report_generation.prompts import ( - RESULT_EVALUATOR_INSTRUCTIONS, - RESULT_EVALUATOR_TEMPLATE, - TRAJECTORY_EVALUATOR_INSTRUCTIONS, - TRAJECTORY_EVALUATOR_TEMPLATE, +from implementations.report_generation.demo import ( + get_langfuse_project_name, + get_reports_output_path, + get_sqlite_db_path, ) load_dotenv(verbose=True) -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") -logger = logging.getLogger(__name__) - - -# Will have the structure: -# { -# "final_report": str | None, -# "trajectory": { -# "actions": list[str], -# "parameters": list[str], -# }, -# } -EvaluationOutput = dict[str, None | Any] - - -class EvaluatorResponse(BaseModel): - """Typed response from the evaluator.""" - - explanation: str - is_answer_correct: bool - - -async def evaluate(dataset_name: str): - """Evaluate the report generation agent against a Langfuse dataset. - - Parameters - ---------- - dataset_name : str - Name of the Langfuse dataset to evaluate against. - """ - # Get the client manager singleton instance and langfuse client - client_manager = AsyncClientManager.get_instance() - langfuse_client = client_manager.langfuse_client - - # Find the dataset in Langfuse - dataset = langfuse_client.get_dataset(dataset_name) - - # Run the experiment with the agent task and evaluator - # against the dataset items - result = dataset.run_experiment( - name="Evaluate Report Generation Agent", - description="Evaluate the Report Generation Agent with data from Langfuse", - task=agent_task, - evaluators=[final_result_evaluator, trajectory_evaluator], - max_concurrency=1, - ) - - # Log the evaluation result - logger.info(result.format().replace("\\n", "\n")) - - try: - # Gracefully close the services - await client_manager.close() - except Exception as e: - logger.warning(f"Client manager services not closed successfully: {e}") - - -async def agent_task(*, item: DatasetItemClient, **kwargs) -> EvaluationOutput: - """Run the report generation agent against an item from a Langfuse dataset. - - Parameters - ---------- - item : DatasetItemClient - The item from the Langfuse dataset to evaluate against. - - Returns - ------- - EvaluationOutput - The output of the report generation agent with the values it should - be evaluated against. - """ - # Define and run the report generation agent - report_generation_agent = get_report_generation_agent(enable_trace=True) - result = await run_agent_with_retry(report_generation_agent, item.input) - - # Extract the report data and trajectory from the agent's response - actions = [] - parameters = [] - final_report = None - for raw_response in result.raw_responses: - for output in raw_response.output: - # The trajectory will be the list of actions and the - # parameters passed to each one of them - if isinstance(output, ResponseFunctionToolCall): - actions.append(output.name) - parameters.append(output.arguments) - - # The final report will be the arguments sent by the - # write_report_to_file function call - # If there is more than one call to the write_report_to_file function, - # the last one will be used because the previous calls were likely - # failed calls - if isinstance(output, ResponseFunctionToolCall) and "write_report_to_file" in output.name: - final_report = output.arguments - - if isinstance(output, ResponseOutputMessage): - for content in output.content: - actions.append(content.type) - if isinstance(content, ResponseOutputText): - parameters.append(content.text) - elif isinstance(content, ResponseOutputRefusal): - parameters.append(content.refusal) - - if final_report is None: - logger.warning("No call to write_report_to_file function found in the agent's response") - - return { - "final_report": final_report, - "trajectory": { - "actions": actions, - "parameters": parameters, - }, - } - - -async def final_result_evaluator( - *, - input: str, - output: EvaluationOutput, - expected_output: EvaluationOutput, - **kwargs, -) -> Evaluation: - # ruff: noqa: A002 - """Evaluate the proposed final answer against the ground truth. - - Uses LLM-as-a-judge and returns the reasoning behind the answer. - - Parameters - ---------- - input : str - The input to the report generation agent. - output : EvaluationOutput - The output of the report generation agent with the values it should be - evaluated against. - expected_output : EvaluationOutput - The evaluation output the report generation agent should have. - kwargs : dict - Additional keyword arguments. - - Returns - ------- - Evaluation - The evaluation result, including the reasoning behind the answer. - """ - # Define the evaluator agent - client_manager = AsyncClientManager.get_instance() - evaluator_agent = agents.Agent( - name="Final Result Evaluator Agent", - instructions=RESULT_EVALUATOR_INSTRUCTIONS, - output_type=EvaluatorResponse, - model=agents.OpenAIChatCompletionsModel( - model=client_manager.configs.default_planner_model, - openai_client=client_manager.openai_client, - ), - ) - # Format the input for the evaluator agent - evaluator_input = RESULT_EVALUATOR_TEMPLATE.format( - question=input, - ground_truth=expected_output["final_report"], - proposed_response=output["final_report"], - ) - # Run the evaluator agent with retry - result = await run_agent_with_retry(evaluator_agent, evaluator_input) - evaluation_response = result.final_output_as(EvaluatorResponse) - - # Return the evaluation result - return Evaluation( - name="Final Result", - value=evaluation_response.is_answer_correct, - comment=evaluation_response.explanation, - ) - - -async def trajectory_evaluator( - *, - input: str, - output: EvaluationOutput, - expected_output: EvaluationOutput, - **kwargs, -) -> Evaluation: - # ruff: noqa: A002 - """Evaluate the agent's trajectory against the ground truth. - - Uses LLM-as-a-judge and returns the reasoning behind the answer. - - Parameters - ---------- - input : str - The input to the report generation agent. - output : EvaluationOutput - The output of the report generation agent with the values it should be - evaluated against. - expected_output : EvaluationOutput - The evaluation output the report generation agent should have. - kwargs : dict - Additional keyword arguments. - - Returns - ------- - Evaluation - The evaluation result, including the reasoning behind the answer. - """ - # Define the evaluator agent - client_manager = AsyncClientManager.get_instance() - evaluator_agent = agents.Agent( - name="Trajectory Evaluator Agent", - instructions=TRAJECTORY_EVALUATOR_INSTRUCTIONS, - output_type=EvaluatorResponse, - model=agents.OpenAIChatCompletionsModel( - model=client_manager.configs.default_planner_model, - openai_client=client_manager.openai_client, - ), - ) - - assert isinstance(expected_output["trajectory"], dict), "Expected trajectory must be a dictionary" - assert isinstance(output["trajectory"], dict), "Actual trajectory must be a dictionary" - - # Format the input for the evaluator agent - evaluator_input = TRAJECTORY_EVALUATOR_TEMPLATE.format( - question=input, - expected_actions=expected_output["trajectory"]["actions"], - expected_descriptions=expected_output["trajectory"]["description"], - actual_actions=output["trajectory"]["actions"], - actual_parameters=output["trajectory"]["parameters"], - ) - # Run the evaluator agent with retry - result = await run_agent_with_retry(evaluator_agent, evaluator_input) - evaluation_response = result.final_output_as(EvaluatorResponse) - - # Return the evaluation result - return Evaluation( - name="Trajectory", - value=evaluation_response.is_answer_correct, - comment=evaluation_response.explanation, - ) - - -@retry(stop=stop_after_attempt(5), wait=wait_exponential()) -async def run_agent_with_retry(agent: agents.Agent, agent_input: str) -> agents.RunResult: - """Run an agent with Tenacity's retry mechanism. - - Parameters - ---------- - agent : agents.Agent - The agent to run. - agent_input : str - The input to the agent. - - Returns - ------- - agents.RunnerResult - The result of the agent run. - """ - logger.info(f"Running agent {agent.name} with input '{agent_input[:100]}...'") - return await agents.Runner.run(agent, input=agent_input) @click.command() @@ -301,7 +41,14 @@ def cli(dataset_name: str): Name of the Langfuse dataset to evaluate against. Default is DEFAULT_EVALUATION_DATASET_NAME. """ - asyncio.run(evaluate(dataset_name)) + asyncio.run( + evaluate( + dataset_name, + sqlite_db_path=get_sqlite_db_path(), + reports_output_path=get_reports_output_path(), + langfuse_project_name=get_langfuse_project_name(), + ) + ) if __name__ == "__main__": diff --git a/implementations/report_generation/file_writer.py b/implementations/report_generation/file_writer.py deleted file mode 100644 index 7da863ed..00000000 --- a/implementations/report_generation/file_writer.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Report file writer functions.""" - -import os -import urllib.parse -from pathlib import Path -from typing import Any - -import pandas as pd - - -# Will use this as default if no path is provided in the REPORTS_OUTPUT_PATH env var -DEFAULT_REPORTS_OUTPUT_PATH = Path("implementations/report_generation/reports/") - - -def write_report_to_file( - report_data: list[Any], - report_columns: list[str], - filename: str = "report.xlsx", - gradio_link: bool = True, -) -> str: - """Write a report to a XLSX file. - - Parameters - ---------- - report_data : list[Any] - The data of the report. - report_columns : list[str] - The columns of the report. - filename : str, optional - The name of the file to create. Default is "report.xlsx". - gradio_link : bool, optional - Whether to return a file link that works with Gradio UI. - Default is True. - - Returns - ------- - str - The path to the report file. If `gradio_link` is True, will return - a URL link that allows Gradio UI to download the file. - """ - # Create reports directory if it doesn't exist - reports_output_path = get_reports_output_path() - reports_output_path.mkdir(exist_ok=True) - filepath = reports_output_path / filename - - report_df = pd.DataFrame(report_data, columns=report_columns) - report_df.to_excel(filepath, index=False) - - file_uri = str(filepath) - if gradio_link: - file_uri = f"gradio_api/file={urllib.parse.quote(str(file_uri), safe='')}" - - return file_uri - - -def get_reports_output_path() -> Path: - """Get the reports output path. - - If no path is provided in the REPORTS_OUTPUT_PATH env var, will use the - default path in DEFAULT_REPORTS_OUTPUT_PATH. - - Returns - ------- - Path - The reports output path. - """ - return Path(os.getenv("REPORTS_OUTPUT_PATH", DEFAULT_REPORTS_OUTPUT_PATH))