diff --git a/benchmarking/nightly-benchmark.yaml b/benchmarking/nightly-benchmark.yaml index daa48496dc..4aac5f9732 100644 --- a/benchmarking/nightly-benchmark.yaml +++ b/benchmarking/nightly-benchmark.yaml @@ -57,6 +57,11 @@ sinks: # Whether to delete scratch dirs after each run delete_scratch: true +# The size of the object store used by Ray which can be either a value in bytes (int), or +# a fraction of total system memory (float), or the value "default" (string) which allows +# for "ray start" to determine object store size. +object_store_size: 536870912000 # 500GB + entries: - name: domain_classification_raydata enabled: true @@ -91,7 +96,6 @@ entries: - metric: domain_label_news_count exact_value: 2817 - object_store_size_bytes: 214748364800 - name: domain_classification_xenna enabled: true script: domain_classification_benchmark.py diff --git a/benchmarking/run.py b/benchmarking/run.py index 0be7e8fc96..d3a44f2e4a 100755 --- a/benchmarking/run.py +++ b/benchmarking/run.py @@ -45,11 +45,12 @@ from runner.path_resolver import PathResolver from runner.process import run_command_with_timeout from runner.ray_cluster import ( + get_ray_cluster_data, setup_ray_cluster_and_env, teardown_ray_cluster_and_env, ) from runner.session import Session -from runner.utils import find_result, get_obj_for_json, resolve_env_vars +from runner.utils import find_result, get_obj_for_json, remove_disabled_blocks, resolve_env_vars def ensure_dir(dir_path: Path) -> None: @@ -172,7 +173,7 @@ def run_entry( num_gpus=ray_num_gpus, enable_object_spilling=ray_enable_object_spilling, ray_log_path=logs_path / "ray.log", - object_store_size_bytes=entry.object_store_size_bytes, + object_store_size=None if entry.object_store_size == "default" else entry.object_store_size, ) # Prepopulate /params.json with entry params. @@ -180,7 +181,7 @@ def run_entry( (session_entry_path / "params.json").write_text( json.dumps( { - "object_store_size_bytes": entry.object_store_size_bytes, + "object_store_size_bytes": entry.object_store_size, "ray_num_cpus": ray_num_cpus, "ray_num_gpus": ray_num_gpus, "ray_enable_object_spilling": ray_enable_object_spilling, @@ -215,7 +216,7 @@ def run_entry( "logs_dir": logs_path, } ) - ray_data = {} + ray_cluster_data = get_ray_cluster_data() # script_persisted_data is a dictionary with keys "params" and "metrics" # "params" will contain everything the script wrote to its params.json file # "metrics" will contain everything the script wrote to its metrics.json file plus metrics @@ -223,7 +224,7 @@ def run_entry( script_persisted_data = get_entry_script_persisted_data(session_entry_path) result_data.update( { - "ray_data": ray_data, + "ray_cluster_data": ray_cluster_data, "metrics": script_persisted_data["metrics"], "params": script_persisted_data["params"], } @@ -298,12 +299,13 @@ def main() -> int: # noqa: C901 # Preprocess the config dict prior to creating objects from it try: Session.assert_valid_config_dict(config_dict) + config_dict = remove_disabled_blocks(config_dict) config_dict = resolve_env_vars(config_dict) except ValueError as e: logger.error(f"Invalid configuration: {e}") return 1 - session = Session.create_from_dict(config_dict, args.entries) + session = Session.from_dict(config_dict, args.entries) if args.list: for entry in session.entries: @@ -324,7 +326,6 @@ def main() -> int: # noqa: C901 # Print a summary of the entries that will be run in the for loop below # Disabled entries will not be printed - # TODO: should entries be created unconditionally and have an "enabled" field instead? logger.info("Benchmark entries to be run in this session:") for idx, entry in enumerate(session.entries, start=1): logger.info(f"\t{idx}. {entry.name}") diff --git a/benchmarking/runner/entry.py b/benchmarking/runner/entry.py index 168f283a05..ebc843cd18 100644 --- a/benchmarking/runner/entry.py +++ b/benchmarking/runner/entry.py @@ -17,10 +17,12 @@ from __future__ import annotations import re -from dataclasses import dataclass, field +from dataclasses import dataclass, field, fields from pathlib import Path from typing import TYPE_CHECKING, Any +from runner.utils import get_total_memory_bytes + if TYPE_CHECKING: from runner.datasets import DatasetResolver from runner.path_resolver import PathResolver @@ -39,14 +41,19 @@ class Entry: sink_data: list[dict[str, Any]] | dict[str, Any] = field(default_factory=dict) requirements: list[dict[str, Any]] | dict[str, Any] = field(default_factory=dict) ray: dict[str, Any] = field(default_factory=dict) # supports only single node: num_cpus,num_gpus,object_store_gb - # If set, overrides the session-level default_object_store_size setting for this entry - object_store_size_bytes: int | None = None + # If set, overrides the session-level object_store_size setting for this entry + # Value will be either number of bytes (int), fraction of system memory (float), or None or "default" (string) both + # representing the default object store size as used by "ray start". + object_store_size: int | float | str | None = None # If set, overrides the session-level delete_scratch setting for this entry delete_scratch: bool | None = None - enabled: bool = True def __post_init__(self) -> None: # noqa: C901, PLR0912 """Post-initialization checks and updates for dataclass.""" + # Process object_store_size by converting values representing fractions of system memory to bytes. + if isinstance(self.object_store_size, float): + self.object_store_size = int(get_total_memory_bytes() * self.object_store_size) + # Convert the sink_data list of dicts to a dict of dicts for easier lookup with key from "name". # sink_data typically starts as a list of dicts from reading YAML, like this: # sink_data: @@ -109,6 +116,21 @@ def __post_init__(self) -> None: # noqa: C901, PLR0912 raise ValueError(msg) self.requirements = requirements + @classmethod + def from_dict(cls, data: dict[str, Any]) -> Entry: + """Create Entry from dict, ignoring extra keys. + + Args: + data: Dictionary containing entry configuration data. + + Returns: + Entry instance with only valid fields populated. + """ + # Get only the fields that are defined in the dataclass + valid_fields = {f.name for f in fields(cls)} + filtered_data = {k: v for k, v in data.items() if k in valid_fields} + return cls(**filtered_data) + def get_command_to_run( self, session_entry_path: Path, diff --git a/benchmarking/runner/env_capture.py b/benchmarking/runner/env_capture.py index 5f43257450..7e02424384 100644 --- a/benchmarking/runner/env_capture.py +++ b/benchmarking/runner/env_capture.py @@ -30,7 +30,7 @@ def dump_env(session_obj: Session, output_path: Path) -> dict[str, Any]: env_data = get_env() - env_data["default_object_store_size_bytes"] = session_obj.default_object_store_size_bytes + env_data["object_store_size"] = session_obj.object_store_size # Try package managers in order of preference for capturing the environment # package_managers = [("uv", "pip freeze"), ("pip", "freeze"), ("micromamba", "list --explicit"), ("conda", "list --explicit")] # noqa: ERA001 diff --git a/benchmarking/runner/ray_cluster.py b/benchmarking/runner/ray_cluster.py index 07a5a662e2..4adccc6a95 100644 --- a/benchmarking/runner/ray_cluster.py +++ b/benchmarking/runner/ray_cluster.py @@ -20,7 +20,9 @@ import time import uuid from pathlib import Path +from typing import Any +import ray from loguru import logger from runner.utils import run_shm_size_check @@ -35,7 +37,7 @@ def setup_ray_cluster_and_env( # noqa: PLR0913 num_gpus: int, enable_object_spilling: bool, ray_log_path: Path, - object_store_size_bytes: int | None = None, + object_store_size: int | None = None, include_dashboard: bool = True, ) -> tuple[RayClient, Path]: """Setup a Ray cluster and set the RAY_ADDRESS environment variable and return the Ray client and temp dir.""" @@ -70,7 +72,7 @@ def setup_ray_cluster_and_env( # noqa: PLR0913 enable_object_spilling=enable_object_spilling, ray_dashboard_host="0.0.0.0", # noqa: S104 ray_stdouterr_capture_file=ray_stdouterr_capture_file, - object_store_memory=object_store_size_bytes, + object_store_memory=object_store_size, ) client.start() @@ -111,17 +113,6 @@ def teardown_ray_cluster_and_env( logger.exception("Failed to copy/remove Ray temp dir") -def _ensure_ray_client_process_started(client: RayClient, timeout_s: int, poll_interval_s: float) -> None: - """Ensure the Ray client process has been started, no longer than timeout.""" - elapsed_s = 0 - while client.ray_process is None and elapsed_s < timeout_s: - time.sleep(poll_interval_s) - elapsed_s += poll_interval_s - if client.ray_process is None: - msg = f"Ray client process failed to start in {timeout_s} seconds" - raise RuntimeError(msg) - - def check_ray_responsive(timeout_s: int = 20) -> bool: # Assume the env var RAY_ADDRESS is set to the correct value by code starting the Ray cluster logger.debug(f"Verifying Ray cluster is responsive, using RAY_ADDRESS={os.environ.get('RAY_ADDRESS')}") @@ -141,16 +132,16 @@ def check_ray_responsive(timeout_s: int = 20) -> bool: timeout=timeout_s, ) if "No cluster status" in result.stdout or "Error" in result.stdout: - logger.debug(f"Ray cluster is not responsive: {result.stdout}") + logger.debug("Ray cluster is not responsive ('No cluster status' returned or Error in output)") else: logger.debug("Ray cluster IS responsive") responsive = True - except subprocess.CalledProcessError as e: - logger.debug(f"Ray cluster is not responsive ('ray status' command failed): {e.stdout}") + except subprocess.CalledProcessError: + logger.debug("Ray cluster is not responsive ('ray status' command failed)") - except subprocess.TimeoutExpired as e: - logger.debug(f"Ray cluster is not responsive ('ray status' command timed out): {e.stdout}") + except subprocess.TimeoutExpired: + logger.debug("Ray cluster is not responsive ('ray status' command timed out)") finally: # Also show the output of `df -h /dev/shm`, since this is often a symptom of problems @@ -165,6 +156,26 @@ def check_ray_responsive(timeout_s: int = 20) -> bool: return responsive +def get_ray_cluster_data() -> dict[str, Any]: + """Get resource data from the Ray cluster.""" + ray.init(ignore_reinit_error=True) + time.sleep(0.2) # ray.available_resources() returns might have a lag + ray_data = ray.cluster_resources() + ray.shutdown() + return ray_data + + +def _ensure_ray_client_process_started(client: RayClient, timeout_s: int, poll_interval_s: float) -> None: + """Ensure the Ray client process has been started, no longer than timeout.""" + elapsed_s = 0 + while client.ray_process is None and elapsed_s < timeout_s: + time.sleep(poll_interval_s) + elapsed_s += poll_interval_s + if client.ray_process is None: + msg = f"Ray client process failed to start in {timeout_s} seconds" + raise RuntimeError(msg) + + def _copy_item_safely(src_path: Path, dst_path: Path) -> None: """Copy a single file or directory, logging warnings on failure.""" try: diff --git a/benchmarking/runner/session.py b/benchmarking/runner/session.py index bd07923a3c..9a2fd3b8a1 100644 --- a/benchmarking/runner/session.py +++ b/benchmarking/runner/session.py @@ -36,14 +36,15 @@ from runner.utils import get_total_memory_bytes -@dataclass(frozen=True, kw_only=True) +@dataclass(kw_only=True) class Session: results_path: Path entries: list[Entry] = field(default_factory=list) sinks: list[Sink] = field(default_factory=list) default_timeout_s: int = 7200 - # Set object store memory to 50% of total system memory by default - default_object_store_size_bytes: int = int(get_total_memory_bytes() * 0.5) + # object store size is either a value in bytes (int), a fraction of total system memory (float), or None or the + # value "default" (string) both representing the default object store size as used by "ray start". + object_store_size: int | float | str | None = 0.5 # Whether to delete the entry's scratch directory after completion by default delete_scratch: bool = True path_resolver: PathResolver = None @@ -57,6 +58,10 @@ def __post_init__(self) -> None: msg = f"Duplicate entry name(s) found: {', '.join(duplicates)}" raise ValueError(msg) + # Process object_store_size by converting values representing fractions of system memory to bytes. + if isinstance(self.object_store_size, float): + self.object_store_size = int(get_total_memory_bytes() * self.object_store_size) + # Update delete_scratch for each entry that has not been set to the session-level delete_scratch setting for entry in self.entries: if entry.delete_scratch is None: @@ -67,10 +72,10 @@ def __post_init__(self) -> None: if entry.timeout_s is None: entry.timeout_s = self.default_timeout_s - # Update object store size for each entry that has not been set to the session-level default_object_store_size setting + # Update object store size for each entry that has not been set. for entry in self.entries: - if entry.object_store_size_bytes is None: - entry.object_store_size_bytes = self.default_object_store_size_bytes + if entry.object_store_size is None: + entry.object_store_size = self.object_store_size @classmethod def assert_valid_config_dict(cls, data: dict) -> None: @@ -82,7 +87,7 @@ def assert_valid_config_dict(cls, data: dict) -> None: raise ValueError(msg) @classmethod - def create_from_dict(cls, data: dict, entry_filter_expr: str | None = None) -> Session: + def from_dict(cls, data: dict, entry_filter_expr: str | None = None) -> Session: """ Factory method to create a Session from a dictionary. @@ -99,9 +104,7 @@ def create_from_dict(cls, data: dict, entry_filter_expr: str | None = None) -> S sess_data = {k: v for k, v in data.items() if k in sess_field_names} sinks = cls.create_sinks_from_dict(sess_data.get("sinks", [])) - # Load entries only if enabled (enabled by default) - # TODO: should entries be created unconditionally and use their "enabled" field instead? - entries = [Entry(**e) for e in sess_data["entries"] if e.get("enabled", True)] + entries = [Entry.from_dict(e) for e in sess_data["entries"]] # Filter entries based on the expression, if provided. # Example: expr "foo and not foobar" will include all entries @@ -132,10 +135,6 @@ def create_sinks_from_dict(cls, sink_configs: list[dict]) -> list[Sink]: sinks = [] for sink_config in sink_configs: sink_name = sink_config["name"] - sink_enabled = sink_config.get("enabled", True) - if not sink_enabled: - logger.warning(f"Sink {sink_name} is not enabled, skipping") - continue if sink_name == "mlflow": from runner.sinks.mlflow_sink import MlflowSink diff --git a/benchmarking/runner/sinks/gdrive_sink.py b/benchmarking/runner/sinks/gdrive_sink.py index 9c2e0b1d43..7d09b6c9de 100644 --- a/benchmarking/runner/sinks/gdrive_sink.py +++ b/benchmarking/runner/sinks/gdrive_sink.py @@ -32,7 +32,6 @@ class GdriveSink(Sink): def __init__(self, sink_config: dict[str, Any]): super().__init__(sink_config) self.sink_config = sink_config - self.enabled = self.sink_config.get("enabled", True) self.results: list[dict[str, Any]] = [] self.session_name: str = None self.matrix_config: Session = None @@ -57,17 +56,14 @@ def process_result(self, result_dict: dict[str, Any], matrix_entry: Entry) -> No pass def finalize(self) -> None: - if self.enabled: - try: - tar_path = self._tar_results_and_artifacts() - self._upload_to_gdrive(tar_path) - except Exception as e: # noqa: BLE001 - tb = traceback.format_exc() - logger.error(f"GdriveSink: Error uploading to Google Drive: {e}\n{tb}") - finally: - self._delete_tar_file(tar_path) - else: - logger.warning("GdriveSink: Not enabled, skipping post.") + try: + tar_path = self._tar_results_and_artifacts() + self._upload_to_gdrive(tar_path) + except Exception as e: # noqa: BLE001 + tb = traceback.format_exc() + logger.error(f"GdriveSink: Error uploading to Google Drive: {e}\n{tb}") + finally: + self._delete_tar_file(tar_path) def _tar_results_and_artifacts(self) -> Path: results_path = Path(self.matrix_config.results_path) diff --git a/benchmarking/runner/sinks/mlflow_sink.py b/benchmarking/runner/sinks/mlflow_sink.py index 05b5ddb3c7..bd944f3a57 100644 --- a/benchmarking/runner/sinks/mlflow_sink.py +++ b/benchmarking/runner/sinks/mlflow_sink.py @@ -33,7 +33,6 @@ def __init__(self, sink_config: dict[str, Any]): if not self.experiment: msg = "MlflowSink: No experiment configured" raise ValueError(msg) - self.enabled = self.sink_config.get("enabled", True) self.results: list[dict[str, Any]] = [] self.session_name: str = None self.matrix_config: Session = None @@ -54,14 +53,11 @@ def process_result(self, result_dict: dict[str, Any], matrix_entry: Entry) -> No self.results.append((additional_metrics, result_dict)) def finalize(self) -> None: - if self.enabled: - try: - self._push(self.results) - except Exception as e: # noqa: BLE001 - tb = traceback.format_exc() - logger.error(f"MlflowSink: Error posting to Mlflow: {e}\n{tb}") - else: - logger.warning("MlflowSink: Not enabled, skipping post.") + try: + self._push(self.results) + except Exception as e: # noqa: BLE001 + tb = traceback.format_exc() + logger.error(f"MlflowSink: Error posting to Mlflow: {e}\n{tb}") def _push(self, results: list[dict[str, Any]]) -> None: pass diff --git a/benchmarking/runner/sinks/slack_sink.py b/benchmarking/runner/sinks/slack_sink.py index 3405307aea..8891314f4a 100644 --- a/benchmarking/runner/sinks/slack_sink.py +++ b/benchmarking/runner/sinks/slack_sink.py @@ -75,7 +75,6 @@ class SlackSink(Sink): def __init__(self, sink_config: dict[str, Any]): super().__init__(sink_config) self.sink_config = sink_config - self.enabled = self.sink_config.get("enabled", True) self.session_name: str | None = None self.matrix_config: Session = None self.env_dict: dict[str, Any] = None @@ -108,15 +107,12 @@ def process_result(self, result_dict: dict[str, Any], matrix_entry: Entry) -> No def finalize(self) -> None: # Posts the queued results to slack as a final report. - if self.enabled: - try: - self._post() - except Exception as e: # noqa: BLE001 - # Optionally, log or handle posting errors - tb = traceback.format_exc() - logger.error(f"SlackSink: Error posting to Slack: {e}\n{tb}") - else: - logger.warning("SlackSink: Not enabled, skipping post.") + try: + self._post() + except Exception as e: # noqa: BLE001 + # Optionally, log or handle posting errors + tb = traceback.format_exc() + logger.error(f"SlackSink: Error posting to Slack: {e}\n{tb}") def _post(self) -> None: # noqa: C901 message_text_values = { diff --git a/benchmarking/runner/utils.py b/benchmarking/runner/utils.py index 6545400016..4e659c4ebc 100644 --- a/benchmarking/runner/utils.py +++ b/benchmarking/runner/utils.py @@ -60,6 +60,34 @@ def _replace_env_var(match: re.Match[str]) -> str: raise ValueError(msg) +def remove_disabled_blocks(obj: object) -> object: + """ + Recursively remove dictionary blocks that contain "enabled": False. + Processes dicts and lists; other types are returned unchanged. + """ + if isinstance(obj, dict): + # If this block explicitly disables itself, remove it + if obj.get("enabled", True) is False: + return None + # Else process all values + result = {} + for k, v in obj.items(): + filtered = remove_disabled_blocks(v) + if filtered is not None: + result[k] = filtered + return result + elif isinstance(obj, list): + # Process each item; skip any that are removed + result = [] + for item in obj: + filtered = remove_disabled_blocks(item) + if filtered is not None: + result.append(filtered) + return result + else: + return obj + + def resolve_env_vars(data: dict | list | str | object) -> dict | list | str | object: """Recursively resolve environment variables in strings in/from various objects.