diff --git a/docs/deployment/launcher-orchestrated/slurm.md b/docs/deployment/launcher-orchestrated/slurm.md index 2c9415a3b..b5a9cbdea 100644 --- a/docs/deployment/launcher-orchestrated/slurm.md +++ b/docs/deployment/launcher-orchestrated/slurm.md @@ -77,11 +77,12 @@ execution: # Resource allocation partition: batch # Slurm partition/queue - num_nodes: 1 # Number of nodes + num_nodes: 1 # Total SLURM nodes + num_instances: 1 # Independent deployment instances (HAProxy auto-enabled when > 1) ntasks_per_node: 1 # Tasks per node gres: gpu:8 # GPU resources walltime: "01:00:00" # Wall time limit (HH:MM:SS) - + # Environment variables and mounts env_vars: deployment: {} # Environment variables for deployment container @@ -96,6 +97,64 @@ execution: The `gpus_per_node` parameter can be used as an alternative to `gres` for specifying GPU resources. However, `gres` is the default in the base configuration. ::: +## Multi-Node Deployment + +Multi-node deployment can be achieved with or without Ray. + +### Without Ray (Custom Command) + +For multi-node setups using vLLM's native data parallelism or other custom coordination, override `deployment.command` with your own multi-node logic. The launcher exports `MASTER_IP` and `SLURM_PROCID` to help coordinate nodes: + +```yaml +defaults: + - execution: slurm/default + - deployment: vllm + - _self_ + +execution: + num_nodes: 2 + +deployment: + command: >- + bash -c 'if [ "$SLURM_PROCID" -eq 0 ]; then + vllm serve ${deployment.hf_model_handle} --data-parallel-size 16 --data-parallel-address $MASTER_IP ...; + else + vllm serve ${deployment.hf_model_handle} --headless --data-parallel-address $MASTER_IP ...; + fi' +``` + +See `examples/slurm_vllm_multinode_dp.yaml` for a complete native data parallelism example. + +### With Ray (vllm_ray) + +For models that require tensor/pipeline parallelism across nodes, use the `vllm_ray` deployment config which includes a built-in Ray cluster setup script: + +```yaml +defaults: + - execution: slurm/default + - deployment: vllm_ray # Ray-managed multi-node vLLM deployment + - _self_ + +execution: + num_nodes: 2 # Single instance spanning 2 nodes + +deployment: + tensor_parallel_size: 8 + pipeline_parallel_size: 2 +``` + +### Multi-Instance with HAProxy + +To run multiple independent deployment instances with HAProxy load-balancing: + +```yaml +execution: + num_nodes: 4 # Total SLURM nodes + num_instances: 2 # 2 instances of 2 nodes each → HAProxy auto-enabled +``` + +When `num_instances > 1`, HAProxy is automatically configured to distribute requests across instance head nodes. See the `examples/` directory for complete configurations. + ## Configuration Examples ### Benchmark Suite Evaluation diff --git a/docs/libraries/nemo-evaluator-launcher/configuration/deployment/vllm.md b/docs/libraries/nemo-evaluator-launcher/configuration/deployment/vllm.md index 2b0db69de..dc5778f7c 100644 --- a/docs/libraries/nemo-evaluator-launcher/configuration/deployment/vllm.md +++ b/docs/libraries/nemo-evaluator-launcher/configuration/deployment/vllm.md @@ -81,12 +81,39 @@ evaluation: HF_TOKEN: $host:HF_TOKEN_FOR_GPQA_DIAMOND # Click request access for GPQA-Diamond: https://huggingface.co/datasets/Idavidrein/gpqa ``` +## Multi-Node Deployment with Ray (`vllm_ray`) + +For models requiring multiple nodes (e.g., pipeline parallelism across nodes), use the `vllm_ray` deployment config: + +```yaml +defaults: + - execution: slurm/default + - deployment: vllm_ray + - _self_ + +execution: + num_nodes: 2 # Single instance spanning 2 nodes + +deployment: + tensor_parallel_size: 8 + pipeline_parallel_size: 2 +``` + +The `vllm_ray` config inherits all fields from `vllm` and adds: + +- **`distributed_executor_backend`**: Ray backend type (default: `ray`) +- **`ray_compiled_dag_channel_type`**: Ray channel type — `auto`, `shm`, or `nccl` (default: `shm`) +- **`command`**: Built-in Ray cluster setup script that starts a Ray head on rank 0, waits for workers, then launches vLLM with `--distributed-executor-backend` + +The `base_command` field in the base `vllm` config contains the `vllm serve ...` invocation. The `vllm_ray` config references it via `${deployment.base_command}` to append Ray-specific flags. + ## Reference The following example configuration files are available in the `examples/` directory: -- `lepton_vllm_llama_3_1_8b_instruct.yaml` - vLLM deployment on Lepton platform -- `slurm_llama_3_1_8b_instruct.yaml` - vLLM deployment on SLURM cluster -- `slurm_llama_3_1_8b_instruct_hf.yaml` - vLLM deployment using HuggingFace model ID +- `slurm_vllm_basic.yaml` - Basic single-node vLLM deployment +- `slurm_vllm_multinode_ray_tp_pp.yaml` - Multi-node Ray deployment with TP+PP +- `slurm_vllm_multinode_multiinstance_ray_tp_pp.yaml` - Multi-node multi-instance Ray with HAProxy +- `slurm_vllm_multinode_dp_haproxy.yaml` - Multi-node independent instances with HAProxy Use `nemo-evaluator-launcher run --dry-run` to check your configuration before running. diff --git a/docs/libraries/nemo-evaluator-launcher/configuration/executors/slurm.md b/docs/libraries/nemo-evaluator-launcher/configuration/executors/slurm.md index 4ee7f7c92..f58cee58c 100644 --- a/docs/libraries/nemo-evaluator-launcher/configuration/executors/slurm.md +++ b/docs/libraries/nemo-evaluator-launcher/configuration/executors/slurm.md @@ -85,6 +85,30 @@ env_vars: **Security:** Secret values are never written into the generated `run.sub` script. They are stored in a separate `.secrets.env` file and sourced at runtime, preventing accidental exposure in logs or artifacts. +### Multi-Node and Multi-Instance + +Configure multi-node deployments using `num_nodes` and `num_instances`: + +```yaml +execution: + num_nodes: 4 # Total SLURM nodes + num_instances: 2 # Independent deployment instances (default: 1) +``` + +- **`num_nodes`**: Total number of SLURM nodes to allocate +- **`num_instances`**: Number of independent deployment instances. When `> 1`, HAProxy is automatically configured to load-balance across instances. `num_nodes` must be divisible by `num_instances`. + +For multi-node deployments requiring Ray (e.g., pipeline parallelism across nodes), use the `vllm_ray` deployment config instead of `vllm`: + +```yaml +defaults: + - deployment: vllm_ray # Built-in Ray cluster setup +``` + +:::{note} +The deprecated `deployment.multiple_instances` field is still accepted but will be removed in a future release. Use `execution.num_instances` instead. +::: + ### Mounting and Storage The Slurm executor provides sophisticated mounting capabilities: diff --git a/packages/nemo-evaluator-launcher/.claude/skills/nel-assistant/SKILL.md b/packages/nemo-evaluator-launcher/.claude/skills/nel-assistant/SKILL.md index 6f7eb2850..e8b1dad89 100644 --- a/packages/nemo-evaluator-launcher/.claude/skills/nel-assistant/SKILL.md +++ b/packages/nemo-evaluator-launcher/.claude/skills/nel-assistant/SKILL.md @@ -142,27 +142,59 @@ Show tasks in the current config. Loop until the user confirms the task list is ``` For the None (External) deployment the `api_key_name` should be already defined. The `DUMMY_API_KEY` export is handled in Step 8. -**Step 6: Advanced - Multi-node (Data Parallel)** +**Step 6: Advanced - Multi-node** -Only if model >120B parameters, suggest multi-node. Explain: "This is DP multi-node - the weights are copied (not distributed) across nodes. One deployment instance per node will be run with HAProxy load-balancing requests." +There are two multi-node patterns. Ask the user which applies: -Ask if user wants multi-node. If yes, ask for node count and configure: +**Pattern A: Multi-instance (independent instances with HAProxy)** + +Only if model >120B parameters or user wants more throughput. Explain: "Each node runs an independent deployment instance. HAProxy load-balances requests across all instances." ```yaml execution: - num_nodes: 4 # 4 nodes = 4 independent deployment instances = 4x throughput - deployment: - n_tasks: ${execution.num_nodes} # Must match num_nodes for multi-instance deployment + num_nodes: 4 # Total nodes + num_instances: 4 # 4 independent instances → HAProxy auto-enabled +``` + +**Pattern B: Multi-node single instance (Ray TP/PP across nodes)** + +When a single model is too large for one node and needs pipeline parallelism across nodes. Use `vllm_ray` deployment config: + +```yaml +defaults: + - deployment: vllm_ray # Built-in Ray cluster setup (replaces manual pre_cmd) + +execution: + num_nodes: 2 # Single instance spanning 2 nodes + +deployment: + tensor_parallel_size: 8 + pipeline_parallel_size: 2 +``` + +**Pattern A+B combined: Multi-instance with multi-node instances** + +For very large models needing both cross-node parallelism AND multiple instances: + +```yaml +defaults: + - deployment: vllm_ray + +execution: + num_nodes: 4 # Total nodes + num_instances: 2 # 2 instances of 2 nodes each → HAProxy auto-enabled deployment: - multiple_instances: true + tensor_parallel_size: 8 + pipeline_parallel_size: 2 ``` **Common Confusions** -- **This is different from `data_parallel_size`**, which controls DP replicas *within* a single node/deployment instance. -- Global data parallelism is `num_nodes x data_parallel_size` (e.g., 2 nodes x 4 DP each = 8 replicas for max throughput). -- With multi-node, `parallelism` in task config is the total concurrent requests across all instances, not per-instance. +- **`num_instances`** controls independent deployment instances with HAProxy. **`data_parallel_size`** controls DP replicas *within* a single instance. +- Global data parallelism is `num_instances x data_parallel_size` (e.g., 2 instances x 8 DP each = 16 replicas). +- With multi-instance, `parallelism` in task config is the total concurrent requests across all instances, not per-instance. +- `num_nodes` must be divisible by `num_instances`. **Step 7: Advanced - Interceptors** diff --git a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp.yaml b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp.yaml index ba6668000..cf1ef9982 100644 --- a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp.yaml +++ b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp.yaml @@ -69,8 +69,6 @@ execution: walltime: 01:00:00 num_nodes: 2 # Number of SLURM nodes for multi-node deployment - deployment: - n_tasks: ${execution.num_nodes} # For multi-node vLLM deployment, must match num_nodes mounts: mount_home: false # Whether to mount home directory (default: true) @@ -97,7 +95,7 @@ evaluation: parallelism: 512 # Number of parallel requests (higher for data parallel deployment) temperature: 0.6 # Sampling temperature top_p: 0.95 # Nucleus sampling parameter - max_tokens: 32768 # Maximum number of tokens to generate (32k) + max_new_tokens: 32768 # Maximum number of tokens to generate (32k) request_timeout: 3600 # Timeout for API requests in seconds target: api_endpoint: diff --git a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp_haproxy.yaml b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp_haproxy.yaml index 8806b3c43..56cdeac04 100644 --- a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp_haproxy.yaml +++ b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_dp_haproxy.yaml @@ -61,15 +61,13 @@ execution: walltime: 01:00:00 num_nodes: 2 # Number of SLURM nodes for multi-node deployment - deployment: - n_tasks: ${execution.num_nodes} # One vLLM instance per node + num_instances: 2 # 2 independent single-node instances → HAProxy auto-enabled mounts: mount_home: false # Whether to mount home directory (default: true) # Override default deployment arguments deployment: - multiple_instances: true # Enable HAProxy load balancing across nodes checkpoint_path: null hf_model_handle: nvidia/NVIDIA-Nemotron-Nano-9B-v2 served_model_name: nvidia/NVIDIA-Nemotron-Nano-9B-v2 @@ -86,7 +84,7 @@ evaluation: parallelism: 512 # Number of parallel requests (higher for multi-node deployment) temperature: 0.6 # Sampling temperature top_p: 0.95 # Nucleus sampling parameter - max_tokens: 32768 # Maximum number of tokens to generate (32k) + max_new_tokens: 32768 # Maximum number of tokens to generate (32k) request_timeout: 3600 # Timeout for API requests in seconds target: api_endpoint: diff --git a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_multiinstance_ray_tp_pp.yaml b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_multiinstance_ray_tp_pp.yaml new file mode 100644 index 000000000..77121211b --- /dev/null +++ b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_multiinstance_ray_tp_pp.yaml @@ -0,0 +1,117 @@ +# +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ============================================================================== +# Multi-Node Multi-Instance SLURM Deployment: DeepSeek-R1 with HAProxy +# ============================================================================== +# This configuration demonstrates how to run evaluations with DeepSeek-R1 +# deployed as multiple instances across SLURM nodes, each instance spanning +# multiple nodes using Ray tensor and pipeline parallelism, with HAProxy +# load-balancing across instances. +# +# Architecture: +# 4 nodes total, 2 instances of 2 nodes each: +# Instance 0 (nodes 0,1): Ray head + worker, vLLM on :8000 +# Instance 1 (nodes 2,3): Ray head + worker, vLLM on :8000 +# HAProxy: distributes requests across Instance 0 and Instance 1 +# +# How to use: +# +# 1. copy this file locally or clone the repository +# 2. (optional) set the required values in the config file. Alternatively, you can pass them later with -o cli arguments, e.g. +# -o execution.hostname=my-cluster.com -o execution.output_dir=/absolute/path/on/cluster -o execution.account=my-account etc. +# 3. (optional) run with 10 samples for quick testing - add the following flag to the command below +# -o ++evaluation.nemo_evaluator_config.config.params.limit_samples=10 +# 4. run full evaluation: +# nemo-evaluator-launcher run --config path/to/slurm_vllm_multinode_multiinstance_ray_tp_pp.yaml +# +# ⚠️ WARNING: +# Always run full evaluations (without limit_samples) for actual benchmark results. +# Using a subset of samples is solely for testing configuration and setup. +# Results from such test runs should NEVER be used to compare models or +# report benchmark performance. + +# Model Details: +# - Model: deepseek-ai/DeepSeek-R1 +# - Hardware: 4 nodes with 8xH100 GPUs each (32 H100 GPUs total) +# - 2 instances, each spanning 2 nodes (16 GPUs per instance) +# - tensor_parallel_size: 8 (within node parallelism) +# - pipeline_parallel_size: 2 (across node parallelism within each instance) +# +# Multi-Node Multi-Instance Configuration: +# - execution.num_nodes: 4 (total SLURM nodes) +# - execution.num_instances: 2 (2 instances → HAProxy auto-enabled) +# - num_nodes_per_instance = num_nodes / num_instances = 2 +# - deployment.tensor_parallel_size: GPU parallelism within a single node +# - deployment.pipeline_parallel_size: Model parallelism across nodes within an instance +# +# The vllm_ray deployment config contains a built-in Ray setup script. +# The script expects scheduler-agnostic variables (PROC_ID, NUM_TASKS, +# MASTER_IP, ALL_NODE_IPS) exported by the executor, and computes +# per-instance variables (INSTANCE_ID, INSTANCE_RANK, etc.) internally. +# ============================================================================== + +defaults: + - execution: slurm/default + - deployment: vllm_ray + - _self_ + +execution: + hostname: ??? # SLURM headnode (login) hostname (required) + username: ${oc.env:USER} + account: ??? # SLURM account allocation (required) + output_dir: ??? # ABSOLUTE path accessible to SLURM compute nodes (required) + num_nodes: 4 # 4 total SLURM nodes (2 per instance × 2 instances) + num_instances: 2 # 2 instances → HAProxy auto-enabled + mounts: + deployment: + /path/to/hf_home: /root/.cache/huggingface + mount_home: false + env_vars: + deployment: + HF_TOKEN: ${oc.env:HF_TOKEN} + +# Ray cluster setup is handled by the vllm_ray deployment config (no pre_cmd needed) +deployment: + image: vllm/vllm-openai:v0.15.1 + checkpoint_path: null + hf_model_handle: deepseek-ai/DeepSeek-R1 + served_model_name: deepseek-ai/DeepSeek-R1 + tensor_parallel_size: 8 + pipeline_parallel_size: 2 + data_parallel_size: 1 + gpu_memory_utilization: 0.90 + port: 8000 + extra_args: "--disable-custom-all-reduce --enforce-eager" + +evaluation: + nemo_evaluator_config: + config: + params: + parallelism: 128 + request_timeout: 3600 + temperature: 0.6 + top_p: 0.95 + max_new_tokens: 32768 + target: + api_endpoint: + adapter_config: + process_reasoning_traces: true # Strip ... tokens from DeepSeek-R1 responses + use_response_logging: true + max_logged_responses: 10 + use_request_logging: true + max_logged_requests: 10 + tasks: + - name: gsm8k_cot_instruct \ No newline at end of file diff --git a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_ray_tp_pp.yaml b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_ray_tp_pp.yaml index 942cfcc31..c480f1141 100644 --- a/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_ray_tp_pp.yaml +++ b/packages/nemo-evaluator-launcher/examples/slurm_vllm_multinode_ray_tp_pp.yaml @@ -42,18 +42,17 @@ # - tensor_parallel_size: 8 (within node parallelism) # - pipeline_parallel_size: 2 (across node parallelism) # -# Multi-Instance Configuration: -# - execution.num_nodes: Number of SLURM nodes to allocate (2 in this example) -# - execution.deployment.n_tasks: Must match num_nodes for multi-instance deployment +# Multi-Node Configuration: +# - execution.num_nodes: 2 (single instance spanning 2 nodes) # - deployment.tensor_parallel_size: GPU parallelism within a single node # - deployment.pipeline_parallel_size: Model parallelism across multiple nodes # -# Custom deployment.pre_cmd handles Ray cluster initialization automatically +# The vllm_ray deployment config handles Ray cluster setup automatically # ============================================================================== defaults: - execution: slurm/default - - deployment: vllm + - deployment: vllm_ray - _self_ # Override default execution arguments @@ -67,14 +66,12 @@ execution: walltime: 02:00:00 num_nodes: 2 # Number of SLURM nodes for multi-node deployment - deployment: - n_tasks: ${execution.num_nodes} # For multi-node ray vLLM deployment, must match num_nodes mounts: mount_home: false # Whether to mount home directory (default: true) # Override default deployment arguments -# Note: This uses pre_cmd for multi-node Ray cluster setup before the default vllm serve command +# Ray cluster setup is handled by the vllm_ray deployment config (no pre_cmd needed) deployment: image: vllm/vllm-openai:v0.15.1 checkpoint_path: null @@ -84,47 +81,7 @@ deployment: pipeline_parallel_size: 2 data_parallel_size: 1 port: 8000 - extra_args: "--disable-custom-all-reduce --distributed-executor-backend ray --enforce-eager" - env_vars: - VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: lit:shm - pre_cmd: | - RAY_PORT=6379 - NODE_PORT=8266 - OBJ_PORT=8267 - RAY_FIXED_PORTS="--node-manager-port=$NODE_PORT --object-manager-port=$OBJ_PORT --metrics-export-port=8269 --dashboard-agent-grpc-port=8270 --dashboard-agent-listen-port=8271 --runtime-env-agent-port=8272" - - export VLLM_HOST_IP=$MASTER_IP - - if [ "$SLURM_PROCID" -eq 0 ]; then - echo "VLLM_HOST_IP set to: $VLLM_HOST_IP (HEAD node)" - echo "Starting Ray HEAD on $(hostname) with IP $VLLM_HOST_IP..." - ray start --head --port=$RAY_PORT $RAY_FIXED_PORTS - - export RAY_ADDRESS=$MASTER_IP:$RAY_PORT - echo "RAY_ADDRESS set to: $RAY_ADDRESS" - - echo "Waiting 90 seconds for worker nodes to join Ray cluster..." - sleep 90 - - echo "=== Ray cluster status ===" - ray status || true - - echo "=== Starting vLLM Deployment ===" - vllm serve ${deployment.hf_model_handle} \ - --tensor-parallel-size=${deployment.tensor_parallel_size} \ - --pipeline-parallel-size=${deployment.pipeline_parallel_size} \ - --port ${deployment.port} \ - --served-model-name ${deployment.served_model_name} \ - ${deployment.extra_args} - else - export VLLM_HOST_IP=$(python3 -c "import socket; s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM); s.connect(('8.8.8.8', 80)); print(s.getsockname()[0])") - echo "VLLM_HOST_IP set to: $VLLM_HOST_IP (WORKER node)" - echo "Worker node waiting 30 seconds for head node to start Ray..." - sleep 30 - echo "Starting Ray WORKER on $(hostname) with IP $VLLM_HOST_IP..." - echo "Connecting to $MASTER_IP:$RAY_PORT..." - ray start --address=$MASTER_IP:$RAY_PORT $RAY_FIXED_PORTS --block - fi + extra_args: "--disable-custom-all-reduce --enforce-eager" # Specify the benchmarks to evaluate evaluation: @@ -135,7 +92,7 @@ evaluation: request_timeout: 3600 # Timeout for API requests in seconds temperature: 0.6 # Sampling temperature top_p: 0.95 # Nucleus sampling parameter - max_tokens: 32768 # Maximum number of tokens to generate (32k) + max_new_tokens: 32768 # Maximum number of tokens to generate (32k) target: api_endpoint: diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py index 6643fb26a..6a8e95389 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/common/helpers.py @@ -339,8 +339,8 @@ def apply_url_override(url: str) -> str: # Local executor - use localhost endpoint_uri = cfg.deployment.endpoints[endpoint_type] - # Use HAProxy port if multiple_instances is enabled - if cfg.deployment.get("multiple_instances", False): + # Use HAProxy port if num_instances > 1 + if cfg.get("execution", {}).get("num_instances", 1) > 1: proxy_config = cfg.execution.get("proxy", {}).get("config", {}) port = proxy_config.get("haproxy_port", 5009) else: diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/deployment/vllm.yaml b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/deployment/vllm.yaml index fd7b972c2..1a74dfad6 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/deployment/vllm.yaml +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/deployment/vllm.yaml @@ -30,7 +30,8 @@ endpoints: completions: /v1/completions health: /health -command: vllm serve ${oc.select:deployment.hf_model_handle,/checkpoint} +base_command: >- + vllm serve ${oc.select:deployment.hf_model_handle,/checkpoint} --tensor-parallel-size=${deployment.tensor_parallel_size} --pipeline-parallel-size=${deployment.pipeline_parallel_size} --data-parallel-size=${deployment.data_parallel_size} @@ -39,3 +40,5 @@ command: vllm serve ${oc.select:deployment.hf_model_handle,/checkpoint} --served-model-name ${deployment.served_model_name} --gpu-memory-utilization ${deployment.gpu_memory_utilization} ${deployment.extra_args} + +command: ${deployment.base_command} diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/deployment/vllm_ray.yaml b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/deployment/vllm_ray.yaml new file mode 100644 index 000000000..ca886f6c7 --- /dev/null +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/deployment/vllm_ray.yaml @@ -0,0 +1,88 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Multi-node vLLM deployment with Ray. +# +# Use this config when num_nodes / num_instances > 1. It inherits every field +# from vllm.yaml and overrides `command` with a built-in Ray cluster setup +# script that: +# - Starts a Ray head on PROC_ID 0, polls until all NODES_PER_INSTANCE workers +# have joined, then launches vLLM with --distributed-executor-backend (default: ray). +# - Starts a Ray worker on every other PROC_ID (blocks until Ray head exits). +# +# Environment variables the executor must inject: +# PROC_ID — rank of this process within the instance (0 = head node) +# MASTER_IP — IP of the head node for this instance +# NODES_PER_INSTANCE — number of nodes per instance (used to poll until all workers join) +# +# Optional env_vars set by this config (override in your yaml if needed): +# VLLM_ENGINE_READY_TIMEOUT_S — startup timeout (default: 3600s) +# VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE — channel type for Ray DAG (default: shm) +# +# OmegaConf escaping note: +# $VAR → passed through as literal (safe for bash) +# ${key} → resolved as Hydra interpolation +# `cmd` → use backticks for bash command substitution (OmegaConf rejects $()) +defaults: + - vllm + - _self_ + +distributed_executor_backend: ray + +env_vars: + VLLM_ENGINE_READY_TIMEOUT_S: 'lit:3600' # allow enough time for large models to load across nodes + VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: 'lit:shm' # "auto", "shm", or "nccl" (default in vLLM: auto) + +command: | + #!/bin/bash + set -euo pipefail + + RAY_PORT=6379 + NODE_PORT=8266 + OBJ_PORT=8267 + RAY_FIXED_PORTS="--node-manager-port=$NODE_PORT --object-manager-port=$OBJ_PORT --metrics-export-port=8269 --dashboard-agent-grpc-port=8270 --dashboard-agent-listen-port=8271 --runtime-env-agent-port=8272" + + echo "=== Ray Setup: PROC_ID=$PROC_ID, MASTER_IP=$MASTER_IP ===" + echo "Hostname: `hostname`" + + if [ "$PROC_ID" -eq 0 ]; then + # HEAD node + export VLLM_HOST_IP=$MASTER_IP + echo "Starting Ray HEAD on `hostname` with VLLM_HOST_IP=$VLLM_HOST_IP" + ray start --head --port=$RAY_PORT $RAY_FIXED_PORTS + + export RAY_ADDRESS=$MASTER_IP:$RAY_PORT + echo "Waiting for $NODES_PER_INSTANCE node(s) to join Ray cluster..." + until [ "`ray status 2>/dev/null | grep -c 'node_'`" -ge "$NODES_PER_INSTANCE" ]; do + sleep 10 + done + + echo "=== Ray cluster ready ===" + ray status + + # Run the deployment command + ${deployment.base_command} --distributed-executor-backend ${deployment.distributed_executor_backend} + else + # WORKER node + export VLLM_HOST_IP=`python3 -c "import socket; s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM); s.connect(('8.8.8.8', 80)); print(s.getsockname()[0])"` + echo "Starting Ray WORKER on `hostname` with VLLM_HOST_IP=$VLLM_HOST_IP" + echo "Connecting to Ray head at $MASTER_IP:$RAY_PORT..." + + # Retry loop in case head is still starting + until ray start --address=$MASTER_IP:$RAY_PORT $RAY_FIXED_PORTS --block 2>/dev/null; do + echo "Retrying connection to Ray head at $MASTER_IP:$RAY_PORT..." + sleep 5 + done + fi diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml index 8f0657f99..8a55da64c 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml @@ -20,7 +20,8 @@ username: ${oc.env:USER} # Defaults to $USER env var account: ??? # SLURM account allocation (required) output_dir: ??? # Absolute path accessible on compute nodes (required) partition: batch -num_nodes: 1 +num_nodes: 1 # Total SLURM nodes (num_nodes_per_instance = num_nodes / num_instances) +num_instances: 1 # Number of independent deployment instances ntasks_per_node: 1 gres: gpu:8 walltime: 01:00:00 @@ -30,7 +31,7 @@ sbatch_comment: null # Optional comment for SLURM job (translates to #SBATCH --c # Deployment-specific SLURM configuration deployment: - n_tasks: 1 # Number of tasks for deployment srun (default: 1, for multi-instance set to num_nodes) + n_tasks: ${execution.num_nodes} # total deployment srun tasks across all instances; executor divides by num_instances per srun. Override if your deployment requires more tasks per node (e.g. ntasks_per_node > 1). mounts: deployment: {} diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py index ccdcfac71..81876f860 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py @@ -29,7 +29,7 @@ import yaml from jinja2 import Environment, FileSystemLoader -from omegaconf import DictConfig, OmegaConf +from omegaconf import DictConfig, OmegaConf, open_dict from nemo_evaluator_launcher.common.env_vars import ( build_reexport_commands, @@ -143,7 +143,7 @@ def execute_eval(cfg: DictConfig, dry_run: bool = False) -> str: ) # Create proxy config file with placeholder IPs for multi-instance deployments - if cfg.deployment.get("multiple_instances", False): + if cfg.execution.num_instances > 1: proxy_type = cfg.execution.get("proxy", {}).get("type", "haproxy") if proxy_type == "haproxy": proxy_config = _generate_haproxy_config_with_placeholders(cfg) @@ -640,6 +640,22 @@ def _create_slurm_sbatch_script( Returns: str: The contents of the sbatch script. """ + # Remove deprecated deployment.multiple_instances if present + if cfg.deployment.get("multiple_instances") is not None: + logger.warning( + "deployment.multiple_instances is deprecated and will be " + "removed from config — use execution.num_instances instead." + ) + with open_dict(cfg): + del cfg.deployment.multiple_instances + + # Validate topology: num_nodes must be divisible by num_instances + if cfg.execution.num_nodes % cfg.execution.num_instances != 0: + raise ValueError( + f"execution.num_nodes ({cfg.execution.num_nodes}) must be divisible by " + f"execution.num_instances ({cfg.execution.num_instances})" + ) + # get task from mapping, overrides, urls tasks_mapping = load_tasks_mapping() task_definition = get_task_definition_for_job( @@ -773,9 +789,9 @@ def _create_slurm_sbatch_script( # wait for the server to initialize health_path = cfg.deployment.endpoints.get("health", "/health") - # For multi-instance check all node IPs, for single instance check localhost - if cfg.deployment.get("multiple_instances", False): - ip_list = '"${NODES_IPS_ARRAY[@]}"' + # HEAD_NODE_IPS is always set: subset of heads when NPI > 1, all nodes otherwise + if cfg.execution.num_instances > 1: + ip_list = '"${HEAD_NODE_IPS[@]}"' else: ip_list = '"127.0.0.1"' s += _get_wait_for_server_handler( @@ -788,7 +804,7 @@ def _create_slurm_sbatch_script( s += "\n\n" # add proxy load balancer for multi-instance deployments - if cfg.deployment.get("multiple_instances", False): + if cfg.execution.num_instances > 1: s += _get_proxy_server_srun_command(cfg, remote_task_subdir) # prepare evaluation mounts @@ -850,8 +866,8 @@ def _create_slurm_sbatch_script( # terminate the server after all evaluation clients finish if cfg.deployment.type != "none": - s += "kill $SERVER_PID # terminate the server to finish gracefully\n" - if cfg.deployment.get("multiple_instances", False): + s += 'for _pid in "${SERVER_PIDS[@]}"; do kill "$_pid" 2>/dev/null || true; done # terminate servers\n' + if cfg.execution.num_instances > 1: s += "kill $PROXY_PID # terminate proxy to finish gracefully\n" s += "\n" @@ -1596,11 +1612,11 @@ def _generate_haproxy_config_with_placeholders(cfg): env = Environment(loader=FileSystemLoader(template_dir)) template = env.get_template("proxy.cfg.template") - # Prepare template data with placeholder IPs - use actual number of nodes - num_nodes = cfg.execution.num_nodes + # Prepare template data with placeholder IPs - one backend per instance head node nodes = [] - for i in range(num_nodes): - nodes.append({"ip": f"{{IP_{i}}}", "port": cfg.deployment.port}) + for i in range(cfg.execution.num_instances): + head_idx = i * cfg.execution.num_nodes // cfg.execution.num_instances + nodes.append({"ip": f"{{IP_{head_idx}}}", "port": cfg.deployment.port}) # Get health check parameters - prefer proxy config, fallback to deployment.endpoints.health proxy_config = cfg.execution.get("proxy", {}).get("config", {}) @@ -1661,9 +1677,13 @@ def _generate_deployment_srun_command( deployment_mounts_list, remote_task_subdir, deployment_env_var_names: list[str] | None = None, - instance_id: int = 0, ): - """Generate the deployment srun command with proper node/ntask configuration. + """Generate per-instance deployment srun commands. + + Loops over num_instances and launches a dedicated srun for each instance on + its own node subset. Multi-instance partitioning lives here; the deployment + command receives PROC_ID (rank within the instance) and MASTER_IP (head of + the instance) and only needs to handle single-instance setup. Returns: tuple: (script_string, is_potentially_unsafe, debug_comment) @@ -1694,56 +1714,88 @@ def _generate_deployment_srun_command( s += 'if [[ ${#nodes_array[@]} -eq 0 ]]; then nodes_array=( "$(hostname)" ); fi\n' s += 'export NODES_IPS_ARRAY=($(for node in "${nodes_array[@]}"; do srun --nodelist="$node" --ntasks=1 --nodes=1 hostname --ip-address; done))\n' s += 'echo "Node IPs: ${NODES_IPS_ARRAY[@]}"\n' - s += "# Export MASTER_IP as the first node IP\n" - s += "export MASTER_IP=${NODES_IPS_ARRAY[0]}\n" - s += 'echo "MASTER_IP: $MASTER_IP"\n' + s += 'export ALL_NODE_IPS=$(IFS=,; echo "${NODES_IPS_ARRAY[*]}")\n' + + num_instances = cfg.execution.num_instances + nodes_per_instance = cfg.execution.num_nodes // num_instances + # n_tasks is total tasks across all instances (= num_nodes by default via slurm/default.yaml). + # Executor divides by num_instances to get per-instance ntasks for each srun. + # Falls back to num_nodes in case the YAML default isn't loaded (e.g. tests). + total_ntasks = ( + cfg.execution.get("deployment", {}).get("n_tasks") or cfg.execution.num_nodes + ) + per_instance_ntasks = total_ntasks // num_instances + + s += "HEAD_NODE_IPS=()\n" + s += "SERVER_PIDS=()\n" - # Add debug comment for deployment pre_cmd before srun command + # Add debug comment for deployment pre_cmd before the loop if debug_comment: s += "# Debug contents of deployment pre_cmd\n" s += debug_comment s += "\n" - s += "srun --mpi pmix --overlap " - s += f"--nodes {cfg.execution.num_nodes} --ntasks {cfg.execution.get('deployment', {}).get('n_tasks', 1)} " - s += "--container-image {} ".format(cfg.deployment.image) - if deployment_mounts_list: - s += "--container-mounts {} ".format(",".join(deployment_mounts_list)) - if not cfg.execution.get("mounts", {}).get("mount_home", True): - s += "--no-container-mount-home " - s += "--output {} ".format(remote_task_subdir / "logs" / "server-%A-%t.log") - if deployment_env_var_names is None: deployment_env_var_names = [] - # Always add MASTER_IP to the environment variables + # Always pass MASTER_IP and ALL_NODE_IPS into each instance container if "MASTER_IP" not in deployment_env_var_names: deployment_env_var_names.append("MASTER_IP") + if "ALL_NODE_IPS" not in deployment_env_var_names: + deployment_env_var_names.append("ALL_NODE_IPS") + + # Build the command that runs inside each instance container: + # 1. Export scheduler-agnostic env vars (PROC_ID, NODES_PER_INSTANCE) + # 2. Optionally write + source deployment_pre_cmd.sh + # 3. Write deployment_cmd.sh and execute it + create_script_cmd = _str_to_echo_command( + cfg.deployment.command, filename="deployment_cmd.sh" + ) + debug_comment += create_script_cmd.debug + "\n\n" - if deployment_env_var_names: - s += f"--container-env {','.join(sorted(deployment_env_var_names))} " + env_setup = ( + f"export PROC_ID=${{SLURM_PROCID:-0}} NODES_PER_INSTANCE={nodes_per_instance}" + ) + script = f"{env_setup} && {create_script_cmd.cmd} && bash deployment_cmd.sh" - # Wrap deployment command to execute pre_cmd inside container if needed if pre_cmd: - # Create a wrapper command that runs inside the container: - # 1. Create deployment_pre_cmd.sh file - # 2. Source it - # 3. Execute the original deployment command create_pre_script_cmd = _str_to_echo_command( pre_cmd, filename="deployment_pre_cmd.sh" ) - # Escape single quotes in the deployment command for bash -c - escaped_deployment_cmd = cfg.deployment.command.replace("'", "'\"'\"'") - wrapped_command = ( - f"bash -c '{create_pre_script_cmd.cmd} && " + script = ( + f"{env_setup} && " + f"{create_pre_script_cmd.cmd} && " f"source deployment_pre_cmd.sh && " - f"{escaped_deployment_cmd}'" + f"{create_script_cmd.cmd} && bash deployment_cmd.sh" ) - s += "{} &\n\n".format(wrapped_command) - else: - s += "{} &\n\n".format(cfg.deployment.command) # run asynchronously - s += "SERVER_PID=$! # capture the PID of the server background srun process\n\n" + # Per-instance loop: launch one srun per instance on its dedicated nodes. + # MASTER_IP is exported before each srun so the container inherits the + # correct per-instance head IP via --container-env. + s += f"for ((g=0; g<{num_instances}; g++)); do\n" + s += f" START_IDX=$((g * {nodes_per_instance}))\n" + s += f' INSTANCE_NODES_ARR=("${{nodes_array[@]:$START_IDX:{nodes_per_instance}}}")\n' + s += ' INSTANCE_NODELIST=$(IFS=,; echo "${INSTANCE_NODES_ARR[*]}")\n' + s += ' MASTER_IP="${NODES_IPS_ARRAY[$START_IDX]}"\n' + s += ' HEAD_NODE_IPS+=("$MASTER_IP")\n' + s += " export MASTER_IP\n" + s += ' echo "Instance $g: MASTER_IP=$MASTER_IP, nodes: ${INSTANCE_NODES_ARR[*]}"\n' + s += " srun --mpi pmix --overlap " + s += f'--nodelist "$INSTANCE_NODELIST" --nodes {nodes_per_instance} --ntasks {per_instance_ntasks} ' + s += "--container-image {} ".format(cfg.deployment.image) + if deployment_mounts_list: + s += "--container-mounts {} ".format(",".join(deployment_mounts_list)) + if not cfg.execution.get("mounts", {}).get("mount_home", True): + s += "--no-container-mount-home " + s += "--output {} ".format(remote_task_subdir / "logs" / "server-${g}-%A-%t.log") + if deployment_env_var_names: + s += f"--container-env {','.join(sorted(deployment_env_var_names))} " + s += "bash -c '{}' &\n".format(script) + s += " SERVER_PIDS+=($!)\n" + s += "done\n\n" + + s += 'echo "HEAD_NODE_IPS: ${HEAD_NODE_IPS[@]}"\n' + s += "SERVER_PID=${SERVER_PIDS[0]} # reference to first instance PID for health check\n\n" return s, is_potentially_unsafe, debug_comment @@ -1758,7 +1810,7 @@ def _get_wait_for_server_handler( """Generate wait for server handler that takes a list of IPs.""" pid_check = "" if check_pid: - pid_check = 'kill -0 "$SERVER_PID" 2>/dev/null || { echo "Server process $SERVER_PID died"; exit 1; }' + pid_check = 'for _check_pid in "${SERVER_PIDS[@]}"; do kill -0 "$_check_pid" 2>/dev/null || { echo "Server process $_check_pid died"; exit 1; }; done' handler = f"""date # wait for the {service_name} to initialize diff --git a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py index 08bfcb8ce..d83445cf0 100644 --- a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py +++ b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py @@ -57,6 +57,7 @@ def base_config(self): "account": "test-account", "partition": "test-partition", "num_nodes": 1, + "num_instances": 1, "ntasks_per_node": 1, "subproject": "test-subproject", }, @@ -504,12 +505,19 @@ def test_complex_configuration_integration( assert "--no-container-mount-home" in script @pytest.mark.parametrize( - "num_nodes,n_tasks,expected_ntasks,should_have_proxy", + "num_nodes,n_tasks_per_instance,num_instances,expected_nodes_per_instance,expected_ntasks_per_instance,should_have_proxy", [ - (1, 1, 1, False), # Single instance, no proxy - (4, 4, 4, True), # Multi-instance with matching n_tasks, needs proxy - (2, 1, 1, False), # Multiple nodes but single task, no proxy - (3, 3, 3, True), # Multi-instance with 3 nodes, needs proxy + (1, 1, 1, 1, 1, False), # Single node, single instance + ( + 4, + 1, + 4, + 1, + 1, + True, + ), # 4 instances of 1 node each, 1 task each, needs proxy + (2, 1, 1, 2, 1, False), # 2 nodes, 1 task, single instance + (3, 1, 3, 1, 1, True), # 3 instances of 1 node each, needs proxy ], ) def test_deployment_n_tasks_and_proxy_setup( @@ -518,15 +526,21 @@ def test_deployment_n_tasks_and_proxy_setup( mock_task, mock_dependencies, num_nodes, - n_tasks, - expected_ntasks, + n_tasks_per_instance, + num_instances, + expected_nodes_per_instance, + expected_ntasks_per_instance, should_have_proxy, ): - """Test deployment.n_tasks with various configurations and proxy setup.""" - base_config["execution"]["deployment"] = {"n_tasks": n_tasks} + """Test deployment.n_tasks (per instance) with various configurations and proxy setup. + + The executor launches one srun per instance on its dedicated node subset. + --nodes and --ntasks in each srun are per-instance values. + n_tasks is tasks per instance (default 1). + """ + base_config["execution"]["deployment"] = {"n_tasks": n_tasks_per_instance} base_config["execution"]["num_nodes"] = num_nodes - # Set multiple_instances to trigger proxy setup when needed - base_config["deployment"]["multiple_instances"] = should_have_proxy + base_config["execution"]["num_instances"] = num_instances cfg = OmegaConf.create(base_config) @@ -539,8 +553,11 @@ def test_deployment_n_tasks_and_proxy_setup( job_id="test123.0", ).cmd - # Check that deployment srun uses correct --ntasks value - assert f"--nodes {num_nodes} --ntasks {expected_ntasks}" in script + # Each per-instance srun uses per-instance --nodes/--ntasks + assert ( + f"--nodes {expected_nodes_per_instance} --ntasks {expected_ntasks_per_instance}" + in script + ) # Check proxy setup based on multi-instance or not if should_have_proxy: @@ -551,8 +568,8 @@ def test_deployment_n_tasks_and_proxy_setup( def test_deployment_n_tasks_default_value( self, base_config, mock_task, mock_dependencies ): - """Test deployment.n_tasks defaults to 1 when not specified.""" - # Don't set deployment.n_tasks - should default to 1 + """Test deployment.n_tasks defaults to nodes_per_instance when not specified.""" + # Don't set deployment.n_tasks — code falls back to nodes_per_instance base_config["execution"]["num_nodes"] = 2 cfg = OmegaConf.create(base_config) @@ -566,10 +583,10 @@ def test_deployment_n_tasks_default_value( job_id="test123.0", ).cmd - # Check that deployment srun defaults to --ntasks 1 - assert "--nodes 2 --ntasks 1" in script + # num_nodes=2, num_instances=1 → nodes_per_instance=2 → --nodes 2 --ntasks 2 + assert "--nodes 2 --ntasks 2" in script - # Check that no proxy is set up (since n_tasks=1, even though num_nodes=2) + # Single instance means no proxy assert "proxy" not in script.lower() @pytest.mark.parametrize( @@ -637,6 +654,7 @@ def base_config(self): "account": "test-account", "partition": "test-partition", "num_nodes": 1, + "num_instances": 1, "ntasks_per_node": 1, "subproject": "test-subproject", }, @@ -879,6 +897,7 @@ def test_generate_deployment_srun_command( }, "execution": { "num_nodes": num_nodes, + "num_instances": 1, "deployment": {"n_tasks": n_tasks}, "mounts": {"mount_home": mount_home}, }, @@ -1012,6 +1031,7 @@ def sample_config(self, tmpdir): "account": "test-account", "partition": "gpu", "num_nodes": 1, + "num_instances": 1, "ntasks_per_node": 8, "gpus_per_node": 8, "subproject": "eval", @@ -1674,6 +1694,7 @@ def sample_config(self, tmpdir): "account": "test-account", "partition": "gpu", "num_nodes": 1, + "num_instances": 1, "ntasks_per_node": 8, "gpus_per_node": 8, "subproject": "eval", @@ -2622,3 +2643,708 @@ def test_kill_job_kill_command_failed(sel, mock_execdb, monkeypatch): with pytest.raises(RuntimeError, match="Could not find or kill job"): SlurmExecutor.kill_job("fail123.0") + + +class TestMultiNodeMultiInstance: + """Tests for multi-node / multi-instance refactoring. + + Covers: + - Topology validation (num_nodes divisible by num_instances) + - Deprecated deployment.multiple_instances removal + warning + - ALL_NODE_IPS and HEAD_NODE_IPS generation in srun command + - Health check IP selection (HEAD_NODE_IPS vs localhost) + - HAProxy placeholder backend generation + - Deployment command always wrapped as base64 script file + - Pre-cmd + deployment command combined wrapping + - Proxy setup triggered by num_instances > 1 + - get_endpoint_url uses HAProxy port when num_instances > 1 + """ + + @pytest.fixture + def base_config(self): + """Base configuration for multi-node tests.""" + return { + "deployment": { + "type": "vllm", + "image": "test-image:latest", + "command": "vllm serve /model --port 8000", + "served_model_name": "test-model", + "port": 8000, + "endpoints": { + "health": "/health", + }, + }, + "execution": { + "type": "slurm", + "output_dir": "/test/output", + "walltime": "01:00:00", + "account": "test-account", + "partition": "test-partition", + "num_nodes": 1, + "num_instances": 1, + "ntasks_per_node": 1, + "subproject": "test-subproject", + }, + "evaluation": {"env_vars": {}}, + "target": {"api_endpoint": {"url": "http://localhost:8000/v1"}}, + } + + @pytest.fixture + def mock_task(self): + return OmegaConf.create({"name": "test_task"}) + + @pytest.fixture + def mock_dependencies(self): + """Mock external dependencies used by _create_slurm_sbatch_script.""" + with ( + patch( + "nemo_evaluator_launcher.executors.slurm.executor.load_tasks_mapping" + ) as mock_load_tasks, + patch( + "nemo_evaluator_launcher.executors.slurm.executor.get_task_definition_for_job" + ) as mock_get_task_def, + patch( + "nemo_evaluator_launcher.common.helpers.get_eval_factory_command" + ) as mock_get_eval_command, + patch( + "nemo_evaluator_launcher.common.helpers.get_served_model_name" + ) as mock_get_model_name, + ): + mock_load_tasks.return_value = {} + mock_get_task_def.return_value = { + "container": "test-eval-container:latest", + "required_env_vars": [], + "endpoint_type": "openai", + "task": "test_task", + } + from nemo_evaluator_launcher.common.helpers import CmdAndReadableComment + + mock_get_eval_command.return_value = CmdAndReadableComment( + cmd="nemo-evaluator run_eval --test", debug="# Test command" + ) + mock_get_model_name.return_value = "test-model" + + yield { + "load_tasks_mapping": mock_load_tasks, + "get_task_definition_for_job": mock_get_task_def, + "get_eval_factory_command": mock_get_eval_command, + "get_served_model_name": mock_get_model_name, + } + + # ── Topology validation ────────────────────────────────────────────── + + def test_num_nodes_not_divisible_by_num_instances_raises( + self, base_config, mock_task, mock_dependencies + ): + """num_nodes must be evenly divisible by num_instances.""" + base_config["execution"]["num_nodes"] = 5 + base_config["execution"]["num_instances"] = 2 + cfg = OmegaConf.create(base_config) + + with pytest.raises(ValueError, match="must be divisible"): + _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ) + + @pytest.mark.parametrize( + "num_nodes,num_instances", + [(4, 2), (6, 3), (8, 1), (1, 1), (4, 4)], + ) + def test_valid_topology_accepted( + self, base_config, mock_task, mock_dependencies, num_nodes, num_instances + ): + """Valid num_nodes / num_instances combos should not raise.""" + base_config["execution"]["num_nodes"] = num_nodes + base_config["execution"]["num_instances"] = num_instances + cfg = OmegaConf.create(base_config) + + # Should not raise + result = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ) + assert result.cmd # non-empty script + + # ── Deprecated multiple_instances handling ──────────────────────────── + + def test_deprecated_multiple_instances_removed_with_warning( + self, base_config, mock_task, mock_dependencies + ): + """deployment.multiple_instances should be deleted and a warning logged.""" + base_config["deployment"]["multiple_instances"] = True + base_config["execution"]["num_instances"] = 2 + base_config["execution"]["num_nodes"] = 2 + cfg = OmegaConf.create(base_config) + + with patch( + "nemo_evaluator_launcher.executors.slurm.executor.logger" + ) as mock_logger: + _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ) + + # Warning should have been logged + mock_logger.warning.assert_called_once() + assert "multiple_instances" in mock_logger.warning.call_args[0][0] + + # Field should be removed from config + assert cfg.deployment.get("multiple_instances") is None + + def test_multiple_instances_false_still_removed( + self, base_config, mock_task, mock_dependencies + ): + """Even multiple_instances=False should trigger deprecation removal.""" + base_config["deployment"]["multiple_instances"] = False + cfg = OmegaConf.create(base_config) + + with patch("nemo_evaluator_launcher.executors.slurm.executor.logger"): + _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ) + + assert cfg.deployment.get("multiple_instances") is None + + # ── ALL_NODE_IPS and HEAD_NODE_IPS in srun command ─────────────────── + + def test_all_node_ips_exported(self, base_config, mock_task, mock_dependencies): + """ALL_NODE_IPS should always be exported and passed to the container.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert 'export ALL_NODE_IPS=$(IFS=,; echo "${NODES_IPS_ARRAY[*]}")' in script + assert "ALL_NODE_IPS" in script + + def test_head_node_ips_single_instance( + self, base_config, mock_task, mock_dependencies + ): + """With 1 instance, HEAD_NODE_IPS loop iterates once (g=0).""" + base_config["execution"]["num_nodes"] = 2 + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "HEAD_NODE_IPS=()" in script + assert "for ((g=0; g<1; g++))" in script + # npi = 2 // 1 = 2, head at index g*2 + assert "g * 2" in script + + @pytest.mark.parametrize( + "num_nodes,num_instances,expected_loop_count,expected_stride", + [ + (4, 2, 2, 2), # 2 instances of 2 nodes each + (6, 3, 3, 2), # 3 instances of 2 nodes each + (4, 4, 4, 1), # 4 instances of 1 node each + (8, 2, 2, 4), # 2 instances of 4 nodes each + ], + ) + def test_head_node_ips_multi_instance( + self, + base_config, + mock_task, + mock_dependencies, + num_nodes, + num_instances, + expected_loop_count, + expected_stride, + ): + """HEAD_NODE_IPS loop should iterate num_instances times with correct stride.""" + base_config["execution"]["num_nodes"] = num_nodes + base_config["execution"]["num_instances"] = num_instances + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert f"for ((g=0; g<{expected_loop_count}; g++))" in script + assert f"g * {expected_stride}" in script + + def test_all_node_ips_in_container_env( + self, base_config, mock_task, mock_dependencies + ): + """ALL_NODE_IPS must appear in the --container-env list.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + # Find the --container-env line and check ALL_NODE_IPS is listed + match = re.search(r"--container-env\s+(\S+)", script) + assert match, "--container-env not found in script" + env_vars = match.group(1) + assert "ALL_NODE_IPS" in env_vars + assert "MASTER_IP" in env_vars + + # ── Health check IP selection ──────────────────────────────────────── + + def test_health_check_uses_localhost_for_single_instance( + self, base_config, mock_task, mock_dependencies + ): + """Single instance should health-check on 127.0.0.1.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + # The wait-for-server handler should use 127.0.0.1 + assert '"127.0.0.1"' in script + + def test_health_check_uses_head_node_ips_for_multi_instance( + self, base_config, mock_task, mock_dependencies + ): + """Multi-instance should health-check on HEAD_NODE_IPS.""" + base_config["execution"]["num_nodes"] = 4 + base_config["execution"]["num_instances"] = 4 + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert '"${HEAD_NODE_IPS[@]}"' in script + + # ── HAProxy placeholder generation ─────────────────────────────────── + + @pytest.mark.parametrize( + "num_nodes,num_instances,expected_ips", + [ + (4, 2, ["{IP_0}", "{IP_2}"]), # heads at node 0 and 2 + (6, 3, ["{IP_0}", "{IP_2}", "{IP_4}"]), # heads at 0, 2, 4 + (4, 4, ["{IP_0}", "{IP_1}", "{IP_2}", "{IP_3}"]), # 1 node per instance + (8, 2, ["{IP_0}", "{IP_4}"]), # heads at 0, 4 + ], + ) + def test_haproxy_placeholder_backends(self, num_nodes, num_instances, expected_ips): + """HAProxy config should have one backend per instance head node.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _generate_haproxy_config_with_placeholders, + ) + + config = { + "deployment": { + "port": 8000, + "endpoints": {"health": "/health"}, + }, + "execution": { + "num_nodes": num_nodes, + "num_instances": num_instances, + "proxy": { + "config": { + "haproxy_port": 5009, + "health_check_path": "/health", + "health_check_status": 200, + }, + }, + }, + } + cfg = OmegaConf.create(config) + haproxy_config = _generate_haproxy_config_with_placeholders(cfg) + + for ip_placeholder in expected_ips: + assert ip_placeholder in haproxy_config, ( + f"{ip_placeholder} not found in HAProxy config" + ) + + # Verify no extra backends beyond expected + import re as _re + + backend_ips = _re.findall(r"\{IP_\d+\}", haproxy_config) + assert len(backend_ips) == len(expected_ips) + + # ── Proxy setup triggered by num_instances > 1 ─────────────────────── + + def test_proxy_setup_when_multi_instance( + self, base_config, mock_task, mock_dependencies + ): + """num_instances > 1 should trigger proxy srun in the sbatch script.""" + base_config["execution"]["num_nodes"] = 2 + base_config["execution"]["num_instances"] = 2 + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "proxy" in script.lower() + assert "PROXY_PID" in script + + def test_no_proxy_when_single_instance( + self, base_config, mock_task, mock_dependencies + ): + """num_instances == 1 should NOT set up proxy.""" + base_config["execution"]["num_nodes"] = 2 + base_config["execution"]["num_instances"] = 1 + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "proxy" not in script.lower() + assert "PROXY_PID" not in script + + def test_proxy_pid_killed_on_shutdown( + self, base_config, mock_task, mock_dependencies + ): + """Multi-instance script should kill all SERVER_PIDS and PROXY_PID.""" + base_config["execution"]["num_nodes"] = 2 + base_config["execution"]["num_instances"] = 2 + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert 'for _pid in "${SERVER_PIDS[@]}"' in script + assert "kill $PROXY_PID" in script + + # ── Deployment command wrapping (base64 script file) ───────────────── + + def test_deployment_command_written_as_script_file( + self, base_config, mock_task, mock_dependencies + ): + """Deployment command should always be base64-encoded into deployment_cmd.sh.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "base64 -d > deployment_cmd.sh" in script + assert "bash deployment_cmd.sh" in script + + def test_deployment_command_base64_encodes_correctly(self): + """The base64 encoding should round-trip to the original command.""" + import base64 + + cmd = "vllm serve /model --port 8000 --tp 8" + encoded = base64.b64encode(cmd.encode("utf-8")).decode("utf-8") + decoded = base64.b64decode(encoded).decode("utf-8") + assert decoded == cmd + + def test_multiline_command_wrapped(self, base_config, mock_task, mock_dependencies): + """Multi-line deployment commands should be encoded into script file.""" + base_config["deployment"]["command"] = ( + "#!/bin/bash\nset -e\nray start --head\nvllm serve /model" + ) + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "base64 -d > deployment_cmd.sh" in script + assert "bash deployment_cmd.sh" in script + # Multi-line command should appear in debug comment + assert "# ray start --head" in script or "deployment_cmd.sh" in script + + def test_command_wrapped_in_bash_c(self, base_config, mock_task, mock_dependencies): + """Deployment srun should use bash -c wrapper running asynchronously.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + # Should have bash -c '...' & + assert re.search(r"bash -c '.*deployment_cmd\.sh.*' &", script) + + # ── Pre-cmd + deployment command combined ──────────────────────────── + + def test_pre_cmd_and_deploy_cmd_both_as_scripts( + self, base_config, mock_task, mock_dependencies + ): + """When pre_cmd is set, both pre_cmd and command should be script files.""" + base_config["deployment"]["pre_cmd"] = "export MY_VAR=1\necho setup done" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "base64 -d > deployment_pre_cmd.sh" in script + assert "source deployment_pre_cmd.sh" in script + assert "base64 -d > deployment_cmd.sh" in script + assert "bash deployment_cmd.sh" in script + + def test_no_pre_cmd_skips_pre_script( + self, base_config, mock_task, mock_dependencies + ): + """Without pre_cmd, only deployment_cmd.sh should be generated.""" + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert "deployment_pre_cmd.sh" not in script + assert "base64 -d > deployment_cmd.sh" in script + + # ── Srun --nodes uses num_nodes ────────────────────────────────────── + + @pytest.mark.parametrize("num_nodes", [1, 2, 4, 8]) + def test_srun_nodes_equals_num_nodes( + self, base_config, mock_task, mock_dependencies, num_nodes + ): + """srun --nodes should always equal execution.num_nodes.""" + base_config["execution"]["num_nodes"] = num_nodes + cfg = OmegaConf.create(base_config) + script = _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + ).cmd + + assert f"--nodes {num_nodes}" in script + + # ── _generate_deployment_srun_command directly ─────────────────────── + + def test_srun_command_all_node_ips_and_head_node_ips(self): + """Directly test that _generate_deployment_srun_command emits IP arrays.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _generate_deployment_srun_command, + ) + + config = { + "deployment": { + "type": "vllm", + "image": "test-image:latest", + "command": "vllm serve /model", + }, + "execution": { + "num_nodes": 4, + "num_instances": 2, + "deployment": {"n_tasks": 4}, + "mounts": {"mount_home": True}, + }, + } + cfg = OmegaConf.create(config) + + command, _, _ = _generate_deployment_srun_command( + cfg=cfg, + deployment_mounts_list=[], + remote_task_subdir=Path("/test/remote"), + ) + + # ALL_NODE_IPS exported + assert "export ALL_NODE_IPS=" in command + # HEAD_NODE_IPS loop: 2 instances, stride=2 + assert "for ((g=0; g<2; g++))" in command + assert "g * 2" in command + # Container env includes both + assert "ALL_NODE_IPS" in command + assert "MASTER_IP" in command + + def test_srun_command_base64_script_wrapping(self): + """Directly test that deployment command is base64-encoded.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _generate_deployment_srun_command, + ) + + config = { + "deployment": { + "type": "vllm", + "image": "test-image:latest", + "command": "#!/bin/bash\nset -e\nvllm serve /model", + }, + "execution": { + "num_nodes": 1, + "num_instances": 1, + "deployment": {"n_tasks": 1}, + "mounts": {"mount_home": True}, + }, + } + cfg = OmegaConf.create(config) + + command, _, _ = _generate_deployment_srun_command( + cfg=cfg, + deployment_mounts_list=[], + remote_task_subdir=Path("/test/remote"), + ) + + assert "base64 -d > deployment_cmd.sh" in command + assert "bash deployment_cmd.sh" in command + assert "bash -c '" in command + + def test_srun_command_with_pre_cmd(self): + """Test that pre_cmd generates deployment_pre_cmd.sh and is sourced.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _generate_deployment_srun_command, + ) + + config = { + "deployment": { + "type": "vllm", + "image": "test-image:latest", + "command": "vllm serve /model", + "pre_cmd": "export MY_VAR=hello\necho pre-setup", + }, + "execution": { + "num_nodes": 1, + "num_instances": 1, + "deployment": {"n_tasks": 1}, + "mounts": {"mount_home": True}, + }, + } + cfg = OmegaConf.create(config) + + command, _, _ = _generate_deployment_srun_command( + cfg=cfg, + deployment_mounts_list=[], + remote_task_subdir=Path("/test/remote"), + ) + + assert "base64 -d > deployment_pre_cmd.sh" in command + assert "source deployment_pre_cmd.sh" in command + assert "base64 -d > deployment_cmd.sh" in command + assert "bash deployment_cmd.sh" in command + + # ── get_endpoint_url with num_instances ────────────────────────────── + + def test_get_endpoint_url_single_instance_uses_deployment_port(self): + """Single instance should use deployment port for endpoint URL.""" + from nemo_evaluator_launcher.common.helpers import get_endpoint_url + + config = { + "deployment": { + "type": "vllm", + "port": 8000, + "endpoints": {"openai": "/v1/chat/completions"}, + }, + "execution": { + "type": "slurm", + "num_instances": 1, + }, + "target": {"api_endpoint": {}}, + } + cfg = OmegaConf.create(config) + url = get_endpoint_url(cfg, {}, "openai") + assert url == "http://127.0.0.1:8000/v1/chat/completions" + + def test_get_endpoint_url_multi_instance_uses_haproxy_port(self): + """Multi-instance should use HAProxy port for endpoint URL.""" + from nemo_evaluator_launcher.common.helpers import get_endpoint_url + + config = { + "deployment": { + "type": "vllm", + "port": 8000, + "endpoints": {"openai": "/v1/chat/completions"}, + }, + "execution": { + "type": "slurm", + "num_instances": 2, + "proxy": { + "config": {"haproxy_port": 5009}, + }, + }, + "target": {"api_endpoint": {}}, + } + cfg = OmegaConf.create(config) + url = get_endpoint_url(cfg, {}, "openai") + assert url == "http://127.0.0.1:5009/v1/chat/completions" + + def test_get_endpoint_url_multi_instance_default_haproxy_port(self): + """Multi-instance without explicit proxy config should default to port 5009.""" + from nemo_evaluator_launcher.common.helpers import get_endpoint_url + + config = { + "deployment": { + "type": "vllm", + "port": 8000, + "endpoints": {"openai": "/v1/chat/completions"}, + }, + "execution": { + "type": "slurm", + "num_instances": 3, + }, + "target": {"api_endpoint": {}}, + } + cfg = OmegaConf.create(config) + url = get_endpoint_url(cfg, {}, "openai") + assert url == "http://127.0.0.1:5009/v1/chat/completions" diff --git a/packages/nemo-evaluator-launcher/tests/unit_tests/test_telemetry.py b/packages/nemo-evaluator-launcher/tests/unit_tests/test_telemetry.py index f73d8e7e4..09c5f801d 100644 --- a/packages/nemo-evaluator-launcher/tests/unit_tests/test_telemetry.py +++ b/packages/nemo-evaluator-launcher/tests/unit_tests/test_telemetry.py @@ -549,6 +549,7 @@ def slurm_base_config(self): "account": "test-account", "partition": "test-partition", "num_nodes": 1, + "num_instances": 1, "ntasks_per_node": 1, "subproject": "test-subproject", "env_vars": {},