Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion nemo_skills/pipeline/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def convert(
partition: str = typer.Option(
None, help="Can specify if need interactive jobs or a specific non-default partition"
),
account: str = typer.Option(None, help="Can specify a non-default Slurm account"),
container: str = typer.Option(None, help="Override container image for the conversion job"),
qos: str = typer.Option(None, help="Specify Slurm QoS, e.g. to request interactive nodes"),
time_min: str = typer.Option(None, help="If specified, will use as a time-min slurm parameter"),
mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"),
Expand Down Expand Up @@ -320,12 +322,13 @@ def convert(
cmd=conversion_cmd,
task_name=expname,
log_dir=log_dir,
container=container_map[(convert_from, convert_to)],
container=container or container_map[(convert_from, convert_to)],
num_gpus=num_gpus,
num_nodes=1, # always running on a single node, might need to change that in the future
num_tasks=1,
cluster_config=cluster_config,
partition=partition,
account=account,
run_after=run_after,
reuse_code=reuse_code,
reuse_code_exp=reuse_code_exp,
Expand Down
35 changes: 32 additions & 3 deletions nemo_skills/pipeline/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def _create_comet_judge_tasks(
judge_server_gpus,
judge_server_nodes,
partition,
account,
judge_container,
run_after,
reuse_code_exp,
reuse_code,
Expand Down Expand Up @@ -108,11 +110,12 @@ def _create_comet_judge_tasks(
cmd=run_cmd,
task_name=f"{expname}-{benchmark}-comet-judge",
log_dir=log_dir + "/judge",
container=cluster_config["containers"]["vllm"],
container=judge_container or cluster_config["containers"]["vllm"],
cluster_config=cluster_config,
num_gpus=judge_server_gpus or 1,
num_nodes=judge_server_nodes or 1,
partition=partition,
account=account,
run_after=run_after,
reuse_code_exp=reuse_code_exp,
reuse_code=reuse_code,
Expand All @@ -138,6 +141,8 @@ def _create_nvembed_judge_tasks(
judge_server_gpus,
judge_server_nodes,
partition,
account,
judge_container,
run_after,
reuse_code_exp,
reuse_code,
Expand Down Expand Up @@ -195,11 +200,12 @@ def _create_nvembed_judge_tasks(
cmd=run_cmd,
task_name=f"{expname}-{benchmark}-nvembed-judge",
log_dir=log_dir + "/judge",
container=cluster_config["containers"]["vllm"],
container=judge_container or cluster_config["containers"]["vllm"],
cluster_config=cluster_config,
num_gpus=judge_server_gpus or 1,
num_nodes=judge_server_nodes or 1,
partition=partition,
account=account,
run_after=run_after,
reuse_code_exp=reuse_code_exp,
reuse_code=reuse_code,
Expand Down Expand Up @@ -227,6 +233,9 @@ def _create_llm_judge_tasks(
cluster,
config_dir,
partition,
account,
main_container,
sandbox_container,
with_sandbox,
keep_mounts_for_sandbox,
run_after,
Expand Down Expand Up @@ -267,6 +276,9 @@ def _create_llm_judge_tasks(
cluster=cluster,
config_dir=config_dir,
partition=partition,
account=account,
main_container=main_container,
sandbox_container=sandbox_container,
with_sandbox=with_sandbox,
keep_mounts_for_sandbox=keep_mounts_for_sandbox,
run_after=run_after,
Expand Down Expand Up @@ -352,6 +364,12 @@ def eval(
server_container: str = typer.Option(
None, help="Override container image for the hosted server (if server_gpus is set)"
),
main_container: str = typer.Option(None, help="Override container image for the main evaluation client"),
sandbox_container: str = typer.Option(None, help="Override container image for the sandbox"),
judge_container: str = typer.Option(None, help="Override container image for GPU-based judges (comet, nvembed)"),
Comment on lines +367 to +369
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a little bulky to have separate override arguments for each container everywhere. Not sure that there is a better solution though. If we wanted to have overrides like we do for tools, e.g.,

++container_overrides.sandbox = "..."
++container_overrides.judge = "..."
``
But then the choice of key is unclear--since our "job components", e.g., Judge, main, sandbox, ... don't map cleanly to a container name (e.g., "judge" -> containers[judge_server_type], main -> containers["nemo-skills"], sandbox -> containers["sandbox"]).

So I think with the current structure, what you've done the best choice, but maybe we can eventually work toward something a little more general here.

judge_server_container: str = typer.Option(
None, help="Override container image for the hosted judge server (if judge_server_gpus is set)"
),
extra_judge_args: str = typer.Option(
"", help="Additional arguments for judge (passed to generate script, so should start with ++)"
),
Expand All @@ -378,6 +396,7 @@ def eval(
"Can provide a list directly when using through Python",
),
partition: str = typer.Option(None, help="Cluster partition to use"),
account: str = typer.Option(None, help="Can specify a non-default Slurm account"),
qos: str = typer.Option(None, help="Specify Slurm QoS, e.g. to request interactive nodes"),
time_min: str = typer.Option(None, help="If specified, will use as a time-min slurm parameter"),
mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"),
Expand Down Expand Up @@ -522,6 +541,7 @@ def eval(
"server_nodes": judge_server_nodes,
"server_args": judge_server_args,
"server_entrypoint": judge_server_entrypoint,
"server_container": judge_server_container,
"generation_type": judge_generation_type,
"generation_module": judge_generation_module,
}
Expand Down Expand Up @@ -611,14 +631,16 @@ def eval(
cmd=pipeline_utils.wrap_python_path(cmd=combine_cmds(cmds, single_node_mode)),
task_name=f"{expname}-{'-'.join(job_benchmarks)}",
log_dir=log_dir,
container=cluster_config["containers"]["nemo-skills"],
container=main_container or cluster_config["containers"]["nemo-skills"],
cluster_config=cluster_config,
partition=partition,
account=account,
server_config=job_server_config,
with_sandbox=job_needs_sandbox or with_sandbox,
keep_mounts_for_sandbox=job_needs_sandbox_to_keep_mounts or keep_mounts_for_sandbox,
sandbox_port=None if get_random_port else 6000,
sandbox_env_overrides=job_sandbox_env_overrides,
sandbox_container=sandbox_container,
run_after=run_after,
reuse_code_exp=reuse_code_exp,
reuse_code=reuse_code,
Expand Down Expand Up @@ -675,6 +697,8 @@ def eval(
judge_server_gpus=judge_server_gpus,
judge_server_nodes=judge_server_nodes,
partition=partition,
account=account,
judge_container=judge_container,
run_after=run_after,
reuse_code_exp=reuse_code_exp,
reuse_code=reuse_code,
Expand All @@ -699,6 +723,8 @@ def eval(
judge_server_gpus=judge_server_gpus,
judge_server_nodes=judge_server_nodes,
partition=partition,
account=account,
judge_container=judge_container,
run_after=run_after,
reuse_code_exp=reuse_code_exp,
reuse_code=reuse_code,
Expand Down Expand Up @@ -726,6 +752,9 @@ def eval(
cluster=cluster,
config_dir=config_dir,
partition=partition,
account=account,
main_container=main_container,
sandbox_container=sandbox_container,
with_sandbox=with_sandbox,
keep_mounts_for_sandbox=keep_mounts_for_sandbox,
run_after=run_after,
Expand Down
14 changes: 12 additions & 2 deletions nemo_skills/pipeline/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ def _create_job_unified(
installation_command: Optional[str],
with_sandbox: bool,
partition: Optional[str],
account: Optional[str],
keep_mounts_for_sandbox: bool,
task_name: str,
log_dir: str,
sbatch_kwargs: Optional[Dict] = None,
sandbox_env_overrides: Optional[List[str]] = None,
main_container: Optional[str] = None,
sandbox_container: Optional[str] = None,
) -> List[CommandGroup]:
"""
Create CommandGroups for n models (unified for n=1 and n>1).
Expand Down Expand Up @@ -147,7 +150,7 @@ def _create_job_unified(

sandbox_cmd = Command(
script=sandbox_script,
container=cluster_config["containers"]["sandbox"],
container=sandbox_container or cluster_config["containers"]["sandbox"],
name=f"{task_name}_sandbox",
)
components.append(sandbox_cmd)
Expand Down Expand Up @@ -178,7 +181,7 @@ def _create_job_unified(

client_cmd = Command(
script=client_script,
container=cluster_config["containers"]["nemo-skills"],
container=main_container or cluster_config["containers"]["nemo-skills"],
name=f"{task_name}",
)
components.append(client_cmd)
Expand All @@ -191,6 +194,7 @@ def _create_job_unified(
commands=components,
hardware=HardwareConfig(
partition=partition,
account=account,
num_gpus=group_gpus,
num_nodes=group_nodes,
num_tasks=group_tasks,
Expand Down Expand Up @@ -272,6 +276,8 @@ def generate(
help="Container image(s). CLI: space-separated. Python API: string or list. "
"Single value broadcasts to all models.",
),
main_container: str = typer.Option(None, help="Override container image for the main generation client"),
sandbox_container: str = typer.Option(None, help="Override container image for the sandbox"),
dependent_jobs: int = typer.Option(0, help="Specify this to launch that number of dependent jobs"),
mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"),
num_random_seeds: int = typer.Option(
Expand All @@ -297,6 +303,7 @@ def generate(
partition: str = typer.Option(
None, help="Can specify if need interactive jobs or a specific non-default partition"
),
account: str = typer.Option(None, help="Can specify a non-default Slurm account"),
qos: str = typer.Option(None, help="Specify Slurm QoS, e.g. to request interactive nodes"),
time_min: str = typer.Option(None, help="If specified, will use as a time-min slurm parameter"),
run_after: List[str] = typer.Option(
Expand Down Expand Up @@ -589,11 +596,14 @@ def convert_server_type_to_string(server_type):
installation_command=installation_command,
with_sandbox=with_sandbox,
partition=partition,
account=account,
keep_mounts_for_sandbox=keep_mounts_for_sandbox,
task_name=task_name,
log_dir=log_dir,
sbatch_kwargs=sbatch_kwargs,
sandbox_env_overrides=sandbox_env_overrides,
main_container=main_container,
sandbox_container=sandbox_container,
)

# Use unique internal job name for dependency tracking, but same task_name
Expand Down
21 changes: 19 additions & 2 deletions nemo_skills/pipeline/nemo_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def nemo_evaluator(
job_gpus: int = typer.Option(0, help="GPUs to allocate for the evaluator client when no servers are hosted"),
job_nodes: int = typer.Option(1, help="Nodes to allocate for the evaluator job"),
partition: str = typer.Option(None, help="Cluster partition to use"),
account: str = typer.Option(None, help="Can specify a non-default Slurm account"),
qos: str = typer.Option(None, help="Slurm QoS"),
mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"),
log_dir: str = typer.Option(None, help="Custom location for logs"),
Expand Down Expand Up @@ -325,6 +326,7 @@ def nemo_evaluator(
job_nodes=job_nodes,
cluster_config=cluster_config,
partition=partition,
account=account,
qos=qos,
exclusive=exclusive,
)
Expand All @@ -346,6 +348,7 @@ def nemo_evaluator(
commands=[main_server_cmd, client_cmd],
hardware=_hardware_for_group(
task_ctx.partition,
task_ctx.account,
task_ctx.server_gpus or None,
task_ctx.server_nodes or 1,
task_ctx.qos,
Expand All @@ -358,6 +361,7 @@ def nemo_evaluator(
commands=[judge_server_cmd],
hardware=_hardware_for_group(
task_ctx.partition,
task_ctx.account,
task_ctx.judge_server_gpus or None,
task_ctx.judge_server_nodes or 1,
task_ctx.qos,
Expand Down Expand Up @@ -393,7 +397,12 @@ def nemo_evaluator(
CommandGroup(
commands=sg_cmds,
hardware=_hardware_for_group(
task_ctx.partition, group_num_gpus, group_num_nodes, task_ctx.qos, task_ctx.exclusive
task_ctx.partition,
task_ctx.account,
group_num_gpus,
group_num_nodes,
task_ctx.qos,
task_ctx.exclusive,
),
name=f"{task_ctx.expname}-{task_ctx.idx}",
log_dir=log_dir,
Expand Down Expand Up @@ -543,17 +552,24 @@ class _TaskCreationContext:
job_nodes: int
cluster_config: Dict
partition: Optional[str]
account: Optional[str]
qos: Optional[str]
exclusive: bool


def _hardware_for_group(
partition: Optional[str], num_gpus: Optional[int], num_nodes: int, qos: Optional[str], exclusive: bool
partition: Optional[str],
account: Optional[str],
num_gpus: Optional[int],
num_nodes: int,
qos: Optional[str],
exclusive: bool,
) -> HardwareConfig:
"""Create HardwareConfig for a CommandGroup.

Args:
partition: SLURM partition name
account: SLURM account name
num_gpus: Number of GPUs (None means no GPU allocation)
num_nodes: Number of nodes
qos: SLURM QoS setting
Expand All @@ -564,6 +580,7 @@ def _hardware_for_group(
"""
return HardwareConfig(
partition=partition,
account=account,
num_gpus=num_gpus,
num_nodes=num_nodes,
sbatch_kwargs={
Expand Down
4 changes: 4 additions & 0 deletions nemo_skills/pipeline/run_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def run_cmd(
partition: str = typer.Option(
None, help="Can specify if need interactive jobs or a specific non-default partition"
),
account: str = typer.Option(None, help="Can specify a non-default Slurm account"),
qos: str = typer.Option(None, help="Specify Slurm QoS, e.g. to request interactive nodes"),
time_min: str = typer.Option(None, help="If specified, will use as a time-min slurm parameter"),
num_gpus: int | None = typer.Option(None, help="Number of GPUs per node to use"),
Expand All @@ -77,6 +78,7 @@ def run_cmd(
server_container: str = typer.Option(
None, help="Override container image for the hosted server (if server_gpus is set)"
),
sandbox_container: str = typer.Option(None, help="Override container image for the sandbox"),
dependent_jobs: int = typer.Option(0, help="Specify this to launch that number of dependent jobs"),
mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"),
run_after: List[str] = typer.Option(
Expand Down Expand Up @@ -197,10 +199,12 @@ def run_cmd(
container=containers,
cluster_config=cluster_config,
partition=partition,
account=account,
server_config=server_config,
with_sandbox=with_sandbox,
keep_mounts_for_sandbox=keep_mounts_for_sandbox,
sandbox_port=None if get_random_port else 6000,
sandbox_container=sandbox_container,
run_after=run_after,
reuse_code=reuse_code,
reuse_code_exp=reuse_code_exp,
Expand Down
7 changes: 6 additions & 1 deletion nemo_skills/pipeline/start_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ def start_server(
"If not specified, will use the default entrypoint for the server type.",
),
server_container: str = typer.Option(None, help="Override container image for the hosted server"),
main_container: str = typer.Option(None, help="Override container image for the main task (e.g., chat interface)"),
sandbox_container: str = typer.Option(None, help="Override container image for the sandbox"),
partition: str = typer.Option(None, help="Cluster partition to use"),
account: str = typer.Option(None, help="Can specify a non-default Slurm account"),
qos: str = typer.Option(None, help="Specify Slurm QoS, e.g. to request interactive nodes"),
time_min: str = typer.Option(None, help="If specified, will use as a time-min slurm parameter"),
mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"),
Expand Down Expand Up @@ -200,13 +203,15 @@ def start_server(
cmd=cmd,
task_name="server",
log_dir=log_dir,
container=cluster_config["containers"]["nemo-skills"],
container=main_container or cluster_config["containers"]["nemo-skills"],
cluster_config=cluster_config,
partition=partition,
account=account,
server_config=server_config,
with_sandbox=with_sandbox,
keep_mounts_for_sandbox=keep_mounts_for_sandbox,
sandbox_port=sandbox_port,
sandbox_container=sandbox_container,
sbatch_kwargs=parse_kwargs(sbatch_kwargs, exclusive=exclusive, qos=qos, time_min=time_min),
)

Expand Down
2 changes: 2 additions & 0 deletions nemo_skills/pipeline/utils/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class HardwareConfig:
"""Hardware configuration for a group of tasks."""

partition: Optional[str] = None
account: Optional[str] = None
num_gpus: Optional[int] = None
num_nodes: Optional[int] = None
num_tasks: Optional[int] = 1
Expand Down Expand Up @@ -585,6 +586,7 @@ def _create_executor(
log_dir=log_dir,
log_prefix=exec_config["log_prefix"],
partition=hardware.partition if hardware else None,
account=hardware.account if hardware else None,
heterogeneous=heterogeneous,
het_group=het_group,
total_het_groups=total_het_groups,
Expand Down
Loading
Loading