diff --git a/docs/deployment/launcher-orchestrated/slurm.md b/docs/deployment/launcher-orchestrated/slurm.md index 2c9415a3b..305f3cbca 100644 --- a/docs/deployment/launcher-orchestrated/slurm.md +++ b/docs/deployment/launcher-orchestrated/slurm.md @@ -77,7 +77,8 @@ execution: # Resource allocation partition: batch # Slurm partition/queue - num_nodes: 1 # Number of nodes + num_nodes: 1 # Total SLURM nodes + num_instances: 1 # Independent deployment instances (HAProxy auto-enabled when > 1) ntasks_per_node: 1 # Tasks per node gres: gpu:8 # GPU resources walltime: "01:00:00" # Wall time limit (HH:MM:SS) @@ -96,6 +97,18 @@ execution: The `gpus_per_node` parameter can be used as an alternative to `gres` for specifying GPU resources. However, `gres` is the default in the base configuration. ::: +## Multi-Instance with HAProxy + +To run multiple independent deployment instances with HAProxy load-balancing: + +```yaml +execution: + num_nodes: 4 # Total SLURM nodes + num_instances: 2 # 2 instances of 2 nodes each → HAProxy auto-enabled +``` + +When `num_instances > 1`, HAProxy is automatically configured to distribute requests across instance head nodes. See the `examples/` directory for complete configurations. + ## Configuration Examples ### Benchmark Suite Evaluation diff --git a/docs/libraries/nemo-evaluator-launcher/configuration/deployment/vllm.md b/docs/libraries/nemo-evaluator-launcher/configuration/deployment/vllm.md index 2b0db69de..b99a2614c 100644 --- a/docs/libraries/nemo-evaluator-launcher/configuration/deployment/vllm.md +++ b/docs/libraries/nemo-evaluator-launcher/configuration/deployment/vllm.md @@ -85,8 +85,9 @@ evaluation: The following example configuration files are available in the `examples/` directory: -- `lepton_vllm_llama_3_1_8b_instruct.yaml` - vLLM deployment on Lepton platform -- `slurm_llama_3_1_8b_instruct.yaml` - vLLM deployment on SLURM cluster -- `slurm_llama_3_1_8b_instruct_hf.yaml` - vLLM deployment using HuggingFace model ID +- `slurm_vllm_basic.yaml` - Basic single-node vLLM deployment +- `slurm_vllm_multinode_ray_tp_pp.yaml` - Multi-node deployment with TP+PP +- `slurm_vllm_multinode_dp.yaml` - Multi-node data parallelism +- `slurm_vllm_multinode_dp_haproxy.yaml` - Multi-node independent instances with HAProxy Use `nemo-evaluator-launcher run --dry-run` to check your configuration before running. diff --git a/docs/libraries/nemo-evaluator-launcher/configuration/executors/slurm.md b/docs/libraries/nemo-evaluator-launcher/configuration/executors/slurm.md index 4ee7f7c92..b41e18390 100644 --- a/docs/libraries/nemo-evaluator-launcher/configuration/executors/slurm.md +++ b/docs/libraries/nemo-evaluator-launcher/configuration/executors/slurm.md @@ -85,6 +85,23 @@ env_vars: **Security:** Secret values are never written into the generated `run.sub` script. They are stored in a separate `.secrets.env` file and sourced at runtime, preventing accidental exposure in logs or artifacts. +### Multi-Node and Multi-Instance + +Configure multi-node deployments using `num_nodes` and `num_instances`: + +```yaml +execution: + num_nodes: 4 # Total SLURM nodes + num_instances: 2 # Independent deployment instances (default: 1) +``` + +- **`num_nodes`**: Total number of SLURM nodes to allocate +- **`num_instances`**: Number of independent deployment instances. When `> 1`, HAProxy is automatically configured to load-balance across instances. `num_nodes` must be divisible by `num_instances`. + +:::{note} +The deprecated `deployment.multiple_instances` field is still accepted but will be removed in a future release. Use `execution.num_instances` instead. +::: + ### Mounting and Storage The Slurm executor provides sophisticated mounting capabilities: diff --git a/packages/nemo-evaluator-launcher/.claude/skills/nel-assistant/SKILL.md b/packages/nemo-evaluator-launcher/.claude/skills/nel-assistant/SKILL.md index 6f7eb2850..dbde5b52b 100644 --- a/packages/nemo-evaluator-launcher/.claude/skills/nel-assistant/SKILL.md +++ b/packages/nemo-evaluator-launcher/.claude/skills/nel-assistant/SKILL.md @@ -142,27 +142,26 @@ Show tasks in the current config. Loop until the user confirms the task list is ``` For the None (External) deployment the `api_key_name` should be already defined. The `DUMMY_API_KEY` export is handled in Step 8. -**Step 6: Advanced - Multi-node (Data Parallel)** +**Step 6: Advanced - Multi-node** -Only if model >120B parameters, suggest multi-node. Explain: "This is DP multi-node - the weights are copied (not distributed) across nodes. One deployment instance per node will be run with HAProxy load-balancing requests." +There are two multi-node patterns. Ask the user which applies: -Ask if user wants multi-node. If yes, ask for node count and configure: +**Pattern A: Multi-instance (independent instances with HAProxy)** + +Only if model >120B parameters or user wants more throughput. Explain: "Each node runs an independent deployment instance. HAProxy load-balances requests across all instances." ```yaml execution: - num_nodes: 4 # 4 nodes = 4 independent deployment instances = 4x throughput - deployment: - n_tasks: ${execution.num_nodes} # Must match num_nodes for multi-instance deployment - -deployment: - multiple_instances: true + num_nodes: 4 # Total nodes + num_instances: 4 # 4 independent instances → HAProxy auto-enabled ``` **Common Confusions** -- **This is different from `data_parallel_size`**, which controls DP replicas *within* a single node/deployment instance. -- Global data parallelism is `num_nodes x data_parallel_size` (e.g., 2 nodes x 4 DP each = 8 replicas for max throughput). -- With multi-node, `parallelism` in task config is the total concurrent requests across all instances, not per-instance. +- **`num_instances`** controls independent deployment instances with HAProxy. **`data_parallel_size`** controls DP replicas *within* a single instance. +- Global data parallelism is `num_instances x data_parallel_size` (e.g., 2 instances x 8 DP each = 16 replicas). +- With multi-instance, `parallelism` in task config is the total concurrent requests across all instances, not per-instance. +- `num_nodes` must be divisible by `num_instances`. **Step 7: Advanced - Interceptors** diff --git a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp.yaml b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp.yaml index ba6668000..cf1ef9982 100644 --- a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp.yaml +++ b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp.yaml @@ -69,8 +69,6 @@ execution: walltime: 01:00:00 num_nodes: 2 # Number of SLURM nodes for multi-node deployment - deployment: - n_tasks: ${execution.num_nodes} # For multi-node vLLM deployment, must match num_nodes mounts: mount_home: false # Whether to mount home directory (default: true) @@ -97,7 +95,7 @@ evaluation: parallelism: 512 # Number of parallel requests (higher for data parallel deployment) temperature: 0.6 # Sampling temperature top_p: 0.95 # Nucleus sampling parameter - max_tokens: 32768 # Maximum number of tokens to generate (32k) + max_new_tokens: 32768 # Maximum number of tokens to generate (32k) request_timeout: 3600 # Timeout for API requests in seconds target: api_endpoint: diff --git a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp_haproxy.yaml b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp_haproxy.yaml index 8806b3c43..56cdeac04 100644 --- a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp_haproxy.yaml +++ b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp_haproxy.yaml @@ -61,15 +61,13 @@ execution: walltime: 01:00:00 num_nodes: 2 # Number of SLURM nodes for multi-node deployment - deployment: - n_tasks: ${execution.num_nodes} # One vLLM instance per node + num_instances: 2 # 2 independent single-node instances → HAProxy auto-enabled mounts: mount_home: false # Whether to mount home directory (default: true) # Override default deployment arguments deployment: - multiple_instances: true # Enable HAProxy load balancing across nodes checkpoint_path: null hf_model_handle: nvidia/NVIDIA-Nemotron-Nano-9B-v2 served_model_name: nvidia/NVIDIA-Nemotron-Nano-9B-v2 @@ -86,7 +84,7 @@ evaluation: parallelism: 512 # Number of parallel requests (higher for multi-node deployment) temperature: 0.6 # Sampling temperature top_p: 0.95 # Nucleus sampling parameter - max_tokens: 32768 # Maximum number of tokens to generate (32k) + max_new_tokens: 32768 # Maximum number of tokens to generate (32k) request_timeout: 3600 # Timeout for API requests in seconds target: api_endpoint: diff --git a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_ray_tp_pp.yaml b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_ray_tp_pp.yaml index 942cfcc31..b81025125 100644 --- a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_ray_tp_pp.yaml +++ b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_ray_tp_pp.yaml @@ -42,9 +42,8 @@ # - tensor_parallel_size: 8 (within node parallelism) # - pipeline_parallel_size: 2 (across node parallelism) # -# Multi-Instance Configuration: -# - execution.num_nodes: Number of SLURM nodes to allocate (2 in this example) -# - execution.deployment.n_tasks: Must match num_nodes for multi-instance deployment +# Multi-Node Configuration: +# - execution.num_nodes: 2 (single instance spanning 2 nodes) # - deployment.tensor_parallel_size: GPU parallelism within a single node # - deployment.pipeline_parallel_size: Model parallelism across multiple nodes # @@ -67,8 +66,6 @@ execution: walltime: 02:00:00 num_nodes: 2 # Number of SLURM nodes for multi-node deployment - deployment: - n_tasks: ${execution.num_nodes} # For multi-node ray vLLM deployment, must match num_nodes mounts: mount_home: false # Whether to mount home directory (default: true) @@ -135,7 +132,7 @@ evaluation: request_timeout: 3600 # Timeout for API requests in seconds temperature: 0.6 # Sampling temperature top_p: 0.95 # Nucleus sampling parameter - max_tokens: 32768 # Maximum number of tokens to generate (32k) + max_new_tokens: 32768 # Maximum number of tokens to generate (32k) target: api_endpoint: diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py index 6643fb26a..83218c6d2 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py @@ -339,8 +339,8 @@ def apply_url_override(url: str) -> str: # Local executor - use localhost endpoint_uri = cfg.deployment.endpoints[endpoint_type] - # Use HAProxy port if multiple_instances is enabled - if cfg.deployment.get("multiple_instances", False): + # Use HAProxy port if num_instances > 1 + if OmegaConf.select(cfg, "execution.num_instances", default=1) > 1: proxy_config = cfg.execution.get("proxy", {}).get("config", {}) port = proxy_config.get("haproxy_port", 5009) else: diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml index 8f0657f99..4174a51a3 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml @@ -20,7 +20,8 @@ username: ${oc.env:USER} # Defaults to $USER env var account: ??? # SLURM account allocation (required) output_dir: ??? # Absolute path accessible on compute nodes (required) partition: batch -num_nodes: 1 +num_nodes: 1 # Total SLURM nodes (num_nodes_per_instance = num_nodes / num_instances) +num_instances: 1 # Number of independent deployment instances ntasks_per_node: 1 gres: gpu:8 walltime: 01:00:00 @@ -30,7 +31,7 @@ sbatch_comment: null # Optional comment for SLURM job (translates to #SBATCH --c # Deployment-specific SLURM configuration deployment: - n_tasks: 1 # Number of tasks for deployment srun (default: 1, for multi-instance set to num_nodes) + n_tasks: ${execution.num_nodes} # Number of tasks for deployment srun (default: num_nodes) mounts: deployment: {} diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py index 771f8e150..435394659 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py @@ -29,7 +29,7 @@ import yaml from jinja2 import Environment, FileSystemLoader -from omegaconf import DictConfig, OmegaConf +from omegaconf import DictConfig, OmegaConf, open_dict from nemo_evaluator_launcher.common.env_vars import ( SecretsEnvResult, @@ -145,7 +145,7 @@ def execute_eval(cfg: DictConfig, dry_run: bool = False) -> str: ) # Create proxy config file with placeholder IPs for multi-instance deployments - if cfg.deployment.get("multiple_instances", False): + if cfg.execution.num_instances > 1: proxy_type = cfg.execution.get("proxy", {}).get("type", "haproxy") if proxy_type == "haproxy": proxy_config = _generate_haproxy_config_with_placeholders(cfg) @@ -642,6 +642,22 @@ def _create_slurm_sbatch_script( Returns: str: The contents of the sbatch script. """ + # Remove deprecated deployment.multiple_instances if present + if cfg.deployment.get("multiple_instances") is not None: + logger.warning( + "deployment.multiple_instances is deprecated and will be " + "removed from config — use execution.num_instances instead." + ) + with open_dict(cfg): + del cfg.deployment.multiple_instances + + # Validate topology: num_nodes must be divisible by num_instances + if cfg.execution.num_nodes % cfg.execution.num_instances != 0: + raise ValueError( + f"execution.num_nodes ({cfg.execution.num_nodes}) must be divisible by " + f"execution.num_instances ({cfg.execution.num_instances})" + ) + # get task from mapping, overrides, urls tasks_mapping = load_tasks_mapping() task_definition = get_task_definition_for_job( @@ -780,9 +796,9 @@ def _create_slurm_sbatch_script( # wait for the server to initialize health_path = cfg.deployment.endpoints.get("health", "/health") - # For multi-instance check all node IPs, for single instance check localhost - if cfg.deployment.get("multiple_instances", False): - ip_list = '"${NODES_IPS_ARRAY[@]}"' + # HEAD_NODE_IPS is always set: subset of heads when NPI > 1, all nodes otherwise + if cfg.execution.num_instances > 1: + ip_list = '"${HEAD_NODE_IPS[@]}"' else: ip_list = '"127.0.0.1"' s += _get_wait_for_server_handler( @@ -795,7 +811,7 @@ def _create_slurm_sbatch_script( s += "\n\n" # add proxy load balancer for multi-instance deployments - if cfg.deployment.get("multiple_instances", False): + if cfg.execution.num_instances > 1: s += _get_proxy_server_srun_command(cfg, remote_task_subdir) # prepare evaluation mounts @@ -858,7 +874,7 @@ def _create_slurm_sbatch_script( # terminate the server after all evaluation clients finish if cfg.deployment.type != "none": s += "kill $SERVER_PID # terminate the server to finish gracefully\n" - if cfg.deployment.get("multiple_instances", False): + if cfg.execution.num_instances > 1: s += "kill $PROXY_PID # terminate proxy to finish gracefully\n" s += "\n" @@ -1579,11 +1595,11 @@ def _generate_haproxy_config_with_placeholders(cfg): env = Environment(loader=FileSystemLoader(template_dir)) template = env.get_template("proxy.cfg.template") - # Prepare template data with placeholder IPs - use actual number of nodes - num_nodes = cfg.execution.num_nodes + # Prepare template data with placeholder IPs - one backend per instance head node nodes = [] - for i in range(num_nodes): - nodes.append({"ip": f"{{IP_{i}}}", "port": cfg.deployment.port}) + for i in range(cfg.execution.num_instances): + head_idx = i * cfg.execution.num_nodes // cfg.execution.num_instances + nodes.append({"ip": f"{{IP_{head_idx}}}", "port": cfg.deployment.port}) # Get health check parameters - prefer proxy config, fallback to deployment.endpoints.health proxy_config = cfg.execution.get("proxy", {}).get("config", {}) @@ -1680,6 +1696,12 @@ def _generate_deployment_srun_command( s += "# Export MASTER_IP as the first node IP\n" s += "export MASTER_IP=${NODES_IPS_ARRAY[0]}\n" s += 'echo "MASTER_IP: $MASTER_IP"\n' + s += 'export ALL_NODE_IPS=$(IFS=,; echo "${NODES_IPS_ARRAY[*]}")\n' + s += "HEAD_NODE_IPS=()\n" + s += f"for ((g=0; g<{cfg.execution.num_instances}; g++)); do\n" + s += f' HEAD_NODE_IPS+=("${{NODES_IPS_ARRAY[$((g * {cfg.execution.num_nodes // cfg.execution.num_instances}))]}}")\n' + s += "done\n" + s += 'echo "HEAD_NODE_IPS: ${HEAD_NODE_IPS[@]}"\n' # Add debug comment for deployment pre_cmd before srun command if debug_comment: @@ -1703,9 +1725,26 @@ def _generate_deployment_srun_command( if "MASTER_IP" not in deployment_env_var_names: deployment_env_var_names.append("MASTER_IP") + # Always add ALL_NODE_IPS to the environment variables + if "ALL_NODE_IPS" not in deployment_env_var_names: + deployment_env_var_names.append("ALL_NODE_IPS") + if deployment_env_var_names: s += f"--container-env {','.join(sorted(deployment_env_var_names))} " + # Build the command that runs inside the container: + # 1. Export scheduler-agnostic env vars (PROC_ID, NUM_TASKS) + # 2. Optionally write + source deployment_pre_cmd.sh + # 3. Write deployment_cmd.sh and execute it + create_script_cmd = _str_to_echo_command( + cfg.deployment.command, filename="deployment_cmd.sh" + ) + debug_comment += create_script_cmd.debug + "\n\n" + + # Map SLURM task variables to scheduler-agnostic names inside the container + env_setup = "export PROC_ID=${SLURM_PROCID:-0} NUM_TASKS=${SLURM_NTASKS:-1}" + script = f"{env_setup} && {create_script_cmd.cmd} && bash deployment_cmd.sh" + # Wrap deployment command to execute pre_cmd inside container if needed if pre_cmd: # Create a wrapper command that runs inside the container: @@ -1715,16 +1754,13 @@ def _generate_deployment_srun_command( create_pre_script_cmd = _str_to_echo_command( pre_cmd, filename="deployment_pre_cmd.sh" ) - # Escape single quotes in the deployment command for bash -c - escaped_deployment_cmd = cfg.deployment.command.replace("'", "'\"'\"'") - wrapped_command = ( - f"bash -c '{create_pre_script_cmd.cmd} && " + script = ( + f"{env_setup} && " + f"{create_pre_script_cmd.cmd} && " f"source deployment_pre_cmd.sh && " - f"{escaped_deployment_cmd}'" + f"{create_script_cmd.cmd} && bash deployment_cmd.sh" ) - s += "{} &\n\n".format(wrapped_command) - else: - s += "{} &\n\n".format(cfg.deployment.command) # run asynchronously + s += "bash -c '{}' &\n\n".format(script) # run asynchronously s += "SERVER_PID=$! # capture the PID of the server background srun process\n\n" diff --git a/packages/nemo-evaluator-launcher/tests/unit_tests/test_helpers.py b/packages/nemo-evaluator-launcher/tests/unit_tests/test_helpers.py index e80d8e781..6769fac07 100644 --- a/packages/nemo-evaluator-launcher/tests/unit_tests/test_helpers.py +++ b/packages/nemo-evaluator-launcher/tests/unit_tests/test_helpers.py @@ -105,6 +105,7 @@ def test_get_endpoint_url_local_builds_localhost(): cfg = _cfg( { "deployment": {"type": "vllm", "port": 8081, "endpoints": {"chat": "/v1"}}, + "execution": {"num_instances": 1}, "evaluation": {}, } ) diff --git a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py index 08bfcb8ce..95324381d 100644 --- a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py +++ b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py @@ -57,6 +57,7 @@ def base_config(self): "account": "test-account", "partition": "test-partition", "num_nodes": 1, + "num_instances": 1, "ntasks_per_node": 1, "subproject": "test-subproject", }, @@ -525,8 +526,10 @@ def test_deployment_n_tasks_and_proxy_setup( """Test deployment.n_tasks with various configurations and proxy setup.""" base_config["execution"]["deployment"] = {"n_tasks": n_tasks} base_config["execution"]["num_nodes"] = num_nodes - # Set multiple_instances to trigger proxy setup when needed - base_config["deployment"]["multiple_instances"] = should_have_proxy + # Set num_instances > 1 to trigger proxy setup when needed + base_config["execution"]["num_instances"] = ( + num_nodes if should_have_proxy else 1 + ) cfg = OmegaConf.create(base_config) @@ -637,6 +640,7 @@ def base_config(self): "account": "test-account", "partition": "test-partition", "num_nodes": 1, + "num_instances": 1, "ntasks_per_node": 1, "subproject": "test-subproject", }, @@ -879,6 +883,7 @@ def test_generate_deployment_srun_command( }, "execution": { "num_nodes": num_nodes, + "num_instances": 1, "deployment": {"n_tasks": n_tasks}, "mounts": {"mount_home": mount_home}, }, @@ -1012,6 +1017,7 @@ def sample_config(self, tmpdir): "account": "test-account", "partition": "gpu", "num_nodes": 1, + "num_instances": 1, "ntasks_per_node": 8, "gpus_per_node": 8, "subproject": "eval", @@ -1674,6 +1680,7 @@ def sample_config(self, tmpdir): "account": "test-account", "partition": "gpu", "num_nodes": 1, + "num_instances": 1, "ntasks_per_node": 8, "gpus_per_node": 8, "subproject": "eval", @@ -2622,3 +2629,708 @@ def test_kill_job_kill_command_failed(sel, mock_execdb, monkeypatch): with pytest.raises(RuntimeError, match="Could not find or kill job"): SlurmExecutor.kill_job("fail123.0") + + +class TestMultiNodeMultiInstance: + """Tests for multi-node / multi-instance refactoring. + + Covers: + - Topology validation (num_nodes divisible by num_instances) + - Deprecated deployment.multiple_instances removal + warning + - ALL_NODE_IPS and HEAD_NODE_IPS generation in srun command + - Health check IP selection (HEAD_NODE_IPS vs localhost) + - HAProxy placeholder backend generation + - Deployment command always wrapped as base64 script file + - Pre-cmd + deployment command combined wrapping + - Proxy setup triggered by num_instances > 1 + - get_endpoint_url uses HAProxy port when num_instances > 1 + """ + + @pytest.fixture + def base_config(self): + """Base configuration for multi-node tests.""" + return { + "deployment": { + "type": "vllm", + "image": "test-image:latest", + "command": "vllm serve /model --port 8000", + "served_model_name": "test-model", + "port": 8000, + "endpoints": { + "health": "/health", + }, + }, + "execution": { + "type": "slurm", + "output_dir": "/test/output", + "walltime": "01:00:00", + "account": "test-account", + "partition": "test-partition", + "num_nodes": 1, + "num_instances": 1, + "ntasks_per_node": 1, + "subproject": "test-subproject", + }, + "evaluation": {"env_vars": {}}, + "target": {"api_endpoint": {"url": "http://localhost:8000/v1"}}, + } + + @pytest.fixture + def mock_task(self): + return OmegaConf.create({"name": "test_task"}) + + @pytest.fixture + def mock_dependencies(self): + """Mock external dependencies used by _create_slurm_sbatch_script.""" + with ( + patch( + "nemo_evaluator_launcher.executors.slurm.executor.load_tasks_mapping" + ) as mock_load_tasks, + patch( + "nemo_evaluator_launcher.executors.slurm.executor.get_task_definition_for_job" + ) as mock_get_task_def, + patch( + "nemo_evaluator_launcher.common.helpers.get_eval_factory_command" + ) as mock_get_eval_command, + patch( + "nemo_evaluator_launcher.common.helpers.get_served_model_name" + ) as mock_get_model_name, + ): + mock_load_tasks.return_value = {} + mock_get_task_def.return_value = { + "container": "test-eval-container:latest", + "required_env_vars": [], + "endpoint_type": "openai", + "task": "test_task", + } + from nemo_evaluator_launcher.common.helpers import CmdAndReadableComment + + mock_get_eval_command.return_value = CmdAndReadableComment( + cmd="nemo-evaluator run_eval --test", debug="# Test command" + ) + mock_get_model_name.return_value = "test-model" + + yield { + "load_tasks_mapping": mock_load_tasks, + "get_task_definition_for_job": mock_get_task_def, + "get_eval_factory_command": mock_get_eval_command, + "get_served_model_name": mock_get_model_name, + } + + # ── Topology validation ────────────────────────────────────────────── + + def test_num_nodes_not_divisible_by_num_instances_raises( + self, base_config, mock_task, mock_dependencies + ): + """num_nodes must be evenly divisible by num_instances.""" + base_config["execution"]["num_nodes"] = 5 + base_config["execution"]["num_instances"] = 2 + cfg = OmegaConf.create(base_config) + + with pytest.raises(ValueError, match="must be divisible"): + _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ) + + @pytest.mark.parametrize( + "num_nodes,num_instances", + [(4, 2), (6, 3), (8, 1), (1, 1), (4, 4)], + ) + def test_valid_topology_accepted( + self, base_config, mock_task, mock_dependencies, num_nodes, num_instances + ): + """Valid num_nodes / num_instances combos should not raise.""" + base_config["execution"]["num_nodes"] = num_nodes + base_config["execution"]["num_instances"] = num_instances + cfg = OmegaConf.create(base_config) + + # Should not raise + result = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ) + assert result.cmd # non-empty script + + # ── Deprecated multiple_instances handling ──────────────────────────── + + def test_deprecated_multiple_instances_removed_with_warning( + self, base_config, mock_task, mock_dependencies + ): + """deployment.multiple_instances should be deleted and a warning logged.""" + base_config["deployment"]["multiple_instances"] = True + base_config["execution"]["num_instances"] = 2 + base_config["execution"]["num_nodes"] = 2 + cfg = OmegaConf.create(base_config) + + with patch( + "nemo_evaluator_launcher.executors.slurm.executor.logger" + ) as mock_logger: + _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ) + + # Warning should have been logged + mock_logger.warning.assert_called_once() + assert "multiple_instances" in mock_logger.warning.call_args[0][0] + + # Field should be removed from config + assert cfg.deployment.get("multiple_instances") is None + + def test_multiple_instances_false_still_removed( + self, base_config, mock_task, mock_dependencies + ): + """Even multiple_instances=False should trigger deprecation removal.""" + base_config["deployment"]["multiple_instances"] = False + cfg = OmegaConf.create(base_config) + + with patch("nemo_evaluator_launcher.executors.slurm.executor.logger"): + _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ) + + assert cfg.deployment.get("multiple_instances") is None + + # ── ALL_NODE_IPS and HEAD_NODE_IPS in srun command ─────────────────── + + def test_all_node_ips_exported(self, base_config, mock_task, mock_dependencies): + """ALL_NODE_IPS should always be exported and passed to the container.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert 'export ALL_NODE_IPS=$(IFS=,; echo "${NODES_IPS_ARRAY[*]}")' in script + assert "ALL_NODE_IPS" in script + + def test_head_node_ips_single_instance( + self, base_config, mock_task, mock_dependencies + ): + """With 1 instance, HEAD_NODE_IPS loop iterates once (g=0).""" + base_config["execution"]["num_nodes"] = 2 + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "HEAD_NODE_IPS=()" in script + assert "for ((g=0; g<1; g++))" in script + # npi = 2 // 1 = 2, head at index g*2 + assert "g * 2" in script + + @pytest.mark.parametrize( + "num_nodes,num_instances,expected_loop_count,expected_stride", + [ + (4, 2, 2, 2), # 2 instances of 2 nodes each + (6, 3, 3, 2), # 3 instances of 2 nodes each + (4, 4, 4, 1), # 4 instances of 1 node each + (8, 2, 2, 4), # 2 instances of 4 nodes each + ], + ) + def test_head_node_ips_multi_instance( + self, + base_config, + mock_task, + mock_dependencies, + num_nodes, + num_instances, + expected_loop_count, + expected_stride, + ): + """HEAD_NODE_IPS loop should iterate num_instances times with correct stride.""" + base_config["execution"]["num_nodes"] = num_nodes + base_config["execution"]["num_instances"] = num_instances + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert f"for ((g=0; g<{expected_loop_count}; g++))" in script + assert f"g * {expected_stride}" in script + + def test_all_node_ips_in_container_env( + self, base_config, mock_task, mock_dependencies + ): + """ALL_NODE_IPS must appear in the --container-env list.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + # Find the --container-env line and check ALL_NODE_IPS is listed + match = re.search(r"--container-env\s+(\S+)", script) + assert match, "--container-env not found in script" + env_vars = match.group(1) + assert "ALL_NODE_IPS" in env_vars + assert "MASTER_IP" in env_vars + + # ── Health check IP selection ──────────────────────────────────────── + + def test_health_check_uses_localhost_for_single_instance( + self, base_config, mock_task, mock_dependencies + ): + """Single instance should health-check on 127.0.0.1.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + # The wait-for-server handler should use 127.0.0.1 + assert '"127.0.0.1"' in script + + def test_health_check_uses_head_node_ips_for_multi_instance( + self, base_config, mock_task, mock_dependencies + ): + """Multi-instance should health-check on HEAD_NODE_IPS.""" + base_config["execution"]["num_nodes"] = 4 + base_config["execution"]["num_instances"] = 4 + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert '"${HEAD_NODE_IPS[@]}"' in script + + # ── HAProxy placeholder generation ─────────────────────────────────── + + @pytest.mark.parametrize( + "num_nodes,num_instances,expected_ips", + [ + (4, 2, ["{IP_0}", "{IP_2}"]), # heads at node 0 and 2 + (6, 3, ["{IP_0}", "{IP_2}", "{IP_4}"]), # heads at 0, 2, 4 + (4, 4, ["{IP_0}", "{IP_1}", "{IP_2}", "{IP_3}"]), # 1 node per instance + (8, 2, ["{IP_0}", "{IP_4}"]), # heads at 0, 4 + ], + ) + def test_haproxy_placeholder_backends(self, num_nodes, num_instances, expected_ips): + """HAProxy config should have one backend per instance head node.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _generate_haproxy_config_with_placeholders, + ) + + config = { + "deployment": { + "port": 8000, + "endpoints": {"health": "/health"}, + }, + "execution": { + "num_nodes": num_nodes, + "num_instances": num_instances, + "proxy": { + "config": { + "haproxy_port": 5009, + "health_check_path": "/health", + "health_check_status": 200, + }, + }, + }, + } + cfg = OmegaConf.create(config) + haproxy_config = _generate_haproxy_config_with_placeholders(cfg) + + for ip_placeholder in expected_ips: + assert ip_placeholder in haproxy_config, ( + f"{ip_placeholder} not found in HAProxy config" + ) + + # Verify no extra backends beyond expected + import re as _re + + backend_ips = _re.findall(r"\{IP_\d+\}", haproxy_config) + assert len(backend_ips) == len(expected_ips) + + # ── Proxy setup triggered by num_instances > 1 ─────────────────────── + + def test_proxy_setup_when_multi_instance( + self, base_config, mock_task, mock_dependencies + ): + """num_instances > 1 should trigger proxy srun in the sbatch script.""" + base_config["execution"]["num_nodes"] = 2 + base_config["execution"]["num_instances"] = 2 + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "proxy" in script.lower() + assert "PROXY_PID" in script + + def test_no_proxy_when_single_instance( + self, base_config, mock_task, mock_dependencies + ): + """num_instances == 1 should NOT set up proxy.""" + base_config["execution"]["num_nodes"] = 2 + base_config["execution"]["num_instances"] = 1 + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "proxy" not in script.lower() + assert "PROXY_PID" not in script + + def test_proxy_pid_killed_on_shutdown( + self, base_config, mock_task, mock_dependencies + ): + """Multi-instance script should kill both SERVER_PID and PROXY_PID.""" + base_config["execution"]["num_nodes"] = 2 + base_config["execution"]["num_instances"] = 2 + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "kill $SERVER_PID" in script + assert "kill $PROXY_PID" in script + + # ── Deployment command wrapping (base64 script file) ───────────────── + + def test_deployment_command_written_as_script_file( + self, base_config, mock_task, mock_dependencies + ): + """Deployment command should always be base64-encoded into deployment_cmd.sh.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "base64 -d > deployment_cmd.sh" in script + assert "bash deployment_cmd.sh" in script + + def test_deployment_command_base64_encodes_correctly(self): + """The base64 encoding should round-trip to the original command.""" + import base64 + + cmd = "vllm serve /model --port 8000 --tp 8" + encoded = base64.b64encode(cmd.encode("utf-8")).decode("utf-8") + decoded = base64.b64decode(encoded).decode("utf-8") + assert decoded == cmd + + def test_multiline_command_wrapped(self, base_config, mock_task, mock_dependencies): + """Multi-line deployment commands should be encoded into script file.""" + base_config["deployment"]["command"] = ( + "#!/bin/bash\nset -e\nray start --head\nvllm serve /model" + ) + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "base64 -d > deployment_cmd.sh" in script + assert "bash deployment_cmd.sh" in script + # Multi-line command should be written to deployment_cmd.sh + assert "deployment_cmd.sh" in script + + def test_command_wrapped_in_bash_c(self, base_config, mock_task, mock_dependencies): + """Deployment srun should use bash -c wrapper running asynchronously.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + # Should have bash -c '...' & + assert re.search(r"bash -c '.*deployment_cmd\.sh.*' &", script) + + # ── Pre-cmd + deployment command combined ──────────────────────────── + + def test_pre_cmd_and_deploy_cmd_both_as_scripts( + self, base_config, mock_task, mock_dependencies + ): + """When pre_cmd is set, both pre_cmd and command should be script files.""" + base_config["deployment"]["pre_cmd"] = "export MY_VAR=1\necho setup done" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "base64 -d > deployment_pre_cmd.sh" in script + assert "source deployment_pre_cmd.sh" in script + assert "base64 -d > deployment_cmd.sh" in script + assert "bash deployment_cmd.sh" in script + + def test_no_pre_cmd_skips_pre_script( + self, base_config, mock_task, mock_dependencies + ): + """Without pre_cmd, only deployment_cmd.sh should be generated.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "deployment_pre_cmd.sh" not in script + assert "base64 -d > deployment_cmd.sh" in script + + # ── Srun --nodes uses num_nodes ────────────────────────────────────── + + @pytest.mark.parametrize("num_nodes", [1, 2, 4, 8]) + def test_srun_nodes_equals_num_nodes( + self, base_config, mock_task, mock_dependencies, num_nodes + ): + """srun --nodes should always equal execution.num_nodes.""" + base_config["execution"]["num_nodes"] = num_nodes + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert f"--nodes {num_nodes}" in script + + # ── _generate_deployment_srun_command directly ─────────────────────── + + def test_srun_command_all_node_ips_and_head_node_ips(self): + """Directly test that _generate_deployment_srun_command emits IP arrays.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _generate_deployment_srun_command, + ) + + config = { + "deployment": { + "type": "vllm", + "image": "test-image:latest", + "command": "vllm serve /model", + }, + "execution": { + "num_nodes": 4, + "num_instances": 2, + "deployment": {"n_tasks": 4}, + "mounts": {"mount_home": True}, + }, + } + cfg = OmegaConf.create(config) + + command, _, _ = _generate_deployment_srun_command( + cfg=cfg, + deployment_mounts_list=[], + remote_task_subdir=Path("/test/remote"), + ) + + # ALL_NODE_IPS exported + assert "export ALL_NODE_IPS=" in command + # HEAD_NODE_IPS loop: 2 instances, stride=2 + assert "for ((g=0; g<2; g++))" in command + assert "g * 2" in command + # Container env includes both + assert "ALL_NODE_IPS" in command + assert "MASTER_IP" in command + + def test_srun_command_base64_script_wrapping(self): + """Directly test that deployment command is base64-encoded.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _generate_deployment_srun_command, + ) + + config = { + "deployment": { + "type": "vllm", + "image": "test-image:latest", + "command": "#!/bin/bash\nset -e\nvllm serve /model", + }, + "execution": { + "num_nodes": 1, + "num_instances": 1, + "deployment": {"n_tasks": 1}, + "mounts": {"mount_home": True}, + }, + } + cfg = OmegaConf.create(config) + + command, _, _ = _generate_deployment_srun_command( + cfg=cfg, + deployment_mounts_list=[], + remote_task_subdir=Path("/test/remote"), + ) + + assert "base64 -d > deployment_cmd.sh" in command + assert "bash deployment_cmd.sh" in command + assert "bash -c '" in command + + def test_srun_command_with_pre_cmd(self): + """Test that pre_cmd generates deployment_pre_cmd.sh and is sourced.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _generate_deployment_srun_command, + ) + + config = { + "deployment": { + "type": "vllm", + "image": "test-image:latest", + "command": "vllm serve /model", + "pre_cmd": "export MY_VAR=hello\necho pre-setup", + }, + "execution": { + "num_nodes": 1, + "num_instances": 1, + "deployment": {"n_tasks": 1}, + "mounts": {"mount_home": True}, + }, + } + cfg = OmegaConf.create(config) + + command, _, _ = _generate_deployment_srun_command( + cfg=cfg, + deployment_mounts_list=[], + remote_task_subdir=Path("/test/remote"), + ) + + assert "base64 -d > deployment_pre_cmd.sh" in command + assert "source deployment_pre_cmd.sh" in command + assert "base64 -d > deployment_cmd.sh" in command + assert "bash deployment_cmd.sh" in command + + # ── get_endpoint_url with num_instances ────────────────────────────── + + def test_get_endpoint_url_single_instance_uses_deployment_port(self): + """Single instance should use deployment port for endpoint URL.""" + from nemo_evaluator_launcher.common.helpers import get_endpoint_url + + config = { + "deployment": { + "type": "vllm", + "port": 8000, + "endpoints": {"openai": "/v1/chat/completions"}, + }, + "execution": { + "type": "slurm", + "num_instances": 1, + }, + "target": {"api_endpoint": {}}, + } + cfg = OmegaConf.create(config) + url = get_endpoint_url(cfg, {}, "openai") + assert url == "http://127.0.0.1:8000/v1/chat/completions" + + def test_get_endpoint_url_multi_instance_uses_haproxy_port(self): + """Multi-instance should use HAProxy port for endpoint URL.""" + from nemo_evaluator_launcher.common.helpers import get_endpoint_url + + config = { + "deployment": { + "type": "vllm", + "port": 8000, + "endpoints": {"openai": "/v1/chat/completions"}, + }, + "execution": { + "type": "slurm", + "num_instances": 2, + "proxy": { + "config": {"haproxy_port": 5009}, + }, + }, + "target": {"api_endpoint": {}}, + } + cfg = OmegaConf.create(config) + url = get_endpoint_url(cfg, {}, "openai") + assert url == "http://127.0.0.1:5009/v1/chat/completions" + + def test_get_endpoint_url_multi_instance_default_haproxy_port(self): + """Multi-instance without explicit proxy config should default to port 5009.""" + from nemo_evaluator_launcher.common.helpers import get_endpoint_url + + config = { + "deployment": { + "type": "vllm", + "port": 8000, + "endpoints": {"openai": "/v1/chat/completions"}, + }, + "execution": { + "type": "slurm", + "num_instances": 3, + }, + "target": {"api_endpoint": {}}, + } + cfg = OmegaConf.create(config) + url = get_endpoint_url(cfg, {}, "openai") + assert url == "http://127.0.0.1:5009/v1/chat/completions" diff --git a/packages/nemo-evaluator-launcher/tests/unit_tests/test_telemetry.py b/packages/nemo-evaluator-launcher/tests/unit_tests/test_telemetry.py index f73d8e7e4..09c5f801d 100644 --- a/packages/nemo-evaluator-launcher/tests/unit_tests/test_telemetry.py +++ b/packages/nemo-evaluator-launcher/tests/unit_tests/test_telemetry.py @@ -549,6 +549,7 @@ def slurm_base_config(self): "account": "test-account", "partition": "test-partition", "num_nodes": 1, + "num_instances": 1, "ntasks_per_node": 1, "subproject": "test-subproject", "env_vars": {},