diff --git a/nemo_run/run/ray/templates/ray.sub.j2 b/nemo_run/run/ray/templates/ray.sub.j2 index bdc917b7..615e7beb 100644 --- a/nemo_run/run/ray/templates/ray.sub.j2 +++ b/nemo_run/run/ray/templates/ray.sub.j2 @@ -28,17 +28,36 @@ DASHBOARD_AGENT_GRPC_PORT=${DASHBOARD_AGENT_GRPC_PORT:-53007} METRICS_EXPORT_PORT=${METRICS_EXPORT_PORT:-53009} # Ports for the head node -PORT=${PORT:-6379} +PORT=${PORT:-54514} RAY_CLIENT_SERVER_PORT=${RAY_CLIENT_SERVER_PORT:-10001} #REDIT_SHARD_PORTS=${REDIT_SHARD_PORTS:-"random"} ?? DASHBOARD_PORT=${DASHBOARD_PORT:-8265} # Also used by debugger DASHBOARD_AGENT_LISTEN_PORT=${DASHBOARD_AGENT_LISTEN_PORT:-52365} +RAY_DEBUGGER_ARGS= +if [ "${RAY_DEBUG:-}" = "legacy" ]; then + RAY_DEBUGGER_ARGS="--ray-debugger-external" +fi + +# After ray>=2.47, this feature is enabled by default which creates uv venvs for any py_executable starting with `uv run`. +# There is severe contention and performance issues with this enabled considering our dependencies are so large and occasionally +# need to be compiled, so NeMo RL has an implementation in nemo_rl/utils/venv.py that does it once per node as opposed to once per task. +export RAY_ENABLE_UV_RUN_RUNTIME_ENV=0 + +# Setting ulimit is recommended by ray best practices page +# @ https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html +# It's session based and won't affect the system outside the script +# Ensure that the soft limit isn't above the hard limit +if [[ $(ulimit -Hn) == "unlimited" ]] || [[ 65535 -lt $(ulimit -Hn) ]]; then + ulimit -Sn 65535 +elif [[ $(ulimit -Hn) != "unlimited" ]] && [[ $(ulimit -Hn) -lt 65535 ]]; then + echo "[WARNING]: Cannot increase ulimit on file descriptors to 65535 according ray recommendation: https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html. Speak to cluster admins to increase, otherwise ray may crash unexpectedly." +fi # On our clusters, the largest port range on an idle worker appeared between 52369-64607 # (not including the other ports set by this script). So this range is chosen to be # somewhere in the middle MIN_WORKER_PORT=${MIN_WORKER_PORT:-54001} -MAX_WORKER_PORT=${MAX_WORKER_PORT:-54257} +MAX_WORKER_PORT=${MAX_WORKER_PORT:-54513} # Ray temp directory (inside container). Used by --temp-dir and log sync sidecar RAY_TEMP_DIR=${RAY_TEMP_DIR:-/ray-cluster} @@ -82,13 +101,66 @@ gpus_per_node=8 num_retries={{ num_retries }} +# Track backgrounded srun client PIDs for head and workers +declare -A SRUN_PIDS + +# Verify all backgrounded srun client processes are still alive; exit fast if any died +check_srun_processes() { + for name in "${!SRUN_PIDS[@]}"; do + pid="${SRUN_PIDS[$name]}" + # Check if the process is still running + if ! kill -0 "$pid" 2>/dev/null; then + echo "[ERROR] Background srun '$name' died (pid=$pid). Could be a failure in startup or an issue with the node preventing the srun to start. Attempting to exit." >&2 + # Signal sidecars inside containers to terminate ASAP + touch "$LOG_DIR/ENDED" + exit 1 + fi + done +} + # Getting the node names and IP addresses in the SLURM allocation nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") nodes_array=($nodes) ip_addresses_array=() for node in $nodes; do - ip_address=$(getent hosts "$node" | awk '{print $1}' | head -n1) + # Try multiple methods to get IP address - ENHANCED VERSION v2.0 + echo "[DEBUG] Resolving hostname: $node using enhanced resolution methods" + ip_address="" + + # Method 1: Try host command + echo "[DEBUG] Method 1: host command" + ip_address=$(host $node 2>/dev/null | awk '/has address/ { print $4 }' | head -1 || true) + echo "[DEBUG] host result: '$ip_address'" + + # Method 2: If host fails, try getent + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 2: getent hosts" + ip_address=$(getent hosts $node 2>/dev/null | awk '{ print $1 }' | head -1 || true) + echo "[DEBUG] getent result: '$ip_address'" + fi + + # Method 3: If getent fails, try nslookup + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 3: nslookup" + ip_address=$(nslookup $node 2>/dev/null | awk '/^Address: / { print $2 }' | head -1 || true) + echo "[DEBUG] nslookup result: '$ip_address'" + fi + + # Method 4: If all DNS methods fail, try ping to extract IP + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 4: ping" + ip_address=$(ping -c 1 $node 2>/dev/null | grep "PING" | sed 's/.*(\([^)]*\)).*/\1/' || true) + echo "[DEBUG] ping result: '$ip_address'" + fi + + # If still no IP, use the hostname itself (might work if it's already an IP or resolvable) + if [[ -z "$ip_address" ]]; then + echo "[WARNING] Could not resolve IP for $node, using hostname as fallback" + ip_address=$node + fi + + echo "[INFO] Node: $node -> IP: $ip_address" # Add the IP address to the array ip_addresses_array+=("$ip_address") done @@ -184,12 +256,13 @@ ray start --head \ --ray-client-server-port=${RAY_CLIENT_SERVER_PORT} \ --dashboard-port=${DASHBOARD_PORT} \ \ - --node-manager-port=${NODE_MANAGER_PORT} \ - --object-manager-port=${OBJECT_MANAGER_PORT} \ - --runtime-env-agent-port=${RUNTIME_ENV_AGENT_PORT} \ - --dashboard-agent-grpc-port=${DASHBOARD_AGENT_GRPC_PORT} \ - --dashboard-agent-listen-port=${DASHBOARD_AGENT_LISTEN_PORT} \ - --metrics-export-port=${METRICS_EXPORT_PORT} \ + --node-manager-port=$((${NODE_MANAGER_PORT} + 1)) \ + --object-manager-port=$((${OBJECT_MANAGER_PORT} + 1)) \ + --runtime-env-agent-port=$((${RUNTIME_ENV_AGENT_PORT} + 1)) \ + --dashboard-agent-grpc-port=$((${DASHBOARD_AGENT_GRPC_PORT} + 1)) \ + --dashboard-agent-listen-port=$((${DASHBOARD_AGENT_LISTEN_PORT} + 1)) \ + --metrics-export-port=$((${METRICS_EXPORT_PORT} + 1)) \ + $RAY_DEBUGGER_ARGS \ \ --block EOFINNER @@ -207,6 +280,7 @@ exit 1 EOF ) srun {{ common_srun_args }} --container-name=ray-head --nodes=1 --ntasks=1 -w "$head_node" -o $LOG_DIR/{{ ray_log_prefix }}head.log bash -x -c "$head_cmd" & +SRUN_PIDS["ray-head"]=$! # Wait for the head node container to start and for Ray to be ready elapsed_time=0 @@ -217,6 +291,7 @@ while ! (srun --overlap --nodes=1 --ntasks=1 -w $head_node test -f $LOG_DIR/STAR exit 1 fi echo "[INFO][$(date)] Waiting for Ray head node container to start and be ready... ($elapsed_time/$RAY_HEAD_START_TIMEOUT seconds)" + check_srun_processes sleep 2 elapsed_time=$((elapsed_time + 2)) done @@ -261,7 +336,6 @@ monitor-sidecar & sed -i 's/context\.py_executable = " "\.join(self\.nsight_cmd) + " python"/context.py_executable = " ".join(self.nsight_cmd) + f" {context.py_executable}"/g' /opt/nemo_rl_venv/lib64/python*/site-packages/ray/_private/runtime_env/nsight.py cat <=2.47, this feature is enabled by default which creates uv venvs for any py_executable starting with `uv run`. +# There is severe contention and performance issues with this enabled considering our dependencies are so large and occasionally +# need to be compiled, so NeMo RL has an implementation in nemo_rl/utils/venv.py that does it once per node as opposed to once per task. +export RAY_ENABLE_UV_RUN_RUNTIME_ENV=0 + +# Setting ulimit is recommended by ray best practices page +# @ https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html +# It's session based and won't affect the system outside the script +# Ensure that the soft limit isn't above the hard limit +if [[ $(ulimit -Hn) == "unlimited" ]] || [[ 65535 -lt $(ulimit -Hn) ]]; then + ulimit -Sn 65535 +elif [[ $(ulimit -Hn) != "unlimited" ]] && [[ $(ulimit -Hn) -lt 65535 ]]; then + echo "[WARNING]: Cannot increase ulimit on file descriptors to 65535 according ray recommendation: https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html. Speak to cluster admins to increase, otherwise ray may crash unexpectedly." +fi + +# On our clusters, the largest port range on an idle worker appeared between 52369-64607 +# (not including the other ports set by this script). So this range is chosen to be +# somewhere in the middle +MIN_WORKER_PORT=${MIN_WORKER_PORT:-54001} +MAX_WORKER_PORT=${MAX_WORKER_PORT:-54513} + +# Ray temp directory (inside container). Used by --temp-dir and log sync sidecar +RAY_TEMP_DIR=${RAY_TEMP_DIR:-/ray-cluster} + +# Number seconds to sync logs from /tmp/ray/session_*/logs to $LOG_DIR/ray/ +RAY_LOG_SYNC_FREQUENCY=${RAY_LOG_SYNC_FREQUENCY:-} + +# Timeout in seconds for Ray head node to start (default 10 minutes) +RAY_HEAD_START_TIMEOUT=${RAY_HEAD_START_TIMEOUT:-600} + +# Directory setup +export CLUSTER_DIR={{ cluster_dir }} +mkdir -p $CLUSTER_DIR + +JOB_IDS_FILE="$CLUSTER_DIR/job_ids.json" +if [[ -f "$JOB_IDS_FILE" ]]; then + tmp="$(mktemp)" + jq --arg id "$SLURM_JOB_ID" '. + [$id]' "$JOB_IDS_FILE" > "$tmp" && mv "$tmp" "$JOB_IDS_FILE" +else + touch "$JOB_IDS_FILE" + echo "[\"$SLURM_JOB_ID\"]" > "$JOB_IDS_FILE" +fi + +mkdir -p $CLUSTER_DIR/scripts + +export LOG_DIR={{ log_dir }} +mkdir -p $LOG_DIR + +# Clean up any previous run files +rm -f $LOG_DIR/STARTED_RAY_HEAD +rm -f $LOG_DIR/ENDED + +# Defaults to placing uv cache inside the CLUSTER_DIR +# This directory is mounted into the container at /home/ray/.cache/uv so it is shared between the head and worker nodes +# UV_CACHE_DIR={{ uv_cache_dir }} +# mkdir -p $UV_CACHE_DIR +######################################################## + +# Number of GPUs per node +gpus_per_node=8 + +num_retries={{ num_retries }} + +# Track backgrounded srun client PIDs for head and workers +declare -A SRUN_PIDS + +# Verify all backgrounded srun client processes are still alive; exit fast if any died +check_srun_processes() { + for name in "${!SRUN_PIDS[@]}"; do + pid="${SRUN_PIDS[$name]}" + # Check if the process is still running + if ! kill -0 "$pid" 2>/dev/null; then + echo "[ERROR] Background srun '$name' died (pid=$pid). Could be a failure in startup or an issue with the node preventing the srun to start. Attempting to exit." >&2 + # Signal sidecars inside containers to terminate ASAP + touch "$LOG_DIR/ENDED" + exit 1 + fi + done +} + +# Getting the node names and IP addresses in the SLURM allocation +nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") +nodes_array=($nodes) +ip_addresses_array=() + +for node in $nodes; do + # Try multiple methods to get IP address - ENHANCED VERSION v2.0 + echo "[DEBUG] Resolving hostname: $node using enhanced resolution methods" + ip_address="" + + # Method 1: Try host command + echo "[DEBUG] Method 1: host command" + ip_address=$(host $node 2>/dev/null | awk '/has address/ { print $4 }' | head -1 || true) + echo "[DEBUG] host result: '$ip_address'" + + # Method 2: If host fails, try getent + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 2: getent hosts" + ip_address=$(getent hosts $node 2>/dev/null | awk '{ print $1 }' | head -1 || true) + echo "[DEBUG] getent result: '$ip_address'" + fi + + # Method 3: If getent fails, try nslookup + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 3: nslookup" + ip_address=$(nslookup $node 2>/dev/null | awk '/^Address: / { print $2 }' | head -1 || true) + echo "[DEBUG] nslookup result: '$ip_address'" + fi + + # Method 4: If all DNS methods fail, try ping to extract IP + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 4: ping" + ip_address=$(ping -c 1 $node 2>/dev/null | grep "PING" | sed 's/.*(\([^)]*\)).*/\1/' || true) + echo "[DEBUG] ping result: '$ip_address'" + fi + + # If still no IP, use the hostname itself (might work if it's already an IP or resolvable) + if [[ -z "$ip_address" ]]; then + echo "[WARNING] Could not resolve IP for $node, using hostname as fallback" + ip_address=$node + fi + + echo "[INFO] Node: $node -> IP: $ip_address" + # Add the IP address to the array + ip_addresses_array+=("$ip_address") +done + +head_node=${nodes_array[0]} +head_node_ip=${ip_addresses_array[0]} + +ip_head=$head_node_ip:$PORT + +{%- if setup_lines %} +{{setup_lines}} +{%- endif %} + +######################################################## +# Ray cluster setup +######################################################## +# First we start the head of the ray cluster on one of the physical nodes +# Set GPU/CPU resources to 0 to avoid scheduling on the head node + +head_cmd=$(cat < /dev/null 2>&1; then + for session_dir in ${RAY_TEMP_DIR}/session_[0-9]*/; do + if [[ -d "\$session_dir/logs" ]]; then + session_name=\$(basename "\$session_dir") + mkdir -p "$LOG_DIR/ray/\$session_name" + if command -v rsync > /dev/null 2>&1; then + rsync -ahP "\$session_dir/logs/" "$LOG_DIR/ray/\$session_name/logs/" 2>/dev/null || true + else + cp -r "\$session_dir/logs" "$LOG_DIR/ray/\$session_name/" + fi + fi + done + fi + if [[ -f "$LOG_DIR/ENDED" ]]; then + echo "Log sync sidecar terminating..." + break + fi + done +} +log-sync-sidecar & + +# Patch nsight.py before starting Ray head +sed -i 's/context\.py_executable = " "\.join(self\.nsight_cmd) + " python"/context.py_executable = " ".join(self.nsight_cmd) + f" {context.py_executable}"/g' /opt/nemo_rl_venv/lib64/python*/site-packages/ray/_private/runtime_env/nsight.py + +cat <1 && \$2 ~ /^[0-9]+\$/ {print \$2; exit}'" +} + + +# Wait for the head node container to start and for Ray to be ready +elapsed_time=0 +while true; do + if srun --overlap --nodes=1 --ntasks=1 -w $head_node test -f $LOG_DIR/STARTED_RAY_HEAD; then + head_container_pid=$(get_container_pid "$head_node") + if [[ -n "$head_container_pid" ]] && enroot exec "$head_container_pid" ray status --address $ip_head 2>/dev/null; then + break + fi + fi + if [[ $elapsed_time -ge $RAY_HEAD_START_TIMEOUT ]]; then + echo "[ERROR][$(date)] Ray head node failed to start within $RAY_HEAD_START_TIMEOUT seconds. Exiting..." + touch $LOG_DIR/ENDED + exit 1 + fi + echo "[INFO][$(date)] Waiting for Ray head node container to start and be ready... ($elapsed_time/$RAY_HEAD_START_TIMEOUT seconds)" + check_srun_processes + sleep 2 + elapsed_time=$((elapsed_time + 2)) +done + +NUM_ACTORS=$((gpus_per_node * SLURM_JOB_NUM_NODES)) + +# Start Ray worker nodes +# We want 1 Ray worker node per physical node +# Worker nodes are started with ray start but without the --head flag +for ((i = 1; i < SLURM_JOB_NUM_NODES; i++)); do + node_i=${nodes_array[$i]} + + worker_cmd=$(cat <$CLUSTER_DIR/ray_cluster_info.json +{ + "head_ip": "$head_node_ip", + "dashboard_port": "$DASHBOARD_PORT", + "port": "$PORT" +} +EOF +# Set up trap to clean up cluster info on job termination +cleanup_cluster_info() { + echo "[INFO] Cleaning up Ray cluster information" + rm -f $CLUSTER_DIR/ray_cluster_info.json +} + +# Register the cleanup function to run on script exit +trap cleanup_cluster_info EXIT + + +echo "[INFO] Ray cluster information saved to $CLUSTER_DIR/ray_cluster_info.json" + +######################################################## + +{% if srun_commands %} +# Run extra commands +{% for srun_command in srun_commands %} +{%- if loop.index <= group_env_vars|length %} +{%- for env_var in group_env_vars[loop.index - 1] %} +{{env_var}} +{%- endfor %} +{%- endif %} + +{{srun_command}} +{% endfor %} +######################################################## +{% endif -%} + +# We can now launch a job on this cluster +# We do so by launching a driver process on the physical node that the head node is on +# This driver process is responsible for launching a job on the Ray cluster +CONTAINER_CWD=$(scontrol show job $SLURM_JOB_ID --json | jq -r '.jobs[].current_working_directory') +# Define command to be empty by default +COMMAND="${COMMAND:-{{ command | default('', true) }}}" +COMMAND_WORKDIR={{ command_workdir | default('$CONTAINER_CWD') }} + +if [[ -n "$COMMAND" ]]; then + # Get container PID and execute command inside it + head_container_pid=$(get_container_pid "$head_node") + srun --overlap --nodes=1 -w "$head_node" -o $LOG_DIR/{{ ray_log_prefix }}job.log enroot exec "$head_container_pid" bash -c "cd $COMMAND_WORKDIR && $COMMAND" +else + echo "[INFO]: Ray Cluster is idled, run this on the slurm head node to get a shell to the head node:" + cat <$CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh +# No args launches on the head node (node 0) +# Args 1-N launch on worker nodes (nodes 1 through N-1) +# Optional: set COMMAND='...' to run non-interactively instead of opening an interactive shell + +# Helper to get container PID +get_container_pid() { + local node=\$1 + srun --overlap --nodes=1 -w "\$node" --jobid $SLURM_JOB_ID bash -c "enroot list -f | awk 'NR>1 && \\\$2 ~ /^[0-9]+\\\$/ {print \\\$2; exit}'" +} + +WORKER_NUM=\\\${1:-} +if [[ -z "\\\$WORKER_NUM" ]]; then + # Empty means we are on the head node + HEAD_PID=\\\$(get_container_pid "$head_node") + if [[ -n "\\\${COMMAND:-}" ]]; then + srun -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --nodes=1 -w "$head_node" --jobid $SLURM_JOB_ID enroot exec "\\\$HEAD_PID" bash -c "cd $CONTAINER_CWD && \\\$COMMAND" + else + srun -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --nodes=1 -w "$head_node" --jobid $SLURM_JOB_ID --pty enroot exec "\\\$HEAD_PID" bash -c "cd $CONTAINER_CWD && exec bash" + fi +else + # Worker numbers 1 through N-1 correspond to ray-worker-1 through ray-worker-(N-1) + # and use nodes_array[1] through nodes_array[N-1] + if [[ \\\$WORKER_NUM -lt 1 || \\\$WORKER_NUM -ge $SLURM_JOB_NUM_NODES ]]; then + echo "Error: WORKER_NUM must be between 1 and $((SLURM_JOB_NUM_NODES-1))" + exit 1 + fi + nodes_array=($nodes) + node="\\\${nodes_array[\\\$WORKER_NUM]}" + WORKER_PID=\\\$(get_container_pid "\\\$node") + if [[ -n "\\\${COMMAND:-}" ]]; then + srun -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --nodes=1 -w "\\\$node" --jobid $SLURM_JOB_ID enroot exec "\\\$WORKER_PID" bash -c "cd $CONTAINER_CWD && \\\$COMMAND" + else + srun -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --nodes=1 -w "\\\$node" --jobid $SLURM_JOB_ID --pty enroot exec "\\\$WORKER_PID" bash -c "cd $CONTAINER_CWD && exec bash" + fi +fi +EOF + chmod +x $CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh + echo " COMMAND='echo hello' bash $CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh # run a non-interactive command on head node" + echo " bash $CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh # to attach to head node (i.e., 'worker 0')" + echo " bash $CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh 1 # to attach to worker 1" + echo " bash $CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh 2 # to attach to worker 2, etc." + sleep infinity +fi diff --git a/nemo_run/run/torchx_backend/schedulers/slurm.py b/nemo_run/run/torchx_backend/schedulers/slurm.py index 48c68084..358419c0 100644 --- a/nemo_run/run/torchx_backend/schedulers/slurm.py +++ b/nemo_run/run/torchx_backend/schedulers/slurm.py @@ -114,12 +114,14 @@ def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[Any]: # t srun_cmds.append([" ".join(srun_cmd)]) command = [app.roles[0].entrypoint] + app.roles[0].args + # Allow selecting Ray template via environment variable + ray_template_name = os.environ.get("NEMO_RUN_SLURM_RAY_TEMPLATE", "ray.sub.j2") req = SlurmRayRequest( name=app.roles[0].name, launch_cmd=["sbatch", "--requeue", "--parsable"], command=" ".join(command), cluster_dir=os.path.join(executor.tunnel.job_dir, Path(job_dir).name, "ray"), - template_name="ray.sub.j2", + template_name=ray_template_name, executor=executor, workdir=f"/{RUNDIR_NAME}/code", nemo_run_dir=os.path.join(executor.tunnel.job_dir, Path(job_dir).name), diff --git a/test/core/execution/artifacts/expected_ray_cluster.sub b/test/core/execution/artifacts/expected_ray_cluster.sub index ef0d1367..0a606cfc 100644 --- a/test/core/execution/artifacts/expected_ray_cluster.sub +++ b/test/core/execution/artifacts/expected_ray_cluster.sub @@ -30,17 +30,36 @@ DASHBOARD_AGENT_GRPC_PORT=${DASHBOARD_AGENT_GRPC_PORT:-53007} METRICS_EXPORT_PORT=${METRICS_EXPORT_PORT:-53009} # Ports for the head node -PORT=${PORT:-6379} +PORT=${PORT:-54514} RAY_CLIENT_SERVER_PORT=${RAY_CLIENT_SERVER_PORT:-10001} #REDIT_SHARD_PORTS=${REDIT_SHARD_PORTS:-"random"} ?? DASHBOARD_PORT=${DASHBOARD_PORT:-8265} # Also used by debugger DASHBOARD_AGENT_LISTEN_PORT=${DASHBOARD_AGENT_LISTEN_PORT:-52365} +RAY_DEBUGGER_ARGS= +if [ "${RAY_DEBUG:-}" = "legacy" ]; then + RAY_DEBUGGER_ARGS="--ray-debugger-external" +fi + +# After ray>=2.47, this feature is enabled by default which creates uv venvs for any py_executable starting with `uv run`. +# There is severe contention and performance issues with this enabled considering our dependencies are so large and occasionally +# need to be compiled, so NeMo RL has an implementation in nemo_rl/utils/venv.py that does it once per node as opposed to once per task. +export RAY_ENABLE_UV_RUN_RUNTIME_ENV=0 + +# Setting ulimit is recommended by ray best practices page +# @ https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html +# It's session based and won't affect the system outside the script +# Ensure that the soft limit isn't above the hard limit +if [[ $(ulimit -Hn) == "unlimited" ]] || [[ 65535 -lt $(ulimit -Hn) ]]; then + ulimit -Sn 65535 +elif [[ $(ulimit -Hn) != "unlimited" ]] && [[ $(ulimit -Hn) -lt 65535 ]]; then + echo "[WARNING]: Cannot increase ulimit on file descriptors to 65535 according ray recommendation: https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html. Speak to cluster admins to increase, otherwise ray may crash unexpectedly." +fi # On our clusters, the largest port range on an idle worker appeared between 52369-64607 # (not including the other ports set by this script). So this range is chosen to be # somewhere in the middle MIN_WORKER_PORT=${MIN_WORKER_PORT:-54001} -MAX_WORKER_PORT=${MAX_WORKER_PORT:-54257} +MAX_WORKER_PORT=${MAX_WORKER_PORT:-54513} # Ray temp directory (inside container). Used by --temp-dir and log sync sidecar RAY_TEMP_DIR=${RAY_TEMP_DIR:-/ray-cluster} @@ -84,13 +103,66 @@ gpus_per_node=8 num_retries=1 +# Track backgrounded srun client PIDs for head and workers +declare -A SRUN_PIDS + +# Verify all backgrounded srun client processes are still alive; exit fast if any died +check_srun_processes() { + for name in "${!SRUN_PIDS[@]}"; do + pid="${SRUN_PIDS[$name]}" + # Check if the process is still running + if ! kill -0 "$pid" 2>/dev/null; then + echo "[ERROR] Background srun '$name' died (pid=$pid). Could be a failure in startup or an issue with the node preventing the srun to start. Attempting to exit." >&2 + # Signal sidecars inside containers to terminate ASAP + touch "$LOG_DIR/ENDED" + exit 1 + fi + done +} + # Getting the node names and IP addresses in the SLURM allocation nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") nodes_array=($nodes) ip_addresses_array=() for node in $nodes; do - ip_address=$(getent hosts "$node" | awk '{print $1}' | head -n1) + # Try multiple methods to get IP address - ENHANCED VERSION v2.0 + echo "[DEBUG] Resolving hostname: $node using enhanced resolution methods" + ip_address="" + + # Method 1: Try host command + echo "[DEBUG] Method 1: host command" + ip_address=$(host $node 2>/dev/null | awk '/has address/ { print $4 }' | head -1 || true) + echo "[DEBUG] host result: '$ip_address'" + + # Method 2: If host fails, try getent + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 2: getent hosts" + ip_address=$(getent hosts $node 2>/dev/null | awk '{ print $1 }' | head -1 || true) + echo "[DEBUG] getent result: '$ip_address'" + fi + + # Method 3: If getent fails, try nslookup + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 3: nslookup" + ip_address=$(nslookup $node 2>/dev/null | awk '/^Address: / { print $2 }' | head -1 || true) + echo "[DEBUG] nslookup result: '$ip_address'" + fi + + # Method 4: If all DNS methods fail, try ping to extract IP + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 4: ping" + ip_address=$(ping -c 1 $node 2>/dev/null | grep "PING" | sed 's/.*(\([^)]*\)).*/\1/' || true) + echo "[DEBUG] ping result: '$ip_address'" + fi + + # If still no IP, use the hostname itself (might work if it's already an IP or resolvable) + if [[ -z "$ip_address" ]]; then + echo "[WARNING] Could not resolve IP for $node, using hostname as fallback" + ip_address=$node + fi + + echo "[INFO] Node: $node -> IP: $ip_address" # Add the IP address to the array ip_addresses_array+=("$ip_address") done @@ -178,12 +250,13 @@ ray start --head \ --ray-client-server-port=${RAY_CLIENT_SERVER_PORT} \ --dashboard-port=${DASHBOARD_PORT} \ \ - --node-manager-port=${NODE_MANAGER_PORT} \ - --object-manager-port=${OBJECT_MANAGER_PORT} \ - --runtime-env-agent-port=${RUNTIME_ENV_AGENT_PORT} \ - --dashboard-agent-grpc-port=${DASHBOARD_AGENT_GRPC_PORT} \ - --dashboard-agent-listen-port=${DASHBOARD_AGENT_LISTEN_PORT} \ - --metrics-export-port=${METRICS_EXPORT_PORT} \ + --node-manager-port=$((${NODE_MANAGER_PORT} + 1)) \ + --object-manager-port=$((${OBJECT_MANAGER_PORT} + 1)) \ + --runtime-env-agent-port=$((${RUNTIME_ENV_AGENT_PORT} + 1)) \ + --dashboard-agent-grpc-port=$((${DASHBOARD_AGENT_GRPC_PORT} + 1)) \ + --dashboard-agent-listen-port=$((${DASHBOARD_AGENT_LISTEN_PORT} + 1)) \ + --metrics-export-port=$((${METRICS_EXPORT_PORT} + 1)) \ + $RAY_DEBUGGER_ARGS \ \ --block EOFINNER @@ -201,6 +274,7 @@ exit 1 EOF ) srun --container-image=nvcr.io/nvidia/pytorch:24.01-py3 --no-container-mount-home --mpi=pmix -A=test_account -p=gpu --gres=gpu:8 --container-mounts /tmp/test_jobs/test-ray-cluster:/tmp/test_jobs/test-ray-cluster,/tmp/test_jobs/test-ray-cluster:/tmp/test_jobs/test-ray-cluster,/tmp/test_jobs/test-ray-cluster/logs:/tmp/test_jobs/test-ray-cluster/logs --container-workdir=/workspace --container-name=ray-head --nodes=1 --ntasks=1 -w "$head_node" -o $LOG_DIR/ray-head.log bash -x -c "$head_cmd" & +SRUN_PIDS["ray-head"]=$! # Wait for the head node container to start and for Ray to be ready elapsed_time=0 @@ -211,6 +285,7 @@ while ! (srun --overlap --nodes=1 --ntasks=1 -w $head_node test -f $LOG_DIR/STAR exit 1 fi echo "[INFO][$(date)] Waiting for Ray head node container to start and be ready... ($elapsed_time/$RAY_HEAD_START_TIMEOUT seconds)" + check_srun_processes sleep 2 elapsed_time=$((elapsed_time + 2)) done @@ -251,7 +326,6 @@ monitor-sidecar & sed -i 's/context\.py_executable = " "\.join(self\.nsight_cmd) + " python"/context.py_executable = " ".join(self.nsight_cmd) + f" {context.py_executable}"/g' /opt/nemo_rl_venv/lib64/python*/site-packages/ray/_private/runtime_env/nsight.py cat <=2.47, this feature is enabled by default which creates uv venvs for any py_executable starting with `uv run`. +# There is severe contention and performance issues with this enabled considering our dependencies are so large and occasionally +# need to be compiled, so NeMo RL has an implementation in nemo_rl/utils/venv.py that does it once per node as opposed to once per task. +export RAY_ENABLE_UV_RUN_RUNTIME_ENV=0 + +# Setting ulimit is recommended by ray best practices page +# @ https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html +# It's session based and won't affect the system outside the script +# Ensure that the soft limit isn't above the hard limit +if [[ $(ulimit -Hn) == "unlimited" ]] || [[ 65535 -lt $(ulimit -Hn) ]]; then + ulimit -Sn 65535 +elif [[ $(ulimit -Hn) != "unlimited" ]] && [[ $(ulimit -Hn) -lt 65535 ]]; then + echo "[WARNING]: Cannot increase ulimit on file descriptors to 65535 according ray recommendation: https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html. Speak to cluster admins to increase, otherwise ray may crash unexpectedly." +fi + +# On our clusters, the largest port range on an idle worker appeared between 52369-64607 +# (not including the other ports set by this script). So this range is chosen to be +# somewhere in the middle +MIN_WORKER_PORT=${MIN_WORKER_PORT:-54001} +MAX_WORKER_PORT=${MAX_WORKER_PORT:-54513} + +# Ray temp directory (inside container). Used by --temp-dir and log sync sidecar +RAY_TEMP_DIR=${RAY_TEMP_DIR:-/ray-cluster} + +# Number seconds to sync logs from /tmp/ray/session_*/logs to $LOG_DIR/ray/ +RAY_LOG_SYNC_FREQUENCY=${RAY_LOG_SYNC_FREQUENCY:-} + +# Timeout in seconds for Ray head node to start (default 10 minutes) +RAY_HEAD_START_TIMEOUT=${RAY_HEAD_START_TIMEOUT:-600} + +# Directory setup +export CLUSTER_DIR=/tmp/test_jobs/test-ray-cluster +mkdir -p $CLUSTER_DIR + +JOB_IDS_FILE="$CLUSTER_DIR/job_ids.json" +if [[ -f "$JOB_IDS_FILE" ]]; then + tmp="$(mktemp)" + jq --arg id "$SLURM_JOB_ID" '. + [$id]' "$JOB_IDS_FILE" > "$tmp" && mv "$tmp" "$JOB_IDS_FILE" +else + touch "$JOB_IDS_FILE" + echo "[\"$SLURM_JOB_ID\"]" > "$JOB_IDS_FILE" +fi + +mkdir -p $CLUSTER_DIR/scripts + +export LOG_DIR=/tmp/test_jobs/test-ray-cluster/logs +mkdir -p $LOG_DIR + +# Clean up any previous run files +rm -f $LOG_DIR/STARTED_RAY_HEAD +rm -f $LOG_DIR/ENDED + +# Defaults to placing uv cache inside the CLUSTER_DIR +# This directory is mounted into the container at /home/ray/.cache/uv so it is shared between the head and worker nodes +# UV_CACHE_DIR=/tmp/test_jobs/test-ray-cluster/uv_cache +# mkdir -p $UV_CACHE_DIR +######################################################## + +# Number of GPUs per node +gpus_per_node=8 + +num_retries=1 + +# Track backgrounded srun client PIDs for head and workers +declare -A SRUN_PIDS + +# Verify all backgrounded srun client processes are still alive; exit fast if any died +check_srun_processes() { + for name in "${!SRUN_PIDS[@]}"; do + pid="${SRUN_PIDS[$name]}" + # Check if the process is still running + if ! kill -0 "$pid" 2>/dev/null; then + echo "[ERROR] Background srun '$name' died (pid=$pid). Could be a failure in startup or an issue with the node preventing the srun to start. Attempting to exit." >&2 + # Signal sidecars inside containers to terminate ASAP + touch "$LOG_DIR/ENDED" + exit 1 + fi + done +} + +# Getting the node names and IP addresses in the SLURM allocation +nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") +nodes_array=($nodes) +ip_addresses_array=() + +for node in $nodes; do + # Try multiple methods to get IP address - ENHANCED VERSION v2.0 + echo "[DEBUG] Resolving hostname: $node using enhanced resolution methods" + ip_address="" + + # Method 1: Try host command + echo "[DEBUG] Method 1: host command" + ip_address=$(host $node 2>/dev/null | awk '/has address/ { print $4 }' | head -1 || true) + echo "[DEBUG] host result: '$ip_address'" + + # Method 2: If host fails, try getent + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 2: getent hosts" + ip_address=$(getent hosts $node 2>/dev/null | awk '{ print $1 }' | head -1 || true) + echo "[DEBUG] getent result: '$ip_address'" + fi + + # Method 3: If getent fails, try nslookup + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 3: nslookup" + ip_address=$(nslookup $node 2>/dev/null | awk '/^Address: / { print $2 }' | head -1 || true) + echo "[DEBUG] nslookup result: '$ip_address'" + fi + + # Method 4: If all DNS methods fail, try ping to extract IP + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 4: ping" + ip_address=$(ping -c 1 $node 2>/dev/null | grep "PING" | sed 's/.*(\([^)]*\)).*/\1/' || true) + echo "[DEBUG] ping result: '$ip_address'" + fi + + # If still no IP, use the hostname itself (might work if it's already an IP or resolvable) + if [[ -z "$ip_address" ]]; then + echo "[WARNING] Could not resolve IP for $node, using hostname as fallback" + ip_address=$node + fi + + echo "[INFO] Node: $node -> IP: $ip_address" + # Add the IP address to the array + ip_addresses_array+=("$ip_address") +done + +head_node=${nodes_array[0]} +head_node_ip=${ip_addresses_array[0]} + +ip_head=$head_node_ip:$PORT + +######################################################## +# Ray cluster setup +######################################################## +# First we start the head of the ray cluster on one of the physical nodes +# Set GPU/CPU resources to 0 to avoid scheduling on the head node + +head_cmd=$(cat < /dev/null 2>&1; then + for session_dir in ${RAY_TEMP_DIR}/session_[0-9]*/; do + if [[ -d "\$session_dir/logs" ]]; then + session_name=\$(basename "\$session_dir") + mkdir -p "$LOG_DIR/ray/\$session_name" + if command -v rsync > /dev/null 2>&1; then + rsync -ahP "\$session_dir/logs/" "$LOG_DIR/ray/\$session_name/logs/" 2>/dev/null || true + else + cp -r "\$session_dir/logs" "$LOG_DIR/ray/\$session_name/" + fi + fi + done + fi + if [[ -f "$LOG_DIR/ENDED" ]]; then + echo "Log sync sidecar terminating..." + break + fi + done +} +log-sync-sidecar & + +# Patch nsight.py before starting Ray head +sed -i 's/context\.py_executable = " "\.join(self\.nsight_cmd) + " python"/context.py_executable = " ".join(self.nsight_cmd) + f" {context.py_executable}"/g' /opt/nemo_rl_venv/lib64/python*/site-packages/ray/_private/runtime_env/nsight.py + +cat <1 && \$2 ~ /^[0-9]+\$/ {print \$2; exit}'" +} + + +# Wait for the head node container to start and for Ray to be ready +elapsed_time=0 +while true; do + if srun --overlap --nodes=1 --ntasks=1 -w $head_node test -f $LOG_DIR/STARTED_RAY_HEAD; then + head_container_pid=$(get_container_pid "$head_node") + if [[ -n "$head_container_pid" ]] && enroot exec "$head_container_pid" ray status --address $ip_head 2>/dev/null; then + break + fi + fi + if [[ $elapsed_time -ge $RAY_HEAD_START_TIMEOUT ]]; then + echo "[ERROR][$(date)] Ray head node failed to start within $RAY_HEAD_START_TIMEOUT seconds. Exiting..." + touch $LOG_DIR/ENDED + exit 1 + fi + echo "[INFO][$(date)] Waiting for Ray head node container to start and be ready... ($elapsed_time/$RAY_HEAD_START_TIMEOUT seconds)" + check_srun_processes + sleep 2 + elapsed_time=$((elapsed_time + 2)) +done + +NUM_ACTORS=$((gpus_per_node * SLURM_JOB_NUM_NODES)) + +# Start Ray worker nodes +# We want 1 Ray worker node per physical node +# Worker nodes are started with ray start but without the --head flag +for ((i = 1; i < SLURM_JOB_NUM_NODES; i++)); do + node_i=${nodes_array[$i]} + + worker_cmd=$(cat <$CLUSTER_DIR/ray_cluster_info.json +{ + "head_ip": "$head_node_ip", + "dashboard_port": "$DASHBOARD_PORT", + "port": "$PORT" +} +EOF +# Set up trap to clean up cluster info on job termination +cleanup_cluster_info() { + echo "[INFO] Cleaning up Ray cluster information" + rm -f $CLUSTER_DIR/ray_cluster_info.json +} + +# Register the cleanup function to run on script exit +trap cleanup_cluster_info EXIT + + +echo "[INFO] Ray cluster information saved to $CLUSTER_DIR/ray_cluster_info.json" + +######################################################## + +# We can now launch a job on this cluster +# We do so by launching a driver process on the physical node that the head node is on +# This driver process is responsible for launching a job on the Ray cluster +CONTAINER_CWD=$(scontrol show job $SLURM_JOB_ID --json | jq -r '.jobs[].current_working_directory') +# Define command to be empty by default +COMMAND="${COMMAND:-python train.py}" +COMMAND_WORKDIR=/workspace + +if [[ -n "$COMMAND" ]]; then + # Get container PID and execute command inside it + head_container_pid=$(get_container_pid "$head_node") + srun --overlap --nodes=1 -w "$head_node" -o $LOG_DIR/ray-job.log enroot exec "$head_container_pid" bash -c "cd $COMMAND_WORKDIR && $COMMAND" +else + echo "[INFO]: Ray Cluster is idled, run this on the slurm head node to get a shell to the head node:" + cat <$CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh +# No args launches on the head node (node 0) +# Args 1-N launch on worker nodes (nodes 1 through N-1) +# Optional: set COMMAND='...' to run non-interactively instead of opening an interactive shell + +# Helper to get container PID +get_container_pid() { + local node=\$1 + srun --overlap --nodes=1 -w "\$node" --jobid $SLURM_JOB_ID bash -c "enroot list -f | awk 'NR>1 && \\\$2 ~ /^[0-9]+\\\$/ {print \\\$2; exit}'" +} + +WORKER_NUM=\\\${1:-} +if [[ -z "\\\$WORKER_NUM" ]]; then + # Empty means we are on the head node + HEAD_PID=\\\$(get_container_pid "$head_node") + if [[ -n "\\\${COMMAND:-}" ]]; then + srun -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --nodes=1 -w "$head_node" --jobid $SLURM_JOB_ID enroot exec "\\\$HEAD_PID" bash -c "cd $CONTAINER_CWD && \\\$COMMAND" + else + srun -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --nodes=1 -w "$head_node" --jobid $SLURM_JOB_ID --pty enroot exec "\\\$HEAD_PID" bash -c "cd $CONTAINER_CWD && exec bash" + fi +else + # Worker numbers 1 through N-1 correspond to ray-worker-1 through ray-worker-(N-1) + # and use nodes_array[1] through nodes_array[N-1] + if [[ \\\$WORKER_NUM -lt 1 || \\\$WORKER_NUM -ge $SLURM_JOB_NUM_NODES ]]; then + echo "Error: WORKER_NUM must be between 1 and $((SLURM_JOB_NUM_NODES-1))" + exit 1 + fi + nodes_array=($nodes) + node="\\\${nodes_array[\\\$WORKER_NUM]}" + WORKER_PID=\\\$(get_container_pid "\\\$node") + if [[ -n "\\\${COMMAND:-}" ]]; then + srun -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --nodes=1 -w "\\\$node" --jobid $SLURM_JOB_ID enroot exec "\\\$WORKER_PID" bash -c "cd $CONTAINER_CWD && \\\$COMMAND" + else + srun -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --nodes=1 -w "\\\$node" --jobid $SLURM_JOB_ID --pty enroot exec "\\\$WORKER_PID" bash -c "cd $CONTAINER_CWD && exec bash" + fi +fi +EOF + chmod +x $CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh + echo " COMMAND='echo hello' bash $CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh # run a non-interactive command on head node" + echo " bash $CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh # to attach to head node (i.e., 'worker 0')" + echo " bash $CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh 1 # to attach to worker 1" + echo " bash $CLUSTER_DIR/scripts/${SLURM_JOB_ID}-attach.sh 2 # to attach to worker 2, etc." + sleep infinity +fi diff --git a/test/core/execution/artifacts/expected_ray_cluster_ssh.sub b/test/core/execution/artifacts/expected_ray_cluster_ssh.sub index cb12ffaa..f5d29a42 100644 --- a/test/core/execution/artifacts/expected_ray_cluster_ssh.sub +++ b/test/core/execution/artifacts/expected_ray_cluster_ssh.sub @@ -31,17 +31,36 @@ DASHBOARD_AGENT_GRPC_PORT=${DASHBOARD_AGENT_GRPC_PORT:-53007} METRICS_EXPORT_PORT=${METRICS_EXPORT_PORT:-53009} # Ports for the head node -PORT=${PORT:-6379} +PORT=${PORT:-54514} RAY_CLIENT_SERVER_PORT=${RAY_CLIENT_SERVER_PORT:-10001} #REDIT_SHARD_PORTS=${REDIT_SHARD_PORTS:-"random"} ?? DASHBOARD_PORT=${DASHBOARD_PORT:-8265} # Also used by debugger DASHBOARD_AGENT_LISTEN_PORT=${DASHBOARD_AGENT_LISTEN_PORT:-52365} +RAY_DEBUGGER_ARGS= +if [ "${RAY_DEBUG:-}" = "legacy" ]; then + RAY_DEBUGGER_ARGS="--ray-debugger-external" +fi + +# After ray>=2.47, this feature is enabled by default which creates uv venvs for any py_executable starting with `uv run`. +# There is severe contention and performance issues with this enabled considering our dependencies are so large and occasionally +# need to be compiled, so NeMo RL has an implementation in nemo_rl/utils/venv.py that does it once per node as opposed to once per task. +export RAY_ENABLE_UV_RUN_RUNTIME_ENV=0 + +# Setting ulimit is recommended by ray best practices page +# @ https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html +# It's session based and won't affect the system outside the script +# Ensure that the soft limit isn't above the hard limit +if [[ $(ulimit -Hn) == "unlimited" ]] || [[ 65535 -lt $(ulimit -Hn) ]]; then + ulimit -Sn 65535 +elif [[ $(ulimit -Hn) != "unlimited" ]] && [[ $(ulimit -Hn) -lt 65535 ]]; then + echo "[WARNING]: Cannot increase ulimit on file descriptors to 65535 according ray recommendation: https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html. Speak to cluster admins to increase, otherwise ray may crash unexpectedly." +fi # On our clusters, the largest port range on an idle worker appeared between 52369-64607 # (not including the other ports set by this script). So this range is chosen to be # somewhere in the middle MIN_WORKER_PORT=${MIN_WORKER_PORT:-54001} -MAX_WORKER_PORT=${MAX_WORKER_PORT:-54257} +MAX_WORKER_PORT=${MAX_WORKER_PORT:-54513} # Ray temp directory (inside container). Used by --temp-dir and log sync sidecar RAY_TEMP_DIR=${RAY_TEMP_DIR:-/ray-cluster} @@ -85,13 +104,66 @@ gpus_per_node=8 num_retries=1 +# Track backgrounded srun client PIDs for head and workers +declare -A SRUN_PIDS + +# Verify all backgrounded srun client processes are still alive; exit fast if any died +check_srun_processes() { + for name in "${!SRUN_PIDS[@]}"; do + pid="${SRUN_PIDS[$name]}" + # Check if the process is still running + if ! kill -0 "$pid" 2>/dev/null; then + echo "[ERROR] Background srun '$name' died (pid=$pid). Could be a failure in startup or an issue with the node preventing the srun to start. Attempting to exit." >&2 + # Signal sidecars inside containers to terminate ASAP + touch "$LOG_DIR/ENDED" + exit 1 + fi + done +} + # Getting the node names and IP addresses in the SLURM allocation nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") nodes_array=($nodes) ip_addresses_array=() for node in $nodes; do - ip_address=$(getent hosts "$node" | awk '{print $1}' | head -n1) + # Try multiple methods to get IP address - ENHANCED VERSION v2.0 + echo "[DEBUG] Resolving hostname: $node using enhanced resolution methods" + ip_address="" + + # Method 1: Try host command + echo "[DEBUG] Method 1: host command" + ip_address=$(host $node 2>/dev/null | awk '/has address/ { print $4 }' | head -1 || true) + echo "[DEBUG] host result: '$ip_address'" + + # Method 2: If host fails, try getent + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 2: getent hosts" + ip_address=$(getent hosts $node 2>/dev/null | awk '{ print $1 }' | head -1 || true) + echo "[DEBUG] getent result: '$ip_address'" + fi + + # Method 3: If getent fails, try nslookup + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 3: nslookup" + ip_address=$(nslookup $node 2>/dev/null | awk '/^Address: / { print $2 }' | head -1 || true) + echo "[DEBUG] nslookup result: '$ip_address'" + fi + + # Method 4: If all DNS methods fail, try ping to extract IP + if [[ -z "$ip_address" ]]; then + echo "[DEBUG] Method 4: ping" + ip_address=$(ping -c 1 $node 2>/dev/null | grep "PING" | sed 's/.*(\([^)]*\)).*/\1/' || true) + echo "[DEBUG] ping result: '$ip_address'" + fi + + # If still no IP, use the hostname itself (might work if it's already an IP or resolvable) + if [[ -z "$ip_address" ]]; then + echo "[WARNING] Could not resolve IP for $node, using hostname as fallback" + ip_address=$node + fi + + echo "[INFO] Node: $node -> IP: $ip_address" # Add the IP address to the array ip_addresses_array+=("$ip_address") done @@ -183,12 +255,13 @@ ray start --head \ --ray-client-server-port=${RAY_CLIENT_SERVER_PORT} \ --dashboard-port=${DASHBOARD_PORT} \ \ - --node-manager-port=${NODE_MANAGER_PORT} \ - --object-manager-port=${OBJECT_MANAGER_PORT} \ - --runtime-env-agent-port=${RUNTIME_ENV_AGENT_PORT} \ - --dashboard-agent-grpc-port=${DASHBOARD_AGENT_GRPC_PORT} \ - --dashboard-agent-listen-port=${DASHBOARD_AGENT_LISTEN_PORT} \ - --metrics-export-port=${METRICS_EXPORT_PORT} \ + --node-manager-port=$((${NODE_MANAGER_PORT} + 1)) \ + --object-manager-port=$((${OBJECT_MANAGER_PORT} + 1)) \ + --runtime-env-agent-port=$((${RUNTIME_ENV_AGENT_PORT} + 1)) \ + --dashboard-agent-grpc-port=$((${DASHBOARD_AGENT_GRPC_PORT} + 1)) \ + --dashboard-agent-listen-port=$((${DASHBOARD_AGENT_LISTEN_PORT} + 1)) \ + --metrics-export-port=$((${METRICS_EXPORT_PORT} + 1)) \ + $RAY_DEBUGGER_ARGS \ \ --block EOFINNER @@ -206,6 +279,7 @@ exit 1 EOF ) srun --container-image=nvcr.io/nvidia/nemo:24.01 --no-container-mount-home --mpi=pmix -A=research_account -p=gpu_partition --gres=gpu:8 --container-mounts /data:/data,/models:/models,/nemo_run:/nemo_run,/lustre/fsw/projects/research/jobs/multi-node-training:/lustre/fsw/projects/research/jobs/multi-node-training,/lustre/fsw/projects/research/jobs/multi-node-training:/lustre/fsw/projects/research/jobs/multi-node-training,/lustre/fsw/projects/research/jobs/multi-node-training/logs:/lustre/fsw/projects/research/jobs/multi-node-training/logs --container-workdir=/workspace/training --container-name=ray-head --nodes=1 --ntasks=1 -w "$head_node" -o $LOG_DIR/ray-head.log bash -x -c "$head_cmd" & +SRUN_PIDS["ray-head"]=$! # Wait for the head node container to start and for Ray to be ready elapsed_time=0 @@ -216,6 +290,7 @@ while ! (srun --overlap --nodes=1 --ntasks=1 -w $head_node test -f $LOG_DIR/STAR exit 1 fi echo "[INFO][$(date)] Waiting for Ray head node container to start and be ready... ($elapsed_time/$RAY_HEAD_START_TIMEOUT seconds)" + check_srun_processes sleep 2 elapsed_time=$((elapsed_time + 2)) done @@ -258,7 +333,6 @@ monitor-sidecar & sed -i 's/context\.py_executable = " "\.join(self\.nsight_cmd) + " python"/context.py_executable = " ".join(self.nsight_cmd) + f" {context.py_executable}"/g' /opt/nemo_rl_venv/lib64/python*/site-packages/ray/_private/runtime_env/nsight.py cat < tuple[SlurmRayRequest, str]: + """Create a Ray enroot cluster request matching expected_ray_cluster_enroot.sub artifact.""" + executor = SlurmExecutor( + account="test_account", + partition="gpu", + time="01:00:00", + nodes=2, + ntasks_per_node=8, + gpus_per_node=8, + container_image="nvcr.io/nvidia/pytorch:24.01-py3", + container_mounts=["/tmp/test_jobs/test-ray-cluster:/tmp/test_jobs/test-ray-cluster"], + ) + + tunnel_mock = Mock(spec=SSHTunnel) + tunnel_mock.job_dir = "/tmp/test_jobs" + tunnel_mock.key = "test-cluster" + executor.tunnel = tunnel_mock + + request = SlurmRayRequest( + name="test-ray-cluster", + cluster_dir="/tmp/test_jobs/test-ray-cluster", + template_name="ray_enroot.sub.j2", + executor=executor, + command="python train.py", + workdir="/workspace", + launch_cmd=["sbatch", "--requeue", "--parsable", "--dependency=singleton"], + ) + + return request, os.path.join(ARTIFACTS_DIR, "expected_ray_cluster_enroot.sub") + + def test_ray_enroot_template( + self, ray_enroot_request_with_artifact: tuple[SlurmRayRequest, str] + ): + """Test that ray_enroot.sub.j2 template matches expected artifact exactly.""" + ray_request, artifact_path = ray_enroot_request_with_artifact + generated_script = ray_request.materialize() + + # Read expected artifact for reference + with open(artifact_path, "r") as f: + expected_script = f.read() + + assert generated_script.strip() == expected_script.strip() diff --git a/test/run/torchx_backend/schedulers/test_slurm.py b/test/run/torchx_backend/schedulers/test_slurm.py index 61b76d44..96b0e239 100644 --- a/test/run/torchx_backend/schedulers/test_slurm.py +++ b/test/run/torchx_backend/schedulers/test_slurm.py @@ -366,3 +366,40 @@ def test_schedule_with_dependencies(slurm_scheduler, slurm_executor): assert result == "12345" # Verify the run was called with the expected arguments mock_tunnel.run.assert_called_once() + + +def test_ray_template_env_var(slurm_scheduler, slurm_executor): + """Test that NEMO_RUN_SLURM_RAY_TEMPLATE environment variable selects the correct template.""" + from nemo_run.config import USE_WITH_RAY_CLUSTER_KEY + from nemo_run.run.ray.slurm import SlurmRayRequest + + # Create a Ray-enabled app + app_def = AppDef( + name="test_ray_app", + roles=[Role(name="test_role", image="", entrypoint="python", args=["script.py"])], + metadata={USE_WITH_RAY_CLUSTER_KEY: True}, + ) + + with ( + mock.patch.object(SlurmTunnelScheduler, "_initialize_tunnel"), + mock.patch.object(SlurmExecutor, "package"), + mock.patch("builtins.open", mock.mock_open()), + ): + slurm_scheduler.tunnel = mock.MagicMock() + + # Test default template name + with mock.patch("nemo_run.core.execution.utils.fill_template") as mock_fill: + mock_fill.return_value = "#!/bin/bash\n# Mock script" + dryrun_info = slurm_scheduler._submit_dryrun(app_def, slurm_executor) + assert isinstance(dryrun_info.request, SlurmRayRequest) + assert dryrun_info.request.template_name == "ray.sub.j2" + + # Test custom template name via environment variable + with ( + mock.patch.dict(os.environ, {"NEMO_RUN_SLURM_RAY_TEMPLATE": "ray_enroot.sub.j2"}), + mock.patch("nemo_run.core.execution.utils.fill_template") as mock_fill, + ): + mock_fill.return_value = "#!/bin/bash\n# Mock script" + dryrun_info = slurm_scheduler._submit_dryrun(app_def, slurm_executor) + assert isinstance(dryrun_info.request, SlurmRayRequest) + assert dryrun_info.request.template_name == "ray_enroot.sub.j2"