Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aieng-eval-agents/aieng/agent_evals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
display_success,
display_warning,
)
from .progress import create_progress, track_with_progress


__all__ = [
Expand All @@ -30,4 +31,7 @@
"display_success",
"display_info",
"display_warning",
# Progress utilities
"create_progress",
"track_with_progress",
]
196 changes: 173 additions & 23 deletions aieng-eval-agents/aieng/agent_evals/langfuse.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
"""Functions and objects pertaining to Langfuse."""

import base64
import hashlib
import json
import logging
import os
from pathlib import Path
from typing import Any, Literal

import logfire
import nest_asyncio
from aieng.agent_evals.async_client_manager import AsyncClientManager
from aieng.agent_evals.configs import Configs
from aieng.agent_evals.progress import track_with_progress
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
Expand Down Expand Up @@ -193,40 +197,186 @@ async def upload_dataset_to_langfuse(dataset_path: str, dataset_name: str):
Parameters
----------
dataset_path : str
Path to the dataset to upload. The dataset must be a json file
containing a list of dictionaries. Each dictionary must contain a
`input` and `expected_output` keys. Additionally, it can include
an `id` key that will be added to the metadata of the dataset item.
Path to the dataset to upload.

Supported formats:
- ``.json``: a JSON array of records.
- ``.jsonl``: one JSON record per non-empty line.

Each record must contain ``input`` and ``expected_output`` keys.
Records may optionally include:
- ``id``: item identifier stored in metadata.
- ``metadata``: additional dictionary metadata merged into upload metadata.
dataset_name : str
Name of the dataset to upload.

Raises
------
ValueError
If the dataset format is invalid or a record is malformed.
FileNotFoundError
If ``dataset_path`` does not exist.
"""
dataset_file = Path(dataset_path)

# Get the client manager singleton instance and langfuse client
client_manager = AsyncClientManager.get_instance()
langfuse_client = client_manager.langfuse_client

# Load the ground truth dataset from the file path
logger.info(f"Loading dataset from '{dataset_path}'")
with open(dataset_path, "r") as file:
dataset = json.load(file)

# Create the dataset in Langfuse
langfuse_client.create_dataset(name=dataset_name)

# Upload each item to the dataset
for item in dataset:
assert "input" in item, "`input` is required for all items in the dataset"
assert "expected_output" in item, "`expected_output` is required for all items in the dataset"

# Parse once so the upload loop has deterministic progress totals.
logger.info("Loading dataset from '%s'", dataset_file)
dataset_format = _detect_dataset_format(dataset_file)
records = _load_dataset_records(dataset_file, dataset_format)

# Create dataset if missing; if it already exists we reuse it.
_ensure_dataset_exists(langfuse_client=langfuse_client, dataset_name=dataset_name)

# We centralize metadata normalization to keep uploader behavior
# consistent across JSON and JSONL sources.
for record_number, item in track_with_progress(
records,
description=f"Uploading Langfuse dataset '{dataset_name}'",
total=len(records),
transient=True, # Clear progress bar when done
):
normalized = _normalize_dataset_record(item=item, record_number=record_number)
item_id = _build_dataset_item_id(
dataset_name=dataset_name,
input_payload=normalized["input"],
expected_output_payload=normalized["expected_output"],
)
langfuse_client.create_dataset_item(
dataset_name=dataset_name,
input=item["input"],
expected_output=item["expected_output"],
metadata={
"id": item.get("id", None),
},
id=item_id, # Globally unique ID of deduplication
input=normalized["input"],
expected_output=normalized["expected_output"],
metadata=normalized["metadata"],
)

logger.info(f"Uploaded {len(dataset)} items to dataset '{dataset_name}'")
logger.info("Uploaded %d items to dataset '%s'", len(records), dataset_name)

# Gracefully close the services
await client_manager.close()


def _detect_dataset_format(dataset_file: Path) -> Literal["json", "jsonl"]:
"""Detect dataset format from extension or first non-empty content line."""
suffix = dataset_file.suffix.lower()
if suffix == ".jsonl":
return "jsonl"
if suffix == ".json":
return "json"

with dataset_file.open("r", encoding="utf-8") as file:
for line in file:
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("["):
return "json"
return "jsonl"

raise ValueError(f"Dataset file is empty: {dataset_file}")


def _ensure_dataset_exists(*, langfuse_client: Any, dataset_name: str) -> None:
"""Ensure the target dataset exists before item uploads."""
try:
langfuse_client.create_dataset(name=dataset_name)
return
except Exception as exc:
# We only continue if the dataset can be retrieved
try:
langfuse_client.get_dataset(dataset_name)
logger.info("Dataset '%s' already exists; appending/upserting items.", dataset_name)
return
except Exception as retrieval_exc:
raise exc from retrieval_exc


def _build_dataset_item_id(
*,
dataset_name: str,
input_payload: Any,
expected_output_payload: Any,
) -> str:
"""Build a deterministic, globally-unique dataset item ID."""
canonical = json.dumps(
{
"input": input_payload,
"expected_output": expected_output_payload,
},
sort_keys=True,
separators=(",", ":"),
ensure_ascii=True,
)
digest = hashlib.sha256(canonical.encode("utf-8")).hexdigest()
return f"{dataset_name}:{digest}"


def _load_dataset_records(
dataset_file: Path, dataset_format: Literal["json", "jsonl"]
) -> list[tuple[int, dict[str, Any]]]:
"""Load dataset records and preserve stable record numbers."""
if dataset_format == "json":
return _load_json_records(dataset_file)
return _load_jsonl_records(dataset_file)


def _load_json_records(dataset_file: Path) -> list[tuple[int, dict[str, Any]]]:
"""Load records from a JSON array file."""
with dataset_file.open("r", encoding="utf-8") as file:
loaded = json.load(file)

if not isinstance(loaded, list):
raise ValueError(f"JSON dataset must be a list of records: {dataset_file}")

return [(index, record) for index, record in enumerate(loaded, start=1)]


def _load_jsonl_records(dataset_file: Path) -> list[tuple[int, dict[str, Any]]]:
"""Load records from a JSONL file with line-number-aware errors."""
records: list[tuple[int, dict[str, Any]]] = []

with dataset_file.open("r", encoding="utf-8") as file:
for line_number, line in enumerate(file, start=1):
stripped = line.strip()
if not stripped:
continue
try:
parsed = json.loads(stripped)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSONL record at line {line_number} in '{dataset_file}': {exc.msg}") from exc
records.append((line_number, parsed))

if not records:
raise ValueError(f"JSONL dataset has no records: {dataset_file}")

return records


def _normalize_dataset_record(item: Any, record_number: int) -> dict[str, Any]:
"""Validate and normalize one dataset record for upload."""
if not isinstance(item, dict):
raise ValueError(f"Record {record_number} must be an object.")

if "input" not in item:
raise ValueError(f"Record {record_number} is missing required key: 'input'")
if "expected_output" not in item:
raise ValueError(f"Record {record_number} is missing required key: 'expected_output'")

raw_metadata = item.get("metadata", {})
if raw_metadata is None:
raw_metadata = {}
if not isinstance(raw_metadata, dict):
raise ValueError(f"Record {record_number} has non-object metadata.")

derived_id = item.get("id", record_number)
metadata = dict(raw_metadata)
metadata["id"] = derived_id

return {
"input": item["input"],
"expected_output": item["expected_output"],
"metadata": metadata,
}
95 changes: 95 additions & 0 deletions aieng-eval-agents/aieng/agent_evals/progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Reusable Rich progress utilities.

This module provides a consistent progress bar style for long-running
workflows across the repository.
"""

from collections.abc import Iterable, Iterator
from typing import TypeVar

from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn, TimeRemainingColumn


T = TypeVar("T")


def create_progress(*, transient: bool = False) -> Progress:
"""Create a standardized Rich ``Progress`` instance.

Parameters
----------
transient : bool, optional, default=False
Whether to clear the progress display after completion.
Defaults to ``False`` so users can inspect completion state.

Returns
-------
Progress
Configured progress renderer with consistent columns.

Examples
--------
>>> from aieng.agent_evals.progress import create_progress
>>> progress = create_progress()
>>> with progress:
... task_id = progress.add_task("Uploading", total=10)
... progress.update(task_id, advance=10)
"""
return Progress(
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TextColumn("{task.completed}/{task.total}"),
TimeRemainingColumn(),
TimeElapsedColumn(),
transient=transient,
)


def track_with_progress(
iterable: Iterable[T],
*,
description: str,
total: int | float | None = None,
transient: bool = False,
) -> Iterator[T]:
"""Iterate items while displaying a progress bar.

This is a ``tqdm``-style helper for common loops. Pass an iterable and iterate
as usual.

Parameters
----------
iterable : Iterable[T]
Iterable of items to process.
description : str
Human-readable task description displayed in the progress bar.
total : int or float or None, optional, default=None
Expected number of units of work. If omitted, this function attempts
to infer the total from ``len(iterable)`` when available.
transient : bool, optional, default=False
Whether to clear the progress display after completion.

Examples
--------
>>> from aieng.agent_evals.progress import track_with_progress
>>> for item in track_with_progress([1, 2, 3], description="Uploading"):
... _ = item
"""
resolved_total = total if total is not None else _infer_total(iterable)

with create_progress(transient=transient) as progress:
task_id = progress.add_task(description, total=resolved_total)
for item in iterable:
yield item
progress.update(task_id, advance=1)


def _infer_total(iterable: Iterable[T]) -> int | None:
"""Infer iterable size when ``len`` is available."""
try:
return len(iterable) # type: ignore[arg-type]
except TypeError:
return None


__all__ = ["create_progress", "track_with_progress"]
Loading