diff --git a/nemo_skills/pipeline/convert.py b/nemo_skills/pipeline/convert.py index 97c7e52597..99ca445552 100644 --- a/nemo_skills/pipeline/convert.py +++ b/nemo_skills/pipeline/convert.py @@ -180,6 +180,8 @@ def convert( partition: str = typer.Option( None, help="Can specify if need interactive jobs or a specific non-default partition" ), + account: str = typer.Option(None, help="Can specify a non-default Slurm account"), + container: str = typer.Option(None, help="Override container image for the conversion job"), qos: str = typer.Option(None, help="Specify Slurm QoS, e.g. to request interactive nodes"), time_min: str = typer.Option(None, help="If specified, will use as a time-min slurm parameter"), mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"), @@ -320,12 +322,13 @@ def convert( cmd=conversion_cmd, task_name=expname, log_dir=log_dir, - container=container_map[(convert_from, convert_to)], + container=container or container_map[(convert_from, convert_to)], num_gpus=num_gpus, num_nodes=1, # always running on a single node, might need to change that in the future num_tasks=1, cluster_config=cluster_config, partition=partition, + account=account, run_after=run_after, reuse_code=reuse_code, reuse_code_exp=reuse_code_exp, diff --git a/nemo_skills/pipeline/eval.py b/nemo_skills/pipeline/eval.py index fac2f884bd..1907da1311 100644 --- a/nemo_skills/pipeline/eval.py +++ b/nemo_skills/pipeline/eval.py @@ -54,6 +54,8 @@ def _create_comet_judge_tasks( judge_server_gpus, judge_server_nodes, partition, + account, + judge_container, run_after, reuse_code_exp, reuse_code, @@ -108,11 +110,12 @@ def _create_comet_judge_tasks( cmd=run_cmd, task_name=f"{expname}-{benchmark}-comet-judge", log_dir=log_dir + "/judge", - container=cluster_config["containers"]["vllm"], + container=judge_container or cluster_config["containers"]["vllm"], cluster_config=cluster_config, num_gpus=judge_server_gpus or 1, num_nodes=judge_server_nodes or 1, partition=partition, + account=account, run_after=run_after, reuse_code_exp=reuse_code_exp, reuse_code=reuse_code, @@ -138,6 +141,8 @@ def _create_nvembed_judge_tasks( judge_server_gpus, judge_server_nodes, partition, + account, + judge_container, run_after, reuse_code_exp, reuse_code, @@ -195,11 +200,12 @@ def _create_nvembed_judge_tasks( cmd=run_cmd, task_name=f"{expname}-{benchmark}-nvembed-judge", log_dir=log_dir + "/judge", - container=cluster_config["containers"]["vllm"], + container=judge_container or cluster_config["containers"]["vllm"], cluster_config=cluster_config, num_gpus=judge_server_gpus or 1, num_nodes=judge_server_nodes or 1, partition=partition, + account=account, run_after=run_after, reuse_code_exp=reuse_code_exp, reuse_code=reuse_code, @@ -227,6 +233,9 @@ def _create_llm_judge_tasks( cluster, config_dir, partition, + account, + main_container, + sandbox_container, with_sandbox, keep_mounts_for_sandbox, run_after, @@ -267,6 +276,9 @@ def _create_llm_judge_tasks( cluster=cluster, config_dir=config_dir, partition=partition, + account=account, + main_container=main_container, + sandbox_container=sandbox_container, with_sandbox=with_sandbox, keep_mounts_for_sandbox=keep_mounts_for_sandbox, run_after=run_after, @@ -352,6 +364,12 @@ def eval( server_container: str = typer.Option( None, help="Override container image for the hosted server (if server_gpus is set)" ), + main_container: str = typer.Option(None, help="Override container image for the main evaluation client"), + sandbox_container: str = typer.Option(None, help="Override container image for the sandbox"), + judge_container: str = typer.Option(None, help="Override container image for GPU-based judges (comet, nvembed)"), + judge_server_container: str = typer.Option( + None, help="Override container image for the hosted judge server (if judge_server_gpus is set)" + ), extra_judge_args: str = typer.Option( "", help="Additional arguments for judge (passed to generate script, so should start with ++)" ), @@ -378,6 +396,7 @@ def eval( "Can provide a list directly when using through Python", ), partition: str = typer.Option(None, help="Cluster partition to use"), + account: str = typer.Option(None, help="Can specify a non-default Slurm account"), qos: str = typer.Option(None, help="Specify Slurm QoS, e.g. to request interactive nodes"), time_min: str = typer.Option(None, help="If specified, will use as a time-min slurm parameter"), mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"), @@ -522,6 +541,7 @@ def eval( "server_nodes": judge_server_nodes, "server_args": judge_server_args, "server_entrypoint": judge_server_entrypoint, + "server_container": judge_server_container, "generation_type": judge_generation_type, "generation_module": judge_generation_module, } @@ -611,14 +631,16 @@ def eval( cmd=pipeline_utils.wrap_python_path(cmd=combine_cmds(cmds, single_node_mode)), task_name=f"{expname}-{'-'.join(job_benchmarks)}", log_dir=log_dir, - container=cluster_config["containers"]["nemo-skills"], + container=main_container or cluster_config["containers"]["nemo-skills"], cluster_config=cluster_config, partition=partition, + account=account, server_config=job_server_config, with_sandbox=job_needs_sandbox or with_sandbox, keep_mounts_for_sandbox=job_needs_sandbox_to_keep_mounts or keep_mounts_for_sandbox, sandbox_port=None if get_random_port else 6000, sandbox_env_overrides=job_sandbox_env_overrides, + sandbox_container=sandbox_container, run_after=run_after, reuse_code_exp=reuse_code_exp, reuse_code=reuse_code, @@ -675,6 +697,8 @@ def eval( judge_server_gpus=judge_server_gpus, judge_server_nodes=judge_server_nodes, partition=partition, + account=account, + judge_container=judge_container, run_after=run_after, reuse_code_exp=reuse_code_exp, reuse_code=reuse_code, @@ -699,6 +723,8 @@ def eval( judge_server_gpus=judge_server_gpus, judge_server_nodes=judge_server_nodes, partition=partition, + account=account, + judge_container=judge_container, run_after=run_after, reuse_code_exp=reuse_code_exp, reuse_code=reuse_code, @@ -726,6 +752,9 @@ def eval( cluster=cluster, config_dir=config_dir, partition=partition, + account=account, + main_container=main_container, + sandbox_container=sandbox_container, with_sandbox=with_sandbox, keep_mounts_for_sandbox=keep_mounts_for_sandbox, run_after=run_after, diff --git a/nemo_skills/pipeline/generate.py b/nemo_skills/pipeline/generate.py index a1b96f556a..187a563cc7 100644 --- a/nemo_skills/pipeline/generate.py +++ b/nemo_skills/pipeline/generate.py @@ -55,11 +55,14 @@ def _create_job_unified( installation_command: Optional[str], with_sandbox: bool, partition: Optional[str], + account: Optional[str], keep_mounts_for_sandbox: bool, task_name: str, log_dir: str, sbatch_kwargs: Optional[Dict] = None, sandbox_env_overrides: Optional[List[str]] = None, + main_container: Optional[str] = None, + sandbox_container: Optional[str] = None, ) -> List[CommandGroup]: """ Create CommandGroups for n models (unified for n=1 and n>1). @@ -147,7 +150,7 @@ def _create_job_unified( sandbox_cmd = Command( script=sandbox_script, - container=cluster_config["containers"]["sandbox"], + container=sandbox_container or cluster_config["containers"]["sandbox"], name=f"{task_name}_sandbox", ) components.append(sandbox_cmd) @@ -178,7 +181,7 @@ def _create_job_unified( client_cmd = Command( script=client_script, - container=cluster_config["containers"]["nemo-skills"], + container=main_container or cluster_config["containers"]["nemo-skills"], name=f"{task_name}", ) components.append(client_cmd) @@ -191,6 +194,7 @@ def _create_job_unified( commands=components, hardware=HardwareConfig( partition=partition, + account=account, num_gpus=group_gpus, num_nodes=group_nodes, num_tasks=group_tasks, @@ -272,6 +276,8 @@ def generate( help="Container image(s). CLI: space-separated. Python API: string or list. " "Single value broadcasts to all models.", ), + main_container: str = typer.Option(None, help="Override container image for the main generation client"), + sandbox_container: str = typer.Option(None, help="Override container image for the sandbox"), dependent_jobs: int = typer.Option(0, help="Specify this to launch that number of dependent jobs"), mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"), num_random_seeds: int = typer.Option( @@ -297,6 +303,7 @@ def generate( partition: str = typer.Option( None, help="Can specify if need interactive jobs or a specific non-default partition" ), + account: str = typer.Option(None, help="Can specify a non-default Slurm account"), qos: str = typer.Option(None, help="Specify Slurm QoS, e.g. to request interactive nodes"), time_min: str = typer.Option(None, help="If specified, will use as a time-min slurm parameter"), run_after: List[str] = typer.Option( @@ -589,11 +596,14 @@ def convert_server_type_to_string(server_type): installation_command=installation_command, with_sandbox=with_sandbox, partition=partition, + account=account, keep_mounts_for_sandbox=keep_mounts_for_sandbox, task_name=task_name, log_dir=log_dir, sbatch_kwargs=sbatch_kwargs, sandbox_env_overrides=sandbox_env_overrides, + main_container=main_container, + sandbox_container=sandbox_container, ) # Use unique internal job name for dependency tracking, but same task_name diff --git a/nemo_skills/pipeline/nemo_evaluator.py b/nemo_skills/pipeline/nemo_evaluator.py index 39838737ed..f3e9057c0e 100644 --- a/nemo_skills/pipeline/nemo_evaluator.py +++ b/nemo_skills/pipeline/nemo_evaluator.py @@ -125,6 +125,7 @@ def nemo_evaluator( job_gpus: int = typer.Option(0, help="GPUs to allocate for the evaluator client when no servers are hosted"), job_nodes: int = typer.Option(1, help="Nodes to allocate for the evaluator job"), partition: str = typer.Option(None, help="Cluster partition to use"), + account: str = typer.Option(None, help="Can specify a non-default Slurm account"), qos: str = typer.Option(None, help="Slurm QoS"), mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"), log_dir: str = typer.Option(None, help="Custom location for logs"), @@ -325,6 +326,7 @@ def nemo_evaluator( job_nodes=job_nodes, cluster_config=cluster_config, partition=partition, + account=account, qos=qos, exclusive=exclusive, ) @@ -346,6 +348,7 @@ def nemo_evaluator( commands=[main_server_cmd, client_cmd], hardware=_hardware_for_group( task_ctx.partition, + task_ctx.account, task_ctx.server_gpus or None, task_ctx.server_nodes or 1, task_ctx.qos, @@ -358,6 +361,7 @@ def nemo_evaluator( commands=[judge_server_cmd], hardware=_hardware_for_group( task_ctx.partition, + task_ctx.account, task_ctx.judge_server_gpus or None, task_ctx.judge_server_nodes or 1, task_ctx.qos, @@ -393,7 +397,12 @@ def nemo_evaluator( CommandGroup( commands=sg_cmds, hardware=_hardware_for_group( - task_ctx.partition, group_num_gpus, group_num_nodes, task_ctx.qos, task_ctx.exclusive + task_ctx.partition, + task_ctx.account, + group_num_gpus, + group_num_nodes, + task_ctx.qos, + task_ctx.exclusive, ), name=f"{task_ctx.expname}-{task_ctx.idx}", log_dir=log_dir, @@ -543,17 +552,24 @@ class _TaskCreationContext: job_nodes: int cluster_config: Dict partition: Optional[str] + account: Optional[str] qos: Optional[str] exclusive: bool def _hardware_for_group( - partition: Optional[str], num_gpus: Optional[int], num_nodes: int, qos: Optional[str], exclusive: bool + partition: Optional[str], + account: Optional[str], + num_gpus: Optional[int], + num_nodes: int, + qos: Optional[str], + exclusive: bool, ) -> HardwareConfig: """Create HardwareConfig for a CommandGroup. Args: partition: SLURM partition name + account: SLURM account name num_gpus: Number of GPUs (None means no GPU allocation) num_nodes: Number of nodes qos: SLURM QoS setting @@ -564,6 +580,7 @@ def _hardware_for_group( """ return HardwareConfig( partition=partition, + account=account, num_gpus=num_gpus, num_nodes=num_nodes, sbatch_kwargs={ diff --git a/nemo_skills/pipeline/run_cmd.py b/nemo_skills/pipeline/run_cmd.py index 5706d516e6..2628302234 100644 --- a/nemo_skills/pipeline/run_cmd.py +++ b/nemo_skills/pipeline/run_cmd.py @@ -58,6 +58,7 @@ def run_cmd( partition: str = typer.Option( None, help="Can specify if need interactive jobs or a specific non-default partition" ), + account: str = typer.Option(None, help="Can specify a non-default Slurm account"), qos: str = typer.Option(None, help="Specify Slurm QoS, e.g. to request interactive nodes"), time_min: str = typer.Option(None, help="If specified, will use as a time-min slurm parameter"), num_gpus: int | None = typer.Option(None, help="Number of GPUs per node to use"), @@ -77,6 +78,7 @@ def run_cmd( server_container: str = typer.Option( None, help="Override container image for the hosted server (if server_gpus is set)" ), + sandbox_container: str = typer.Option(None, help="Override container image for the sandbox"), dependent_jobs: int = typer.Option(0, help="Specify this to launch that number of dependent jobs"), mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"), run_after: List[str] = typer.Option( @@ -197,10 +199,12 @@ def run_cmd( container=containers, cluster_config=cluster_config, partition=partition, + account=account, server_config=server_config, with_sandbox=with_sandbox, keep_mounts_for_sandbox=keep_mounts_for_sandbox, sandbox_port=None if get_random_port else 6000, + sandbox_container=sandbox_container, run_after=run_after, reuse_code=reuse_code, reuse_code_exp=reuse_code_exp, diff --git a/nemo_skills/pipeline/start_server.py b/nemo_skills/pipeline/start_server.py index 71eae6bcb0..cd557659e3 100644 --- a/nemo_skills/pipeline/start_server.py +++ b/nemo_skills/pipeline/start_server.py @@ -127,7 +127,10 @@ def start_server( "If not specified, will use the default entrypoint for the server type.", ), server_container: str = typer.Option(None, help="Override container image for the hosted server"), + main_container: str = typer.Option(None, help="Override container image for the main task (e.g., chat interface)"), + sandbox_container: str = typer.Option(None, help="Override container image for the sandbox"), partition: str = typer.Option(None, help="Cluster partition to use"), + account: str = typer.Option(None, help="Can specify a non-default Slurm account"), qos: str = typer.Option(None, help="Specify Slurm QoS, e.g. to request interactive nodes"), time_min: str = typer.Option(None, help="If specified, will use as a time-min slurm parameter"), mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"), @@ -200,13 +203,15 @@ def start_server( cmd=cmd, task_name="server", log_dir=log_dir, - container=cluster_config["containers"]["nemo-skills"], + container=main_container or cluster_config["containers"]["nemo-skills"], cluster_config=cluster_config, partition=partition, + account=account, server_config=server_config, with_sandbox=with_sandbox, keep_mounts_for_sandbox=keep_mounts_for_sandbox, sandbox_port=sandbox_port, + sandbox_container=sandbox_container, sbatch_kwargs=parse_kwargs(sbatch_kwargs, exclusive=exclusive, qos=qos, time_min=time_min), ) diff --git a/nemo_skills/pipeline/utils/declarative.py b/nemo_skills/pipeline/utils/declarative.py index 7029dcc638..f47067ee99 100644 --- a/nemo_skills/pipeline/utils/declarative.py +++ b/nemo_skills/pipeline/utils/declarative.py @@ -264,6 +264,7 @@ class HardwareConfig: """Hardware configuration for a group of tasks.""" partition: Optional[str] = None + account: Optional[str] = None num_gpus: Optional[int] = None num_nodes: Optional[int] = None num_tasks: Optional[int] = 1 @@ -585,6 +586,7 @@ def _create_executor( log_dir=log_dir, log_prefix=exec_config["log_prefix"], partition=hardware.partition if hardware else None, + account=hardware.account if hardware else None, heterogeneous=heterogeneous, het_group=het_group, total_het_groups=total_het_groups, diff --git a/nemo_skills/pipeline/utils/exp.py b/nemo_skills/pipeline/utils/exp.py index 3bce2eb864..949bef372f 100644 --- a/nemo_skills/pipeline/utils/exp.py +++ b/nemo_skills/pipeline/utils/exp.py @@ -168,6 +168,7 @@ def get_executor( log_prefix: str = "main", mounts=None, partition=None, + account=None, dependencies=None, extra_package_dirs: tuple[str] | None = None, heterogeneous=False, @@ -210,6 +211,7 @@ def get_executor( taken from `cluster_config`. partition: SLURM partition override. If omitted, inferred from `gpus_per_node` and `cluster_config`. + account: SLURM account override. If omitted, uses `cluster_config["account"]`. dependencies: SLURM job handles to depend on. The dependency type is taken from `cluster_config['dependency_type']` (default: "afterany"). extra_package_dirs: Additional directories to package with the code for remote @@ -328,9 +330,12 @@ def get_executor( dependency_type = cluster_config.get("dependency_type", "afterany") job_details_class = CustomJobDetailsRay if with_ray else CustomJobDetails + # Resolve account with fallback to cluster_config + account = account or cluster_config.get("account") + # Build executor parameters as a dictionary to avoid duplicate parameters executor_params = { - "account": cluster_config["account"], + "account": account, "partition": partition, "nodes": num_nodes, "ntasks_per_node": tasks_per_node, @@ -429,7 +434,9 @@ def add_task( num_nodes=1, log_dir=None, partition=None, + account=None, with_sandbox=False, + sandbox_container=None, keep_mounts_for_sandbox=False, sandbox_port: int | None = None, server_config=None, @@ -538,6 +545,7 @@ def add_task( tasks_per_node=num_server_tasks, gpus_per_node=server_config["num_gpus"], partition=partition, + account=account, dependencies=dependencies, job_name=task_name, log_dir=log_dir, @@ -582,6 +590,7 @@ def add_task( tasks_per_node=cur_tasks, gpus_per_node=num_gpus if server_config is None else 0, partition=partition, + account=account, dependencies=dependencies, job_name=task_name, log_dir=log_dir, @@ -621,11 +630,12 @@ def add_task( commands.append(get_sandbox_command(cluster_config)) sandbox_executor = get_executor( cluster_config=cluster_config, - container=cluster_config["containers"]["sandbox"], + container=sandbox_container or cluster_config["containers"]["sandbox"], num_nodes=executors[0].nodes if cluster_config["executor"] == "slurm" else 1, tasks_per_node=1, gpus_per_node=0, partition=partition, + account=account, mounts=None if keep_mounts_for_sandbox else [], dependencies=dependencies, job_name=task_name,