diff --git a/src/strands_evals/experiment.py b/src/strands_evals/experiment.py index 6ef4740..beafe61 100644 --- a/src/strands_evals/experiment.py +++ b/src/strands_evals/experiment.py @@ -1,4 +1,5 @@ import asyncio +import inspect import json import logging import os @@ -45,7 +46,6 @@ def _get_label_from_score(evaluator: Evaluator, score: float) -> str: Args: evaluator: The evaluator instance score: The numeric score - default_label: Default label to return if provided and no mapping found Returns: The label corresponding to the score @@ -179,6 +179,146 @@ def _record_evaluator_result( evaluator_data[eval_name]["reasons"].append(reason) evaluator_data[eval_name]["detailed_results"].append(detailed_results) + def _build_reports(self, evaluator_data: dict[str, dict[str, list]]) -> list[EvaluationReport]: + """Build EvaluationReport objects from collected evaluator data. + + Args: + evaluator_data: Dictionary keyed by evaluator name containing scores, test_passes, + cases, reasons, and detailed_results lists. + + Returns: + A list of EvaluationReport objects, one per evaluator. + """ + reports = [] + for evaluator in self._evaluators: + eval_name = evaluator.get_type_name() + data = evaluator_data[eval_name] + scores = data["scores"] + report = EvaluationReport( + evaluator_name=eval_name, + overall_score=sum(scores) / len(scores) if scores else 0, + scores=scores, + test_passes=data["test_passes"], + cases=data["cases"], + reasons=data["reasons"], + detailed_results=data["detailed_results"], + ) + reports.append(report) + return reports + + def _init_evaluator_data(self) -> dict[str, dict[str, list]]: + """Initialize the evaluator data dictionary for collecting results.""" + return { + evaluator.get_type_name(): { + "scores": [], + "test_passes": [], + "cases": [], + "reasons": [], + "detailed_results": [], + } + for evaluator in self._evaluators + } + + def _log_evaluation_to_cloudwatch( + self, + evaluator: Evaluator, + eval_span: Any, + evaluation_context: EvaluationData, + score: float, + reason: str, + ): + """Log evaluation result to CloudWatch if observability is enabled. + + Args: + evaluator: The evaluator that produced the result + eval_span: The OpenTelemetry span for this evaluation + evaluation_context: The evaluation data being evaluated + score: The aggregate evaluation score + reason: The aggregate evaluation reason + """ + try: + label = _get_label_from_score(evaluator, score) + except Exception: + label = "UNKNOWN" + + eval_span.set_attributes({"gen_ai.evaluation.score.label": label}) + + try: + trace_id = format_trace_id(eval_span.get_span_context().trace_id) + session_id = (evaluation_context.metadata or {}).get("session_id", "") + evaluator_full_name = f"Custom.{evaluator.get_type_name()}" + region = os.environ.get("AWS_REGION", "us-east-1") + _config_arn = f"arn:aws:strands:{region}::strands-evaluation-empty-config/{self._config_id}" + _evaluator_arn = f"arn:aws:strands-evals:::evaluator/{evaluator_full_name}" + + log_data = { + "gen_ai.evaluation.name": evaluator_full_name, + "gen_ai.evaluation.score.value": str(score), + "gen_ai.evaluation.explanation": reason or "", + "gen_ai.evaluation.score.label": label, + "gen_ai.response.id": trace_id, + "aws.bedrock_agentcore.evaluator.rating_scale": "Numerical", + "aws.bedrock_agentcore.evaluation_level": evaluator.evaluation_level or "Trace", + "event.name": "gen_ai.evaluation.result", + "aws.bedrock_agentcore.online_evaluation_config.arn": _config_arn, + "aws.bedrock_agentcore.online_evaluation_config.name": "strands-local-evaluation", + "aws.bedrock_agentcore.evaluator.arn": _evaluator_arn, + "session.id": session_id, + } + + agent_observability_enabled = os.environ.get("AGENT_OBSERVABILITY_ENABLED", "") + if agent_observability_enabled: + _send_to_cloudwatch( + message="gen_ai.evaluation.result", + log_data=log_data, + trace_id=trace_id, + evaluator_name=evaluator_full_name, + score=cast(float, score), + config_id=self._config_id, + label=label, + ) + except Exception as e: + logger.debug("error=<%s> | skipping cloudwatch logging", e) + + def _set_task_span_attributes(self, task_span: Any, evaluation_context: EvaluationData) -> None: + """Set standard attributes on a task execution span.""" + task_span.set_attributes( + { + "gen_ai.evaluation.data.input": serialize(evaluation_context.input), + "gen_ai.evaluation.data.expected_output": serialize(evaluation_context.expected_output), + "gen_ai.evaluation.data.actual_output": serialize(evaluation_context.actual_output), + "gen_ai.evaluation.data.has_trajectory": evaluation_context.actual_trajectory is not None, + "gen_ai.evaluation.data.has_interactions": evaluation_context.actual_interactions is not None, + } + ) + + def _extract_task_error(self, e: Exception, case_name: str) -> str: + """Extract error message from a task execution exception and log it.""" + if isinstance(e, RetryError): + original_exception = e.last_attempt.exception() + if original_exception is None: + original_exception = Exception(f"Task execution failed after {_MAX_RETRY_ATTEMPTS} retries") + error_msg = str(original_exception) + else: + error_msg = str(e) + logger.error("case=<%s>, error=<%s> | task execution failed", case_name, error_msg) + return error_msg + + def _build_failed_evaluation_data( + self, case: Case[InputT, OutputT], error_msg: str + ) -> EvaluationData[InputT, OutputT]: + """Build an EvaluationData record for a failed task execution.""" + return EvaluationData( + name=case.name, + input=case.input, + actual_output=None, + expected_output=case.expected_output, + expected_trajectory=case.expected_trajectory, + expected_interactions=case.expected_interactions, + expected_environment_state=case.expected_environment_state, + metadata={**(case.metadata or {}), "task_error": error_msg}, + ) + def _run_task( self, task: Callable[[Case[InputT, OutputT]], OutputT | dict[str, Any]], case: Case[InputT, OutputT] ) -> EvaluationData[InputT, OutputT]: @@ -193,9 +333,10 @@ def _run_task( Return: An EvaluationData record containing the input and actual output, name, expected output, and metadata. """ - if asyncio.iscoroutinefunction(task): + if inspect.iscoroutinefunction(task): raise ValueError("Async task is not supported. Please use run_evaluations_async instead.") + metadata = {**(case.metadata or {}), "session_id": case.session_id} evaluation_context = EvaluationData( name=case.name, input=case.input, @@ -203,7 +344,7 @@ def _run_task( expected_trajectory=case.expected_trajectory, expected_interactions=case.expected_interactions, expected_environment_state=case.expected_environment_state, - metadata=case.metadata, + metadata=metadata, ) task_output = task(case) if isinstance(task_output, dict): # could be evaluating the trajectory as well @@ -234,6 +375,7 @@ async def _run_task_async( An EvaluationData record containing the input and actual output, name, expected output, and metadata. """ # Create evaluation context + metadata = {**(case.metadata or {}), "session_id": case.session_id} evaluation_context = EvaluationData( name=case.name, input=case.input, @@ -241,11 +383,11 @@ async def _run_task_async( expected_trajectory=case.expected_trajectory, expected_interactions=case.expected_interactions, expected_environment_state=case.expected_environment_state, - metadata=case.metadata, + metadata=metadata, ) # Handle both async and sync tasks - if asyncio.iscoroutinefunction(task): + if inspect.iscoroutinefunction(task): task_output = await task(case) else: # Run sync function in separate thread to avoid blocking @@ -265,25 +407,38 @@ async def _run_task_async( return evaluation_context - async def _worker(self, queue: asyncio.Queue, task: Callable, results: list): - """ - Worker that processes cases from the queue. Run evaluation on the task. + def run_tasks( + self, task: Callable[[Case[InputT, OutputT]], OutputT | dict[str, Any]] + ) -> list[EvaluationData[InputT, OutputT]]: + """Run the task against all cases and return the evaluation data. + + Executes the task function against each case with retry logic for throttling errors, + without running any evaluators. The returned EvaluationData can be passed to + run_evaluators() later, enabling reuse of task results across multiple evaluation runs. Args: - queue: Queue containing cases to process - task: Task function to run on each case - results: List to store results + task: The task to run on each case. Takes a Case and returns either + OutputT or {"output": OutputT, "trajectory": ..., "interactions": ..., "environment_state": ...}. + + Returns: + A list of EvaluationData objects, one per case. On task failure, the corresponding + EvaluationData will have actual_output=None and the error in metadata["task_error"]. """ - while True: - try: - case = queue.get_nowait() - except asyncio.QueueEmpty: - break + if inspect.iscoroutinefunction(task): + raise ValueError("Async task is not supported. Please use run_tasks_async instead.") + results: list[EvaluationData[InputT, OutputT]] = [] + + for case in self._cases: case_name = case.name or f"case_{len(results)}" - trace_id = None - try: + with self._tracer.start_as_current_span( + f"run_task {case_name}", + attributes={ + "gen_ai.evaluation.case.name": case_name, + "gen_ai.evaluation.case.input": serialize(case.input), + }, + ) as run_task_span: @retry( retry=retry_if_exception(is_throttling_error), @@ -291,42 +446,56 @@ async def _worker(self, queue: asyncio.Queue, task: Callable, results: list): wait=wait_exponential(multiplier=_INITIAL_RETRY_DELAY, max=_MAX_RETRY_DELAY), reraise=True, ) - async def _run_task_with_retry(task=task, case=case): - return await self._run_task_async(task, case) + def _run_task_with_retry(task=task, case=case): + return self._run_task(task, case) try: with self._tracer.start_as_current_span( - f"execute_case {case_name}", - ) as case_span: - evaluation_context = await _run_task_with_retry() - case_span.set_attributes( - { - "gen_ai.evaluation.data.input": serialize(evaluation_context.input), - "gen_ai.evaluation.data.expected_output": serialize(evaluation_context.expected_output), - "gen_ai.evaluation.data.actual_output": serialize(evaluation_context.actual_output), - "gen_ai.evaluation.data.has_trajectory": ( - evaluation_context.actual_trajectory is not None - ), - "gen_ai.evaluation.data.has_interactions": ( - evaluation_context.actual_interactions is not None - ), - } - ) - trace_id = format_trace_id(case_span.get_span_context().trace_id) - except RetryError as e: - # Max retries exceeded - original_exception = e.last_attempt.exception() - if original_exception is None: - original_exception = Exception(f"Task execution failed after {_MAX_RETRY_ATTEMPTS} retries") - logger.error( - f"Max retry attempts ({_MAX_RETRY_ATTEMPTS}) exceeded for task execution " - f"on case {case_name}. Last error: {str(original_exception)}" - ) - raise original_exception from e + "task_execution", + attributes={ + "gen_ai.evaluation.task.type": "agent_task", + "gen_ai.evaluation.case.name": case_name, + }, + ) as task_span: + evaluation_context = _run_task_with_retry() + self._set_task_span_attributes(task_span, evaluation_context) + results.append(evaluation_context) + except Exception as e: + error_msg = self._extract_task_error(e, case_name) + run_task_span.record_exception(e) + results.append(self._build_failed_evaluation_data(case, error_msg)) - # Evaluate with each evaluator - evaluator_results = [] - for evaluator in self._evaluators: + return results + + async def _task_worker( + self, + queue: asyncio.Queue, + task: Callable, + results: list[EvaluationData[InputT, OutputT] | None], + ): + """Worker that processes cases from the queue for task execution only. + + Args: + queue: Queue containing (index, case) tuples to process + task: Task function to run on each case + results: Pre-allocated list to store EvaluationData results by index + """ + while True: + try: + index, case = queue.get_nowait() + except asyncio.QueueEmpty: + break + + case_name = case.name or f"case_{index}" + + try: + with self._tracer.start_as_current_span( + f"run_task {case_name}", + attributes={ + "gen_ai.evaluation.case.name": case_name, + "gen_ai.evaluation.case.input": serialize(case.input), + }, + ) as run_task_span: @retry( retry=retry_if_exception(is_throttling_error), @@ -334,255 +503,103 @@ async def _run_task_with_retry(task=task, case=case): wait=wait_exponential(multiplier=_INITIAL_RETRY_DELAY, max=_MAX_RETRY_DELAY), reraise=True, ) - async def _evaluate_with_retry(evaluator=evaluator, evaluation_context=evaluation_context): - outputs = await evaluator.evaluate_async(evaluation_context) - (score, passed, reason) = evaluator.aggregator(outputs) - return outputs, float(score), passed, reason + async def _run_task_with_retry(task=task, case=case): + return await self._run_task_async(task, case) try: with self._tracer.start_as_current_span( - f"evaluator {evaluator.get_type_name()}", - ) as eval_span: - ( - evaluation_outputs, - aggregate_score, - aggregate_pass, - aggregate_reason, - ) = await _evaluate_with_retry() - - try: - label = _get_label_from_score(evaluator, aggregate_score) - except Exception: - label = "UNKNOWN" + "task_execution", + attributes={ + "gen_ai.evaluation.task.type": "agent_task", + "gen_ai.evaluation.case.name": case_name, + }, + ) as task_span: + evaluation_context = await _run_task_with_retry() + self._set_task_span_attributes(task_span, evaluation_context) + results[index] = evaluation_context + except Exception as e: + error_msg = self._extract_task_error(e, case_name) + run_task_span.record_exception(e) + results[index] = self._build_failed_evaluation_data(case, error_msg) + finally: + queue.task_done() - eval_span.set_attributes( - { - "gen_ai.evaluation.score.label": label, - "gen_ai.evaluation.score.value": str(aggregate_score), - "gen_ai.evaluation.test_pass": aggregate_pass, - "gen_ai.evaluation.explanation": aggregate_reason or "", - } - ) + async def run_tasks_async( + self, + task: Callable[[Case[InputT, OutputT]], OutputT | dict[str, Any]], + max_workers: int = 10, + ) -> list[EvaluationData[InputT, OutputT]]: + """Run the task against all cases asynchronously and return the evaluation data. - evaluator_results.append( - { - "evaluator_name": evaluator.get_type_name(), - "test_pass": aggregate_pass, - "score": aggregate_score, - "reason": aggregate_reason or "", - "detailed_results": evaluation_outputs, - } - ) + Args: + task: The task to run on each case. Can be sync or async. + max_workers: Maximum number of parallel workers (default: 10) - # CloudWatch logging for this evaluator - try: - evaluator_full_name = f"Custom.{evaluator.get_type_name()}" - region = os.environ.get("AWS_REGION", "us-east-1") - _config_arn = ( - f"arn:aws:strands:{region}::strands-evaluation-empty-config/{self._config_id}" - ) - _evaluator_arn = f"arn:aws:strands-evals:::evaluator/{evaluator_full_name}" + Returns: + A list of EvaluationData objects, one per case. On task failure, the corresponding + EvaluationData will have actual_output=None and the error in metadata["task_error"]. + """ + queue: asyncio.Queue[tuple[int, Case[InputT, OutputT]]] = asyncio.Queue() + results: list[EvaluationData[InputT, OutputT] | None] = [None] * len(self._cases) - log_data = { - "gen_ai.evaluation.name": evaluator_full_name, - "gen_ai.evaluation.score.value": str(aggregate_score), - "gen_ai.evaluation.explanation": aggregate_reason or "", - "gen_ai.evaluation.score.label": label, - "gen_ai.response.id": trace_id, - "aws.bedrock_agentcore.evaluator.rating_scale": "Numerical", - "aws.bedrock_agentcore.evaluation_level": evaluator.evaluation_level or "Trace", - "event.name": "gen_ai.evaluation.result", - "aws.bedrock_agentcore.online_evaluation_config.arn": _config_arn, - "aws.bedrock_agentcore.online_evaluation_config.name": "strands-local-evaluation", - "aws.bedrock_agentcore.evaluator.arn": _evaluator_arn, - "session.id": case.session_id, - } + for index, case in enumerate(self._cases): + queue.put_nowait((index, case)) - agent_observability_enabled = os.environ.get("AGENT_OBSERVABILITY_ENABLED", "") - if agent_observability_enabled: - _send_to_cloudwatch( - message="gen_ai.evaluation.result", - log_data=log_data, - trace_id=trace_id, - evaluator_name=evaluator_full_name, - score=cast(float, aggregate_score), - config_id=self._config_id, - label=label, - ) - except Exception as e: - logger.debug(f"Skipping CloudWatch logging: {str(e)}") + num_workers = min(max_workers, len(self._cases)) - except RetryError as e: - # Max retries exceeded - original_exception = e.last_attempt.exception() - if original_exception is None: - original_exception = Exception( - f"Evaluator {evaluator.get_type_name()} failed after {_MAX_RETRY_ATTEMPTS} retries" - ) - logger.error( - f"Max retry attempts ({_MAX_RETRY_ATTEMPTS}) exceeded for evaluator " - f"{evaluator.get_type_name()} on case {case_name}. Last error: {str(original_exception)}" - ) - evaluator_results.append( - { - "evaluator_name": evaluator.get_type_name(), - "test_pass": False, - "score": 0, - "reason": f"Evaluator error: {str(original_exception)}", - "detailed_results": [], - } - ) - except Exception as e: - # Catch non-throttling errors and record as failure (error isolation) - evaluator_results.append( - { - "evaluator_name": evaluator.get_type_name(), - "test_pass": False, - "score": 0, - "reason": f"Evaluator error: {str(e)}", - "detailed_results": [], - } - ) + workers = [asyncio.create_task(self._task_worker(queue, task, results)) for _ in range(num_workers)] - # Store results - results.append( - { - "case": evaluation_context.model_dump(), - "evaluator_results": evaluator_results, - } - ) + await queue.join() + for worker in workers: + worker.cancel() + await asyncio.gather(*workers, return_exceptions=True) - except Exception as e: - # Handle task execution errors - evaluator_results = [] - for evaluator in self._evaluators: - evaluator_results.append( - { - "evaluator_name": evaluator.get_type_name(), - "test_pass": False, - "score": 0, - "reason": f"An error occurred: {str(e)}", - "detailed_results": [], - } - ) - results.append( - { - "case": case.model_dump(), - "evaluator_results": evaluator_results, - } - ) - finally: - queue.task_done() + return cast(list[EvaluationData[InputT, OutputT]], results) - def run_evaluations( - self, task: Callable[[Case[InputT, OutputT]], OutputT | dict[str, Any]] - ) -> list[EvaluationReport]: - """ - Run the evaluations for all of the test cases with all evaluators. + def run_evaluators(self, evaluation_data: list[EvaluationData[InputT, OutputT]]) -> list[EvaluationReport]: + """Run evaluators on pre-computed evaluation data. + + Evaluates each EvaluationData item with all configured evaluators, without executing + any task functions. Use with run_tasks() to separate task execution from evaluation. Args: - task: The task to run the test case on. This function should take in InputT and returns either - OutputT or {"output": OutputT, "trajectory": ...}. + evaluation_data: Pre-computed evaluation data, e.g. from run_tasks() or deserialized + from a previous run. - Return: - A list of EvaluationReport objects, one for each evaluator, containing the overall score, - individual case results, and basic feedback for each test case. + Returns: + A list of EvaluationReport objects, one per evaluator. """ - evaluator_data: dict[str, dict[str, list]] = { - evaluator.get_type_name(): { - "scores": [], - "test_passes": [], - "cases": [], - "reasons": [], - "detailed_results": [], - } - for evaluator in self._evaluators - } + evaluator_data = self._init_evaluator_data() - for case in self._cases: - case_name = case.name or f"case_{len(evaluator_data[self._evaluators[0].get_type_name()]['cases'])}" + for evaluation_context in evaluation_data: + first_eval_name = self._evaluators[0].get_type_name() + case_name = evaluation_context.name or f"case_{len(evaluator_data[first_eval_name]['cases'])}" + + # If task execution failed, record failure for all evaluators without running them + task_error = (evaluation_context.metadata or {}).get("task_error") + if task_error: + for evaluator in self._evaluators: + self._record_evaluator_result( + evaluator_data=evaluator_data, + eval_name=evaluator.get_type_name(), + case_data=evaluation_context.model_dump(), + test_pass=False, + score=0, + reason=f"Task execution error: {task_error}", + detailed_results=[], + ) + continue with self._tracer.start_as_current_span( f"eval_case {case_name}", attributes={ "gen_ai.evaluation.case.name": case_name, - "gen_ai.evaluation.case.input": serialize(case.input), + "gen_ai.evaluation.case.input": serialize(evaluation_context.input), }, - ) as case_span: - # Task execution with retry logic - @retry( - retry=retry_if_exception(is_throttling_error), - stop=stop_after_attempt(_MAX_RETRY_ATTEMPTS), - wait=wait_exponential(multiplier=_INITIAL_RETRY_DELAY, max=_MAX_RETRY_DELAY), - reraise=True, - ) - def _run_task_with_retry(task=task, case=case): - return self._run_task(task, case) - - try: - with self._tracer.start_as_current_span( - "task_execution", - attributes={ - "gen_ai.evaluation.task.type": "agent_task", - "gen_ai.evaluation.case.name": case_name, - }, - ) as task_span: - evaluation_context = _run_task_with_retry() - task_span.set_attributes( - { - "gen_ai.evaluation.data.input": serialize(evaluation_context.input), - "gen_ai.evaluation.data.expected_output": serialize(evaluation_context.expected_output), - "gen_ai.evaluation.data.actual_output": serialize(evaluation_context.actual_output), - "gen_ai.evaluation.data.has_trajectory": ( - evaluation_context.actual_trajectory is not None - ), - "gen_ai.evaluation.data.has_interactions": ( - evaluation_context.actual_interactions is not None - ), - } - ) - except RetryError as e: - # Max retries exceeded - original_exception = e.last_attempt.exception() - if original_exception is None: - original_exception = Exception(f"Task execution failed after {_MAX_RETRY_ATTEMPTS} retries") - logger.error( - f"Max retry attempts ({_MAX_RETRY_ATTEMPTS}) exceeded for task execution " - f"on case {case_name}. Last error: {str(original_exception)}" - ) - case_span.record_exception(original_exception) - for evaluator in self._evaluators: - eval_name = evaluator.get_type_name() - self._record_evaluator_result( - evaluator_data=evaluator_data, - eval_name=eval_name, - case_data=case.model_dump(), - test_pass=False, - score=0, - reason=f"Task execution error: {str(original_exception)}", - detailed_results=[], - ) - continue - except Exception as e: - case_span.record_exception(e) - for evaluator in self._evaluators: - eval_name = evaluator.get_type_name() - self._record_evaluator_result( - evaluator_data=evaluator_data, - eval_name=eval_name, - case_data=case.model_dump(), - test_pass=False, - score=0, - reason=f"Task execution error: {str(e)}", - detailed_results=[], - ) - continue - - # Evaluate with each evaluator using the same task output + ): for evaluator in self._evaluators: eval_name = evaluator.get_type_name() - # Evaluator execution with retry logic @retry( retry=retry_if_exception(is_throttling_error), stop=stop_after_attempt(_MAX_RETRY_ATTEMPTS), @@ -607,7 +624,7 @@ def _evaluate_with_retry(evaluator=evaluator, evaluation_context=evaluation_cont ) eval_span.set_attributes( { - "gen_ai.evaluation.score.value": aggregate_score, + "gen_ai.evaluation.score.value": str(aggregate_score), "gen_ai.evaluation.test_pass": aggregate_pass, "gen_ai.evaluation.explanation": aggregate_reason or "", } @@ -622,16 +639,26 @@ def _evaluate_with_retry(evaluator=evaluator, evaluation_context=evaluation_cont reason=aggregate_reason or "", detailed_results=evaluation_outputs, ) + + self._log_evaluation_to_cloudwatch( + evaluator=evaluator, + eval_span=eval_span, + evaluation_context=evaluation_context, + score=aggregate_score, + reason=aggregate_reason or "", + ) except RetryError as e: - # Max retries exceeded original_exception = e.last_attempt.exception() if original_exception is None: original_exception = Exception( f"Evaluator {evaluator.get_type_name()} failed after {_MAX_RETRY_ATTEMPTS} retries" ) logger.error( - f"Max retry attempts ({_MAX_RETRY_ATTEMPTS}) exceeded for evaluator " - f"{evaluator.get_type_name()} on case {case_name}. Last error: {str(original_exception)}" + "evaluator=<%s>, case=<%s>, retries=<%s>, error=<%s> | max retry attempts exceeded", + evaluator.get_type_name(), + case_name, + _MAX_RETRY_ATTEMPTS, + original_exception, ) self._record_evaluator_result( evaluator_data=evaluator_data, @@ -653,45 +680,181 @@ def _evaluate_with_retry(evaluator=evaluator, evaluation_context=evaluation_cont detailed_results=[], ) - reports = [] - for evaluator in self._evaluators: - eval_name = evaluator.get_type_name() - data = evaluator_data[eval_name] - report = EvaluationReport( - evaluator_name=eval_name, - overall_score=sum(data["scores"]) / len(data["scores"]) if len(data["scores"]) else 0, - scores=data["scores"], - test_passes=data["test_passes"], - cases=data["cases"], - reasons=data["reasons"], - detailed_results=data["detailed_results"], - ) - reports.append(report) + return self._build_reports(evaluator_data) - return reports + async def _evaluator_worker( + self, + queue: asyncio.Queue, + results: list[dict | None], + ): + """Worker that processes EvaluationData from the queue for evaluation only. - async def run_evaluations_async(self, task: Callable, max_workers: int = 10) -> list[EvaluationReport]: + Args: + queue: Queue containing (index, EvaluationData) tuples to evaluate + results: Pre-allocated list to store per-case evaluator results by index """ - Run evaluations asynchronously using a queue for parallel processing. + while True: + try: + index, evaluation_context = queue.get_nowait() + except asyncio.QueueEmpty: + break + + case_name = evaluation_context.name or f"case_{index}" + + # If task execution failed, record failure for all evaluators + task_error = (evaluation_context.metadata or {}).get("task_error") + if task_error: + evaluator_results = [] + for evaluator in self._evaluators: + evaluator_results.append( + { + "evaluator_name": evaluator.get_type_name(), + "test_pass": False, + "score": 0, + "reason": f"Task execution error: {task_error}", + "detailed_results": [], + } + ) + results[index] = { + "case": evaluation_context.model_dump(), + "evaluator_results": evaluator_results, + } + queue.task_done() + continue + + try: + evaluator_results = [] + with self._tracer.start_as_current_span( + f"eval_case {case_name}", + attributes={ + "gen_ai.evaluation.case.name": case_name, + "gen_ai.evaluation.case.input": serialize(evaluation_context.input), + }, + ): + for evaluator in self._evaluators: + + @retry( + retry=retry_if_exception(is_throttling_error), + stop=stop_after_attempt(_MAX_RETRY_ATTEMPTS), + wait=wait_exponential(multiplier=_INITIAL_RETRY_DELAY, max=_MAX_RETRY_DELAY), + reraise=True, + ) + async def _evaluate_with_retry(evaluator=evaluator, evaluation_context=evaluation_context): + outputs = await evaluator.evaluate_async(evaluation_context) + (score, passed, reason) = evaluator.aggregator(outputs) + return outputs, float(score), passed, reason + + try: + with self._tracer.start_as_current_span( + f"evaluator {evaluator.get_type_name()}", + attributes={ + "gen_ai.evaluation.name": evaluator.get_type_name(), + "gen_ai.evaluation.case.name": case_name, + }, + ) as eval_span: + ( + evaluation_outputs, + aggregate_score, + aggregate_pass, + aggregate_reason, + ) = await _evaluate_with_retry() + + try: + label = _get_label_from_score(evaluator, aggregate_score) + except Exception: + label = "UNKNOWN" + + eval_span.set_attributes( + { + "gen_ai.evaluation.score.label": label, + "gen_ai.evaluation.score.value": str(aggregate_score), + "gen_ai.evaluation.test_pass": aggregate_pass, + "gen_ai.evaluation.explanation": aggregate_reason or "", + } + ) + + evaluator_results.append( + { + "evaluator_name": evaluator.get_type_name(), + "test_pass": aggregate_pass, + "score": aggregate_score, + "reason": aggregate_reason or "", + "detailed_results": evaluation_outputs, + } + ) + + self._log_evaluation_to_cloudwatch( + evaluator=evaluator, + eval_span=eval_span, + evaluation_context=evaluation_context, + score=aggregate_score, + reason=aggregate_reason or "", + ) + + except RetryError as e: + original_exception = e.last_attempt.exception() + if original_exception is None: + original_exception = Exception( + f"Evaluator {evaluator.get_type_name()} failed after {_MAX_RETRY_ATTEMPTS} retries" + ) + logger.error( + "evaluator=<%s>, case=<%s>, retries=<%s>, error=<%s> | max retry attempts exceeded", + evaluator.get_type_name(), + case_name, + _MAX_RETRY_ATTEMPTS, + original_exception, + ) + evaluator_results.append( + { + "evaluator_name": evaluator.get_type_name(), + "test_pass": False, + "score": 0, + "reason": f"Evaluator error: {str(original_exception)}", + "detailed_results": [], + } + ) + except Exception as e: + evaluator_results.append( + { + "evaluator_name": evaluator.get_type_name(), + "test_pass": False, + "score": 0, + "reason": f"Evaluator error: {str(e)}", + "detailed_results": [], + } + ) + + results[index] = { + "case": evaluation_context.model_dump(), + "evaluator_results": evaluator_results, + } + finally: + queue.task_done() + + async def run_evaluators_async( + self, + evaluation_data: list[EvaluationData[InputT, OutputT]], + max_workers: int = 10, + ) -> list[EvaluationReport]: + """Run evaluators on pre-computed evaluation data asynchronously. Args: - task: The task function to run on each case. This function should take in InputT and returns - either OutputT or {"output": OutputT, "trajectory": ...}. The task can either run - synchronously or asynchronously. + evaluation_data: Pre-computed evaluation data, e.g. from run_tasks_async() or deserialized + from a previous run. max_workers: Maximum number of parallel workers (default: 10) Returns: - List of EvaluationReport objects, one for each evaluator, containing evaluation results + A list of EvaluationReport objects, one per evaluator. """ - queue: asyncio.Queue[Case[InputT, OutputT]] = asyncio.Queue() - results: list[Any] = [] + queue: asyncio.Queue[tuple[int, EvaluationData[InputT, OutputT]]] = asyncio.Queue() + results: list[dict | None] = [None] * len(evaluation_data) - for case in self._cases: - queue.put_nowait(case) + for index, data in enumerate(evaluation_data): + queue.put_nowait((index, data)) - num_workers = min(max_workers, len(self._cases)) + num_workers = min(max_workers, len(evaluation_data)) if evaluation_data else 0 - workers = [asyncio.create_task(self._worker(queue, task, results)) for _ in range(num_workers)] + workers = [asyncio.create_task(self._evaluator_worker(queue, results)) for _ in range(num_workers)] await queue.join() for worker in workers: @@ -699,18 +862,11 @@ async def run_evaluations_async(self, task: Callable, max_workers: int = 10) -> await asyncio.gather(*workers, return_exceptions=True) # Organize results by evaluator - evaluator_data: dict[str, dict[str, list]] = { - evaluator.get_type_name(): { - "scores": [], - "test_passes": [], - "cases": [], - "reasons": [], - "detailed_results": [], - } - for evaluator in self._evaluators - } + evaluator_data = self._init_evaluator_data() for result in results: + if result is None: + continue case_data = result["case"] for eval_result in result["evaluator_results"]: eval_name = eval_result["evaluator_name"] @@ -720,23 +876,38 @@ async def run_evaluations_async(self, task: Callable, max_workers: int = 10) -> evaluator_data[eval_name]["reasons"].append(eval_result["reason"]) evaluator_data[eval_name]["detailed_results"].append(eval_result["detailed_results"]) - reports = [] - for evaluator in self._evaluators: - eval_name = evaluator.get_type_name() - data = evaluator_data[eval_name] - scores = data["scores"] - report = EvaluationReport( - evaluator_name=eval_name, - overall_score=sum(scores) / len(scores) if scores else 0, - scores=scores, - test_passes=data["test_passes"], - cases=data["cases"], - reasons=data["reasons"], - detailed_results=data["detailed_results"], - ) - reports.append(report) + return self._build_reports(evaluator_data) - return reports + def run_evaluations( + self, task: Callable[[Case[InputT, OutputT]], OutputT | dict[str, Any]] + ) -> list[EvaluationReport]: + """Run tasks and evaluators in sequence. + + Convenience method equivalent to run_evaluators(run_tasks(task)). + + Args: + task: The task to run on each case. Takes a Case and returns either + OutputT or {"output": OutputT, "trajectory": ...}. + + Returns: + A list of EvaluationReport objects, one per evaluator. + """ + return self.run_evaluators(self.run_tasks(task)) + + async def run_evaluations_async(self, task: Callable, max_workers: int = 10) -> list[EvaluationReport]: + """Run tasks and evaluators asynchronously in sequence. + + Convenience method equivalent to run_evaluators_async(run_tasks_async(task)). + + Args: + task: The task function to run on each case. Can be sync or async. + max_workers: Maximum number of parallel workers (default: 10) + + Returns: + A list of EvaluationReport objects, one per evaluator. + """ + evaluation_data = await self.run_tasks_async(task, max_workers) + return await self.run_evaluators_async(evaluation_data, max_workers) def to_dict(self) -> dict: """ diff --git a/tests/strands_evals/test_experiment.py b/tests/strands_evals/test_experiment.py index f1e52fc..03baf03 100644 --- a/tests/strands_evals/test_experiment.py +++ b/tests/strands_evals/test_experiment.py @@ -183,7 +183,7 @@ def simple_task(c): assert result.name == "test" assert result.expected_trajectory is None assert result.actual_trajectory is None - assert result.metadata is None + assert "session_id" in result.metadata assert result.actual_interactions is None assert result.expected_interactions is None @@ -1003,20 +1003,18 @@ async def test_experiment_run_evaluations_async_creates_spans(mock_span): experiment = Experiment(cases=[case], evaluators=[MockEvaluator()]) with patch.object(experiment._tracer, "start_as_current_span", return_value=mock_span) as mock_start_span: - with patch("strands_evals.experiment.format_trace_id", return_value="mock_trace_id"): - async def async_task(c): - return c.input + async def async_task(c): + return c.input - await experiment.run_evaluations_async(async_task) + await experiment.run_evaluations_async(async_task) - # Verify both execute_case and evaluator spans were created - calls = mock_start_span.call_args_list - assert len(calls) == 2 - execute_case_span_call = calls[0] - evaluator_span_call = calls[1] - assert execute_case_span_call[0][0] == "execute_case async_test" - assert evaluator_span_call[0][0] == "evaluator MockEvaluator" + # Verify spans were created for task and evaluation phases + calls = mock_start_span.call_args_list + span_names = [call[0][0] for call in calls] + assert any("run_task" in name for name in span_names) + assert any("eval_case" in name for name in span_names) + assert any("evaluator MockEvaluator" in name for name in span_names) @pytest.mark.asyncio @@ -1048,25 +1046,24 @@ async def test_experiment_run_evaluations_async_with_dict_output(mock_span): experiment = Experiment(cases=[case], evaluators=[MockEvaluator()]) with patch.object(experiment._tracer, "start_as_current_span", return_value=mock_span): - with patch("strands_evals.experiment.format_trace_id", return_value="mock_trace_id"): - async def async_task_with_dict(c): - return {"output": c.input, "trajectory": ["step1"], "interactions": interactions} + async def async_task_with_dict(c): + return {"output": c.input, "trajectory": ["step1"], "interactions": interactions} - await experiment.run_evaluations_async(async_task_with_dict) + await experiment.run_evaluations_async(async_task_with_dict) - # Check that set_attributes was called (trajectory/interactions are set via set_attributes) - mock_span.set_attributes.assert_called() - # Verify has_trajectory and has_interactions flags are set - set_attrs_calls = mock_span.set_attributes.call_args_list - has_trajectory_set = any( - "gen_ai.evaluation.data.has_trajectory" in call[0][0] for call in set_attrs_calls if call[0] - ) - has_interactions_set = any( - "gen_ai.evaluation.data.has_interactions" in call[0][0] for call in set_attrs_calls if call[0] - ) - assert has_trajectory_set - assert has_interactions_set + # Check that set_attributes was called (trajectory/interactions are set via set_attributes) + mock_span.set_attributes.assert_called() + # Verify has_trajectory and has_interactions flags are set + set_attrs_calls = mock_span.set_attributes.call_args_list + has_trajectory_set = any( + "gen_ai.evaluation.data.has_trajectory" in call[0][0] for call in set_attrs_calls if call[0] + ) + has_interactions_set = any( + "gen_ai.evaluation.data.has_interactions" in call[0][0] for call in set_attrs_calls if call[0] + ) + assert has_trajectory_set + assert has_interactions_set def test_experiment_run_evaluations_multiple_cases(mock_span, simple_task): @@ -1351,7 +1348,7 @@ async def always_throttling_task(c): assert len(reports) == 1 assert reports[0].scores[0] == 0 assert reports[0].test_passes[0] is False - assert "An error occurred" in reports[0].reasons[0] + assert "Task execution error" in reports[0].reasons[0] @pytest.mark.asyncio @@ -1682,3 +1679,272 @@ def test_deterministic_evaluator_error_isolation(): # Equals still ran successfully despite the ThrowingEvaluator failure assert reports[1].scores == [1.0] assert reports[1].test_passes == [True] + + +# ============================================================ +# run_tasks tests +# ============================================================ + + +def test_run_tasks_returns_evaluation_data(mock_evaluator): + """Test run_tasks returns list of EvaluationData with correct fields populated""" + cases = [ + Case(name="case1", input="hello", expected_output="world"), + Case(name="case2", input="foo", expected_output="bar"), + ] + experiment = Experiment(cases=cases, evaluators=[mock_evaluator]) + + def echo_task(c): + return c.input + + results = experiment.run_tasks(echo_task) + + assert len(results) == 2 + assert results[0].input == "hello" + assert results[0].actual_output == "hello" + assert results[0].expected_output == "world" + assert results[0].name == "case1" + assert results[1].input == "foo" + assert results[1].actual_output == "foo" + assert results[1].expected_output == "bar" + assert results[1].name == "case2" + + +def test_run_tasks_with_dict_output(mock_evaluator): + """Test run_tasks handles task returning dict with output/trajectory/interactions""" + interactions = [{"node_name": "agent1", "dependencies": [], "messages": ["hello"]}] + cases = [Case(name="test", input="hello", expected_output="world", expected_interactions=interactions)] + experiment = Experiment(cases=cases, evaluators=[mock_evaluator]) + + def dict_task(c): + return { + "output": f"response to {c.input}", + "trajectory": ["step1", "step2"], + "interactions": interactions, + } + + results = experiment.run_tasks(dict_task) + + assert len(results) == 1 + assert results[0].actual_output == "response to hello" + assert results[0].actual_trajectory == ["step1", "step2"] + assert results[0].actual_interactions == interactions + assert results[0].expected_output == "world" + assert results[0].expected_interactions == interactions + + +def test_run_tasks_handles_task_failure(mock_evaluator): + """Test run_tasks returns EvaluationData with actual_output=None and error in metadata on failure""" + cases = [ + Case(name="will_fail", input="hello", expected_output="world"), + ] + experiment = Experiment(cases=cases, evaluators=[mock_evaluator]) + + def failing_task(c): + raise RuntimeError("Task exploded") + + results = experiment.run_tasks(failing_task) + + assert len(results) == 1 + assert results[0].actual_output is None + assert results[0].input == "hello" + assert results[0].expected_output == "world" + assert results[0].name == "will_fail" + assert "task_error" in results[0].metadata + assert "Task exploded" in results[0].metadata["task_error"] + + +# ============================================================ +# run_evaluators tests +# ============================================================ + + +def test_run_evaluators_with_precomputed_data(mock_evaluator): + """Test run_evaluators evaluates pre-computed EvaluationData without running tasks""" + evaluation_data = [ + EvaluationData(input="hello", actual_output="hello", expected_output="hello", name="match"), + EvaluationData(input="foo", actual_output="foo", expected_output="bar", name="no_match"), + ] + experiment = Experiment(evaluators=[mock_evaluator]) + + reports = experiment.run_evaluators(evaluation_data) + + assert len(reports) == 1 + assert reports[0].evaluator_name == "MockEvaluator" + assert reports[0].scores == [1.0, 0.0] + assert reports[0].test_passes == [True, False] + assert reports[0].overall_score == 0.5 + + +def test_run_evaluators_handles_evaluator_failure(): + """Test run_evaluators isolates evaluator errors without crashing""" + evaluation_data = [ + EvaluationData(input="hello", actual_output="hello", expected_output="hello", name="test"), + ] + experiment = Experiment(evaluators=[ThrowingEvaluator(), MockEvaluator()]) + + reports = experiment.run_evaluators(evaluation_data) + + assert len(reports) == 2 + # ThrowingEvaluator should fail gracefully + assert reports[0].scores == [0] + assert reports[0].test_passes == [False] + assert "Evaluator exploded" in reports[0].reasons[0] + # MockEvaluator should still succeed + assert reports[1].scores == [1.0] + assert reports[1].test_passes == [True] + + +def test_run_evaluators_with_multiple_evaluators(): + """Test run_evaluators with multiple evaluators on precomputed data""" + evaluation_data = [ + EvaluationData(input="hello", actual_output="hello", expected_output="hello", name="test"), + ] + experiment = Experiment(evaluators=[MockEvaluator(), MockEvaluator2()]) + + reports = experiment.run_evaluators(evaluation_data) + + assert len(reports) == 2 + assert reports[0].evaluator_name == "MockEvaluator" + assert reports[0].scores == [1.0] + assert reports[1].evaluator_name == "MockEvaluator2" + assert reports[1].scores == [0.5] + + +# ============================================================ +# Roundtrip and serialization tests +# ============================================================ + + +def test_roundtrip_run_tasks_then_run_evaluators(mock_evaluator): + """Test that run_tasks output feeds correctly into run_evaluators""" + cases = [ + Case(name="match", input="hello", expected_output="hello"), + Case(name="no_match", input="foo", expected_output="bar"), + ] + experiment = Experiment(cases=cases, evaluators=[mock_evaluator]) + + def echo_task(c): + return c.input + + # Two-step flow + evaluation_data = experiment.run_tasks(echo_task) + reports = experiment.run_evaluators(evaluation_data) + + assert len(reports) == 1 + assert reports[0].scores == [1.0, 0.0] + assert reports[0].test_passes == [True, False] + assert reports[0].overall_score == 0.5 + + +def test_run_evaluators_with_serialized_data(mock_evaluator): + """Test that EvaluationData survives serialization roundtrip and can still be evaluated""" + cases = [ + Case(name="match", input="hello", expected_output="hello"), + ] + experiment = Experiment(cases=cases, evaluators=[mock_evaluator]) + + def echo_task(c): + return c.input + + # Run tasks, serialize, deserialize, then evaluate + evaluation_data = experiment.run_tasks(echo_task) + serialized = [d.model_dump() for d in evaluation_data] + deserialized = [EvaluationData.model_validate(d) for d in serialized] + + reports = experiment.run_evaluators(deserialized) + + assert len(reports) == 1 + assert reports[0].scores == [1.0] + assert reports[0].test_passes == [True] + + +# ============================================================ +# Async run_tasks and run_evaluators tests +# ============================================================ + + +@pytest.mark.asyncio +async def test_run_tasks_async_returns_evaluation_data(): + """Test run_tasks_async returns list of EvaluationData with correct fields""" + cases = [ + Case(name="case1", input="hello", expected_output="world"), + Case(name="case2", input="foo", expected_output="bar"), + ] + experiment = Experiment(cases=cases, evaluators=[MockEvaluator()]) + + def echo_task(c): + return c.input + + results = await experiment.run_tasks_async(echo_task) + + assert len(results) == 2 + assert results[0].actual_output == "hello" + assert results[0].expected_output == "world" + assert results[1].actual_output == "foo" + assert results[1].expected_output == "bar" + + +@pytest.mark.asyncio +async def test_run_tasks_async_with_async_task(): + """Test run_tasks_async works with async task functions""" + cases = [Case(name="test", input="hello", expected_output="world")] + experiment = Experiment(cases=cases, evaluators=[MockEvaluator()]) + + async def async_task(c): + return c.input + + results = await experiment.run_tasks_async(async_task) + + assert len(results) == 1 + assert results[0].actual_output == "hello" + + +@pytest.mark.asyncio +async def test_run_tasks_async_handles_task_failure(): + """Test run_tasks_async handles task failure with error in metadata""" + cases = [Case(name="will_fail", input="hello", expected_output="world")] + experiment = Experiment(cases=cases, evaluators=[MockEvaluator()]) + + def failing_task(c): + raise RuntimeError("Async task exploded") + + results = await experiment.run_tasks_async(failing_task) + + assert len(results) == 1 + assert results[0].actual_output is None + assert "task_error" in results[0].metadata + assert "Async task exploded" in results[0].metadata["task_error"] + + +@pytest.mark.asyncio +async def test_run_evaluators_async_with_precomputed_data(): + """Test run_evaluators_async evaluates pre-computed EvaluationData""" + evaluation_data = [ + EvaluationData(input="hello", actual_output="hello", expected_output="hello", name="match"), + EvaluationData(input="foo", actual_output="foo", expected_output="bar", name="no_match"), + ] + experiment = Experiment(evaluators=[MockEvaluator()]) + + reports = await experiment.run_evaluators_async(evaluation_data) + + assert len(reports) == 1 + assert reports[0].scores == [1.0, 0.0] + assert reports[0].test_passes == [True, False] + assert reports[0].overall_score == 0.5 + + +@pytest.mark.asyncio +async def test_run_evaluators_async_handles_evaluator_failure(): + """Test run_evaluators_async isolates evaluator errors""" + evaluation_data = [ + EvaluationData(input="hello", actual_output="hello", expected_output="hello", name="test"), + ] + experiment = Experiment(evaluators=[ThrowingEvaluator(), MockEvaluator()]) + + reports = await experiment.run_evaluators_async(evaluation_data) + + assert len(reports) == 2 + assert reports[0].scores == [0] + assert "Async evaluator exploded" in reports[0].reasons[0] + assert reports[1].scores == [1.0]