diff --git a/aieng-eval-agents/aieng/agent_evals/__init__.py b/aieng-eval-agents/aieng/agent_evals/__init__.py index 924e7d08..4ac3033b 100644 --- a/aieng-eval-agents/aieng/agent_evals/__init__.py +++ b/aieng-eval-agents/aieng/agent_evals/__init__.py @@ -16,6 +16,7 @@ display_success, display_warning, ) +from .progress import create_progress, track_with_progress __all__ = [ @@ -30,4 +31,7 @@ "display_success", "display_info", "display_warning", + # Progress utilities + "create_progress", + "track_with_progress", ] diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index 2831d5b0..8102ae3e 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -1,14 +1,18 @@ """Functions and objects pertaining to Langfuse.""" import base64 +import hashlib import json import logging import os +from pathlib import Path +from typing import Any, Literal import logfire import nest_asyncio from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.configs import Configs +from aieng.agent_evals.progress import track_with_progress from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource @@ -193,40 +197,186 @@ async def upload_dataset_to_langfuse(dataset_path: str, dataset_name: str): Parameters ---------- dataset_path : str - Path to the dataset to upload. The dataset must be a json file - containing a list of dictionaries. Each dictionary must contain a - `input` and `expected_output` keys. Additionally, it can include - an `id` key that will be added to the metadata of the dataset item. + Path to the dataset to upload. + + Supported formats: + - ``.json``: a JSON array of records. + - ``.jsonl``: one JSON record per non-empty line. + + Each record must contain ``input`` and ``expected_output`` keys. + Records may optionally include: + - ``id``: item identifier stored in metadata. + - ``metadata``: additional dictionary metadata merged into upload metadata. dataset_name : str Name of the dataset to upload. + + Raises + ------ + ValueError + If the dataset format is invalid or a record is malformed. + FileNotFoundError + If ``dataset_path`` does not exist. """ + dataset_file = Path(dataset_path) + # Get the client manager singleton instance and langfuse client client_manager = AsyncClientManager.get_instance() langfuse_client = client_manager.langfuse_client - # Load the ground truth dataset from the file path - logger.info(f"Loading dataset from '{dataset_path}'") - with open(dataset_path, "r") as file: - dataset = json.load(file) - - # Create the dataset in Langfuse - langfuse_client.create_dataset(name=dataset_name) - - # Upload each item to the dataset - for item in dataset: - assert "input" in item, "`input` is required for all items in the dataset" - assert "expected_output" in item, "`expected_output` is required for all items in the dataset" - + # Parse once so the upload loop has deterministic progress totals. + logger.info("Loading dataset from '%s'", dataset_file) + dataset_format = _detect_dataset_format(dataset_file) + records = _load_dataset_records(dataset_file, dataset_format) + + # Create dataset if missing; if it already exists we reuse it. + _ensure_dataset_exists(langfuse_client=langfuse_client, dataset_name=dataset_name) + + # We centralize metadata normalization to keep uploader behavior + # consistent across JSON and JSONL sources. + for record_number, item in track_with_progress( + records, + description=f"Uploading Langfuse dataset '{dataset_name}'", + total=len(records), + transient=True, # Clear progress bar when done + ): + normalized = _normalize_dataset_record(item=item, record_number=record_number) + item_id = _build_dataset_item_id( + dataset_name=dataset_name, + input_payload=normalized["input"], + expected_output_payload=normalized["expected_output"], + ) langfuse_client.create_dataset_item( dataset_name=dataset_name, - input=item["input"], - expected_output=item["expected_output"], - metadata={ - "id": item.get("id", None), - }, + id=item_id, # Globally unique ID of deduplication + input=normalized["input"], + expected_output=normalized["expected_output"], + metadata=normalized["metadata"], ) - logger.info(f"Uploaded {len(dataset)} items to dataset '{dataset_name}'") + logger.info("Uploaded %d items to dataset '%s'", len(records), dataset_name) # Gracefully close the services await client_manager.close() + + +def _detect_dataset_format(dataset_file: Path) -> Literal["json", "jsonl"]: + """Detect dataset format from extension or first non-empty content line.""" + suffix = dataset_file.suffix.lower() + if suffix == ".jsonl": + return "jsonl" + if suffix == ".json": + return "json" + + with dataset_file.open("r", encoding="utf-8") as file: + for line in file: + stripped = line.strip() + if not stripped: + continue + if stripped.startswith("["): + return "json" + return "jsonl" + + raise ValueError(f"Dataset file is empty: {dataset_file}") + + +def _ensure_dataset_exists(*, langfuse_client: Any, dataset_name: str) -> None: + """Ensure the target dataset exists before item uploads.""" + try: + langfuse_client.create_dataset(name=dataset_name) + return + except Exception as exc: + # We only continue if the dataset can be retrieved + try: + langfuse_client.get_dataset(dataset_name) + logger.info("Dataset '%s' already exists; appending/upserting items.", dataset_name) + return + except Exception as retrieval_exc: + raise exc from retrieval_exc + + +def _build_dataset_item_id( + *, + dataset_name: str, + input_payload: Any, + expected_output_payload: Any, +) -> str: + """Build a deterministic, globally-unique dataset item ID.""" + canonical = json.dumps( + { + "input": input_payload, + "expected_output": expected_output_payload, + }, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=True, + ) + digest = hashlib.sha256(canonical.encode("utf-8")).hexdigest() + return f"{dataset_name}:{digest}" + + +def _load_dataset_records( + dataset_file: Path, dataset_format: Literal["json", "jsonl"] +) -> list[tuple[int, dict[str, Any]]]: + """Load dataset records and preserve stable record numbers.""" + if dataset_format == "json": + return _load_json_records(dataset_file) + return _load_jsonl_records(dataset_file) + + +def _load_json_records(dataset_file: Path) -> list[tuple[int, dict[str, Any]]]: + """Load records from a JSON array file.""" + with dataset_file.open("r", encoding="utf-8") as file: + loaded = json.load(file) + + if not isinstance(loaded, list): + raise ValueError(f"JSON dataset must be a list of records: {dataset_file}") + + return [(index, record) for index, record in enumerate(loaded, start=1)] + + +def _load_jsonl_records(dataset_file: Path) -> list[tuple[int, dict[str, Any]]]: + """Load records from a JSONL file with line-number-aware errors.""" + records: list[tuple[int, dict[str, Any]]] = [] + + with dataset_file.open("r", encoding="utf-8") as file: + for line_number, line in enumerate(file, start=1): + stripped = line.strip() + if not stripped: + continue + try: + parsed = json.loads(stripped) + except json.JSONDecodeError as exc: + raise ValueError(f"Invalid JSONL record at line {line_number} in '{dataset_file}': {exc.msg}") from exc + records.append((line_number, parsed)) + + if not records: + raise ValueError(f"JSONL dataset has no records: {dataset_file}") + + return records + + +def _normalize_dataset_record(item: Any, record_number: int) -> dict[str, Any]: + """Validate and normalize one dataset record for upload.""" + if not isinstance(item, dict): + raise ValueError(f"Record {record_number} must be an object.") + + if "input" not in item: + raise ValueError(f"Record {record_number} is missing required key: 'input'") + if "expected_output" not in item: + raise ValueError(f"Record {record_number} is missing required key: 'expected_output'") + + raw_metadata = item.get("metadata", {}) + if raw_metadata is None: + raw_metadata = {} + if not isinstance(raw_metadata, dict): + raise ValueError(f"Record {record_number} has non-object metadata.") + + derived_id = item.get("id", record_number) + metadata = dict(raw_metadata) + metadata["id"] = derived_id + + return { + "input": item["input"], + "expected_output": item["expected_output"], + "metadata": metadata, + } diff --git a/aieng-eval-agents/aieng/agent_evals/progress.py b/aieng-eval-agents/aieng/agent_evals/progress.py new file mode 100644 index 00000000..69f678f4 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/progress.py @@ -0,0 +1,95 @@ +"""Reusable Rich progress utilities. + +This module provides a consistent progress bar style for long-running +workflows across the repository. +""" + +from collections.abc import Iterable, Iterator +from typing import TypeVar + +from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn, TimeRemainingColumn + + +T = TypeVar("T") + + +def create_progress(*, transient: bool = False) -> Progress: + """Create a standardized Rich ``Progress`` instance. + + Parameters + ---------- + transient : bool, optional, default=False + Whether to clear the progress display after completion. + Defaults to ``False`` so users can inspect completion state. + + Returns + ------- + Progress + Configured progress renderer with consistent columns. + + Examples + -------- + >>> from aieng.agent_evals.progress import create_progress + >>> progress = create_progress() + >>> with progress: + ... task_id = progress.add_task("Uploading", total=10) + ... progress.update(task_id, advance=10) + """ + return Progress( + TextColumn("[bold blue]{task.description}"), + BarColumn(), + TextColumn("{task.completed}/{task.total}"), + TimeRemainingColumn(), + TimeElapsedColumn(), + transient=transient, + ) + + +def track_with_progress( + iterable: Iterable[T], + *, + description: str, + total: int | float | None = None, + transient: bool = False, +) -> Iterator[T]: + """Iterate items while displaying a progress bar. + + This is a ``tqdm``-style helper for common loops. Pass an iterable and iterate + as usual. + + Parameters + ---------- + iterable : Iterable[T] + Iterable of items to process. + description : str + Human-readable task description displayed in the progress bar. + total : int or float or None, optional, default=None + Expected number of units of work. If omitted, this function attempts + to infer the total from ``len(iterable)`` when available. + transient : bool, optional, default=False + Whether to clear the progress display after completion. + + Examples + -------- + >>> from aieng.agent_evals.progress import track_with_progress + >>> for item in track_with_progress([1, 2, 3], description="Uploading"): + ... _ = item + """ + resolved_total = total if total is not None else _infer_total(iterable) + + with create_progress(transient=transient) as progress: + task_id = progress.add_task(description, total=resolved_total) + for item in iterable: + yield item + progress.update(task_id, advance=1) + + +def _infer_total(iterable: Iterable[T]) -> int | None: + """Infer iterable size when ``len`` is available.""" + try: + return len(iterable) # type: ignore[arg-type] + except TypeError: + return None + + +__all__ = ["create_progress", "track_with_progress"] diff --git a/aieng-eval-agents/tests/aieng/agent_evals/test_langfuse.py b/aieng-eval-agents/tests/aieng/agent_evals/test_langfuse.py new file mode 100644 index 00000000..b9afa40c --- /dev/null +++ b/aieng-eval-agents/tests/aieng/agent_evals/test_langfuse.py @@ -0,0 +1,183 @@ +"""Tests for Langfuse helper utilities.""" + +import json +from pathlib import Path +from typing import Any, Generator +from unittest.mock import AsyncMock, MagicMock + +import pytest +from aieng.agent_evals import langfuse as langfuse_module +from aieng.agent_evals.progress import create_progress, track_with_progress + + +class _TrackRecorder: + """Track helper stand-in for uploader tests.""" + + def __init__(self) -> None: + self.descriptions: list[str] = [] + self.totals: list[int | float | None] = [] + self.item_count = 0 + + def wrap( + self, iterable, *, description: str, total: int | float | None = None, transient: bool = False + ) -> Generator[Any, Any, None]: + """Yield items while recording invocations.""" + del transient + self.descriptions.append(description) + self.totals.append(total) + for item in iterable: + self.item_count += 1 + yield item + + +@pytest.fixture +def mock_client_manager(monkeypatch) -> MagicMock: + """Patch AsyncClientManager singleton with a local mock.""" + manager = MagicMock() + manager.langfuse_client = MagicMock() + manager.close = AsyncMock() + manager.langfuse_client.create_dataset = MagicMock() + manager.langfuse_client.get_dataset = MagicMock() + manager.langfuse_client.create_dataset_item = MagicMock() + + monkeypatch.setattr(langfuse_module.AsyncClientManager, "get_instance", lambda: manager) + return manager + + +@pytest.fixture +def patch_progress(monkeypatch) -> _TrackRecorder: + """Patch track helper so tests stay quiet and deterministic.""" + recorder = _TrackRecorder() + monkeypatch.setattr(langfuse_module, "track_with_progress", recorder.wrap) + return recorder + + +@pytest.mark.asyncio +async def test_upload_dataset_to_langfuse_json(tmp_path: Path, mock_client_manager, patch_progress) -> None: + """Upload records from JSON and map metadata consistently.""" + dataset_file = tmp_path / "dataset.json" + records = [ + { + "id": "case-1", + "input": {"q": "A"}, + "expected_output": {"a": 1}, + "metadata": {"split": "train"}, + }, + { + "input": {"q": "B"}, + "expected_output": {"a": 2}, + "metadata": {"split": "test"}, + }, + ] + dataset_file.write_text(json.dumps(records), encoding="utf-8") + + await langfuse_module.upload_dataset_to_langfuse(str(dataset_file), "json-dataset") + + mock_client_manager.langfuse_client.create_dataset.assert_called_once_with(name="json-dataset") + assert mock_client_manager.langfuse_client.create_dataset_item.call_count == 2 + assert patch_progress.item_count == 2 + + first_call = mock_client_manager.langfuse_client.create_dataset_item.call_args_list[0].kwargs + second_call = mock_client_manager.langfuse_client.create_dataset_item.call_args_list[1].kwargs + + assert first_call["metadata"] == {"split": "train", "id": "case-1"} + assert second_call["metadata"] == {"split": "test", "id": 2} + assert isinstance(first_call["id"], str) + assert first_call["id"].startswith("json-dataset:") + assert first_call["id"] != second_call["id"] + mock_client_manager.close.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_upload_dataset_to_langfuse_jsonl(tmp_path: Path, mock_client_manager, patch_progress) -> None: + """Upload records from JSONL line-by-line with stable line-based IDs.""" + dataset_file = tmp_path / "dataset.jsonl" + dataset_file.write_text( + "\n".join( + [ + json.dumps({"input": {"q": "A"}, "expected_output": {"a": 1}}), + "", + json.dumps({"input": {"q": "B"}, "expected_output": {"a": 2}, "id": "line-3"}), + ] + ), + encoding="utf-8", + ) + + await langfuse_module.upload_dataset_to_langfuse(str(dataset_file), "jsonl-dataset") + + assert mock_client_manager.langfuse_client.create_dataset_item.call_count == 2 + first_call = mock_client_manager.langfuse_client.create_dataset_item.call_args_list[0].kwargs + second_call = mock_client_manager.langfuse_client.create_dataset_item.call_args_list[1].kwargs + + assert first_call["metadata"]["id"] == 1 + assert second_call["metadata"]["id"] == "line-3" + assert first_call["id"] != second_call["id"] + assert patch_progress.item_count == 2 + + +@pytest.mark.asyncio +async def test_upload_dataset_to_langfuse_reuses_existing_dataset( + tmp_path: Path, mock_client_manager, patch_progress +) -> None: + """Fallback to existing dataset when creation raises and retrieval succeeds.""" + dataset_file = tmp_path / "dataset.jsonl" + dataset_file.write_text( + json.dumps({"input": {"q": "A"}, "expected_output": {"a": 1}}), + encoding="utf-8", + ) + + mock_client_manager.langfuse_client.create_dataset.side_effect = RuntimeError("already exists") + + await langfuse_module.upload_dataset_to_langfuse(str(dataset_file), "existing-dataset") + + mock_client_manager.langfuse_client.get_dataset.assert_called_once_with("existing-dataset") + mock_client_manager.langfuse_client.create_dataset_item.assert_called_once() + + +@pytest.mark.asyncio +async def test_upload_dataset_to_langfuse_invalid_jsonl_reports_line( + tmp_path: Path, mock_client_manager, patch_progress +) -> None: + """Raise a helpful error when JSONL contains a malformed line.""" + dataset_file = tmp_path / "bad.jsonl" + dataset_file.write_text( + "\n".join( + [ + json.dumps({"input": "ok", "expected_output": "ok"}), + '{"input": "broken", "expected_output": ', + ] + ), + encoding="utf-8", + ) + + with pytest.raises(ValueError, match="line 2"): + await langfuse_module.upload_dataset_to_langfuse(str(dataset_file), "bad-jsonl") + + +@pytest.mark.asyncio +async def test_upload_dataset_to_langfuse_missing_required_key( + tmp_path: Path, mock_client_manager, patch_progress +) -> None: + """Raise a clear error when required fields are missing.""" + dataset_file = tmp_path / "missing.json" + dataset_file.write_text( + json.dumps([{"input": "only-input"}]), + encoding="utf-8", + ) + + with pytest.raises(ValueError, match="expected_output"): + await langfuse_module.upload_dataset_to_langfuse(str(dataset_file), "missing-key") + + +def test_progress_helpers_smoke() -> None: + """Create and use shared progress helpers without errors.""" + with create_progress(transient=True) as progress: + task_id = progress.add_task("Smoke", total=2) + progress.update(task_id, advance=1) + progress.update(task_id, advance=1) + + task = progress.tasks[0] + assert task.completed == 2 + + items = list(track_with_progress([1, 2], description="Smoke Track", transient=True)) + assert items == [1, 2]