diff --git a/nemo_skills/pipeline/cli.py b/nemo_skills/pipeline/cli.py index 76ea60a73d..2df7eb772c 100644 --- a/nemo_skills/pipeline/cli.py +++ b/nemo_skills/pipeline/cli.py @@ -26,6 +26,7 @@ from nemo_skills.pipeline.generate import generate from nemo_skills.pipeline.megatron_lm.train import train_megatron_lm from nemo_skills.pipeline.nemo_evaluator import nemo_evaluator +from nemo_skills.pipeline.nemo_gym_rollouts import nemo_gym_rollouts from nemo_skills.pipeline.nemo_rl.grpo import grpo_nemo_rl from nemo_skills.pipeline.nemo_rl.sft import sft_nemo_rl from nemo_skills.pipeline.prepare_data import prepare_data diff --git a/nemo_skills/pipeline/nemo_gym_rollouts.py b/nemo_skills/pipeline/nemo_gym_rollouts.py new file mode 100644 index 0000000000..80fcb4b070 --- /dev/null +++ b/nemo_skills/pipeline/nemo_gym_rollouts.py @@ -0,0 +1,374 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""NeMo Gym Rollouts Pipeline. + +This pipeline command runs rollout collection with NeMo Gym, orchestrating: +- vLLM model server (optional, can use pre-hosted) +- Sandbox for code execution (optional) +- NeMo Gym servers (ng_run) +- Rollout collection client (ng_collect_rollouts) + +Example usage: + # Self-hosted vLLM server + ns nemo_gym_rollouts \\ + --cluster local \\ + --config_paths "ns_tools/configs/ns_tools.yaml,math_with_judge/configs/math_with_judge.yaml" \\ + --input_file data/example.jsonl \\ + --output_dir /results/rollouts \\ + --model /path/to/model \\ + --server_type vllm \\ + --server_gpus 1 \\ + --with_sandbox \\ + +agent_name=ns_tools_simple_agent \\ + +limit=10 \\ + +num_samples_in_parallel=3 \\ + +num_repeats=4 # Run each prompt 4 times for mean@4 + + # Pre-hosted server + ns nemo_gym_rollouts \\ + --cluster local \\ + --config_paths "ns_tools/configs/ns_tools.yaml" \\ + --input_file data/example.jsonl \\ + --output_dir /results/rollouts \\ + --server_address http://localhost:8000/v1 \\ + --policy_model_name nvidia/model-name \\ + +agent_name=ns_tools_simple_agent +""" + +import logging + +import typer + +import nemo_skills.pipeline.utils as pipeline_utils +from nemo_skills.pipeline.app import app, typer_unpacker +from nemo_skills.pipeline.utils.cluster import cluster_path_exists, parse_kwargs +from nemo_skills.pipeline.utils.declarative import ( + Command, + CommandGroup, + HardwareConfig, + Pipeline, +) +from nemo_skills.pipeline.utils.mounts import get_unmounted_path +from nemo_skills.pipeline.utils.scripts import ( + NemoGymRolloutsScript, + SandboxScript, + ServerScript, +) +from nemo_skills.utils import get_logger_name, setup_logging + +LOG = logging.getLogger(get_logger_name(__file__)) + + +@app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +@typer_unpacker +def nemo_gym_rollouts( + ctx: typer.Context, + cluster: str = typer.Option( + None, + help="One of the configs inside config_dir or NEMO_SKILLS_CONFIG_DIR or ./cluster_configs. " + "Can also use NEMO_SKILLS_CONFIG instead of specifying as argument.", + ), + config_paths: str = typer.Option( + ..., + help="Comma-separated list of NeMo Gym config YAML files for ng_run. " + "E.g., 'ns_tools/configs/ns_tools.yaml,math_with_judge/configs/math_with_judge.yaml'", + ), + input_file: str = typer.Option(..., help="Path to input JSONL file for rollout collection"), + output_dir: str = typer.Option(..., help="Directory for rollout outputs. Output file will be rollouts.jsonl"), + expname: str = typer.Option("nemo_gym_rollouts", help="NeMo Run experiment name"), + model: str = typer.Option(None, help="Path to model for self-hosted vLLM server"), + server_address: str = typer.Option( + None, + help="Address of pre-hosted server (e.g., http://localhost:8000/v1). If provided, skips self-hosted server.", + ), + server_type: pipeline_utils.SupportedServers = typer.Option( + None, + help="Type of server (vllm, trtllm, sglang, etc.)", + ), + server_gpus: int = typer.Option(None, help="Number of GPUs for self-hosted server"), + server_nodes: int = typer.Option(1, help="Number of nodes for self-hosted server"), + server_args: str = typer.Option("", help="Additional arguments for the server"), + with_sandbox: bool = typer.Option(False, help="If True, start a sandbox container for code execution"), + gym_path: str = typer.Option( + "/opt/NeMo-RL/3rdparty/Gym-workspace/Gym", + help="Path to NeMo Gym installation. Defaults to container built-in. Use for mounted/custom Gym.", + ), + policy_api_key: str = typer.Option( + "dummy", + help="API key for policy server. Use 'dummy' for local vLLM servers.", + ), + policy_model_name: str = typer.Option( + None, + help="Model name for policy server. Required for pre-hosted servers. " + "For self-hosted, defaults to the model path if not specified.", + ), + partition: str = typer.Option(None, help="Cluster partition to use"), + qos: str = typer.Option(None, help="Specify Slurm QoS"), + time_min: str = typer.Option(None, help="Slurm time-min parameter"), + config_dir: str = typer.Option(None, help="Custom directory for cluster configs"), + log_dir: str = typer.Option(None, help="Custom location for logs"), + exclusive: bool | None = typer.Option(None, help="Add exclusive flag to slurm job"), + num_random_seeds: int = typer.Option( + None, + help="Number of parallel rollout jobs to run. Each job writes to rollouts-rs{i}.jsonl. " + "Use this to scale rollout collection across multiple nodes.", + ), + random_seeds: str = typer.Option( + None, + help="Explicit list of seed indices to run (comma-separated, e.g., '0,2,5,7'). " + "Overrides num_random_seeds. Can provide a list directly when using through Python.", + ), + starting_seed: int = typer.Option( + 0, + help="Starting seed index when using num_random_seeds. " + "E.g., starting_seed=10 with num_random_seeds=4 creates seeds 10,11,12,13.", + ), + rerun_done: bool = typer.Option( + False, + help="If False (default), skip seeds that already have output files. " + "If True, re-run all seeds even if output files exist.", + ), + use_mounted_nemo_skills: bool = typer.Option( + True, + help="If True (default), use the nemo-skills code packaged to /nemo_run/code. " + "If False, avoid /nemo_run/code and use the container's installed nemo-skills instead. " + "Set to False when you want Gym to use its own bundled nemo-skills version.", + ), + dry_run: bool = typer.Option(False, help="Validate without executing"), + sbatch_kwargs: str = typer.Option("", help="Additional sbatch kwargs as JSON string"), +): + """Run NeMo Gym rollout collection pipeline. + + This command orchestrates running rollout collection with NeMo Gym: + 1. Optionally starts a vLLM model server (or uses pre-hosted) + 2. Optionally starts a sandbox for code execution + 3. Starts NeMo Gym servers via ng_run + 4. Runs ng_collect_rollouts to collect rollouts + + All Hydra arguments (prefixed with + or ++) are passed through to ng_run + and ng_collect_rollouts. Common arguments include: + - +agent_name=... (required for rollout collection) + - +limit=... (limit number of samples from input) + - +num_samples_in_parallel=... (concurrent requests) + - +num_repeats=N (run each prompt N times for mean@k evaluation) + + For large-scale rollout collection, use --num_random_seeds to create multiple + independent jobs. Each job has its own server and sandbox (unique ports to avoid + conflicts if scheduled on same node) and writes to rollouts-rs{i}.jsonl. + + Use --starting_seed to offset seed numbering (e.g., to continue from a previous run). + Use --random_seeds to specify explicit seeds (e.g., '0,2,5,7' to re-run specific seeds). + """ + setup_logging(disable_hydra_logs=False, use_rich=True) + extra_arguments = " ".join(ctx.args) + LOG.info("Starting NeMo Gym rollouts pipeline") + LOG.info(f"Extra arguments: {extra_arguments}") + + # Parse config paths + config_paths_list = [p.strip() for p in config_paths.split(",") if p.strip()] + LOG.info(f"Config paths: {config_paths_list}") + + # Validate server configuration + self_hosted = model is not None and server_gpus is not None + pre_hosted = server_address is not None + + if not self_hosted and not pre_hosted: + raise ValueError( + "Must provide either --model and --server_gpus for self-hosted server, " + "or --server_address for pre-hosted server" + ) + + if self_hosted and pre_hosted: + raise ValueError("Cannot specify both self-hosted (--model, --server_gpus) and pre-hosted (--server_address)") + + if self_hosted and server_type is None: + raise ValueError("--server_type is required when using self-hosted server") + + # Validate and set policy_model_name + if pre_hosted and policy_model_name is None: + raise ValueError("--policy_model_name is required when using a pre-hosted server (--server_address)") + + if self_hosted and policy_model_name is None: + # For self-hosted, default to the model path + policy_model_name = model + LOG.info(f"Using model path as policy_model_name: {policy_model_name}") + + # Get cluster config + cluster_config = pipeline_utils.get_cluster_config(cluster, config_dir) + + if not log_dir: + log_dir = f"{output_dir}/logs" + + # Parse sbatch kwargs + sbatch_kwargs_dict = parse_kwargs(sbatch_kwargs, exclusive=exclusive, qos=qos, time_min=time_min) + + # Determine seed indices for parallel jobs + if random_seeds is not None: + # Explicit seeds provided + if isinstance(random_seeds, str): + seed_indices = [int(s.strip()) for s in random_seeds.split(",")] + else: + seed_indices = list(random_seeds) + LOG.info(f"Using explicit seeds: {seed_indices}") + elif num_random_seeds: + seed_indices = list(range(starting_seed, starting_seed + num_random_seeds)) + LOG.info( + f"Creating {num_random_seeds} separate jobs (rs{starting_seed}..rs{starting_seed + num_random_seeds - 1})" + ) + else: + seed_indices = [None] # Single job, no seed suffix + + # Get server type string once if self-hosted + server_type_str = None + server_container = None + if self_hosted: + server_type_str = server_type.value if hasattr(server_type, "value") else server_type + server_container = cluster_config["containers"].get(server_type_str, server_type_str) + + # Filter out seeds with existing output files (unless rerun_done=True) + if not rerun_done and seed_indices != [None]: + filtered_seeds = [] + skipped_seeds = [] + for seed_idx in seed_indices: + output_file = f"{output_dir}/rollouts-rs{seed_idx}.jsonl" + # Check if file exists on cluster + try: + unmounted_path = get_unmounted_path(cluster_config, output_file) + if cluster_path_exists(cluster_config, unmounted_path): + skipped_seeds.append(seed_idx) + else: + filtered_seeds.append(seed_idx) + except Exception as e: + LOG.warning(f"Could not check if {output_file} exists: {e}. Including seed {seed_idx}.") + filtered_seeds.append(seed_idx) + + if skipped_seeds: + LOG.info(f"Skipping seeds with existing output files: {skipped_seeds}") + seed_indices = filtered_seeds + + if not seed_indices: + LOG.info("All seeds already have output files. Nothing to run.") + return None + + # Build jobs - one per seed, each with its own server/sandbox (unique ports) + jobs = [] + for seed_idx in seed_indices: + components = [] + + # Determine naming suffix + if seed_idx is not None: + output_file = f"{output_dir}/rollouts-rs{seed_idx}.jsonl" + job_suffix = f"_rs{seed_idx}" + else: + output_file = f"{output_dir}/rollouts.jsonl" + job_suffix = "" + + # 1. Server (optional, self-hosted) - each job gets its own server with unique port + server_script = None + if self_hosted: + server_script = ServerScript( + server_type=server_type_str, + model_path=model, + cluster_config=cluster_config, + num_gpus=server_gpus, + num_nodes=server_nodes, + server_args=server_args, + allocate_port=True, # Each job gets unique port + ) + + server_cmd = Command( + script=server_script, + container=server_container, + name=f"{expname}_server{job_suffix}", + ) + components.append(server_cmd) + LOG.info(f"Job{job_suffix}: server on port {server_script.port}") + + # 2. Sandbox (optional) - each job gets its own sandbox with unique port + sandbox_script = None + if with_sandbox: + sandbox_script = SandboxScript( + cluster_config=cluster_config, + allocate_port=True, # Each job gets unique port + ) + + sandbox_cmd = Command( + script=sandbox_script, + container=cluster_config["containers"]["sandbox"], + name=f"{expname}_sandbox{job_suffix}", + ) + components.append(sandbox_cmd) + LOG.info(f"Job{job_suffix}: sandbox on port {sandbox_script.port}") + + # 3. NeMo Gym rollouts + nemo_gym_script = NemoGymRolloutsScript( + config_paths=config_paths_list, + input_file=input_file, + output_file=output_file, + extra_arguments=extra_arguments, + server=server_script, + server_address=server_address, + sandbox=sandbox_script, + gym_path=gym_path, + policy_api_key=policy_api_key, + policy_model_name=policy_model_name, + ) + + nemo_gym_cmd = Command( + script=nemo_gym_script, + container=cluster_config["containers"]["nemo-rl"], + name=f"{expname}_nemo_gym{job_suffix}", + # If use_mounted_nemo_skills=False, avoid /nemo_run/code so Gym uses its bundled version + avoid_nemo_run_code=not use_mounted_nemo_skills, + ) + components.append(nemo_gym_cmd) + + # Create command group for this job + hardware = HardwareConfig( + partition=partition, + num_gpus=server_gpus if self_hosted else 0, + num_nodes=server_nodes if self_hosted else 1, + num_tasks=1, + sbatch_kwargs=sbatch_kwargs_dict, + ) + + group = CommandGroup( + commands=components, + hardware=hardware, + name=f"{expname}{job_suffix}", + log_dir=log_dir, + ) + + jobs.append({"name": f"{expname}{job_suffix}", "group": group}) + + LOG.info(f"Created {len(jobs)} job(s)") + + # Create and run pipeline + pipeline = Pipeline( + name=expname, + cluster_config=cluster_config, + jobs=jobs, + ) + + sequential = cluster_config["executor"] in ["local", "none"] + result = pipeline.run(dry_run=dry_run, sequential=sequential) + + LOG.info(f"Pipeline {'validated' if dry_run else 'submitted'} successfully") + return result + + +if __name__ == "__main__": + typer.main.get_command_name = lambda name: name + app() diff --git a/nemo_skills/pipeline/utils/__init__.py b/nemo_skills/pipeline/utils/__init__.py index 3e738a530f..37d16b734d 100644 --- a/nemo_skills/pipeline/utils/__init__.py +++ b/nemo_skills/pipeline/utils/__init__.py @@ -71,6 +71,15 @@ get_registered_external_repo, register_external_repo, ) +from nemo_skills.pipeline.utils.scripts import ( + BaseJobScript, + GenerationClientScript, + GymClientScript, + MultiVLLMServerScript, + NemoGymRolloutsScript, + SandboxScript, + ServerScript, +) from nemo_skills.pipeline.utils.server import ( SupportedServers, SupportedServersSelfHosted, diff --git a/nemo_skills/pipeline/utils/declarative.py b/nemo_skills/pipeline/utils/declarative.py index 7029dcc638..5497311195 100644 --- a/nemo_skills/pipeline/utils/declarative.py +++ b/nemo_skills/pipeline/utils/declarative.py @@ -36,7 +36,7 @@ get_packaging_job_key, tunnel_hash, ) -from nemo_skills.pipeline.utils.mounts import is_mounted_filepath +from nemo_skills.pipeline.utils.mounts import get_mounts_from_config, is_mounted_filepath from nemo_skills.pipeline.utils.server import wrap_python_path from nemo_skills.utils import get_logger_name @@ -219,6 +219,22 @@ class Command: script: run.Script container: str = "nemo-skills" name: str = "command" + # Optional extra mounts for this Command (e.g., "/dev/shm:/dev/shm"). + # These are merged with mounts from the cluster config when creating the executor. + mounts: Optional[List[str]] = None + # Optional per-command env var overrides (merged with Script-provided runtime env). + environment: Optional[Dict[str, str]] = None + # Runtime working directory to `cd` into before running the script body. + # This is useful because pyxis sets container-workdir=/nemo_run/code by default, + # which can cause imports from /nemo_run/code to shadow site-packages. + workdir: Optional[str] = None + # Control whether /nemo_run/code is used for Python imports for this command. + # - If avoid_nemo_run_code=True, we `cd` away from /nemo_run/code (default "/") and + # remove /nemo_run/code from PYTHONPATH if present. + # - If force_nemo_run_code=True, we prepend /nemo_run/code to PYTHONPATH even if the + # script later cd's elsewhere. + avoid_nemo_run_code: bool = False + force_nemo_run_code: bool = False def prepare_for_execution(self, cluster_config: Dict) -> Tuple[run.Script, Dict]: """Prepare script for execution. @@ -244,11 +260,46 @@ def prepare_for_execution(self, cluster_config: Dict) -> Tuple[run.Script, Dict] # Update script.inline with evaluated command self.script.set_inline(evaluated_command) + # Optionally wrap the command to control cwd/PYTHONPATH behavior (see fields above). + # This is done at the very end so it applies to both eager and lazy inline builders. + prelude_lines: List[str] = [] + + # If requested, force /nemo_run/code on PYTHONPATH (so mounted code is importable even after cd). + if self.force_nemo_run_code and self.avoid_nemo_run_code: + raise ValueError("Command cannot set both avoid_nemo_run_code=True and force_nemo_run_code=True") + if self.force_nemo_run_code: + prelude_lines.append('export PYTHONPATH="/nemo_run/code${PYTHONPATH:+:$PYTHONPATH}"') + + # If requested, avoid /nemo_run/code import shadowing (cd away + remove PYTHONPATH entry). + effective_workdir = self.workdir + if self.avoid_nemo_run_code and effective_workdir is None: + effective_workdir = "/" + if self.avoid_nemo_run_code: + prelude_lines.append('if [ -n "${PYTHONPATH:-}" ]; then') + prelude_lines.append( + " export PYTHONPATH=\"$(echo \"$PYTHONPATH\" | tr ':' '\\n' | grep -v '^/nemo_run/code' | paste -sd: -)\"" + ) + prelude_lines.append("fi") + + if effective_workdir: + prelude_lines.append(f'cd "{effective_workdir}"') + + if prelude_lines: + prelude = "\n".join(prelude_lines) + "\n" + inline_cmd = self.script.inline + if isinstance(inline_cmd, str): + self.script.set_inline(prelude + inline_cmd) + # If inline_cmd is still callable here, we intentionally do not wrap it; it should + # have been evaluated above. This keeps behavior deterministic. + # Build execution config from Script fields + merged_env = dict(runtime_metadata.get("environment", {})) + if self.environment: + merged_env.update(self.environment) execution_config = { "log_prefix": getattr(self.script, "log_prefix", "main"), - "environment": runtime_metadata.get("environment", {}), - "mounts": None, # Mounts not currently exposed by Scripts + "environment": merged_env, + "mounts": self.mounts, "container": self.container, } @@ -574,12 +625,30 @@ def _create_executor( if span_group_nodes and hardware and hardware.num_nodes is not None: num_nodes = hardware.num_nodes + # Check if the script has a per-script num_tasks override. + # This allows different scripts in the same CommandGroup to have different + # task configurations (e.g., vLLM servers with 2 tasks per node, Gym with 1). + script_num_tasks = getattr(command.script, "num_tasks_override", None) + tasks_per_node = ( + script_num_tasks + if script_num_tasks is not None + else (hardware.num_tasks if hardware and hardware.num_tasks is not None else 1) + ) + + # Allow per-command extra mounts without requiring editing the cluster YAML. + # We treat exec_config["mounts"] as additive and merge it with mounts from cluster_config. + mounts = None + extra_mounts = exec_config.get("mounts") or None + if extra_mounts: + base_mounts = get_mounts_from_config(cluster_config) + mounts = base_mounts + [m for m in extra_mounts if m not in base_mounts] + with env_context: return get_executor( cluster_config=cluster_config, container=container_image, num_nodes=num_nodes, - tasks_per_node=hardware.num_tasks if hardware and hardware.num_tasks is not None else 1, + tasks_per_node=tasks_per_node, gpus_per_node=hardware.num_gpus if hardware and hardware.num_gpus is not None else 0, job_name=job_name_override if job_name_override else command.name, log_dir=log_dir, @@ -589,7 +658,7 @@ def _create_executor( het_group=het_group, total_het_groups=total_het_groups, overlap=overlap, - mounts=exec_config.get("mounts"), + mounts=mounts, with_ray=self.with_ray, sbatch_kwargs=hardware.sbatch_kwargs, dependencies=dependencies, @@ -664,7 +733,30 @@ def _plan_and_add_job( if heterogeneous: shared_env_vars.update(exec_config.get("environment", {})) - # Share packager across executors for efficiency (single-group only) + # IMPORTANT: For single-group jobs with multiple components (overlap), + # nemo-run effectively uses the FIRST executor to determine the SLURM allocation + # (sbatch nodes/gpus/ntasks-per-node). Components that only need to run on the + # master node (e.g., Gym client, sandbox) set span_group_nodes=False which would + # request 1 node if they appear first. That leads to allocating only 1 node even + # when a later component (e.g., multi-node vLLM servers) needs >1 nodes. + # + # To avoid this footgun, ensure that components which span the group's nodes are + # scheduled first so the allocation matches the maximal requirements. + if not heterogeneous: + + def _allocation_sort_key(entry: Dict) -> Tuple[int, int]: + group_hw = entry["group"].hardware + span = getattr(entry["command"].script, "span_group_nodes", False) + # Prefer spanning components first; then prefer larger node counts. + nodes = (group_hw.num_nodes or 1) if span else 1 + return (0 if span else 1, -nodes) + + prepared_commands.sort(key=_allocation_sort_key) + + # Share packager across executors for efficiency (single-group only). + # NOTE: We must NOT key this off of comp_idx/het_idx because we may reorder + # prepared_commands (e.g., to ensure spanning components drive the allocation). + # Otherwise we can end up assigning executor.packager=None for early entries. shared_packager = None # Build commands and executors using prepared data @@ -709,9 +801,9 @@ def _plan_and_add_job( job_name_override=job_name_for_slurm, ) - # Share packager across executors for single-group jobs + # Share packager across executors for single-group jobs (robust to reordering) if not heterogeneous: - if comp_idx == 0 and het_idx == 0: + if shared_packager is None: shared_packager = executor.packager else: executor.packager = shared_packager diff --git a/nemo_skills/pipeline/utils/exp.py b/nemo_skills/pipeline/utils/exp.py index 0f9ec7f123..98c153a1ec 100644 --- a/nemo_skills/pipeline/utils/exp.py +++ b/nemo_skills/pipeline/utils/exp.py @@ -315,6 +315,7 @@ def get_executor( "--no-container-mount-home", "--mpi=pmix", "--wait=10", + "--kill-on-bad-exit=1", # Fail entire job if any task exits with non-zero (e.g., vLLM crash) # we need to be explicit about this in srun as commands might need to run in parallel f"--ntasks-per-node={tasks_per_node}", f"--nodes={num_nodes}", diff --git a/nemo_skills/pipeline/utils/packager.py b/nemo_skills/pipeline/utils/packager.py index 4cb8c6552e..fa2d890b39 100644 --- a/nemo_skills/pipeline/utils/packager.py +++ b/nemo_skills/pipeline/utils/packager.py @@ -111,6 +111,16 @@ def get_git_repo_path(path: str | Path = None): def get_packager(extra_package_dirs: tuple[str] | None = None): """Will check if we are running from a git repo and use git packager or default packager otherwise.""" nemo_skills_dir = get_registered_external_repo("nemo_skills").path + # Controls for deterministic packaging behavior across different launch directories. + # + # By default, if we are running inside a git repo, we use GitArchivePackager (committed files only). + # If the current repo does not contain `nemo_skills/`, we additionally include the *installed* + # nemo_skills package directory to ensure remote tasks can import it. + # + # This can be surprising when users rely on a venv editable install / uncommitted changes. + # These flags let you force using the installed package tree (PatternPackager) regardless of CWD. + force_installed_nemo_skills = bool(int(os.getenv("NEMO_SKILLS_FORCE_INSTALLED_PACKAGE", "0"))) + force_pattern_packager = bool(int(os.getenv("NEMO_SKILLS_FORCE_PATTERN_PACKAGER", "0"))) if extra_package_dirs: include_patterns = [str(Path(d) / "*") for d in extra_package_dirs] @@ -121,15 +131,18 @@ def get_packager(extra_package_dirs: tuple[str] | None = None): check_uncommited_changes = not bool(int(os.getenv("NEMO_SKILLS_DISABLE_UNCOMMITTED_CHANGES_CHECK", 0))) - # are we in a git repo? If yes, we are uploading the current code - repo_path = get_git_repo_path(path=None) # check if we are in a git repo in pwd + # Are we in a git repo? If yes, we *normally* upload committed code from the current repo. + # force_pattern_packager overrides this and forces packaging from the installed package tree. + repo_path = None if force_pattern_packager else get_git_repo_path(path=None) if repo_path: - # Do we have nemo_skills package in this repo? If no, we need to pick it up from installed location - if not (Path(repo_path) / "nemo_skills").is_dir(): + # Do we have nemo_skills package in this repo? If no, we need to pick it up from installed location. + # If force_installed_nemo_skills is set, we always pick up the installed package. + if force_installed_nemo_skills or not (Path(repo_path) / "nemo_skills").is_dir(): LOG.info( - "Not running from Nemo-Skills repo, trying to upload installed package. " + "Using installed Nemo-Skills package for packaging (force_installed=%s). " "Make sure there are no extra files in %s", + force_installed_nemo_skills, str(nemo_skills_dir / "*"), ) include_patterns.append(str(nemo_skills_dir / "*")) @@ -150,7 +163,9 @@ def get_packager(extra_package_dirs: tuple[str] | None = None): ) else: LOG.info( - "Not running from a git repo, trying to upload installed package. Make sure there are no extra files in %s", + "Using PatternPackager for installed Nemo-Skills package (force_pattern=%s). " + "Make sure there are no extra files in %s", + force_pattern_packager, str(nemo_skills_dir / "*"), ) include_patterns.append(str(nemo_skills_dir / "*")) diff --git a/nemo_skills/pipeline/utils/scripts.py b/nemo_skills/pipeline/utils/scripts.py index 3e74e438ef..61603d7f64 100644 --- a/nemo_skills/pipeline/utils/scripts.py +++ b/nemo_skills/pipeline/utils/scripts.py @@ -75,10 +75,16 @@ class BaseJobScript(run.Script): When True, the script spans all nodes specified in the group's num_nodes. This is important for multi-node setups with --overlap where the server needs multiple nodes but client/sandbox should run on the master node only. + num_tasks_override: Override the group's num_tasks for this specific script. + When set, this script's srun will use this value for --ntasks-per-node + instead of the group's HardwareConfig.num_tasks. Useful when multiple + scripts in a CommandGroup need different task configurations (e.g., + vLLM servers needing 2 tasks per node while Gym client needs 1). """ het_group_index: Optional[int] = field(default=None, init=False, repr=False) span_group_nodes: bool = False # Default: run on 1 node + num_tasks_override: Optional[int] = None # Per-script task count override installation_command: Optional[str] = None entrypoint: str = field(default="bash", init=False) @@ -432,3 +438,564 @@ def build_cmd() -> Tuple[str, Dict]: # Always use lazy command building self.set_inline(build_cmd) super().__post_init__() + + +@dataclass(kw_only=True) +class NemoGymRolloutsScript(BaseJobScript): + """Script for running NeMo Gym rollout collection. + + This script orchestrates the full rollout collection workflow: + 1. Starts ng_run in background to spin up NeMo Gym servers + 2. Polls ng_status until all servers are healthy + 3. Runs ng_collect_rollouts to collect rollouts + 4. Keeps ng_run running (cleanup handled externally) + + Attributes: + config_paths: List of YAML config file paths for ng_run + input_file: Input JSONL file path for rollout collection + output_file: Output JSONL file path for rollouts + extra_arguments: Additional Hydra overrides passed to both ng_run and ng_collect_rollouts + server: Optional ServerScript reference for policy model server + server_address: Optional pre-hosted server address + sandbox: Optional SandboxScript reference for sandbox port + log_prefix: Prefix for log files (default: "nemo_gym") + + Example: + script = NemoGymRolloutsScript( + config_paths=[ + "resources_servers/ns_tools/configs/ns_tools.yaml", + "resources_servers/math_with_judge/configs/math_with_judge.yaml", + ], + input_file="/data/input.jsonl", + output_file="/data/rollouts.jsonl", + extra_arguments="+agent_name=ns_tools_simple_agent +limit=10", + server=server_script, + sandbox=sandbox_script, + ) + """ + + config_paths: List[str] + input_file: str + output_file: str + extra_arguments: str = "" + server: Optional["ServerScript"] = None + server_address: Optional[str] = None + sandbox: Optional["SandboxScript"] = None + gym_path: str = "/opt/NeMo-RL/3rdparty/Gym-workspace/Gym" + policy_api_key: str = "dummy" # API key for policy server (can be dummy for local) + policy_model_name: Optional[str] = None # Model name override for policy server + + log_prefix: str = field(default="nemo_gym", init=False) + + def __post_init__(self): + """Initialize the combined ng_run + ng_collect_rollouts script.""" + + def build_cmd() -> Tuple[str, Dict]: + """Build the full rollout collection command.""" + # Build config_paths argument + config_paths_str = ",".join(self.config_paths) + + # Build ng_run command parts + ng_run_parts = [ + "ng_run", + f'"+config_paths=[{config_paths_str}]"', + ] + + # Add policy server URL if we have a server reference or address + if self.server is not None: + server_addr = f"http://{self.server.hostname_ref()}:{self.server.port}/v1" + ng_run_parts.append(f'+policy_base_url="{server_addr}"') + elif self.server_address is not None: + ng_run_parts.append(f'+policy_base_url="{self.server_address}"') + + # Add policy API key (required by some configs) + ng_run_parts.append(f'+policy_api_key="{self.policy_api_key}"') + + # Add policy model name (required by configs) + if self.policy_model_name: + ng_run_parts.append(f'+policy_model_name="{self.policy_model_name}"') + + # Add extra arguments to ng_run + if self.extra_arguments: + ng_run_parts.append(self.extra_arguments) + + ng_run_cmd = " ".join(ng_run_parts) + + # Build ng_collect_rollouts command + ng_collect_parts = [ + "ng_collect_rollouts", + f'+input_jsonl_fpath="{self.input_file}"', + f'+output_jsonl_fpath="{self.output_file}"', + ] + + # Add extra arguments to ng_collect_rollouts + if self.extra_arguments: + ng_collect_parts.append(self.extra_arguments) + + ng_collect_cmd = " ".join(ng_collect_parts) + + # Compute the vLLM server URL for the wait check + if self.server is not None: + vllm_server_url = f"http://{self.server.hostname_ref()}:{self.server.port}/v1" + elif self.server_address is not None: + vllm_server_url = self.server_address + else: + vllm_server_url = "" + + # Build the full bash script that: + # 1. Installs NeMo Gym from 3rdparty/Gym-workspace/Gym + # 2. Waits for vLLM server to be ready + # 3. Starts ng_run in background + # 4. Polls ng_status until healthy (with early failure detection) + # 5. Runs ng_collect_rollouts + cmd = f"""set -e +set -o pipefail + +echo "=== Installing NeMo Gym ===" +cd {self.gym_path} || {{ echo "ERROR: Failed to cd to Gym directory"; exit 1; }} +uv venv --python 3.12 --allow-existing .venv || {{ echo "ERROR: Failed to create venv"; exit 1; }} +source .venv/bin/activate || {{ echo "ERROR: Failed to activate venv"; exit 1; }} +uv sync --active --extra dev || {{ echo "ERROR: Failed to sync dependencies"; exit 1; }} +echo "NeMo Gym installed successfully" + +# Disable pipefail for the polling loop (grep may return non-zero) +set +o pipefail + +# Wait for vLLM server to be ready before starting ng_run +# Note: --kill-on-bad-exit in srun ensures job fails if vLLM crashes +VLLM_SERVER_URL="{vllm_server_url}" +if [ -n "$VLLM_SERVER_URL" ]; then + echo "=== Waiting for vLLM server at $VLLM_SERVER_URL ===" + while [ $(curl -s -o /dev/null -w "%{{http_code}}" "$VLLM_SERVER_URL/models" 2>/dev/null) -ne 200 ]; do + sleep 10 + done + echo "vLLM server is ready!" +fi + +echo "=== Starting NeMo Gym servers ===" +{ng_run_cmd} & +NG_RUN_PID=$! +echo "ng_run PID: $NG_RUN_PID" + +echo "Waiting for NeMo Gym servers..." +LAST_STATUS="" +while true; do + # Check if ng_run process died - let the failure cascade naturally + if ! kill -0 $NG_RUN_PID 2>/dev/null; then + echo "ERROR: ng_run process exited unexpectedly" + wait $NG_RUN_PID 2>/dev/null # Get exit code + exit 1 + fi + + STATUS_OUTPUT=$(ng_status 2>&1) + + if echo "$STATUS_OUTPUT" | grep -q "healthy, 0 unhealthy"; then + echo "All servers ready!" + break + fi + + # Only print status when it changes (reduce verbosity) + CURRENT_STATUS=$(echo "$STATUS_OUTPUT" | grep -oE '[0-9]+ healthy' | head -1 || echo "starting") + if [ "$CURRENT_STATUS" != "$LAST_STATUS" ]; then + echo "Server status: $CURRENT_STATUS" + LAST_STATUS="$CURRENT_STATUS" + fi + + sleep 10 +done + +# Re-enable pipefail for the actual rollout collection +set -o pipefail + +echo "=== Running rollout collection ===" +echo "Input file: {self.input_file}" +echo "Output file: {self.output_file}" +mkdir -p "$(dirname "{self.output_file}")" +echo "Output directory created: $(dirname "{self.output_file}")" +echo "Running: {ng_collect_cmd}" +{ng_collect_cmd} || {{ echo "ERROR: ng_collect_rollouts failed"; kill $NG_RUN_PID 2>/dev/null || true; exit 1; }} + +echo "=== Rollout collection complete ===" +echo "Output: {self.output_file}" + +echo "=== Cleaning up ===" +kill $NG_RUN_PID 2>/dev/null || true +echo "Servers terminated." +""" + # Build environment variables for sandbox connection + # YAML configs use ${oc.env:NEMO_SKILLS_SANDBOX_HOST,127.0.0.1} and + # ${oc.env:NEMO_SKILLS_SANDBOX_PORT,6000} to resolve these + env_vars = {} + if self.sandbox is not None: + env_vars["NEMO_SKILLS_SANDBOX_HOST"] = self.sandbox.hostname_ref() + env_vars["NEMO_SKILLS_SANDBOX_PORT"] = str(self.sandbox.port) + + return cmd.strip(), {"environment": env_vars} + + self.set_inline(build_cmd) + super().__post_init__() + + +@dataclass(kw_only=True) +class MultiVLLMServerScript(BaseJobScript): + """Script for deploying multiple independent vLLM servers for Gym routing. + + This script enables data-parallel vLLM deployments where Gym routes requests + across multiple independent vLLM server replicas. Each replica runs on its own + set of GPUs and handles requests independently. + + Supports configurations like: + - 1 server per node with TP=8 (full node per server) + - 2 servers per node with TP=4 each (2 servers sharing a node) + - 4 servers per node with TP=2 each + - 8 servers per node with TP=1 each + + The script uses SLURM environment variables to determine which server instance + to start on each task: + - SLURM_PROCID: Global task ID (0 to num_nodes * servers_per_node - 1) + - SLURM_LOCALID: Task ID within node (0 to servers_per_node - 1) + - SLURM_NODEID: Node ID (0 to num_nodes - 1) + + Attributes: + model_path: Path to model weights or HuggingFace model name + cluster_config: Cluster configuration dictionary + num_nodes: Number of nodes to use + servers_per_node: Number of vLLM servers per node + gpus_per_server: GPUs per server (tensor_parallel_size) + server_args: Additional vLLM server arguments + base_port: Starting port (servers use base_port + global_rank) + + Example: + # 4 nodes × 2 servers per node = 8 total replicas, each with TP=4 + servers = MultiVLLMServerScript( + model_path="/models/llama-70b", + cluster_config=cluster_config, + num_nodes=4, + servers_per_node=2, + gpus_per_server=4, + server_args="--dtype auto --max-model-len 8192", + ) + + # Use in CommandGroup + group = CommandGroup( + commands=[ + Command(script=servers, container="vllm", name="vllm"), + Command(script=gym_client, container="nemo-skills", name="gym"), + ], + hardware=HardwareConfig( + num_nodes=4, + num_gpus=8, + num_tasks=2, # servers_per_node + ), + ) + """ + + model_path: str + cluster_config: Dict + num_nodes: int = 1 + servers_per_node: int = 1 + gpus_per_server: int = 8 + server_args: str = "" + base_port: Optional[int] = None + # Mitigations for vLLM v1 shared-memory communicator instability on some clusters. + # When enabled, we export env vars that (if supported by the vLLM version) disable + # SHM-based broadcast paths, which are sensitive to /dev/shm sizing and IPC quirks. + disable_shm_broadcast: bool = False + # Print /dev/shm diagnostics at startup to help debug node-to-node differences. + print_shm_diagnostics: bool = False + + # Spans all nodes - each SLURM task runs one server + span_group_nodes: bool = True + + # Internal tracking + _total_replicas: int = field(init=False, repr=False, default=0) + _ports: List[int] = field(init=False, repr=False, default_factory=list) + log_prefix: str = field(default="vllm_servers", init=False) + + def __post_init__(self): + """Build command for distributed vLLM servers.""" + self._total_replicas = self.num_nodes * self.servers_per_node + + # Set num_tasks_override to ensure this script gets the right task count + self.num_tasks_override = self.servers_per_node + + # Allocate ports + if self.base_port is None: + self.base_port = get_free_port(strategy="random") + self._ports = [self.base_port + i for i in range(self._total_replicas)] + + ports_str = " ".join(str(p) for p in self._ports) + + # Build command that runs one server per SLURM task + disable_shm_broadcast = "true" if self.disable_shm_broadcast else "false" + print_shm_diagnostics = "true" if self.print_shm_diagnostics else "false" + cmd = f''' +#!/bin/bash +set -e + +# Configuration +PORTS=({ports_str}) +GPUS_PER_SERVER={self.gpus_per_server} +SERVERS_PER_NODE={self.servers_per_node} +DISABLE_SHM_BROADCAST={disable_shm_broadcast} +PRINT_SHM_DIAGNOSTICS={print_shm_diagnostics} + +# SLURM environment +# SLURM_PROCID: Global task ID (0 to num_nodes * servers_per_node - 1) +# SLURM_LOCALID: Task ID within this node (0 to servers_per_node - 1) +# SLURM_NODEID: Node ID (0 to num_nodes - 1) +GLOBAL_RANK=${{SLURM_PROCID:-0}} +LOCAL_RANK=${{SLURM_LOCALID:-0}} +NODE_ID=${{SLURM_NODEID:-0}} + +# Calculate which GPUs this server should use +# E.g., with 2 servers per node and 4 GPUs each: +# Local rank 0 → GPUs 0,1,2,3 +# Local rank 1 → GPUs 4,5,6,7 +GPU_START=$((LOCAL_RANK * GPUS_PER_SERVER)) +GPU_END=$((GPU_START + GPUS_PER_SERVER - 1)) +GPU_LIST=$(seq -s, $GPU_START $GPU_END) + +# Get port for this server +MY_PORT=${{PORTS[$GLOBAL_RANK]}} + +echo "=== vLLM Server Configuration ===" +echo "Global Rank: $GLOBAL_RANK" +echo "Node ID: $NODE_ID" +echo "Local Rank: $LOCAL_RANK" +echo "Port: $MY_PORT" +echo "GPUs: $GPU_LIST (CUDA_VISIBLE_DEVICES)" +echo "Tensor Parallel Size: {self.gpus_per_server}" +echo "Model: {self.model_path}" +echo "=================================" + +# Debug /dev/shm and IPC-related info (helps with node-to-node variance) +if [ "$PRINT_SHM_DIAGNOSTICS" = "true" ]; then + echo "" + echo "=== /dev/shm diagnostics (before vLLM start) ===" + df -h /dev/shm || true + ls -ld /dev/shm || true + mount | grep -E " /dev/shm " || true + echo "ulimit -n: $(ulimit -n || true)" + echo "===============================================" + echo "" +fi + +# Set GPU visibility +export CUDA_VISIBLE_DEVICES=$GPU_LIST + +# Mitigation: attempt to disable vLLM SHM broadcast transport (best-effort, version-dependent). +# These env vars are intentionally safe: if vLLM doesn't recognize them, they are ignored. +if [ "$DISABLE_SHM_BROADCAST" = "true" ]; then + export VLLM_DISABLE_SHM_BROADCAST=1 + export VLLM_USE_SHM_BROADCAST=0 +fi + +# Start vLLM server +python3 -m vllm.entrypoints.openai.api_server \\ + --model "{self.model_path}" \\ + --host "0.0.0.0" \\ + --port "$MY_PORT" \\ + --tensor-parallel-size {self.gpus_per_server} \\ + --trust-remote-code \\ + {self.server_args} +''' + + self.set_inline(cmd) + super().__post_init__() + + @property + def total_replicas(self) -> int: + """Total number of vLLM server replicas.""" + return self._total_replicas + + @property + def ports(self) -> List[int]: + """List of ports for all server replicas.""" + return self._ports + + +@dataclass(kw_only=True) +class GymClientScript(BaseJobScript): + """Script that starts Gym with routing to multiple vLLM servers. + + This script runs on a single node (the master node) and: + 1. Waits for all vLLM servers to be healthy + 2. Builds the list of server URLs + 3. Exports GYM_VLLM_BASE_URLS environment variable + 4. Starts Gym with the configured command + + Attributes: + servers: Reference to MultiVLLMServerScript for server URLs + gym_command: Command to run Gym (e.g., "python -m nemo_gym.server ...") + health_check_interval: Seconds between health check attempts (default: 10) + + Example: + gym_client = GymClientScript( + servers=vllm_servers, + gym_command="python -m nemo_gym.server --config /path/to/config.yaml", + ) + + # In same CommandGroup as vllm_servers + group = CommandGroup( + commands=[ + Command(script=vllm_servers, container="vllm", name="vllm"), + Command(script=gym_client, container="nemo-skills", name="gym"), + ], + ... + ) + """ + + servers: "MultiVLLMServerScript" + gym_command: str + health_check_interval: int = 10 # Seconds between health check attempts + + # Runs only on master node, single task + span_group_nodes: bool = False + num_tasks_override: int = 1 + + log_prefix: str = field(default="gym", init=False) + + def __post_init__(self): + """Build command that waits for servers then starts Gym.""" + + def build_cmd() -> Tuple[str, Dict]: + ports_str = " ".join(str(p) for p in self.servers.ports) + num_servers = self.servers.total_replicas + num_nodes = self.servers.num_nodes + servers_per_node = self.servers.servers_per_node + + cmd = f""" +#!/bin/bash +set -e + +echo "=== Gym Client Starting ===" +echo "Waiting for {num_servers} vLLM servers across {num_nodes} nodes..." +echo "No timeout - will wait until servers are ready or job is cancelled." +echo "If vLLM servers crash, the job will fail." + +# Configuration +PORTS=({ports_str}) +NUM_NODES={num_nodes} +SERVERS_PER_NODE={servers_per_node} +HEALTH_CHECK_INTERVAL={self.health_check_interval} + +# Expand SLURM_JOB_NODELIST without scontrol (not available in containers) +# Uses Python to parse the compressed node list format (e.g., "node[001-004]") +expand_nodelist() {{ + python3 -c " +import re +import sys + +nodelist = sys.argv[1] +nodes = [] + +# Handle comma-separated parts +for part in re.split(r',(?![^\\[]*\\])', nodelist): + # Check if it has a range like prefix[001-003,005] + match = re.match(r'(.*)\\[([^\\]]+)\\](.*)', part) + if match: + prefix, ranges, suffix = match.groups() + for r in ranges.split(','): + if '-' in r: + start, end = r.split('-') + width = len(start) + for i in range(int(start), int(end) + 1): + nodes.append(f'{{prefix}}{{i:0{{width}}d}}{{suffix}}') + else: + nodes.append(f'{{prefix}}{{r}}{{suffix}}') + else: + nodes.append(part) + +print(' '.join(nodes)) +" "$1" +}} + +# Get all node hostnames. +# IMPORTANT: depending on how the step is launched, some clusters populate only one of these. +# We'll pick the candidate that expands to the most nodes, and verify it matches NUM_NODES. +BEST_NODES_STR="" +BEST_COUNT=0 +for VAR_NAME in SLURM_JOB_NODELIST SLURM_NODELIST SLURM_STEP_NODELIST; do + NL="${{!VAR_NAME:-}}" + if [ -z "$NL" ]; then + continue + fi + EXPANDED="$(expand_nodelist "$NL" || true)" + COUNT="$(echo "$EXPANDED" | wc -w | tr -d ' ')" + if [ "$COUNT" -gt "$BEST_COUNT" ]; then + BEST_COUNT="$COUNT" + BEST_NODES_STR="$EXPANDED" + BEST_VAR_NAME="$VAR_NAME" + fi +done + +if [ "$BEST_COUNT" -lt "$NUM_NODES" ]; then + echo "ERROR: Could not discover enough node hostnames for NUM_NODES=$NUM_NODES" + echo " Best candidate: ${{BEST_VAR_NAME:-}} -> $BEST_COUNT nodes: '${{BEST_NODES_STR:-}}'" + echo " SLURM_JOB_NODELIST='${{SLURM_JOB_NODELIST:-}}'" + echo " SLURM_NODELIST='${{SLURM_NODELIST:-}}'" + echo " SLURM_STEP_NODELIST='${{SLURM_STEP_NODELIST:-}}'" + exit 1 +fi + +read -ra NODES <<< "$BEST_NODES_STR" +echo "Nodes (from $BEST_VAR_NAME): ${{NODES[*]}}" + +# Build URL list and wait for each server (no timeout - wait indefinitely) +URLS="" +GLOBAL_IDX=0 +START_TIME=$(date +%s) + +for NODE_IDX in $(seq 0 $((NUM_NODES - 1))); do + NODE=${{NODES[$NODE_IDX]}} + if [ -z "$NODE" ]; then + echo "ERROR: Empty NODE at index $NODE_IDX. Nodes: ${{NODES[*]}}" + exit 1 + fi + + for LOCAL_IDX in $(seq 0 $((SERVERS_PER_NODE - 1))); do + PORT=${{PORTS[$GLOBAL_IDX]}} + URL="http://${{NODE}}:${{PORT}}/v1" + + echo "Waiting for server $GLOBAL_IDX at $URL..." + ATTEMPT=0 + while true; do + ATTEMPT=$((ATTEMPT + 1)) + if curl -s "${{URL}}/health" > /dev/null 2>&1; then + ELAPSED=$(($(date +%s) - START_TIME)) + echo " Server $GLOBAL_IDX is ready (after ${{ELAPSED}}s total)" + break + fi + # Log progress every 60 seconds + if [ $((ATTEMPT % (60 / HEALTH_CHECK_INTERVAL))) -eq 0 ]; then + ELAPSED=$(($(date +%s) - START_TIME)) + echo " ... still waiting for server $GLOBAL_IDX (${{ELAPSED}}s elapsed)" + fi + sleep $HEALTH_CHECK_INTERVAL + done + + if [ -n "$URLS" ]; then URLS="$URLS,"; fi + URLS="${{URLS}}$URL" + GLOBAL_IDX=$((GLOBAL_IDX + 1)) + done +done + +TOTAL_TIME=$(($(date +%s) - START_TIME)) +echo "" +echo "=== All {num_servers} servers ready! (took ${{TOTAL_TIME}}s) ===" +echo "Server URLs: $URLS" +echo "" + +# Export for Gym to use +export GYM_VLLM_BASE_URLS="$URLS" + +# Run Gym +echo "Starting Gym..." +{self.gym_command} +""" + + return cmd.strip(), {"environment": {}} + + self.set_inline(build_cmd) + super().__post_init__()