diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 97dc709e..9dafdb20 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -1,3 +1,4 @@ +import base64 import json import logging import os @@ -5,6 +6,8 @@ from dataclasses import dataclass, field from enum import Enum from pathlib import Path +import time +import tempfile from typing import Any, Optional, Type import requests @@ -13,6 +16,7 @@ from nemo_run.core.execution.base import Executor, ExecutorMacros from nemo_run.core.packaging.base import Packager from nemo_run.core.packaging.git import GitArchivePackager +from nemo_run.config import get_nemorun_home logger = logging.getLogger(__name__) @@ -50,9 +54,12 @@ class DGXCloudExecutor(Executor): app_secret: str project_name: str container_image: str + pvc_nemo_run_dir: str + launched_from_cluster: bool = False nodes: int = 1 gpus_per_node: int = 0 nprocs_per_node: int = 1 + pvc_job_dir: str = field(init=False, default="") pvcs: list[dict[str, Any]] = field(default_factory=list) distributed_framework: str = "PyTorch" custom_spec: dict[str, Any] = field(default_factory=dict) @@ -90,21 +97,113 @@ def get_project_and_cluster_id(self, token: str) -> tuple[Optional[str], Optiona break return project_id, cluster_id - def create_distributed_job( - self, token: str, project_id: str, cluster_id: str, name: str, cmd: list[str] - ): + def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> str: + with tempfile.TemporaryDirectory() as temp_dir: + tarball_path = os.path.join(temp_dir, "archive.tar.gz") + subprocess.run(f"tar -czf {tarball_path} -C {local_dir_path} .", shell=True, check=True) + with open(tarball_path, "rb") as file: + file_data = file.read() + encoded_data = base64.b64encode(file_data).decode("utf-8") + + # Delete and recreate directory if it already exists, command to decode base64 data, save to a file, and extract inside the pod + cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz" + return cmd + + def create_data_mover_workload(self, token: str, project_id: str, cluster_id: str): + """ + Creates a CPU only workload to move job directory into PVC using the provided project/cluster IDs. + """ + + cmd = self.copy_directory_data_command(self.job_dir, self.pvc_job_dir) + + url = f"{self.base_url}/workloads/workspaces" + headers = self._default_headers(token=token) + + payload = { + "name": "data-mover", + "useGivenNameAsPrefix": True, + "projectId": project_id, + "clusterId": cluster_id, + "spec": { + "command": "sh -c", + "args": f"'{cmd}'", + "image": "busybox:1.37.0", + "storage": {"pvc": self.pvcs}, + }, + } + + response = requests.post(url, json=payload, headers=headers) + + logger.debug( + "Created workload; response code=%s, content=%s", + response.status_code, + response.text.strip(), + ) + + return response + + def delete_workload(self, token: str, workload_id: str): + url = f"{self.base_url}/workloads/workspaces/{workload_id}" + headers = self._default_headers(token=token) + + response = requests.delete(url, headers=headers) + + logger.debug( + "Delete interactive workspace; response code=%s, content=%s", + response.status_code, + response.text.strip(), + ) + return response + + def move_data(self, token: str, project_id: str, cluster_id: str, sleep: float = 10) -> None: + """ + Moves job directory into PVC and deletes the workload after completion + """ + + resp = self.create_data_mover_workload(token, project_id, cluster_id) + if resp.status_code not in [200, 202]: + raise RuntimeError( + f"Failed to create data mover workload, status_code={resp.status_code}" + ) + + resp_json = resp.json() + workload_id = resp_json["workloadId"] + status = DGXCloudState(resp_json["actualPhase"]) + + while status in [ + DGXCloudState.PENDING, + DGXCloudState.CREATING, + DGXCloudState.INITIALIZING, + DGXCloudState.RUNNING, + ]: + time.sleep(sleep) + status = self.status(workload_id) + + if status is not DGXCloudState.COMPLETED: + raise RuntimeError("Failed to move data to PVC") + + resp = self.delete_workload(token, workload_id) + if resp.status_code >= 200 and resp.status_code < 300: + logger.info( + "Successfully deleted data movement workload %s on DGXCloud with response code %d", + workload_id, + resp.status_code, + ) + else: + logger.error( + "Failed to delete data movement workload %s, response code=%d, reason=%s", + workload_id, + resp.status_code, + resp.text, + ) + + def create_distributed_job(self, token: str, project_id: str, cluster_id: str, name: str): """ Creates a distributed PyTorch job using the provided project/cluster IDs. """ + url = f"{self.base_url}/workloads/distributed" headers = self._default_headers(token=token) - launch_script = f""" -ln -s {self.job_dir} /nemo_run -cd /nemo_run/code -{" ".join(cmd)} -""" - with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f: - f.write(launch_script) payload = { "name": name, @@ -112,7 +211,7 @@ def create_distributed_job( "projectId": project_id, "clusterId": cluster_id, "spec": { - "command": f"/bin/bash {self.job_dir}/launch_script.sh", + "command": f"/bin/bash {self.pvc_job_dir}/launch_script.sh", "image": self.container_image, "distributedFramework": self.distributed_framework, "minReplicas": self.nodes, @@ -145,7 +244,21 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: if not project_id or not cluster_id: raise RuntimeError("Unable to determine project/cluster IDs for job submission") - resp = self.create_distributed_job(token, project_id, cluster_id, name, cmd) + # prepare launch script and move data to PVC + launch_script = f""" +ln -s {self.pvc_job_dir}/ /nemo_run +cd /nemo_run/code +{" ".join(cmd)} +""" + with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f: + f.write(launch_script) + + if not self.launched_from_cluster: + logger.info("Creating data movement workload") + self.move_data(token, project_id, cluster_id) + + logger.info("Creating distributed workload") + resp = self.create_distributed_job(token, project_id, cluster_id, name) if resp.status_code not in [200, 202]: raise RuntimeError(f"Failed to create job, status_code={resp.status_code}") @@ -167,10 +280,10 @@ def nproc_per_node(self) -> int: return 1 def status(self, job_id: str) -> Optional[DGXCloudState]: - url = f"{self.base_url}/workloads/distributed/{job_id}" + url = f"{self.base_url}/workloads/{job_id}" token = self.get_auth_token() if not token: - logger.error("Failed to retrieve auth token for cancellation request.") + logger.error("Failed to retrieve auth token for status request.") return None headers = self._default_headers(token=token) @@ -179,7 +292,7 @@ def status(self, job_id: str) -> Optional[DGXCloudState]: return DGXCloudState("Unknown") r_json = response.json() - return DGXCloudState(r_json["actualPhase"]) + return DGXCloudState(r_json["phase"]) def cancel(self, job_id: str): # Retrieve the authentication token for the REST calls @@ -225,18 +338,49 @@ def assign( self.job_name = task_id self.experiment_dir = exp_dir self.job_dir = os.path.join(exp_dir, task_dir) - self.experiment_id = exp_id assert any( map( lambda x: os.path.commonpath( - [os.path.abspath(x["path"]), os.path.abspath(self.job_dir)] + [os.path.abspath(x["path"]), os.path.abspath(self.pvc_nemo_run_dir)] ) == os.path.abspath(x["path"]), self.pvcs, ) ), ( - f"Need to specify atleast one PVC containing {self.job_dir}.\nTo update job dir to a PVC path, you can use set_nemorun_home() or the NEMORUN_HOME env var." + f"Need to specify at least one PVC containing {self.pvc_nemo_run_dir}. Update your PVC path or pvc_nemo_run_dir." + ) + + # setting linked PVC job directory + nemo_run_home = get_nemorun_home() + job_subdir = self.job_dir[len(nemo_run_home) + 1 :] # +1 to remove the initial backslash + self.pvc_job_dir = os.path.join(self.pvc_nemo_run_dir, job_subdir) + + logger.info( + "PVC job directory set as: %s", + self.pvc_job_dir, ) + self.experiment_id = exp_id + + def get_launcher_prefix(self) -> Optional[list[str]]: + launcher = self.get_launcher() + if launcher.nsys_profile: + return launcher.get_nsys_prefix(profile_dir="/nemo_run") + + def package_configs(self, *cfgs: tuple[str, str]) -> list[str]: + filenames = [] + basepath = os.path.join(self.job_dir, "configs") + for name, cfg in cfgs: + filename = os.path.join(basepath, name) + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, "w") as f: + f.write(cfg) + filenames.append( + os.path.join( + "/nemo_run/configs", + name, + ) + ) + return filenames def package(self, packager: Packager, job_name: str): assert self.experiment_id, "Executor not assigned to an experiment." diff --git a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py index fbf1253e..27e0282a 100644 --- a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py +++ b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py @@ -87,7 +87,7 @@ def _submit_dryrun( # type: ignore cfg: Executor, ) -> AppDryRunInfo[DGXRequest]: assert isinstance(cfg, DGXCloudExecutor), ( - f"{cfg.__class__} not supported for skypilot scheduler." + f"{cfg.__class__} not supported for DGXCloud scheduler." ) executor = cfg diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index b0181091..f54f9636 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -16,12 +16,13 @@ import os import subprocess import tempfile -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, mock_open, patch import pytest from nemo_run.core.execution.dgxcloud import DGXCloudExecutor, DGXCloudState from nemo_run.core.packaging.git import GitArchivePackager +from nemo_run.config import set_nemorun_home class TestDGXCloudExecutor: @@ -34,6 +35,7 @@ def test_init(self): container_image="nvcr.io/nvidia/test:latest", nodes=2, gpus_per_node=8, + pvc_nemo_run_dir="/workspace/nemo_run", pvcs=[{"path": "/workspace", "claimName": "test-claim"}], ) @@ -46,6 +48,7 @@ def test_init(self): assert executor.gpus_per_node == 8 assert executor.pvcs == [{"path": "/workspace", "claimName": "test-claim"}] assert executor.distributed_framework == "PyTorch" + assert executor.pvc_nemo_run_dir == "/workspace/nemo_run" @patch("requests.post") def test_get_auth_token_success(self, mock_post): @@ -59,6 +62,7 @@ def test_get_auth_token_success(self, mock_post): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) token = executor.get_auth_token() @@ -86,6 +90,7 @@ def test_get_auth_token_failure(self, mock_post): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) token = executor.get_auth_token() @@ -104,6 +109,7 @@ def test_get_project_and_cluster_id_success(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) project_id, cluster_id = executor.get_project_and_cluster_id("test_token") @@ -129,6 +135,7 @@ def test_get_project_and_cluster_id_not_found(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) project_id, cluster_id = executor.get_project_and_cluster_id("test_token") @@ -136,64 +143,170 @@ def test_get_project_and_cluster_id_not_found(self, mock_get): assert project_id is None assert cluster_id is None + @patch("subprocess.run") + @patch("builtins.open", new_callable=mock_open, read_data=b"mock tarball") + def test_copy_directory_data_command_success(self, mock_file, mock_subprocess): + local_dir_path = "/mock/local/dir" + dest_path = "/mock/destination/path" + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + response = executor.copy_directory_data_command(local_dir_path, dest_path) + + mock_subprocess.assert_called_once() + mock_file.call_count == 1 + + assert ( + "rm -rf /mock/destination/path && mkdir -p /mock/destination/path && echo" in response + ) + assert ( + "base64 -d > /mock/destination/path/archive.tar.gz && tar -xzf /mock/destination/path/archive.tar.gz -C /mock/destination/path && rm /mock/destination/path/archive.tar.gz" + in response + ) + + @patch("tempfile.TemporaryDirectory") + def test_copy_directory_data_command_fails(self, mock_tempdir): + local_dir_path = "/mock/local/dir" + dest_path = "/mock/destination/path" + + mock_tempdir.side_effect = OSError("Temporary directory creation failed") + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + with pytest.raises(OSError, match="Temporary directory creation failed"): + executor.copy_directory_data_command(local_dir_path, dest_path) + @patch("requests.post") - def test_create_distributed_job(self, mock_post): + @patch.object(DGXCloudExecutor, "copy_directory_data_command") + def test_create_data_mover_workload_success(self, mock_command, mock_post): + mock_command.return_value = "sleep infinity" + mock_response = MagicMock() mock_response.status_code = 200 mock_response.text = '{"status": "submitted"}' mock_post.return_value = mock_response - with tempfile.TemporaryDirectory() as tmp_dir: - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - nodes=2, - gpus_per_node=8, - pvcs=[{"path": tmp_dir, "claimName": "test-claim"}], - ) - executor.job_dir = tmp_dir - executor.env_vars = {"TEST_VAR": "test_value"} - - response = executor.create_distributed_job( - token="test_token", - project_id="proj_id", - cluster_id="cluster_id", - name="test_job", - cmd=["python", "train.py"], - ) + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) - assert response == mock_response - assert os.path.exists(os.path.join(tmp_dir, "launch_script.sh")) + response = executor.create_data_mover_workload( + token="test_token", + project_id="proj_id", + cluster_id="cluster_id", + ) - # Check if the API call is made correctly - mock_post.assert_called_once() - # The URL is the first argument to post - args, kwargs = mock_post.call_args - assert kwargs["json"]["name"] == "test_job" - assert kwargs["json"]["projectId"] == "proj_id" - assert kwargs["json"]["clusterId"] == "cluster_id" - assert kwargs["json"]["spec"]["image"] == "nvcr.io/nvidia/test:latest" - assert kwargs["json"]["spec"]["numWorkers"] == 2 - assert kwargs["json"]["spec"]["compute"]["gpuDevicesRequest"] == 8 - assert kwargs["json"]["spec"]["environmentVariables"] == [ - {"name": "TEST_VAR", "value": "test_value"} - ] - assert kwargs["headers"] == executor._default_headers(token="test_token") + assert response == mock_response - @patch.object(DGXCloudExecutor, "get_auth_token") - @patch.object(DGXCloudExecutor, "get_project_and_cluster_id") - @patch.object(DGXCloudExecutor, "create_distributed_job") - def test_launch_success(self, mock_create_job, mock_get_ids, mock_get_token): - mock_get_token.return_value = "test_token" - mock_get_ids.return_value = ("proj_id", "cluster_id") + # Check if the API call is made correctly + mock_post.assert_called_once() + # The URL is the first argument to post + args, kwargs = mock_post.call_args + assert kwargs["json"]["projectId"] == "proj_id" + assert kwargs["json"]["clusterId"] == "cluster_id" + assert kwargs["json"]["spec"]["command"] == "sh -c" + assert kwargs["json"]["spec"]["args"] == "'sleep infinity'" + assert kwargs["headers"] == executor._default_headers(token="test_token") + + @patch("requests.delete") + def test_delete_workload(self, mock_delete): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_delete.return_value = mock_response + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + + response = executor.delete_workload(token="test_token", workload_id="job123") + + assert response == mock_response + mock_delete.assert_called_once_with( + "https://dgxapi.example.com/workloads/workspaces/job123", + headers=executor._default_headers(token="test_token"), + ) + @patch.object(DGXCloudExecutor, "create_data_mover_workload") + @patch.object(DGXCloudExecutor, "status") + @patch.object(DGXCloudExecutor, "delete_workload") + def test_move_data_success(self, mock_delete, mock_status, mock_create): mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = {"workloadId": "job123", "actualPhase": "Pending"} - mock_create_job.return_value = mock_response + mock_create.return_value = mock_response + mock_delete.return_value = mock_response + + mock_status.return_value = DGXCloudState.COMPLETED + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + + executor.move_data(token="test_token", project_id="proj_id", cluster_id="cluster_id") + + mock_create.assert_called_once_with("test_token", "proj_id", "cluster_id") + mock_status.assert_called() + mock_delete.assert_called_once_with("test_token", "job123") + + @patch.object(DGXCloudExecutor, "create_data_mover_workload") + def test_move_data_data_mover_fail(self, mock_create): + mock_response = MagicMock() + mock_response.status_code = 400 + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + + with pytest.raises(RuntimeError, match="Failed to create data mover workload"): + executor.move_data(token="test_token", project_id="proj_id", cluster_id="cluster_id") + + @patch.object(DGXCloudExecutor, "create_data_mover_workload") + @patch.object(DGXCloudExecutor, "status") + def test_move_data_failed(self, mock_status, mock_create): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"workloadId": "job123", "actualPhase": "Pending"} + mock_create.return_value = mock_response + + mock_status.return_value = DGXCloudState.FAILED executor = DGXCloudExecutor( base_url="https://dgxapi.example.com", @@ -201,17 +314,100 @@ def test_launch_success(self, mock_create_job, mock_get_ids, mock_get_token): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], ) - job_id, status = executor.launch("test_job", ["python", "train.py"]) + with pytest.raises(RuntimeError, match="Failed to move data to PVC"): + executor.move_data(token="test_token", project_id="proj_id", cluster_id="cluster_id") + + mock_create.assert_called_once_with("test_token", "proj_id", "cluster_id") + mock_status.assert_called() + + @patch("requests.post") + def test_create_distributed_job(self, mock_post): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = '{"status": "submitted"}' + mock_post.return_value = mock_response + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + nodes=2, + gpus_per_node=8, + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + executor.pvc_job_dir = "/workspace/nemo_run/job_dir" + executor.env_vars = {"TEST_VAR": "test_value"} + + response = executor.create_distributed_job( + token="test_token", + project_id="proj_id", + cluster_id="cluster_id", + name="test_job", + ) - assert job_id == "job123" - assert status == "Pending" - mock_get_token.assert_called_once() - mock_get_ids.assert_called_once_with("test_token") - mock_create_job.assert_called_once_with( - "test_token", "proj_id", "cluster_id", "test-job", ["python", "train.py"] + assert response == mock_response + + # Check if the API call is made correctly + mock_post.assert_called_once() + # The URL is the first argument to post + args, kwargs = mock_post.call_args + assert kwargs["json"]["name"] == "test_job" + assert kwargs["json"]["projectId"] == "proj_id" + assert kwargs["json"]["clusterId"] == "cluster_id" + assert kwargs["json"]["spec"]["image"] == "nvcr.io/nvidia/test:latest" + assert ( + kwargs["json"]["spec"]["command"] + == "/bin/bash /workspace/nemo_run/job_dir/launch_script.sh" ) + assert kwargs["json"]["spec"]["numWorkers"] == 2 + assert kwargs["json"]["spec"]["compute"]["gpuDevicesRequest"] == 8 + assert kwargs["json"]["spec"]["environmentVariables"] == [ + {"name": "TEST_VAR", "value": "test_value"} + ] + assert kwargs["headers"] == executor._default_headers(token="test_token") + + @patch.object(DGXCloudExecutor, "get_auth_token") + @patch.object(DGXCloudExecutor, "get_project_and_cluster_id") + @patch.object(DGXCloudExecutor, "move_data") + @patch.object(DGXCloudExecutor, "create_distributed_job") + def test_launch_success(self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token): + mock_get_token.return_value = "test_token" + mock_get_ids.return_value = ("proj_id", "cluster_id") + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"workloadId": "job123", "actualPhase": "Pending"} + mock_create_job.return_value = mock_response + + with tempfile.TemporaryDirectory() as tmp_dir: + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + ) + executor.job_dir = tmp_dir + + job_id, status = executor.launch("test_job", ["python", "train.py"]) + + assert job_id == "job123" + assert status == "Pending" + assert os.path.exists(os.path.join(tmp_dir, "launch_script.sh")) + mock_get_token.assert_called_once() + mock_get_ids.assert_called_once_with("test_token") + mock_move_data.assert_called_once_with("test_token", "proj_id", "cluster_id") + mock_create_job.assert_called_once_with( + "test_token", "proj_id", "cluster_id", "test-job" + ) @patch.object(DGXCloudExecutor, "get_auth_token") def test_launch_no_token(self, mock_get_token): @@ -223,6 +419,7 @@ def test_launch_no_token(self, mock_get_token): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) with pytest.raises(RuntimeError, match="Failed to get auth token"): @@ -240,6 +437,7 @@ def test_launch_no_project_id(self, mock_get_ids, mock_get_token): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) with pytest.raises(RuntimeError, match="Unable to determine project/cluster IDs"): @@ -247,8 +445,11 @@ def test_launch_no_project_id(self, mock_get_ids, mock_get_token): @patch.object(DGXCloudExecutor, "get_auth_token") @patch.object(DGXCloudExecutor, "get_project_and_cluster_id") + @patch.object(DGXCloudExecutor, "move_data") @patch.object(DGXCloudExecutor, "create_distributed_job") - def test_launch_job_creation_failed(self, mock_create_job, mock_get_ids, mock_get_token): + def test_launch_job_creation_failed( + self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token + ): mock_get_token.return_value = "test_token" mock_get_ids.return_value = ("proj_id", "cluster_id") @@ -256,16 +457,19 @@ def test_launch_job_creation_failed(self, mock_create_job, mock_get_ids, mock_ge mock_response.status_code = 400 mock_create_job.return_value = mock_response - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - ) + with tempfile.TemporaryDirectory() as tmp_dir: + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + ) + executor.job_dir = tmp_dir - with pytest.raises(RuntimeError, match="Failed to create job"): - executor.launch("test_job", ["python", "train.py"]) + with pytest.raises(RuntimeError, match="Failed to create job"): + executor.launch("test_job", ["python", "train.py"]) def test_nnodes(self): executor = DGXCloudExecutor( @@ -275,6 +479,7 @@ def test_nnodes(self): project_name="test_project", container_image="nvcr.io/nvidia/test:latest", nodes=3, + pvc_nemo_run_dir="/workspace/nemo_run", ) assert executor.nnodes() == 3 @@ -287,6 +492,7 @@ def test_nproc_per_node_with_gpus(self): project_name="test_project", container_image="nvcr.io/nvidia/test:latest", gpus_per_node=4, + pvc_nemo_run_dir="/workspace/nemo_run", ) assert executor.nproc_per_node() == 4 @@ -300,6 +506,7 @@ def test_nproc_per_node_with_nprocs(self): container_image="nvcr.io/nvidia/test:latest", gpus_per_node=0, nprocs_per_node=3, + pvc_nemo_run_dir="/workspace/nemo_run", ) assert executor.nproc_per_node() == 3 @@ -313,6 +520,7 @@ def test_nproc_per_node_default(self): container_image="nvcr.io/nvidia/test:latest", gpus_per_node=0, nprocs_per_node=0, + pvc_nemo_run_dir="/workspace/nemo_run", ) assert executor.nproc_per_node() == 1 @@ -321,7 +529,7 @@ def test_nproc_per_node_default(self): def test_status(self, mock_get): mock_response = MagicMock() mock_response.status_code = 200 - mock_response.json.return_value = {"actualPhase": "Running"} + mock_response.json.return_value = {"phase": "Running"} mock_get.return_value = mock_response with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"): @@ -331,13 +539,14 @@ def test_status(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) status = executor.status("job123") assert status == DGXCloudState.RUNNING mock_get.assert_called_once_with( - "https://dgxapi.example.com/workloads/distributed/job123", + "https://dgxapi.example.com/workloads/job123", headers=executor._default_headers(token="test_token"), ) @@ -350,6 +559,7 @@ def test_status_no_token(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) status = executor.status("job123") @@ -370,6 +580,7 @@ def test_status_error_response(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) status = executor.status("job123") @@ -389,6 +600,7 @@ def test_cancel(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) executor.cancel("job123") @@ -407,6 +619,7 @@ def test_cancel_no_token(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) executor.cancel("job123") @@ -420,28 +633,34 @@ def test_logs(self): assert "Logs not available" in mock_warning.call_args[0][0] def test_assign(self): - with tempfile.TemporaryDirectory() as tmp_dir: - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - pvcs=[{"path": tmp_dir, "claimName": "test-claim"}], - ) + set_nemorun_home("/nemo_home") - task_dir = "test_task" - executor.assign( - exp_id="test_exp", - exp_dir=tmp_dir, - task_id="test_task", - task_dir=task_dir, - ) + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "/workspace", "claimName": "test-claim"}], + ) - assert executor.job_name == "test_task" - assert executor.experiment_dir == tmp_dir - assert executor.job_dir == os.path.join(tmp_dir, task_dir) - assert executor.experiment_id == "test_exp" + task_dir = "test_task" + exp_dir = "/nemo_home/experiments/experiment" + executor.assign( + exp_id="test_exp", + exp_dir=exp_dir, + task_id="test_task", + task_dir=task_dir, + ) + + assert executor.job_name == "test_task" + assert executor.experiment_dir == exp_dir + assert executor.job_dir == os.path.join(exp_dir, task_dir) + assert executor.pvc_job_dir == os.path.join( + "/workspace/nemo_run/experiments/experiment", task_dir + ) + assert executor.experiment_id == "test_exp" def test_assign_no_pvc(self): with tempfile.TemporaryDirectory() as tmp_dir: @@ -451,10 +670,11 @@ def test_assign_no_pvc(self): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", pvcs=[{"path": "/other/path", "claimName": "test-claim"}], ) - with pytest.raises(AssertionError, match="Need to specify atleast one PVC"): + with pytest.raises(AssertionError, match="Need to specify at least one PVC"): executor.assign( exp_id="test_exp", exp_dir=tmp_dir, @@ -462,6 +682,29 @@ def test_assign_no_pvc(self): task_dir="test_task", ) + @patch("os.makedirs") + @patch("builtins.open", new_callable=mock_open) + def test_package_configs(self, mock_file, mock_makedirs): + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "/other/path", "claimName": "test-claim"}], + ) + + configs = [("config1.yaml", "key: value"), ("subdir/config2.yaml", "another: config")] + + filenames = executor.package_configs(*configs) + + assert len(filenames) == 2 + assert filenames[0] == "/nemo_run/configs/config1.yaml" + assert filenames[1] == "/nemo_run/configs/subdir/config2.yaml" + mock_makedirs.assert_called() + assert mock_file.call_count == 2 + @patch("invoke.context.Context.run") @patch("subprocess.run") def test_package_git_packager(self, mock_subprocess_run, mock_context_run): @@ -480,6 +723,7 @@ def test_package_git_packager(self, mock_subprocess_run, mock_context_run): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", pvcs=[{"path": tmp_dir, "claimName": "test-claim"}], ) executor.experiment_id = "test_exp" @@ -505,6 +749,7 @@ def test_macro_values(self): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) result = executor.macro_values() @@ -518,6 +763,7 @@ def test_default_headers_without_token(self): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) headers = executor._default_headers() @@ -533,6 +779,7 @@ def test_default_headers_with_token(self): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) headers = executor._default_headers(token="test_token") diff --git a/test/run/torchx_backend/schedulers/test_dgxcloud.py b/test/run/torchx_backend/schedulers/test_dgxcloud.py index 41b30013..5a3f9ae9 100644 --- a/test/run/torchx_backend/schedulers/test_dgxcloud.py +++ b/test/run/torchx_backend/schedulers/test_dgxcloud.py @@ -42,6 +42,7 @@ def dgx_cloud_executor(): app_secret="test_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", job_dir=tempfile.mkdtemp(), ) @@ -151,6 +152,7 @@ def test_save_and_get_job_dirs(): project_name="test_project", container_image="test:latest", job_dir=temp_dir, + pvc_nemo_run_dir="/workspace/nemo_run", ) _save_job_dir("test_app_id", "RUNNING", executor)