diff --git a/docs/getting-started/benchmark.md b/docs/getting-started/benchmark.md index 6173907f6..ce1a1623b 100644 --- a/docs/getting-started/benchmark.md +++ b/docs/getting-started/benchmark.md @@ -65,6 +65,7 @@ GuideLLM offers a wide range of configuration options to customize your benchmar | `--random-seed` | Random seed for reproducibility | `--random-seed 42` | | `--max-seconds` | Duration for each benchmark in seconds | `--max-seconds 30` | | `--max-requests` | Maximum number of requests for each benchmark | `--max-requests 1000` | +| `--data-samples` | Maximum number of dataset rows to load | `--data-samples 1000` | | `--output-dir` | Directory path to save output files | `--output-dir results/` | | `--outputs` | Output formats to generate | `--outputs json csv html` | @@ -187,6 +188,34 @@ guidellm benchmark \ You can customize synthetic data generation with additional parameters such as standard deviation, minimum, and maximum values. See the [Datasets Synthetic data documentation](../guides/datasets.md#synthetic-data) for more details. +### Trace Replay Benchmarking (beta) + +For realistic load testing, replay trace events using each row's timestamp and token lengths. Trace files must be JSONL and are loaded with the `trace_synthetic` data type. By default, each row uses `timestamp`, `input_length`, and `output_length` fields. Timestamps may be absolute or monotonic values; GuideLLM sorts them and converts them to offsets from the first event before scheduling: + +```json +{"timestamp": 1234500.0, "input_length": 256, "output_length": 128} +{"timestamp": 1234500.5, "input_length": 512, "output_length": 64} +``` + +In this example, the second request is scheduled 0.5 seconds after the first request. + +Run with the `replay` profile: + +```bash +guidellm benchmark \ + --target "http://localhost:8000" \ + --data path/to/trace.jsonl \ + --data-args type_=trace_synthetic \ + --profile replay \ + --rate 1.0 +``` + +The `--rate` parameter acts as a time scale for the intervals between trace events, not requests per second: `1.0` preserves the original timing, `2.0` doubles the intervals and runs twice as long, and `0.5` halves the intervals and runs twice as fast. + +GuideLLM orders trace rows by timestamp before scheduling and payload generation, so each scheduled event uses the token lengths from the same sorted row. Use `--data-samples` to limit how many trace rows are loaded and replayed. `--max-requests` remains a runtime completion constraint; it does not truncate the trace dataset. + +If your trace uses different column names, map them with `timestamp_column`, `prompt_tokens_column`, and `output_tokens_column` in `--data-args`. + ### Working with Real Data While synthetic data is convenient for quick tests, you can benchmark with real-world data: diff --git a/docs/guides/datasets.md b/docs/guides/datasets.md index e5104d1ec..2b4ce51e1 100644 --- a/docs/guides/datasets.md +++ b/docs/guides/datasets.md @@ -13,6 +13,10 @@ The following arguments can be used to configure datasets and their processing: - `prompt_column`: Specifies the column name for the prompt. By default, GuideLLM will try the most common column names (e.g., `prompt`, `text`, `input`). - `prompt_tokens_count_column`: Specifies the column name for the prompt token count. These are used to set the request prompt token count for counting metrics. By default, GuideLLM assumes no token count is provided. - `output_tokens_count_column`: Specifies the column name for the output token count. These are used to set the request output token count for the request and counting metrics. By default, GuideLLM assumes no token count is provided. + - `type_`: Selects a specialized dataset deserializer, such as `trace_synthetic` for trace replay files. + - `timestamp_column`: Specifies the timestamp column for `trace_synthetic` data. The default is `timestamp`. + - `prompt_tokens_column`: Specifies the prompt token length column for `trace_synthetic` data. The default is `input_length`. + - `output_tokens_column`: Specifies the output token length column for `trace_synthetic` data. The default is `output_length`. - `split`: Specifies the dataset split to use (e.g., `train`, `val`, `test`). By default, GuideLLM will try the most common split names (e.g., `train`, `validation`, `test`) if the dataset has splits, otherwise it will use the entire dataset. - Any remaining arguments are passed directly into the dataset constructor as kwargs. - `--data-sampler`: Specifies the sampling strategy for datasets. By default, no sampling is applied. When set to `random`, it enables random shuffling of the dataset, which can be useful for creating diverse batches during benchmarking. @@ -116,22 +120,62 @@ GuideLLM supports various file formats for datasets, including text, CSV, JSON, #### Supported Formats with Examples - **Text files (`.txt`, `.text`)**: Where each line is a separate prompt to use. + ``` Hello, how are you? What is your name? ``` + - **CSV files (`.csv`)**: Where each row is a separate dataset entry and the first row contains the column names. The columns should include `prompt` or other common names for the prompt which will be used as the prompt column. Additional columns can be included based on the previously mentioned aliases for the `--data-column-mapper` argument. + ```csv prompt,output_tokens_count,additional_column,additional_column2 Hello, how are you?,5,foo,bar What is your name?,3,baz,qux ``` + - **JSON Lines files (`.jsonl`)**: Where each line is a separate JSON object. The objects should include `prompt` or other common names for the prompt which will be used as the prompt column. Additional fields can be included based on the previously mentioned aliases for the `--data-args` argument. + ```json {"prompt": "Hello, how are you?", "output_tokens_count": 5, "additional_column": "foo", "additional_column2": "bar"} {"prompt": "What is your name?", "output_tokens_count": 3, "additional_column": "baz", "additional_column2": "qux"} ``` + +- **Trace files (`.jsonl` with `trace_synthetic` type)**: Specialized JSONL files for replay benchmarking with `timestamp`, `input_length`, and `output_length` fields. Used with `--profile replay` to replay trace events using each row's timestamp and token lengths. Timestamps must be numbers expressed in seconds on a shared timeline with any consistent zero point; GuideLLM sorts them and converts them to offsets from the first event before scheduling. Date strings are not parsed yet, so provide timestamps as numbers. See [Trace Replay Benchmarking](../getting-started/benchmark.md#trace-replay-benchmarking). + + ```json + {"timestamp": 1234500.0, "input_length": 256, "output_length": 128} + {"timestamp": 1234500.5, "input_length": 512, "output_length": 64} + ``` + + In this example, the second request is scheduled 0.5 seconds after the first request. Trace rows are ordered by timestamp before GuideLLM schedules requests and generates synthetic payloads. This keeps each scheduled event aligned with the prompt and output token lengths from the same row. + + Use `--data-args type_=trace_synthetic` to enable trace loading: + + ```bash + guidellm benchmark \ + --target http://localhost:8000 \ + --profile replay \ + --rate 1.0 \ + --data path/to/trace.jsonl \ + --data-args type_=trace_synthetic + ``` + + If your trace uses different column names, configure them with `timestamp_column`, `prompt_tokens_column`, and `output_tokens_column`: + + ```bash + guidellm benchmark \ + --target http://localhost:8000 \ + --profile replay \ + --rate 1.0 \ + --data replay.jsonl \ + --data-args type_=trace_synthetic,timestamp_column=timestamp,prompt_tokens_column=input_length,output_tokens_column=output_length + ``` + + For replay, `--rate` is a time scale for the intervals between trace events rather than requests per second. Use `--data-samples` to limit how many trace rows are loaded and replayed. Use `--max-requests` only as a runtime completion constraint; it does not limit the trace rows loaded from the file. + - **JSON files (`.json`)**: Where the entire dataset is represented as a JSON array of objects nested under a specific key. To surface the correct key to use, a `--data-column-mapper` argument must be passed in of `"field": "NAME"` for where the array exists. The objects should include `prompt` or other common names for the prompt which will be used as the prompt column. Additional fields can be included based on the previously mentioned aliases for the `--data-column-mapper` argument. + ```json { "version": "1.0", @@ -141,8 +185,11 @@ GuideLLM supports various file formats for datasets, including text, CSV, JSON, ] } ``` + - **Parquet files (`.parquet`)** Example: A binary columnar storage format for efficient data processing. For more information on the supported formats, see the Hugging Face dataset documentation linked in the [Notes](#notes) section. + - **Arrow files (`.arrow`)** Example: A cross-language development platform for in-memory data. For more information on the supported formats, see the Hugging Face dataset documentation linked in the [Notes](#notes) section. + - **HDF5 files (`.hdf5`)** Example: A hierarchical data format for storing large amounts of data. For more information on the supported formats, see the Hugging Face dataset documentation linked in the [Notes](#notes) section. #### Example Commands diff --git a/src/guidellm/benchmark/entrypoints.py b/src/guidellm/benchmark/entrypoints.py index 1f0ed3043..5ce7c9ed7 100644 --- a/src/guidellm/benchmark/entrypoints.py +++ b/src/guidellm/benchmark/entrypoints.py @@ -355,6 +355,8 @@ async def resolve_profile( max_global_error_rate: float | None, over_saturation: dict[str, Any] | None = None, console: Console | None = None, + data: list[Any] | None = None, + **profile_kwargs: Any, ) -> Profile: """ Resolve and configure a benchmark profile with rate and constraint settings. @@ -376,6 +378,8 @@ async def resolve_profile( :param max_global_error_rate: Maximum global error rate threshold before stopping :param over_saturation: Over-saturation detection configuration (dict) :param console: Console instance for progress reporting, or None + :param data: Optional list of data sources. + :param profile_kwargs: Additional profile-specific arguments. :return: Configured Profile instance ready for benchmarking :raises ValueError: If constraints are provided with a pre-configured Profile """ @@ -403,6 +407,8 @@ async def resolve_profile( random_seed=random_seed, rampup_duration=rampup, constraints={**constraints}, + data=data, + **profile_kwargs, ) elif constraints: raise ValueError( @@ -536,6 +542,9 @@ async def benchmark_generative_text( max_global_error_rate=args.max_global_error_rate, over_saturation=args.over_saturation, console=console, + data=args.data, + data_args=args.data_args, + data_samples=request_loader.info.get("data_samples", -1), ) output_formats = await resolve_output_formats( outputs=args.outputs, output_dir=args.output_dir, console=console diff --git a/src/guidellm/benchmark/profiles.py b/src/guidellm/benchmark/profiles.py index 054356c10..704c24e0e 100644 --- a/src/guidellm/benchmark/profiles.py +++ b/src/guidellm/benchmark/profiles.py @@ -13,6 +13,7 @@ from abc import ABC, abstractmethod from collections.abc import Generator +from pathlib import Path from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal import numpy as np @@ -37,8 +38,10 @@ SchedulingStrategy, SynchronousStrategy, ThroughputStrategy, + TraceReplayStrategy, ) from guidellm.schemas import PydanticClassRegistryMixin +from guidellm.utils.trace_io import load_relative_timestamps if TYPE_CHECKING: from guidellm.benchmark.schemas import Benchmark @@ -48,13 +51,14 @@ "ConcurrentProfile", "Profile", "ProfileType", + "ReplayProfile", "SweepProfile", "SynchronousProfile", "ThroughputProfile", ] ProfileType = Annotated[ - Literal["synchronous", "concurrent", "throughput", "async", "sweep"], + Literal["synchronous", "concurrent", "throughput", "async", "sweep", "replay"], "Profile type identifiers for polymorphic deserialization", ] @@ -328,6 +332,110 @@ def next_strategy( return SynchronousStrategy() +@Profile.register("replay") +class ReplayProfile(Profile): + """ + Replay a trace file: + schedule each request at start_time + time_scale * relative_timestamp[i]. + + For this profile, the ``rate`` argument is interpreted as time_scale (scale factor + applied to relative timestamps), not as requests per second. + + When ``data_samples`` is set, the replayed timestamps are truncated to match + the sampled dataset size. + """ + + type_: Literal["replay"] = "replay" # type: ignore[assignment] + relative_timestamps: list[float] = Field( + description="Request start times relative to first event (first = 0)", + ) + time_scale: float = Field( + default=1.0, + gt=0, + description="Scale factor applied to relative timestamps", + ) + + @classmethod + def resolve_args( + cls, + rate_type: str, + rate: list[float] | None, + random_seed: int, + **kwargs: Any, + ) -> dict[str, Any]: + _ = (rate_type, random_seed) # unused + data = kwargs.get("data") + if not data: + raise ValueError("Replay profile requires data (path to trace file)") + if len(data) != 1: + raise ValueError( + f"ReplayProfile requires exactly one data source, received {len(data)}" + ) + if not data[0]: + raise ValueError("Replay profile requires data (path to trace file)") + path = Path(data[0]) if isinstance(data[0], str) else data[0] + if not path.exists(): + raise ValueError(f"Replay trace file not found: {path}") + + # For replay profile, rate is interpreted as time_scale (not requests per + # second) + time_scale = rate[0] if rate and len(rate) > 0 else 1.0 + + # Honor a custom timestamp column when configured via --data-args so the + # replay profile and trace_synthetic deserializer use the same field. + data_args = kwargs.get("data_args") or [] + first_args = data_args[0] if data_args else {} + timestamp_column = "timestamp" + if isinstance(first_args, dict): + raw_timestamp_column = first_args.get("timestamp_column") + if isinstance(raw_timestamp_column, str) and raw_timestamp_column.strip(): + timestamp_column = raw_timestamp_column + + relative_timestamps = load_relative_timestamps( + path, timestamp_column=timestamp_column + ) + data_samples = kwargs.get("data_samples", -1) + if isinstance(data_samples, int) and data_samples > 0: + relative_timestamps = relative_timestamps[:data_samples] + + if not relative_timestamps: + raise ValueError( + "No timestamps remain after applying data_samples. " + "The trace is empty or all events were filtered out." + ) + + constraints = dict(kwargs.get("constraints") or {}) + if not any( + key in constraints + for key in ("max_number", "max_num", "max_requests", "max_req") + ): + constraints["max_requests"] = len(relative_timestamps) + + return { + "relative_timestamps": relative_timestamps, + "time_scale": time_scale, + "constraints": constraints, + } + + @property + def strategy_types(self) -> list[str]: + return ["trace"] + + def next_strategy( + self, + prev_strategy: SchedulingStrategy | None, + prev_benchmark: Benchmark | None, + ) -> TraceReplayStrategy | None: + _ = prev_benchmark + # Replay has a single strategy; return it once, then None + if prev_strategy is not None: + return None + return TraceReplayStrategy( + relative_timestamps=self.relative_timestamps, + time_scale=self.time_scale, + ) + + @Profile.register("concurrent") class ConcurrentProfile(Profile): """ diff --git a/src/guidellm/data/deserializers/__init__.py b/src/guidellm/data/deserializers/__init__.py index fb22fd2a7..4fdfbceae 100644 --- a/src/guidellm/data/deserializers/__init__.py +++ b/src/guidellm/data/deserializers/__init__.py @@ -25,6 +25,7 @@ SyntheticTextDataset, SyntheticTextDatasetDeserializer, ) +from .trace_synthetic import TraceSyntheticDatasetDeserializer __all__ = [ "ArrowFileDatasetDeserializer", @@ -46,4 +47,5 @@ "SyntheticTextDatasetDeserializer", "TarFileDatasetDeserializer", "TextFileDatasetDeserializer", + "TraceSyntheticDatasetDeserializer", ] diff --git a/src/guidellm/data/deserializers/trace_synthetic.py b/src/guidellm/data/deserializers/trace_synthetic.py new file mode 100644 index 000000000..f4cff0bf5 --- /dev/null +++ b/src/guidellm/data/deserializers/trace_synthetic.py @@ -0,0 +1,207 @@ +""" +Trace file deserializer that generates synthetic prompts per row. + +Reads a trace file (timestamp, input_length, output_length) and yields one row per +line with a synthetic prompt matching the requested input_length for replay benchmarks. +""" + +from __future__ import annotations + +from collections.abc import Callable +from pathlib import Path +from typing import Any + +from datasets import Dataset +from datasets.exceptions import DatasetGenerationError +from faker import Faker +from transformers import PreTrainedTokenizerBase + +from guidellm.data.deserializers.deserializer import ( + DataNotSupportedError, + DatasetDeserializer, + DatasetDeserializerFactory, +) +from guidellm.utils.trace_io import load_trace_rows + +__all__ = ["TraceSyntheticDatasetDeserializer"] + + +def _encode_prompt( + processor: PreTrainedTokenizerBase, + text: str, +) -> list[int]: + """Encode text with the configured tokenizer defaults.""" + return processor.encode(text) + + +def _decode_prompt( + processor: PreTrainedTokenizerBase, + token_ids: list[int], +) -> str: + """Decode token ids into a prompt string.""" + decoded = processor.decode(token_ids, skip_special_tokens=True) + if isinstance(decoded, list): + return decoded[0] if decoded else "" + return decoded + + +def _create_base_prompt_token_ids( + processor: PreTrainedTokenizerBase, + faker: Faker, + token_count: int, +) -> list[int]: + """Generate reusable synthetic token ids for trace prompt construction.""" + if token_count <= 0: + return [] + + token_text = (faker.word() or "x")[0] + text = token_text + token_ids = _encode_prompt(processor, text) + max_attempts = 8 + attempts = 0 + + while len(token_ids) < token_count and attempts < max_attempts: + attempts += 1 + missing_tokens = token_count - len(token_ids) + text = f"{text} {' '.join([token_text] * missing_tokens)}" + token_ids = _encode_prompt(processor, text) + + if len(token_ids) < token_count: + raise DataNotSupportedError( + "Could not generate enough synthetic prompt tokens for " + f"{token_count} tokens after {max_attempts} attempts" + ) + + return token_ids + + +def _create_prompt( + processor: PreTrainedTokenizerBase, + prompt_tokens_count: int, + base_prompt_token_ids: list[int], + request_index: int, +) -> str: + """Build a prompt from unique prefix tokens and reusable base prompt tokens.""" + if prompt_tokens_count <= 0: + return "" + + unique_prefix = f"guidellm-trace-request-{request_index}: " + prefix_token_ids = _encode_prompt(processor, unique_prefix) + prompt_token_ids = (prefix_token_ids + base_prompt_token_ids)[:prompt_tokens_count] + if len(prompt_token_ids) < prompt_tokens_count: + raise DataNotSupportedError( + "Could not build a synthetic prompt with " + f"{prompt_tokens_count} tokens from generated base tokens" + ) + + return _decode_prompt(processor, prompt_token_ids) + + +def _load_trace_rows( + path: Path, + timestamp_column: str, + prompt_tokens_column: str, + output_tokens_column: str, +) -> list[dict[str, Any]]: + """Load trace file into list of dicts with timestamp, prompt_tokens, + output_tokens.""" + try: + raw = load_trace_rows( + path, + required_columns=[ + prompt_tokens_column, + output_tokens_column, + ], + timestamp_column=timestamp_column, + ) + except (DatasetGenerationError, KeyError, ValueError) as e: + raise DataNotSupportedError(str(e)) from e + try: + return [ + { + "timestamp": float(row[timestamp_column]), + "prompt_tokens": int(row[prompt_tokens_column]), + "output_tokens": int(row[output_tokens_column]), + } + for row in raw + ] + except (TypeError, ValueError) as e: + raise DataNotSupportedError(str(e)) from e + + +@DatasetDeserializerFactory.register("trace_synthetic") +class TraceSyntheticDatasetDeserializer(DatasetDeserializer): + """ + Load a trace file and generate a synthetic prompt per row. + + Trace file must have timestamp, and columns for prompt and output token counts + (default: input_length, output_length). Each row becomes one request with + a synthetic prompt of the requested input length. + """ + + def __call__( + self, + data: Any, + processor_factory: Callable[[], PreTrainedTokenizerBase], + random_seed: int, + **data_kwargs: dict[str, Any], + ) -> Dataset: + if ( + not isinstance(data, str | Path) + or not (path := Path(data)).exists() + or not path.is_file() + ): + raise DataNotSupportedError( + "TraceSyntheticDatasetDeserializer expects a path to a trace file, " + f"got {data}" + ) + timestamp_column = str(data_kwargs.pop("timestamp_column", "timestamp")) + prompt_tokens_column = str( + data_kwargs.pop("prompt_tokens_column", "input_length") + ) + output_tokens_column = str( + data_kwargs.pop("output_tokens_column", "output_length") + ) + rows = _load_trace_rows( + path, timestamp_column, prompt_tokens_column, output_tokens_column + ) + if not rows: + raise DataNotSupportedError("Trace file is empty") + + processor = processor_factory() + faker = Faker() + faker.seed_instance(random_seed) + max_prompt_tokens = max(row["prompt_tokens"] for row in rows) + base_prompt_token_ids = _create_base_prompt_token_ids( + processor, faker, max_prompt_tokens + ) + + prompts: list[str] = [] + prompt_tokens_counts: list[int] = [] + output_tokens_counts: list[int] = [] + for i, row in enumerate(rows): + n_in = row["prompt_tokens"] + n_out = row["output_tokens"] + if n_in < 0 or n_out < 0: + raise DataNotSupportedError( + "Trace token counts must be non-negative, got " + f"input_length={n_in}, output_length={n_out}" + ) + prompt = _create_prompt( + processor, n_in, base_prompt_token_ids, request_index=i + ) + prompts.append(prompt) + prompt_tokens_counts.append(n_in) + output_tokens_counts.append(n_out) + + # Avoid passing deserializer-only keys to Dataset.from_dict + data_kwargs.pop("type_", None) + + return Dataset.from_dict( + { + "prompt": prompts, + "prompt_tokens_count": prompt_tokens_counts, + "output_tokens_count": output_tokens_counts, + }, + **data_kwargs, + ) diff --git a/src/guidellm/scheduler/__init__.py b/src/guidellm/scheduler/__init__.py index 3aa6b5a70..1aafd994b 100644 --- a/src/guidellm/scheduler/__init__.py +++ b/src/guidellm/scheduler/__init__.py @@ -50,6 +50,7 @@ StrategyType, SynchronousStrategy, ThroughputStrategy, + TraceReplayStrategy, ) from .worker import WorkerProcess from .worker_group import WorkerProcessGroup @@ -90,6 +91,7 @@ "StrategyType", "SynchronousStrategy", "ThroughputStrategy", + "TraceReplayStrategy", "UnserializableConstraintInitializer", "WorkerProcess", "WorkerProcessGroup", diff --git a/src/guidellm/scheduler/strategies.py b/src/guidellm/scheduler/strategies.py index ff8e76a4c..e47daf1ea 100644 --- a/src/guidellm/scheduler/strategies.py +++ b/src/guidellm/scheduler/strategies.py @@ -38,11 +38,12 @@ "StrategyType", "SynchronousStrategy", "ThroughputStrategy", + "TraceReplayStrategy", ] StrategyType = Annotated[ - Literal["synchronous", "concurrent", "throughput", "constant", "poisson"], + Literal["synchronous", "concurrent", "throughput", "constant", "poisson", "trace"], "Valid strategy type identifiers for scheduling request patterns", ] @@ -671,3 +672,55 @@ def request_completed(self, request_info: RequestInfo): :param request_info: Completed request metadata (unused) """ _ = request_info # request_info unused for async poisson strategy + + +@SchedulingStrategy.register("trace") +class TraceReplayStrategy(SchedulingStrategy): + """ + Replay scheduling from a trace of timestamps. + + Schedules each request at start_time + time_scale * relative_timestamp[i], + so the trace's inter-arrival pattern is reproduced with an optional time scale. + """ + + type_: Literal["trace"] = "trace" # type: ignore[assignment] + relative_timestamps: list[float] = Field( + description="Request start times relative to first event (first = 0)", + ) + time_scale: float = Field( + default=1.0, + gt=0, + description="Scale factor applied to relative timestamps", + ) + + def __str__(self) -> str: + return f"trace@{self.time_scale:.2f}" + + @property + def processes_limit(self) -> PositiveInt | None: + # Trace replay is currently constrained to one process until each + # scheduled timestamp is bound to its request before workers compete + # for queue items. + return 1 + + @property + def requests_limit(self) -> PositiveInt | None: + return None + + async def next_request_time(self, worker_index: NonNegativeInt) -> float: + _ = worker_index + start_time = await self.get_processes_start_time() + if not self.relative_timestamps: + return start_time + + idx = self.next_request_index() + if idx > len(self.relative_timestamps): + # Trace exhausted: park this worker slot until the scheduler cancels + # the processing loop via constraint_reached_event. CancelledError + # propagates up cleanly, matching the exit path of all other strategies. + await asyncio.Event().wait() + + return start_time + self.time_scale * self.relative_timestamps[idx - 1] + + def request_completed(self, request_info: RequestInfo): + _ = request_info diff --git a/src/guidellm/utils/trace_io.py b/src/guidellm/utils/trace_io.py new file mode 100644 index 000000000..a3f1962a9 --- /dev/null +++ b/src/guidellm/utils/trace_io.py @@ -0,0 +1,87 @@ +""" +Shared trace file I/O for replay benchmarks. + +Reads trace files (.jsonl only for now) and exposes rows or relative timestamps. +Used by replay profiles and the trace_synthetic deserializer. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from datasets import Dataset, load_dataset + +__all__ = ["load_relative_timestamps", "load_trace_rows"] + + +def load_trace_rows( + path: Path | str, + required_columns: list[str] | None = None, + timestamp_column: str | None = None, + **data_kwargs: Any, +) -> Dataset: + """ + Load trace file rows as a HuggingFace Dataset. + + Supports .jsonl only (one JSON object per line). + If required_columns is set, every column must exist in the dataset; + otherwise KeyError is raised with a descriptive message. + If timestamp_column is set, rows are sorted by that column. + + :param path: Path to the trace file. + :param required_columns: Optional list of column/field names that each row + must have. + :param timestamp_column: Optional timestamp column used to sort trace rows. + :param data_kwargs: Additional keyword arguments forwarded to load_dataset. + :return: HuggingFace Dataset (iterable as dicts, column-accessible). + :raises KeyError: If a required column is missing in the dataset. + :raises ValueError: If the file format is not .jsonl. + """ + path = Path(path) + suffix = path.suffix.lower() + if suffix != ".jsonl": + raise ValueError(f"Unsupported trace file format: {suffix}") + if path.stat().st_size == 0: + raise ValueError(f"Trace file is empty or has no valid rows: {path}") + + trace_dataset = load_dataset( + "json", data_files=str(path), split="train", **data_kwargs + ) + + required_columns = required_columns or [] + if timestamp_column and timestamp_column not in required_columns: + required_columns = [*required_columns, timestamp_column] + + if required_columns: + missing = [c for c in required_columns if c not in trace_dataset.column_names] + if missing: + raise KeyError(f"Trace row missing required columns: {missing}") + + if timestamp_column: + trace_dataset = trace_dataset.sort(timestamp_column) + + return trace_dataset + + +def load_relative_timestamps( + path: Path | str, + timestamp_column: str = "timestamp", +) -> list[float]: + """ + Load timestamps from a trace file and return times relative to the first event. + + Trace file must be JSONL (one JSON object per line). The first timestamp + becomes 0.0, and all others are relative to it (always >= 0). + + :param path: Path to the trace file. + :param timestamp_column: Name of the column/field containing the timestamp. + :return: List of relative timestamps in seconds (first is 0.0). + :raises ValueError: If the trace file is empty or has no valid rows. + """ + trace_dataset = load_trace_rows(path, timestamp_column=timestamp_column) + if len(trace_dataset) == 0: + raise ValueError(f"Trace file is empty or has no valid rows: {path}") + timestamps = [float(t) for t in trace_dataset[timestamp_column]] + t0 = timestamps[0] + return [t - t0 for t in timestamps] diff --git a/tests/unit/benchmark/test_replay_profile.py b/tests/unit/benchmark/test_replay_profile.py new file mode 100644 index 000000000..6b50dbe85 --- /dev/null +++ b/tests/unit/benchmark/test_replay_profile.py @@ -0,0 +1,339 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path + +import pytest +from pydantic import ValidationError + +from guidellm.benchmark.entrypoints import resolve_profile +from guidellm.benchmark.profiles import Profile, ReplayProfile +from guidellm.scheduler import TraceReplayStrategy + + +def _trace_path(tmp_path: Path, lines: list[str] | None = None) -> Path: + path = tmp_path / "trace.jsonl" + path.write_text("\n".join(lines or [])) + return path + + +class TestReplayProfile: + @pytest.mark.smoke + def test_resolve_args_requires_data(self): + with pytest.raises(ValueError, match="Replay profile requires data"): + ReplayProfile.resolve_args( + rate_type="replay", + rate=[1.0], + random_seed=42, + ) + + @pytest.mark.smoke + def test_resolve_args_rejects_multiple_data_sources(self): + with pytest.raises(ValueError, match="exactly one data source"): + ReplayProfile.resolve_args( + rate_type="replay", + rate=[1.0], + random_seed=42, + data=["trace-a.jsonl", "trace-b.jsonl"], + ) + + @pytest.mark.smoke + def test_resolve_args_rejects_missing_or_empty_trace(self, tmp_path: Path): + missing = tmp_path / "missing.jsonl" + with pytest.raises(ValueError, match="not found"): + ReplayProfile.resolve_args( + rate_type="replay", + rate=[1.0], + random_seed=42, + data=[str(missing)], + ) + + empty = _trace_path(tmp_path) + with pytest.raises(ValueError, match="empty|No timestamps"): + ReplayProfile.resolve_args( + rate_type="replay", + rate=[1.0], + random_seed=42, + data=[str(empty)], + ) + + @pytest.mark.smoke + @pytest.mark.parametrize( + ("rate", "expected_scale"), + [ + (None, 1.0), + ([2.0], 2.0), + ], + ) + def test_profile_create_resolves_timestamps_and_time_scale( + self, tmp_path: Path, rate, expected_scale + ): + trace = _trace_path( + tmp_path, + [ + '{"timestamp": 5.0, "input_length": 1, "output_length": 1}', + '{"timestamp": 2.0, "input_length": 2, "output_length": 2}', + '{"timestamp": 8.0, "input_length": 3, "output_length": 3}', + ], + ) + + profile = Profile.create( + rate_type="replay", + rate=rate, + random_seed=42, + data=[str(trace)], + ) + + assert isinstance(profile, ReplayProfile) + assert profile.relative_timestamps == pytest.approx([0.0, 3.0, 6.0], abs=1e-9) + assert profile.time_scale == expected_scale + assert profile.constraints["max_requests"] == 3 + + @pytest.mark.sanity + def test_non_positive_time_scale_is_rejected(self, tmp_path: Path): + trace = _trace_path( + tmp_path, + ['{"timestamp": 0, "input_length": 1, "output_length": 1}'], + ) + + with pytest.raises(ValidationError): + Profile.create( + rate_type="replay", + rate=[0.0], + random_seed=42, + data=[str(trace)], + ) + + @pytest.mark.smoke + def test_custom_timestamp_column_via_data_args(self, tmp_path: Path): + trace = _trace_path( + tmp_path, + [ + '{"ts": 5.0, "input_length": 100, "output_length": 10}', + '{"ts": 2.0, "input_length": 200, "output_length": 20}', + '{"ts": 8.0, "input_length": 300, "output_length": 30}', + ], + ) + + kwargs = ReplayProfile.resolve_args( + rate_type="replay", + rate=[1.0], + random_seed=42, + data=[str(trace)], + data_args=[{"timestamp_column": "ts"}], + ) + + assert kwargs["relative_timestamps"] == pytest.approx([0.0, 3.0, 6.0], abs=1e-9) + assert kwargs["constraints"]["max_requests"] == 3 + + @pytest.mark.smoke + def test_large_bursty_trace_sets_default_request_constraint(self, tmp_path: Path): + prompt_lengths = [ + 6755, + 7319, + 7234, + 2287, + 9013, + 6506, + 4824, + 3119, + 23090, + 3135, + 26874, + 10487, + 17448, + 6253, + 6725, + 13538, + 87162, + 6166, + 6320, + 2007, + 3174, + 3131, + 3159, + 6820, + 3154, + 9416, + 7460, + ] + timestamps = [ + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.5, + 0.5, + 0.5, + 0.5, + 0.5, + 0.5, + 0.5, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 2.0, + 2.0, + 2.0, + 2.0, + ] + trace = _trace_path( + tmp_path, + [ + ( + f'{{"timestamp": {timestamp}, ' + f'"input_length": {prompt_length}, "output_length": 1}}' + ) + for timestamp, prompt_length in zip( + timestamps, prompt_lengths, strict=True + ) + ], + ) + + kwargs = ReplayProfile.resolve_args( + rate_type="replay", + rate=[1.0], + random_seed=42, + data=[str(trace)], + ) + + assert kwargs["relative_timestamps"] == pytest.approx(timestamps, abs=1e-9) + assert kwargs["constraints"]["max_requests"] == 27 + + @pytest.mark.smoke + @pytest.mark.parametrize("invalid_value", [None, "", " ", 123, False, []]) + def test_invalid_timestamp_column_config_falls_back_to_default( + self, tmp_path: Path, invalid_value + ): + trace = _trace_path( + tmp_path, + [ + '{"timestamp": 10.0, "input_length": 1, "output_length": 1}', + '{"timestamp": 12.0, "input_length": 2, "output_length": 2}', + ], + ) + + kwargs = ReplayProfile.resolve_args( + rate_type="replay", + rate=[1.0], + random_seed=42, + data=[str(trace)], + data_args=[{"timestamp_column": invalid_value}], + ) + + assert kwargs["relative_timestamps"] == pytest.approx([0.0, 2.0], abs=1e-9) + assert kwargs["constraints"]["max_requests"] == 2 + + @pytest.mark.smoke + def test_data_samples_truncates_after_sorting_and_preserves_constraints( + self, tmp_path: Path + ): + trace = _trace_path( + tmp_path, + [ + '{"timestamp": 5.0, "input_length": 1, "output_length": 1}', + '{"timestamp": 2.0, "input_length": 2, "output_length": 2}', + '{"timestamp": 8.0, "input_length": 3, "output_length": 3}', + '{"timestamp": 1.0, "input_length": 4, "output_length": 4}', + ], + ) + + kwargs = ReplayProfile.resolve_args( + rate_type="replay", + rate=[1.0], + random_seed=42, + data=[str(trace)], + data_samples=3, + constraints={"max_requests": 10, "max_seconds": 0.25}, + ) + + assert kwargs["relative_timestamps"] == pytest.approx([0.0, 1.0, 4.0], abs=1e-9) + assert kwargs["constraints"] == {"max_requests": 10, "max_seconds": 0.25} + + @pytest.mark.smoke + @pytest.mark.parametrize("data_samples", [0, -1]) + def test_non_positive_data_samples_do_not_truncate( + self, tmp_path: Path, data_samples: int + ): + trace = _trace_path( + tmp_path, + [ + '{"timestamp": 0, "input_length": 1, "output_length": 1}', + '{"timestamp": 1.0, "input_length": 2, "output_length": 2}', + ], + ) + + kwargs = ReplayProfile.resolve_args( + rate_type="replay", + rate=[1.0], + random_seed=42, + data=[str(trace)], + data_samples=data_samples, + ) + + assert kwargs["relative_timestamps"] == pytest.approx([0.0, 1.0], abs=1e-9) + assert kwargs["constraints"]["max_requests"] == 2 + + @pytest.mark.smoke + def test_resolve_profile_passes_replay_specific_kwargs(self, tmp_path: Path): + trace = _trace_path( + tmp_path, + [ + '{"ts": 5.0, "input_length": 1, "output_length": 1}', + '{"ts": 2.0, "input_length": 2, "output_length": 2}', + '{"ts": 8.0, "input_length": 3, "output_length": 3}', + ], + ) + + profile = asyncio.run( + resolve_profile( + profile="replay", + rate=[2.0], + random_seed=42, + rampup=0.0, + constraints={}, + max_seconds=None, + max_requests=2, + max_errors=None, + max_error_rate=None, + max_global_error_rate=None, + data=[str(trace)], + data_args=[{"timestamp_column": "ts"}], + data_samples=2, + ) + ) + + assert isinstance(profile, ReplayProfile) + assert profile.relative_timestamps == pytest.approx([0.0, 3.0], abs=1e-9) + assert profile.time_scale == 2.0 + assert profile.constraints["max_requests"] == 2 + + @pytest.mark.smoke + def test_next_strategy_returns_trace_then_none(self, tmp_path: Path): + trace = _trace_path( + tmp_path, + ['{"timestamp": 0, "input_length": 1, "output_length": 1}'], + ) + kwargs = ReplayProfile.resolve_args( + rate_type="replay", + rate=[2.0], + random_seed=42, + data=[str(trace)], + ) + profile = ReplayProfile(**kwargs) + + strategy = profile.next_strategy(None, None) + assert profile.strategy_types == ["trace"] + assert isinstance(strategy, TraceReplayStrategy) + assert strategy.relative_timestamps == [0.0] + assert strategy.time_scale == 2.0 + assert profile.next_strategy(strategy, None) is None diff --git a/tests/unit/data/deserializers/test_trace_synthetic.py b/tests/unit/data/deserializers/test_trace_synthetic.py new file mode 100644 index 000000000..9fbc6eefb --- /dev/null +++ b/tests/unit/data/deserializers/test_trace_synthetic.py @@ -0,0 +1,274 @@ +from __future__ import annotations + +from pathlib import Path +from unittest.mock import Mock + +import pytest +from datasets import Dataset + +from guidellm.data.deserializers.trace_synthetic import ( + TraceSyntheticDatasetDeserializer, +) +from guidellm.data.schemas import DataNotSupportedError + + +def _mock_processor() -> Mock: + """Tokenizer where each whitespace-delimited word is one token.""" + proc = Mock() + proc.encode.side_effect = lambda text: list(range(len(text.split()))) + proc.decode.side_effect = lambda tokens, skip_special_tokens=False: " ".join( + f"tok{i}" for i, _ in enumerate(tokens) + ) + return proc + + +def _write_trace(tmp_path: Path, content: str, suffix: str = ".jsonl") -> Path: + path = tmp_path / f"trace{suffix}" + path.write_text(content) + return path + + +class TestTraceSyntheticDatasetDeserializer: + @pytest.fixture + def deserializer(self) -> TraceSyntheticDatasetDeserializer: + return TraceSyntheticDatasetDeserializer() + + def _deserialize(self, deserializer, data, **kwargs): + return deserializer( + data=data, + processor_factory=_mock_processor, + random_seed=42, + **kwargs, + ) + + @pytest.mark.smoke + def test_loads_sorted_rows_and_keeps_token_columns_aligned( + self, tmp_path: Path, deserializer + ): + trace = _write_trace( + tmp_path, + '{"timestamp": 5.0, "input_length": 3, "output_length": 30}\n' + '{"timestamp": 2.0, "input_length": 1, "output_length": 10}\n' + '{"timestamp": 2.0, "input_length": 2, "output_length": 20}\n' + '{"timestamp": 8.0, "input_length": 0, "output_length": 40}\n', + ) + + ds = self._deserialize(deserializer, trace, type_="trace_synthetic") + + assert isinstance(ds, Dataset) + assert ds["prompt_tokens_count"] == [1, 2, 3, 0] + assert ds["output_tokens_count"] == [10, 20, 30, 40] + for prompt, token_count in zip( + ds["prompt"], ds["prompt_tokens_count"], strict=True + ): + assert len(_mock_processor().encode(prompt)) == token_count + + @pytest.mark.smoke + def test_honors_custom_column_names(self, tmp_path: Path, deserializer): + trace = _write_trace( + tmp_path, + '{"ts": 3.0, "input_tokens": 4, "generated_tokens": 40}\n' + '{"ts": 1.0, "input_tokens": 2, "generated_tokens": 20}\n', + ) + + ds = self._deserialize( + deserializer, + trace, + type_="trace_synthetic", + timestamp_column="ts", + prompt_tokens_column="input_tokens", + output_tokens_column="generated_tokens", + ) + + assert ds["prompt_tokens_count"] == [2, 4] + assert ds["output_tokens_count"] == [20, 40] + + @pytest.mark.smoke + def test_generates_large_trace_prompts_from_reusable_base( + self, tmp_path: Path, deserializer + ): + prompt_lengths = [ + 6755, + 7319, + 7234, + 2287, + 9013, + 6506, + 4824, + 3119, + 23090, + 3135, + 26874, + 10487, + 17448, + 6253, + 6725, + 13538, + 87162, + 6166, + 6320, + 2007, + 3174, + 3131, + 3159, + 6820, + 3154, + 9416, + 7460, + ] + output_lengths = [ + 500, + 490, + 794, + 316, + 3, + 3, + 173, + 20, + 453, + 19, + 458, + 402, + 610, + 3, + 32, + 71, + 402, + 24, + 548, + 354, + 19, + 23, + 20, + 26, + 21, + 145, + 3, + ] + timestamps = [ + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.5, + 0.5, + 0.5, + 0.5, + 0.5, + 0.5, + 0.5, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 2.0, + 2.0, + 2.0, + 2.0, + ] + trace = _write_trace( + tmp_path, + "\n".join( + ( + f'{{"timestamp": {timestamp}, ' + f'"input_length": {prompt_length}, ' + f'"output_length": {output_length}}}' + ) + for timestamp, prompt_length, output_length in zip( + timestamps, prompt_lengths, output_lengths, strict=True + ) + ), + ) + processor = _mock_processor() + + ds = deserializer( + data=trace, + processor_factory=lambda: processor, + random_seed=42, + type_="trace_synthetic", + ) + + assert ds["prompt_tokens_count"] == prompt_lengths + assert ds["output_tokens_count"] == output_lengths + assert processor.encode.call_count <= len(prompt_lengths) + 4 + for prompt, token_count in zip( + ds["prompt"], ds["prompt_tokens_count"], strict=True + ): + assert len(_mock_processor().encode(prompt)) == token_count + + @pytest.mark.smoke + def test_rejects_invalid_data(self, deserializer): + with pytest.raises(DataNotSupportedError, match="path to a trace file"): + self._deserialize(deserializer, 123) + + @pytest.mark.sanity + @pytest.mark.parametrize( + ("content", "kwargs", "match"), + [ + ("", {}, "empty"), + ( + '{"ts": 0, "input_length": 10, "output_length": 5}\n', + {}, + "timestamp", + ), + ( + '{"timestamp": 0, "input_length": 10}\n', + {}, + "output_length", + ), + ( + '{"timestamp": 0, "prompt_tokens": 10, "output_length": 5}\n', + { + "prompt_tokens_column": "prompt_tokens", + "output_tokens_column": "out", + }, + "out", + ), + ( + '{"timestamp": "bad", "input_length": 10, "output_length": 5}\n', + {}, + "could not convert", + ), + ( + '{"timestamp": 0, "input_length": "bad", "output_length": 5}\n', + {}, + "invalid literal", + ), + ( + '{"timestamp": 0, "input_length": 10, "output_length": null}\n', + {}, + "NoneType", + ), + ( + '{"timestamp": 0, "input_length": 10, "output_length": 5}\nnot-json\n', + {}, + "generating the dataset", + ), + ], + ) + def test_trace_validation_raises( + self, tmp_path: Path, deserializer, content, kwargs, match + ): + trace = _write_trace(tmp_path, content) + + with pytest.raises(DataNotSupportedError, match=match): + self._deserialize(deserializer, trace, **kwargs) + + @pytest.mark.sanity + def test_unsupported_file_suffix_raises(self, tmp_path: Path, deserializer): + trace = _write_trace( + tmp_path, + '{"timestamp": 0, "input_length": 10, "output_length": 5}\n', + suffix=".json", + ) + + with pytest.raises(DataNotSupportedError, match=r"Unsupported.*\.json"): + self._deserialize(deserializer, trace) diff --git a/tests/unit/scheduler/test_trace_replay.py b/tests/unit/scheduler/test_trace_replay.py new file mode 100644 index 000000000..8b6f6f7b0 --- /dev/null +++ b/tests/unit/scheduler/test_trace_replay.py @@ -0,0 +1,183 @@ +from __future__ import annotations + +import asyncio +from multiprocessing import get_context +from pathlib import Path + +import pytest +from datasets.exceptions import DatasetGenerationError + +from guidellm.scheduler import SchedulingStrategy, TraceReplayStrategy +from guidellm.schemas import RequestInfo +from guidellm.utils.trace_io import load_relative_timestamps + + +def _write_trace(tmp_path: Path, content: str, suffix: str = ".jsonl") -> Path: + path = tmp_path / f"trace{suffix}" + path.write_text(content) + return path + + +class TestLoadRelativeTimestamps: + @pytest.mark.smoke + def test_loads_sorted_relative_timestamps_with_duplicates(self, tmp_path: Path): + trace = _write_trace( + tmp_path, + '{"timestamp": 5.0, "input_length": 10, "output_length": 10}\n' + '{"timestamp": 2.0, "input_length": 20, "output_length": 20}\n' + '{"timestamp": 2.0, "input_length": 30, "output_length": 30}\n' + '{"timestamp": 8.0, "input_length": 40, "output_length": 40}\n', + ) + + assert load_relative_timestamps(trace) == pytest.approx( + [0.0, 0.0, 3.0, 6.0], abs=1e-9 + ) + + @pytest.mark.smoke + def test_loads_custom_timestamp_column(self, tmp_path: Path): + trace = _write_trace( + tmp_path, + '{"ts": 10.0, "input_length": 10, "output_length": 10}\n' + '{"ts": 10.25, "input_length": 20, "output_length": 20}\n', + ) + + assert load_relative_timestamps(trace, timestamp_column="ts") == pytest.approx( + [0.0, 0.25], abs=1e-9 + ) + + @pytest.mark.smoke + @pytest.mark.parametrize( + ("suffix", "content", "error_type", "match"), + [ + (".jsonl", "", ValueError, "no valid rows"), + ( + ".json", + '[{"timestamp": 0, "input_length": 10, "output_length": 100}]', + ValueError, + r"Unsupported.*\.json", + ), + ( + ".csv", + "timestamp,input_length,output_length\n0,10,100\n", + ValueError, + r"Unsupported.*\.csv", + ), + ( + ".jsonl", + '{"ts": 0, "input_length": 10, "output_length": 100}\n', + KeyError, + "timestamp", + ), + ( + ".jsonl", + '{"timestamp": "bad", "input_length": 10, "output_length": 100}\n', + ValueError, + "could not convert", + ), + ( + ".jsonl", + '{"timestamp": 0, "input_length": 10, "output_length": 100}\n' + "not-json\n", + DatasetGenerationError, + "generating the dataset", + ), + ], + ) + def test_invalid_trace_inputs_raise( + self, tmp_path: Path, suffix, content, error_type, match + ): + trace = _write_trace(tmp_path, content, suffix=suffix) + + with pytest.raises(error_type, match=match): + load_relative_timestamps(trace) + + +class TestTraceReplayStrategy: + @pytest.mark.smoke + def test_initialization_and_serialization(self): + strategy = TraceReplayStrategy( + relative_timestamps=[0.0, 0.5, 1.0], + time_scale=2.0, + ) + + assert strategy.type_ == "trace" + assert str(strategy) == "trace@2.00" + assert strategy.processes_limit == 1 + assert strategy.requests_limit is None + restored = SchedulingStrategy.model_validate(strategy.model_dump()) + assert isinstance(restored, TraceReplayStrategy) + assert restored.relative_timestamps == [0.0, 0.5, 1.0] + assert restored.time_scale == 2.0 + + @pytest.mark.smoke + def test_next_request_time_scales_timestamps(self): + strategy = TraceReplayStrategy( + relative_timestamps=[0.0, 0.5, 1.0], + time_scale=2.0, + ) + strategy.init_processes_timings( + worker_count=1, + max_concurrency=10, + mp_context=get_context(), + ) + strategy.init_processes_start(1000.0) + + async def run(): + return [await strategy.next_request_time(0) for _ in range(3)] + + assert asyncio.run(run()) == pytest.approx([1000.0, 1001.0, 1002.0], abs=1e-6) + + @pytest.mark.smoke + def test_next_request_time_parks_when_trace_exhausted(self): + strategy = TraceReplayStrategy( + relative_timestamps=[0.0, 0.5], + time_scale=1.0, + ) + strategy.init_processes_timings( + worker_count=1, + max_concurrency=10, + mp_context=get_context(), + ) + strategy.init_processes_start(1000.0) + + async def run(): + # Consume the 2 valid slots + await strategy.next_request_time(0) + await strategy.next_request_time(0) + # The 3rd call parks; should raise CancelledError when cancelled + task = asyncio.create_task(strategy.next_request_time(0)) + await asyncio.sleep(0.05) + assert not task.done(), "expected to be parked, not resolved" + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + asyncio.run(run()) + + @pytest.mark.smoke + def test_empty_trace_has_no_request_limit_and_uses_start_time(self): + strategy = TraceReplayStrategy(relative_timestamps=[], time_scale=1.0) + strategy.init_processes_timings( + worker_count=1, + max_concurrency=10, + mp_context=get_context(), + ) + strategy.init_processes_start(123.0) + + assert strategy.requests_limit is None + + async def run(): + return await strategy.next_request_time(0) + + assert asyncio.run(run()) == pytest.approx(123.0) + + @pytest.mark.smoke + def test_request_completed_no_op(self): + strategy = TraceReplayStrategy(relative_timestamps=[0.0], time_scale=1.0) + info = RequestInfo( + request_id="x", + status="completed", + scheduler_process_id=0, + scheduler_start_time=0, + ) + strategy.request_completed(info)