Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 60 additions & 55 deletions llmeter/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from functools import cached_property
from numbers import Number
from typing import Any, Sequence

Expand Down Expand Up @@ -169,8 +168,8 @@ def load_responses(self) -> list[InvocationResponse]:
InvocationResponse(**json.loads(line)) for line in f if line
]
logger.info("Loaded %d responses from %s", len(self.responses), responses_path)
# Invalidate cached stats so they are recomputed with the loaded responses
self.__dict__.pop("_builtin_stats", None)
# Recompute stats from the freshly loaded responses
self._preloaded_stats = self._compute_stats(self)
return self.responses

@classmethod
Expand Down Expand Up @@ -241,9 +240,9 @@ def load(

result = cls(responses=responses, **summary)

# When skipping responses, load pre-computed stats from stats.json if available
# so that result.stats works without needing the responses
# Load or compute stats
if not load_responses:
# Use pre-computed stats from disk when responses aren't loaded
stats_path = result_path / "stats.json"
if stats_path.exists():
with stats_path.open("r") as s:
Expand All @@ -260,78 +259,84 @@ def load(
pass
else:
result._preloaded_stats = None
else:
# Compute stats from the loaded responses
result._preloaded_stats = cls._compute_stats(result)

return result

@cached_property
def _builtin_stats(self) -> dict:
"""
Default run metrics and aggregated statistics provided by LLMeter core
@classmethod
def _compute_stats(cls, result: "Result") -> dict:
"""Compute stats from in-memory responses.

Users should generally refer to the `.stats` property instead, which combines this data
with any additional values contributed by callbacks or other extensions.
This is the fallback used when ``_preloaded_stats`` is not available — for
example when a ``Result`` is constructed manually or after
:meth:`load_responses` reloads data from disk.

This is a read-only and `@cached_property`, which means the result is computed once and
then cached for subsequent accesses - improving performance.
Args:
result: A ``Result`` instance whose ``responses`` list is populated.

Returns:
stats: A dictionary containing all computed statistics. The keys are:
- All key-value pairs from the Result's dictionary representation
- Test-specific statistics
- Aggregated statistics with keys in the format "{stat_name}-{aggregation_type}"
where stat_name is one of the four metrics listed above, and
aggregation_type includes measures like mean, median, etc.
"""
A flat dictionary matching the ``Result.stats`` schema, containing
run-level metrics (``failed_requests``, ``requests_per_minute``, …)
and per-metric aggregations (``time_to_first_token-p50``, …).

Example::

result = Result(responses=my_responses, total_requests=100, ...)
stats = Result._compute_stats(result)
stats["time_to_first_token-p90"] # 0.485
"""
aggregation_metrics = [
"time_to_last_token",
"time_to_first_token",
"num_tokens_output",
"num_tokens_input",
]

results_stats = _get_stats_from_results(
self,
aggregation_metrics,
)
results_stats = _get_stats_from_results(result, aggregation_metrics)
return {
**self.to_dict(),
**_get_run_stats(self),
**result.to_dict(),
**_get_run_stats(result),
**{f"{k}-{j}": v for k, o in results_stats.items() for j, v in o.items()},
}

@property
def stats(self) -> dict:
"""Run metrics and aggregated statistics over the individual requests.

Returns a flat dictionary combining:

* Basic run information (from ``to_dict()``).
* Aggregated statistics (``average``, ``p50``, ``p90``, ``p99``) for
``time_to_last_token``, ``time_to_first_token``, ``num_tokens_output``,
and ``num_tokens_input``. Keys use the format
``"{metric}-{aggregation}"``.
* Run-level throughput metrics (``requests_per_minute``,
``total_input_tokens``, etc.).
* Any additional stats contributed by callbacks via
:meth:`_update_contributed_stats`.

During a live run, stats are computed incrementally by
:class:`~llmeter.utils.RunningStats` and stored in ``_preloaded_stats``.
When loading from disk with ``load_responses=False``, pre-computed stats
from ``stats.json`` are used. As a fallback (e.g. manually constructed
``Result``), stats are computed on the fly from ``self.responses``.

Returns:
A new shallow copy of the stats dictionary on each access.

Example::

result = await runner.run(payload=my_payload, clients=5)
result.stats["time_to_first_token-p50"] # 0.312
result.stats["requests_per_minute"] # 141.2
result.stats["failed_requests"] # 0
"""
Run metrics and aggregated statistics over the individual requests

This combined view includes:
- Basic information about the run (from the Result's dictionary representation)
- Aggregated statistics ('average', 'p50', 'p90', 'p99') for:
- Time to last token
- Time to first token
- Number of tokens output
- Number of tokens input

Aggregated statistics are keyed in the format "{stat_name}-{aggregation_type}"

This property is read-only and returns a new shallow copy of the data on each access.
Default stats provided by LLMeter are calculated on first access and then cached. Callbacks
Callbacks or other mechanisms needing to augment stats should use the
`_update_contributed_stats()` method.

When the Result was loaded with ``load_responses=False``, pre-computed stats from
``stats.json`` are returned if available. Call ``load_responses()`` to load the
individual responses and recompute stats from the raw data.
"""
# Use preloaded stats when responses were not loaded
if not self.responses and self._preloaded_stats is not None:
if self._preloaded_stats is not None:
stats = self._preloaded_stats.copy()
if self._contributed_stats:
stats.update(self._contributed_stats)
return stats

stats = self._builtin_stats.copy()
else:
# Fallback: compute from responses (e.g. Result constructed manually)
stats = self._compute_stats(self)

if self._contributed_stats:
stats.update(self._contributed_stats)
Expand Down
106 changes: 96 additions & 10 deletions llmeter/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from tqdm.auto import tqdm, trange
from upath import UPath as Path

from llmeter.utils import now_utc
from llmeter.utils import RunningStats, now_utc

if TYPE_CHECKING:
# Avoid circular import: We only need typing for Callback
Expand Down Expand Up @@ -61,6 +61,8 @@ class _RunConfig:
run_description: str | None = None
timeout: int | float = 60
callbacks: list[Callback] | None = None
low_memory: bool = False
progress_bar_stats: dict[str, tuple[str, ...] | str] | None = None
disable_per_client_progress_bar: InitVar[bool] = True
disable_clients_progress_bar: InitVar[bool] = True

Expand Down Expand Up @@ -149,19 +151,35 @@ class _Run(_RunConfig):
"""

def __post_init__(self, disable_client_progress_bar, disable_clients_progress_bar):
assert (
self.run_name is not None
), "Test Run must be created with an explicit run_name"
assert self.run_name is not None, (
"Test Run must be created with an explicit run_name"
)

super().__post_init__(disable_client_progress_bar, disable_clients_progress_bar)

assert (
self.endpoint is not None
), "Test Run must be created with an explicit Endpoint"
assert self.endpoint is not None, (
"Test Run must be created with an explicit Endpoint"
)

self._validate_and_prepare_payload()
self._responses = []

if self.low_memory:
assert self.output_path is not None, (
"output_path is required when low_memory=True "
"(responses must be written to disk)"
)

self._running_stats = RunningStats(
metrics=[
"time_to_last_token",
"time_to_first_token",
"time_per_output_token",
"num_tokens_output",
"num_tokens_input",
]
)

def _validate_and_prepare_payload(self):
"""Validate and prepare the payload for the test run and update n_requests

Expand Down Expand Up @@ -251,9 +269,18 @@ async def _process_results_from_q(self, output_path: Path | None = None):
if self.callbacks is not None:
[await cb.after_invoke(response) for cb in self.callbacks]

self._responses.append(response)
if self.low_memory and self._running_stats is not None:
self._running_stats.update(response.to_dict())
else:
self._responses.append(response)
self._running_stats.update(response.to_dict())

if self._progress_bar:
self._progress_bar.update(1)
self._progress_bar.set_postfix(
self._running_stats.snapshot(self.progress_bar_stats),
refresh=False,
)

if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -403,7 +430,7 @@ async def _invoke_n_c(
end_t = time.perf_counter()
total_test_time = end_t - start_t
logger.info(
f"Generated {clients} connections with {n_requests} invocations each in {total_test_time*1000:.2f} seconds"
f"Generated {clients} connections with {n_requests} invocations each in {total_test_time * 1000:.2f} seconds"
)

# Signal the token counting task to exit
Expand Down Expand Up @@ -474,7 +501,7 @@ async def _run(self):
return result

self._progress_bar.close()
logger.info(f"Test completed in {total_test_time*1000:.2f} seconds.")
logger.info(f"Test completed in {total_test_time * 1000:.2f} seconds.")

result = replace(
result,
Expand All @@ -484,6 +511,22 @@ async def _run(self):
end_time=run_end_time,
)

# Compute stats from the running accumulators
result._preloaded_stats = self._running_stats.to_stats(
total_requests=result.total_requests,
total_test_time=total_test_time,
result_dict=result.to_dict(),
)
result._preloaded_stats["start_time"] = run_start_time
result._preloaded_stats["end_time"] = run_end_time
result._preloaded_stats["total_test_time"] = total_test_time

if self.low_memory:
logger.info(
"Low-memory mode: responses not stored in memory. "
"Use result.load_responses() to load from disk."
)

if self.callbacks is not None:
[await cb.after_run(result) for cb in self.callbacks]

Expand Down Expand Up @@ -554,6 +597,15 @@ class Runner(_RunConfig):
endpoint. Defaults to 60 seconds.
callbacks (list[Callback] | None): Optional callbacks to enable during the test Run. See
`llmeter.callbacks` for more information.
low_memory (bool): When ``True``, responses are written to disk but not kept in memory
during the run. Stats are computed incrementally via
:class:`~llmeter.utils.RunningStats`. Requires ``output_path`` to be set. Use
``result.load_responses()`` to load responses from disk after the run. Defaults to
``False``.
progress_bar_stats (dict | None): Controls which live stats appear on the progress bar.
Maps short display labels to field specs — see
:attr:`RunningStats.DEFAULT_SNAPSHOT_STATS` for the format and defaults. Pass ``{}``
to disable live stats entirely. Defaults to ``None`` (use built-in defaults).
disable_per_client_progress_bar (bool): Set `True` to disable per-client progress bars
from showing during the run. Default `False` (each client's progress will be shown).
disable_clients_progress_bar (bool): Set `True` to disable overall progress bar from
Expand Down Expand Up @@ -600,6 +652,8 @@ async def run(
run_description: str | None = None,
timeout: int | float | None = None,
callbacks: list[Callback] | None = None,
low_memory: bool | None = None,
progress_bar_stats: dict[str, tuple[str, ...] | str] | None = None,
disable_per_client_progress_bar: bool | None = None,
disable_clients_progress_bar: bool | None = None,
) -> Result:
Expand Down Expand Up @@ -635,6 +689,36 @@ async def run(
endpoint.
callbacks (list[Callback] | None): Optional callbacks to enable during the test Run. See
`llmeter.callbacks` for more information.
low_memory (bool): When ``True``, responses are written to disk but not
kept in memory. Stats are computed incrementally via
:class:`~llmeter.utils.RunningStats`. Requires ``output_path``.
Use ``result.load_responses()`` to access responses after the run.

Example::

result = await runner.run(
output_path="/tmp/my_run",
low_memory=True,
)
result.stats # works (computed incrementally)
result.responses # [] (empty)
result.load_responses() # loads from disk

progress_bar_stats (dict): Controls which live stats appear on the
progress bar. Maps short display labels to field specs — see
:attr:`RunningStats.DEFAULT_SNAPSHOT_STATS` for the format and
defaults. Pass ``{}`` to disable live stats entirely.

Example::

# Show only p99 latency and tokens per second:
result = await runner.run(
progress_bar_stats={
"p99_ttlt": ("time_to_last_token", "p99"),
"tps": ("time_per_output_token", "p50", "inv"),
"fail": "failed",
},
)
disable_per_client_progress_bar (bool): Set `True` to disable per-client progress bars
from showing during the run.
disable_clients_progress_bar (bool): Set `True` to disable overall progress bar from
Expand Down Expand Up @@ -667,6 +751,8 @@ async def run(
run_description=run_description,
timeout=timeout,
callbacks=callbacks,
low_memory=low_memory,
progress_bar_stats=progress_bar_stats,
disable_per_client_progress_bar=disable_per_client_progress_bar,
disable_clients_progress_bar=disable_clients_progress_bar,
)
Expand Down
Loading