Skip to content
6 changes: 5 additions & 1 deletion benchmarking/nightly-benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ sinks:
# Whether to delete scratch dirs after each run
delete_scratch: true

# The size of the object store used by Ray which can be either a value in bytes (int), or
# a fraction of total system memory (float), or the value "default" (string) which allows
# for "ray start" to determine object store size.
object_store_size: 536870912000 # 500GB

entries:
- name: domain_classification_raydata
enabled: true
Expand Down Expand Up @@ -91,7 +96,6 @@ entries:
- metric: domain_label_news_count
exact_value: 2817

object_store_size_bytes: 214748364800
- name: domain_classification_xenna
enabled: true
script: domain_classification_benchmark.py
Expand Down
15 changes: 8 additions & 7 deletions benchmarking/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@
from runner.path_resolver import PathResolver
from runner.process import run_command_with_timeout
from runner.ray_cluster import (
get_ray_cluster_data,
setup_ray_cluster_and_env,
teardown_ray_cluster_and_env,
)
from runner.session import Session
from runner.utils import find_result, get_obj_for_json, resolve_env_vars
from runner.utils import find_result, get_obj_for_json, remove_disabled_blocks, resolve_env_vars


def ensure_dir(dir_path: Path) -> None:
Expand Down Expand Up @@ -172,15 +173,15 @@ def run_entry(
num_gpus=ray_num_gpus,
enable_object_spilling=ray_enable_object_spilling,
ray_log_path=logs_path / "ray.log",
object_store_size_bytes=entry.object_store_size_bytes,
object_store_size=None if entry.object_store_size == "default" else entry.object_store_size,
)

# Prepopulate <session_entry_path>/params.json with entry params.
# These will be appended with the benchmark params by the benchmark script.
(session_entry_path / "params.json").write_text(
json.dumps(
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT of logging stuff from runtime as @ayushdg recommended

ray.cluster_resources() exposes a nice dictionary we can log/use

{'accelerator_type:A100': 1.0,
 'node:__internal_head__': 1.0,
 'node:10.184.206.10': 1.0,
 'CPU': 128.0,
 'object_store_memory': 644245094400.0,
 'memory': 1225750720512.0,
 'GPU': 4.0}

Maybe new set of keys?

Copy link
Contributor Author

@rlratzel rlratzel Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a placeholder for ray data in results.json which I used for this. Would we rather this data go in params.json? Since it's actual data returned from the cluster, it seems less like params to the benchmark and more like results, but I don't feel strongly.

"object_store_size_bytes": entry.object_store_size_bytes,
"object_store_size_bytes": entry.object_store_size,
"ray_num_cpus": ray_num_cpus,
"ray_num_gpus": ray_num_gpus,
"ray_enable_object_spilling": ray_enable_object_spilling,
Expand Down Expand Up @@ -215,15 +216,15 @@ def run_entry(
"logs_dir": logs_path,
}
)
ray_data = {}
ray_cluster_data = get_ray_cluster_data()
# script_persisted_data is a dictionary with keys "params" and "metrics"
# "params" will contain everything the script wrote to its params.json file
# "metrics" will contain everything the script wrote to its metrics.json file plus metrics
# from the Task objects restored from the tasks.pkl file.
script_persisted_data = get_entry_script_persisted_data(session_entry_path)
result_data.update(
{
"ray_data": ray_data,
"ray_cluster_data": ray_cluster_data,
"metrics": script_persisted_data["metrics"],
"params": script_persisted_data["params"],
}
Expand Down Expand Up @@ -298,12 +299,13 @@ def main() -> int: # noqa: C901
# Preprocess the config dict prior to creating objects from it
try:
Session.assert_valid_config_dict(config_dict)
config_dict = remove_disabled_blocks(config_dict)
config_dict = resolve_env_vars(config_dict)
except ValueError as e:
logger.error(f"Invalid configuration: {e}")
return 1

session = Session.create_from_dict(config_dict, args.entries)
session = Session.from_dict(config_dict, args.entries)

if args.list:
for entry in session.entries:
Expand All @@ -324,7 +326,6 @@ def main() -> int: # noqa: C901

# Print a summary of the entries that will be run in the for loop below
# Disabled entries will not be printed
# TODO: should entries be created unconditionally and have an "enabled" field instead?
logger.info("Benchmark entries to be run in this session:")
for idx, entry in enumerate(session.entries, start=1):
logger.info(f"\t{idx}. {entry.name}")
Expand Down
30 changes: 26 additions & 4 deletions benchmarking/runner/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
from __future__ import annotations

import re
from dataclasses import dataclass, field
from dataclasses import dataclass, field, fields
from pathlib import Path
from typing import TYPE_CHECKING, Any

from runner.utils import get_total_memory_bytes

if TYPE_CHECKING:
from runner.datasets import DatasetResolver
from runner.path_resolver import PathResolver
Expand All @@ -39,14 +41,19 @@ class Entry:
sink_data: list[dict[str, Any]] | dict[str, Any] = field(default_factory=dict)
requirements: list[dict[str, Any]] | dict[str, Any] = field(default_factory=dict)
ray: dict[str, Any] = field(default_factory=dict) # supports only single node: num_cpus,num_gpus,object_store_gb
# If set, overrides the session-level default_object_store_size setting for this entry
object_store_size_bytes: int | None = None
# If set, overrides the session-level object_store_size setting for this entry
# Value will be either number of bytes (int), fraction of system memory (float), or None or "default" (string) both
# representing the default object store size as used by "ray start".
object_store_size: int | float | str | None = None
# If set, overrides the session-level delete_scratch setting for this entry
delete_scratch: bool | None = None
enabled: bool = True

def __post_init__(self) -> None: # noqa: C901, PLR0912
"""Post-initialization checks and updates for dataclass."""
# Process object_store_size by converting values representing fractions of system memory to bytes.
if isinstance(self.object_store_size, float):
self.object_store_size = int(get_total_memory_bytes() * self.object_store_size)

# Convert the sink_data list of dicts to a dict of dicts for easier lookup with key from "name".
# sink_data typically starts as a list of dicts from reading YAML, like this:
# sink_data:
Expand Down Expand Up @@ -109,6 +116,21 @@ def __post_init__(self) -> None: # noqa: C901, PLR0912
raise ValueError(msg)
self.requirements = requirements

@classmethod
def from_dict(cls, data: dict[str, Any]) -> Entry:
"""Create Entry from dict, ignoring extra keys.

Args:
data: Dictionary containing entry configuration data.

Returns:
Entry instance with only valid fields populated.
"""
# Get only the fields that are defined in the dataclass
valid_fields = {f.name for f in fields(cls)}
filtered_data = {k: v for k, v in data.items() if k in valid_fields}
return cls(**filtered_data)

def get_command_to_run(
self,
session_entry_path: Path,
Expand Down
2 changes: 1 addition & 1 deletion benchmarking/runner/env_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

def dump_env(session_obj: Session, output_path: Path) -> dict[str, Any]:
env_data = get_env()
env_data["default_object_store_size_bytes"] = session_obj.default_object_store_size_bytes
env_data["object_store_size"] = session_obj.object_store_size

# Try package managers in order of preference for capturing the environment
# package_managers = [("uv", "pip freeze"), ("pip", "freeze"), ("micromamba", "list --explicit"), ("conda", "list --explicit")] # noqa: ERA001
Expand Down
47 changes: 29 additions & 18 deletions benchmarking/runner/ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import time
import uuid
from pathlib import Path
from typing import Any

import ray
from loguru import logger
from runner.utils import run_shm_size_check

Expand All @@ -35,7 +37,7 @@ def setup_ray_cluster_and_env( # noqa: PLR0913
num_gpus: int,
enable_object_spilling: bool,
ray_log_path: Path,
object_store_size_bytes: int | None = None,
object_store_size: int | None = None,
include_dashboard: bool = True,
) -> tuple[RayClient, Path]:
"""Setup a Ray cluster and set the RAY_ADDRESS environment variable and return the Ray client and temp dir."""
Expand Down Expand Up @@ -70,7 +72,7 @@ def setup_ray_cluster_and_env( # noqa: PLR0913
enable_object_spilling=enable_object_spilling,
ray_dashboard_host="0.0.0.0", # noqa: S104
ray_stdouterr_capture_file=ray_stdouterr_capture_file,
object_store_memory=object_store_size_bytes,
object_store_memory=object_store_size,
)
client.start()

Expand Down Expand Up @@ -111,17 +113,6 @@ def teardown_ray_cluster_and_env(
logger.exception("Failed to copy/remove Ray temp dir")


def _ensure_ray_client_process_started(client: RayClient, timeout_s: int, poll_interval_s: float) -> None:
"""Ensure the Ray client process has been started, no longer than timeout."""
elapsed_s = 0
while client.ray_process is None and elapsed_s < timeout_s:
time.sleep(poll_interval_s)
elapsed_s += poll_interval_s
if client.ray_process is None:
msg = f"Ray client process failed to start in {timeout_s} seconds"
raise RuntimeError(msg)


def check_ray_responsive(timeout_s: int = 20) -> bool:
# Assume the env var RAY_ADDRESS is set to the correct value by code starting the Ray cluster
logger.debug(f"Verifying Ray cluster is responsive, using RAY_ADDRESS={os.environ.get('RAY_ADDRESS')}")
Expand All @@ -141,16 +132,16 @@ def check_ray_responsive(timeout_s: int = 20) -> bool:
timeout=timeout_s,
)
if "No cluster status" in result.stdout or "Error" in result.stdout:
logger.debug(f"Ray cluster is not responsive: {result.stdout}")
logger.debug("Ray cluster is not responsive ('No cluster status' returned or Error in output)")
else:
logger.debug("Ray cluster IS responsive")
responsive = True

except subprocess.CalledProcessError as e:
logger.debug(f"Ray cluster is not responsive ('ray status' command failed): {e.stdout}")
except subprocess.CalledProcessError:
logger.debug("Ray cluster is not responsive ('ray status' command failed)")

except subprocess.TimeoutExpired as e:
logger.debug(f"Ray cluster is not responsive ('ray status' command timed out): {e.stdout}")
except subprocess.TimeoutExpired:
logger.debug("Ray cluster is not responsive ('ray status' command timed out)")

finally:
# Also show the output of `df -h /dev/shm`, since this is often a symptom of problems
Expand All @@ -165,6 +156,26 @@ def check_ray_responsive(timeout_s: int = 20) -> bool:
return responsive


def get_ray_cluster_data() -> dict[str, Any]:
"""Get resource data from the Ray cluster."""
ray.init(ignore_reinit_error=True)
time.sleep(0.2) # ray.available_resources() returns might have a lag
ray_data = ray.cluster_resources()
ray.shutdown()
return ray_data


def _ensure_ray_client_process_started(client: RayClient, timeout_s: int, poll_interval_s: float) -> None:
"""Ensure the Ray client process has been started, no longer than timeout."""
elapsed_s = 0
while client.ray_process is None and elapsed_s < timeout_s:
time.sleep(poll_interval_s)
elapsed_s += poll_interval_s
if client.ray_process is None:
msg = f"Ray client process failed to start in {timeout_s} seconds"
raise RuntimeError(msg)


def _copy_item_safely(src_path: Path, dst_path: Path) -> None:
"""Copy a single file or directory, logging warnings on failure."""
try:
Expand Down
27 changes: 13 additions & 14 deletions benchmarking/runner/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@
from runner.utils import get_total_memory_bytes


@dataclass(frozen=True, kw_only=True)
@dataclass(kw_only=True)
class Session:
results_path: Path
entries: list[Entry] = field(default_factory=list)
sinks: list[Sink] = field(default_factory=list)
default_timeout_s: int = 7200
# Set object store memory to 50% of total system memory by default
default_object_store_size_bytes: int = int(get_total_memory_bytes() * 0.5)
# object store size is either a value in bytes (int), a fraction of total system memory (float), or None or the
# value "default" (string) both representing the default object store size as used by "ray start".
object_store_size: int | float | str | None = 0.5
# Whether to delete the entry's scratch directory after completion by default
delete_scratch: bool = True
path_resolver: PathResolver = None
Expand All @@ -57,6 +58,10 @@ def __post_init__(self) -> None:
msg = f"Duplicate entry name(s) found: {', '.join(duplicates)}"
raise ValueError(msg)

# Process object_store_size by converting values representing fractions of system memory to bytes.
if isinstance(self.object_store_size, float):
self.object_store_size = int(get_total_memory_bytes() * self.object_store_size)

# Update delete_scratch for each entry that has not been set to the session-level delete_scratch setting
for entry in self.entries:
if entry.delete_scratch is None:
Expand All @@ -67,10 +72,10 @@ def __post_init__(self) -> None:
if entry.timeout_s is None:
entry.timeout_s = self.default_timeout_s

# Update object store size for each entry that has not been set to the session-level default_object_store_size setting
# Update object store size for each entry that has not been set.
for entry in self.entries:
if entry.object_store_size_bytes is None:
entry.object_store_size_bytes = self.default_object_store_size_bytes
if entry.object_store_size is None:
entry.object_store_size = self.object_store_size

@classmethod
def assert_valid_config_dict(cls, data: dict) -> None:
Expand All @@ -82,7 +87,7 @@ def assert_valid_config_dict(cls, data: dict) -> None:
raise ValueError(msg)

@classmethod
def create_from_dict(cls, data: dict, entry_filter_expr: str | None = None) -> Session:
def from_dict(cls, data: dict, entry_filter_expr: str | None = None) -> Session:
"""
Factory method to create a Session from a dictionary.

Expand All @@ -99,9 +104,7 @@ def create_from_dict(cls, data: dict, entry_filter_expr: str | None = None) -> S
sess_data = {k: v for k, v in data.items() if k in sess_field_names}
sinks = cls.create_sinks_from_dict(sess_data.get("sinks", []))

# Load entries only if enabled (enabled by default)
# TODO: should entries be created unconditionally and use their "enabled" field instead?
entries = [Entry(**e) for e in sess_data["entries"] if e.get("enabled", True)]
entries = [Entry.from_dict(e) for e in sess_data["entries"]]

# Filter entries based on the expression, if provided.
# Example: expr "foo and not foobar" will include all entries
Expand Down Expand Up @@ -132,10 +135,6 @@ def create_sinks_from_dict(cls, sink_configs: list[dict]) -> list[Sink]:
sinks = []
for sink_config in sink_configs:
sink_name = sink_config["name"]
sink_enabled = sink_config.get("enabled", True)
if not sink_enabled:
logger.warning(f"Sink {sink_name} is not enabled, skipping")
continue
if sink_name == "mlflow":
from runner.sinks.mlflow_sink import MlflowSink

Expand Down
20 changes: 8 additions & 12 deletions benchmarking/runner/sinks/gdrive_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class GdriveSink(Sink):
def __init__(self, sink_config: dict[str, Any]):
super().__init__(sink_config)
self.sink_config = sink_config
self.enabled = self.sink_config.get("enabled", True)
self.results: list[dict[str, Any]] = []
self.session_name: str = None
self.matrix_config: Session = None
Expand All @@ -57,17 +56,14 @@ def process_result(self, result_dict: dict[str, Any], matrix_entry: Entry) -> No
pass

def finalize(self) -> None:
if self.enabled:
try:
tar_path = self._tar_results_and_artifacts()
self._upload_to_gdrive(tar_path)
except Exception as e: # noqa: BLE001
tb = traceback.format_exc()
logger.error(f"GdriveSink: Error uploading to Google Drive: {e}\n{tb}")
finally:
self._delete_tar_file(tar_path)
else:
logger.warning("GdriveSink: Not enabled, skipping post.")
try:
tar_path = self._tar_results_and_artifacts()
self._upload_to_gdrive(tar_path)
except Exception as e: # noqa: BLE001
tb = traceback.format_exc()
logger.error(f"GdriveSink: Error uploading to Google Drive: {e}\n{tb}")
finally:
self._delete_tar_file(tar_path)

def _tar_results_and_artifacts(self) -> Path:
results_path = Path(self.matrix_config.results_path)
Expand Down
14 changes: 5 additions & 9 deletions benchmarking/runner/sinks/mlflow_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def __init__(self, sink_config: dict[str, Any]):
if not self.experiment:
msg = "MlflowSink: No experiment configured"
raise ValueError(msg)
self.enabled = self.sink_config.get("enabled", True)
self.results: list[dict[str, Any]] = []
self.session_name: str = None
self.matrix_config: Session = None
Expand All @@ -54,14 +53,11 @@ def process_result(self, result_dict: dict[str, Any], matrix_entry: Entry) -> No
self.results.append((additional_metrics, result_dict))

def finalize(self) -> None:
if self.enabled:
try:
self._push(self.results)
except Exception as e: # noqa: BLE001
tb = traceback.format_exc()
logger.error(f"MlflowSink: Error posting to Mlflow: {e}\n{tb}")
else:
logger.warning("MlflowSink: Not enabled, skipping post.")
try:
self._push(self.results)
except Exception as e: # noqa: BLE001
tb = traceback.format_exc()
logger.error(f"MlflowSink: Error posting to Mlflow: {e}\n{tb}")

def _push(self, results: list[dict[str, Any]]) -> None:
pass
Loading