Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nemo_skills/pipeline/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from nemo_skills.pipeline.generate import generate
from nemo_skills.pipeline.megatron_lm.train import train_megatron_lm
from nemo_skills.pipeline.nemo_evaluator import nemo_evaluator
from nemo_skills.pipeline.nemo_gym_rollouts import nemo_gym_rollouts
from nemo_skills.pipeline.nemo_rl.grpo import grpo_nemo_rl
from nemo_skills.pipeline.nemo_rl.sft import sft_nemo_rl
from nemo_skills.pipeline.prepare_data import prepare_data
Expand Down
374 changes: 374 additions & 0 deletions nemo_skills/pipeline/nemo_gym_rollouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,374 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a simple gpu test?

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""NeMo Gym Rollouts Pipeline.

This pipeline command runs rollout collection with NeMo Gym, orchestrating:
- vLLM model server (optional, can use pre-hosted)
- Sandbox for code execution (optional)
- NeMo Gym servers (ng_run)
- Rollout collection client (ng_collect_rollouts)

Example usage:
# Self-hosted vLLM server
ns nemo_gym_rollouts \\
--cluster local \\
--config_paths "ns_tools/configs/ns_tools.yaml,math_with_judge/configs/math_with_judge.yaml" \\
--input_file data/example.jsonl \\
--output_dir /results/rollouts \\
--model /path/to/model \\
--server_type vllm \\
--server_gpus 1 \\
--with_sandbox \\
+agent_name=ns_tools_simple_agent \\
+limit=10 \\
+num_samples_in_parallel=3 \\
+num_repeats=4 # Run each prompt 4 times for mean@4

# Pre-hosted server
ns nemo_gym_rollouts \\
--cluster local \\
--config_paths "ns_tools/configs/ns_tools.yaml" \\
--input_file data/example.jsonl \\
--output_dir /results/rollouts \\
--server_address http://localhost:8000/v1 \\
--policy_model_name nvidia/model-name \\
+agent_name=ns_tools_simple_agent
"""

import logging

import typer

import nemo_skills.pipeline.utils as pipeline_utils
from nemo_skills.pipeline.app import app, typer_unpacker
from nemo_skills.pipeline.utils.cluster import cluster_path_exists, parse_kwargs
from nemo_skills.pipeline.utils.declarative import (
Command,
CommandGroup,
HardwareConfig,
Pipeline,
)
from nemo_skills.pipeline.utils.mounts import get_unmounted_path
from nemo_skills.pipeline.utils.scripts import (
NemoGymRolloutsScript,
SandboxScript,
ServerScript,
)
from nemo_skills.utils import get_logger_name, setup_logging

LOG = logging.getLogger(get_logger_name(__file__))


@app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True})
@typer_unpacker
def nemo_gym_rollouts(
ctx: typer.Context,
cluster: str = typer.Option(
None,
help="One of the configs inside config_dir or NEMO_SKILLS_CONFIG_DIR or ./cluster_configs. "
"Can also use NEMO_SKILLS_CONFIG instead of specifying as argument.",
),
config_paths: str = typer.Option(
...,
help="Comma-separated list of NeMo Gym config YAML files for ng_run. "
"E.g., 'ns_tools/configs/ns_tools.yaml,math_with_judge/configs/math_with_judge.yaml'",
),
input_file: str = typer.Option(..., help="Path to input JSONL file for rollout collection"),
output_dir: str = typer.Option(..., help="Directory for rollout outputs. Output file will be rollouts.jsonl"),
expname: str = typer.Option("nemo_gym_rollouts", help="NeMo Run experiment name"),
model: str = typer.Option(None, help="Path to model for self-hosted vLLM server"),
server_address: str = typer.Option(
None,
help="Address of pre-hosted server (e.g., http://localhost:8000/v1). If provided, skips self-hosted server.",
),
server_type: pipeline_utils.SupportedServers = typer.Option(
None,
help="Type of server (vllm, trtllm, sglang, etc.)",
),
server_gpus: int = typer.Option(None, help="Number of GPUs for self-hosted server"),
server_nodes: int = typer.Option(1, help="Number of nodes for self-hosted server"),
server_args: str = typer.Option("", help="Additional arguments for the server"),
with_sandbox: bool = typer.Option(False, help="If True, start a sandbox container for code execution"),
gym_path: str = typer.Option(
"/opt/NeMo-RL/3rdparty/Gym-workspace/Gym",
help="Path to NeMo Gym installation. Defaults to container built-in. Use for mounted/custom Gym.",
),
policy_api_key: str = typer.Option(
"dummy",
help="API key for policy server. Use 'dummy' for local vLLM servers.",
),
policy_model_name: str = typer.Option(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just reuse the "model" parameter here? That would be consistent with how we do that in main generate pipeline

None,
help="Model name for policy server. Required for pre-hosted servers. "
"For self-hosted, defaults to the model path if not specified.",
),
partition: str = typer.Option(None, help="Cluster partition to use"),
qos: str = typer.Option(None, help="Specify Slurm QoS"),
time_min: str = typer.Option(None, help="Slurm time-min parameter"),
config_dir: str = typer.Option(None, help="Custom directory for cluster configs"),
log_dir: str = typer.Option(None, help="Custom location for logs"),
exclusive: bool | None = typer.Option(None, help="Add exclusive flag to slurm job"),
num_random_seeds: int = typer.Option(
None,
help="Number of parallel rollout jobs to run. Each job writes to rollouts-rs{i}.jsonl. "
"Use this to scale rollout collection across multiple nodes.",
),
random_seeds: str = typer.Option(
None,
help="Explicit list of seed indices to run (comma-separated, e.g., '0,2,5,7'). "
"Overrides num_random_seeds. Can provide a list directly when using through Python.",
),
starting_seed: int = typer.Option(
0,
help="Starting seed index when using num_random_seeds. "
"E.g., starting_seed=10 with num_random_seeds=4 creates seeds 10,11,12,13.",
),
rerun_done: bool = typer.Option(
False,
help="If False (default), skip seeds that already have output files. "
"If True, re-run all seeds even if output files exist.",
),
use_mounted_nemo_skills: bool = typer.Option(
True,
help="If True (default), use the nemo-skills code packaged to /nemo_run/code. "
"If False, avoid /nemo_run/code and use the container's installed nemo-skills instead. "
"Set to False when you want Gym to use its own bundled nemo-skills version.",
),
dry_run: bool = typer.Option(False, help="Validate without executing"),
sbatch_kwargs: str = typer.Option("", help="Additional sbatch kwargs as JSON string"),
):
"""Run NeMo Gym rollout collection pipeline.

This command orchestrates running rollout collection with NeMo Gym:
1. Optionally starts a vLLM model server (or uses pre-hosted)
2. Optionally starts a sandbox for code execution
3. Starts NeMo Gym servers via ng_run
4. Runs ng_collect_rollouts to collect rollouts

All Hydra arguments (prefixed with + or ++) are passed through to ng_run
and ng_collect_rollouts. Common arguments include:
- +agent_name=... (required for rollout collection)
- +limit=... (limit number of samples from input)
- +num_samples_in_parallel=... (concurrent requests)
- +num_repeats=N (run each prompt N times for mean@k evaluation)

For large-scale rollout collection, use --num_random_seeds to create multiple
independent jobs. Each job has its own server and sandbox (unique ports to avoid
conflicts if scheduled on same node) and writes to rollouts-rs{i}.jsonl.

Use --starting_seed to offset seed numbering (e.g., to continue from a previous run).
Use --random_seeds to specify explicit seeds (e.g., '0,2,5,7' to re-run specific seeds).
"""
setup_logging(disable_hydra_logs=False, use_rich=True)
extra_arguments = " ".join(ctx.args)
LOG.info("Starting NeMo Gym rollouts pipeline")
LOG.info(f"Extra arguments: {extra_arguments}")

# Parse config paths
config_paths_list = [p.strip() for p in config_paths.split(",") if p.strip()]
LOG.info(f"Config paths: {config_paths_list}")

# Validate server configuration
self_hosted = model is not None and server_gpus is not None
pre_hosted = server_address is not None

if not self_hosted and not pre_hosted:
raise ValueError(
"Must provide either --model and --server_gpus for self-hosted server, "
"or --server_address for pre-hosted server"
)

if self_hosted and pre_hosted:
raise ValueError("Cannot specify both self-hosted (--model, --server_gpus) and pre-hosted (--server_address)")

if self_hosted and server_type is None:
raise ValueError("--server_type is required when using self-hosted server")

# Validate and set policy_model_name
if pre_hosted and policy_model_name is None:
raise ValueError("--policy_model_name is required when using a pre-hosted server (--server_address)")

if self_hosted and policy_model_name is None:
# For self-hosted, default to the model path
policy_model_name = model
LOG.info(f"Using model path as policy_model_name: {policy_model_name}")

# Get cluster config
cluster_config = pipeline_utils.get_cluster_config(cluster, config_dir)

if not log_dir:
log_dir = f"{output_dir}/logs"

# Parse sbatch kwargs
sbatch_kwargs_dict = parse_kwargs(sbatch_kwargs, exclusive=exclusive, qos=qos, time_min=time_min)

# Determine seed indices for parallel jobs
if random_seeds is not None:
# Explicit seeds provided
if isinstance(random_seeds, str):
seed_indices = [int(s.strip()) for s in random_seeds.split(",")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse str_ids_to_list from main generate pipeline for consistency?

else:
seed_indices = list(random_seeds)
LOG.info(f"Using explicit seeds: {seed_indices}")
elif num_random_seeds:
seed_indices = list(range(starting_seed, starting_seed + num_random_seeds))
LOG.info(
f"Creating {num_random_seeds} separate jobs (rs{starting_seed}..rs{starting_seed + num_random_seeds - 1})"
)
else:
seed_indices = [None] # Single job, no seed suffix

# Get server type string once if self-hosted
server_type_str = None
server_container = None
if self_hosted:
server_type_str = server_type.value if hasattr(server_type, "value") else server_type
server_container = cluster_config["containers"].get(server_type_str, server_type_str)
Copy link
Contributor

@coderabbitai coderabbitai bot Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# View the specific file and line in context
head -250 nemo_skills/pipeline/nemo_gym_rollouts.py | tail -20

Repository: NVIDIA-NeMo/Skills

Length of output: 990


🏁 Script executed:

# Get full context around line 238
sed -n '225,250p' nemo_skills/pipeline/nemo_gym_rollouts.py

Repository: NVIDIA-NeMo/Skills

Length of output: 1258


🏁 Script executed:

# Search for similar container resolution patterns
rg 'containers.*get\(' --type=py -B2 -A2

Repository: NVIDIA-NeMo/Skills

Length of output: 913


🏁 Script executed:

# Search for direct container dictionary access patterns
rg 'containers\[' --type=py -B2 -A2

Repository: NVIDIA-NeMo/Skills

Length of output: 44


🏁 Script executed:

# Find where cluster_config is created/initialized
rg 'cluster_config.*containers' --type=py -B2 -A2

Repository: NVIDIA-NeMo/Skills

Length of output: 17498


Consider using direct dictionary access or explicit validation for container resolution.

cluster_config["containers"].get(server_type_str, server_type_str) allows passing either a registered container type or a direct image path, but this creates a UX risk: a typo like "vlm" instead of "vllm" silently becomes a literal image name, failing only at runtime instead of failing fast. The rest of the codebase uses direct dictionary access (e.g., cluster_config["containers"]["nemo-skills"]), which fails immediately on missing keys. Either validate the server type against a set of known options or use direct access if the key is always expected.

🤖 Prompt for AI Agents
In `@nemo_skills/pipeline/nemo_gym_rollouts.py` at line 238, The code uses
cluster_config["containers"].get(server_type_str, server_type_str) which can
silently accept typos as literal image names; change this to validate the
server_type_str against cluster_config["containers"] (or use direct access) so
missing keys fail fast: check if server_type_str is a key in
cluster_config["containers"] and if so set server_container =
cluster_config["containers"][server_type_str], otherwise raise a clear error (or
fallback to a validated explicit list) so functions referencing server_container
(and the server_type_str lookup) don’t silently accept invalid names.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gwarmstrong should we do that? Not sure why we fall back to server_type_str here. We should probalby have a separate server_container argument that users can use to override, but if it's not specified we should fail if we can't find container

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!


# Filter out seeds with existing output files (unless rerun_done=True)
if not rerun_done and seed_indices != [None]:
filtered_seeds = []
skipped_seeds = []
for seed_idx in seed_indices:
output_file = f"{output_dir}/rollouts-rs{seed_idx}.jsonl"
# Check if file exists on cluster
try:
unmounted_path = get_unmounted_path(cluster_config, output_file)
if cluster_path_exists(cluster_config, unmounted_path):
skipped_seeds.append(seed_idx)
else:
filtered_seeds.append(seed_idx)
except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we expect this to fail? Why blank exception?

LOG.warning(f"Could not check if {output_file} exists: {e}. Including seed {seed_idx}.")
filtered_seeds.append(seed_idx)

if skipped_seeds:
LOG.info(f"Skipping seeds with existing output files: {skipped_seeds}")
seed_indices = filtered_seeds

if not seed_indices:
LOG.info("All seeds already have output files. Nothing to run.")
return None

# Build jobs - one per seed, each with its own server/sandbox (unique ports)
jobs = []
for seed_idx in seed_indices:
components = []

# Determine naming suffix
if seed_idx is not None:
output_file = f"{output_dir}/rollouts-rs{seed_idx}.jsonl"
job_suffix = f"_rs{seed_idx}"
else:
output_file = f"{output_dir}/rollouts.jsonl"
job_suffix = ""

# 1. Server (optional, self-hosted) - each job gets its own server with unique port
server_script = None
if self_hosted:
server_script = ServerScript(
server_type=server_type_str,
model_path=model,
cluster_config=cluster_config,
num_gpus=server_gpus,
num_nodes=server_nodes,
server_args=server_args,
allocate_port=True, # Each job gets unique port
)

server_cmd = Command(
script=server_script,
container=server_container,
name=f"{expname}_server{job_suffix}",
)
components.append(server_cmd)
LOG.info(f"Job{job_suffix}: server on port {server_script.port}")

# 2. Sandbox (optional) - each job gets its own sandbox with unique port
sandbox_script = None
if with_sandbox:
sandbox_script = SandboxScript(
cluster_config=cluster_config,
allocate_port=True, # Each job gets unique port
)

sandbox_cmd = Command(
script=sandbox_script,
container=cluster_config["containers"]["sandbox"],
name=f"{expname}_sandbox{job_suffix}",
)
components.append(sandbox_cmd)
LOG.info(f"Job{job_suffix}: sandbox on port {sandbox_script.port}")

# 3. NeMo Gym rollouts
nemo_gym_script = NemoGymRolloutsScript(
config_paths=config_paths_list,
input_file=input_file,
output_file=output_file,
extra_arguments=extra_arguments,
server=server_script,
server_address=server_address,
sandbox=sandbox_script,
gym_path=gym_path,
policy_api_key=policy_api_key,
policy_model_name=policy_model_name,
)

nemo_gym_cmd = Command(
script=nemo_gym_script,
container=cluster_config["containers"]["nemo-rl"],
name=f"{expname}_nemo_gym{job_suffix}",
# If use_mounted_nemo_skills=False, avoid /nemo_run/code so Gym uses its bundled version
avoid_nemo_run_code=not use_mounted_nemo_skills,
)
components.append(nemo_gym_cmd)

# Create command group for this job
hardware = HardwareConfig(
partition=partition,
num_gpus=server_gpus if self_hosted else 0,
num_nodes=server_nodes if self_hosted else 1,
num_tasks=1,
sbatch_kwargs=sbatch_kwargs_dict,
)

group = CommandGroup(
commands=components,
hardware=hardware,
name=f"{expname}{job_suffix}",
log_dir=log_dir,
)

jobs.append({"name": f"{expname}{job_suffix}", "group": group})

LOG.info(f"Created {len(jobs)} job(s)")

# Create and run pipeline
pipeline = Pipeline(
name=expname,
cluster_config=cluster_config,
jobs=jobs,
)

sequential = cluster_config["executor"] in ["local", "none"]
result = pipeline.run(dry_run=dry_run, sequential=sequential)

LOG.info(f"Pipeline {'validated' if dry_run else 'submitted'} successfully")
return result


if __name__ == "__main__":
typer.main.get_command_name = lambda name: name
app()
9 changes: 9 additions & 0 deletions nemo_skills/pipeline/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@
get_registered_external_repo,
register_external_repo,
)
from nemo_skills.pipeline.utils.scripts import (
BaseJobScript,
GenerationClientScript,
GymClientScript,
MultiVLLMServerScript,
NemoGymRolloutsScript,
SandboxScript,
ServerScript,
)
from nemo_skills.pipeline.utils.server import (
SupportedServers,
SupportedServersSelfHosted,
Expand Down
Loading