From 0404b3bc937b0225f9602b86dfb09f023644d9fc Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Thu, 3 Apr 2025 14:08:19 -0700 Subject: [PATCH 01/15] adding APIs for data movement Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 105 +++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 97dc709e..1cba6521 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -1,3 +1,4 @@ +import base64 import json import logging import os @@ -5,6 +6,8 @@ from dataclasses import dataclass, field from enum import Enum from pathlib import Path +import time +import tempfile from typing import Any, Optional, Type import requests @@ -90,6 +93,106 @@ def get_project_and_cluster_id(self, token: str) -> tuple[Optional[str], Optiona break return project_id, cluster_id + def copy_directory_command(self): + # Local directory path to copy + local_dir_path = "/home/roclark/.nemo_run/experiments/nemo.collections.llm.api.pretrain/nemo.collections.llm.api.pretrain_1743621953" + + # Destination path inside the pod + dest_path = self.job_dir + + # Create a temporary directory to hold the tarball + with tempfile.TemporaryDirectory() as temp_dir: + + tarball_path = os.path.join(temp_dir, "archive.tar.gz") + + # Create a tarball of the directory + subprocess.run(f"tar -czf {tarball_path} -C {local_dir_path} .", shell=True, check=True) + + # Read the tarball + with open(tarball_path, "rb") as file: + file_data = file.read() + + # Encode the tarball data to base64 + encoded_data = base64.b64encode(file_data).decode("utf-8") + + # Command to decode base64 data, save to a file, and extract inside the pod + cmd = f"mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz" + return cmd + + + def create_data_mover_workload( + self, token: str, project_id: str, cluster_id: str, name: str, cmd: list[str] + ): + """ + Creates an cpu only workload to move data into PVC using the provided project/cluster IDs. + """ + headers = self._default_headers(token=token) + url = f"{self.base_url}/workloads/workspaces" + headers = self._default_headers(token=token) + + payload = { + "name": name, + "useGivenNameAsPrefix": True, + "projectId": project_id, + "clusterId": cluster_id, + "spec": { + "command": "sh -c", + "args": f"'{cmd}'", + "image": "busybox:1.37.0", + "storage": {"pvc": self.pvcs}, + } + } + + response = requests.post(url, json=payload, headers=headers) + + logger.debug( + "Created workload; response code=%s, content=%s", + response.status_code, + response.text.strip(), + ) + + return response + + def workload_completed( + self, token: str, workload_id: str + ): + """ + Checks the status of the interactive workload + """ + + url = f"{self.base_url}/workloads/{workload_id}" + headers = self._default_headers(token=token) + + response = requests.get(url, headers=headers) + + if response.status_code not in [200, 202]: + raise RuntimeError(f"Failed to get workload status, status_code={response.status_code}") + + phase = response.json()["phase"] + + if phase == "Completed": + return True + else: + return False + + def delete_workload( + self, token: str, workload_id: str + ): + while not self.workload_completed(token, workload_id): + time.sleep(10) + + url = f"{self.base_url}/workloads/{workload_id}" + headers = self._default_headers(token=token) + + response = requests.delete(url, headers=headers) + + logger.debug( + "Delete interactive workspace; response code=%s, content=%s", + response.status_code, + response.text.strip(), + ) + return response + def create_distributed_job( self, token: str, project_id: str, cluster_id: str, name: str, cmd: list[str] ): @@ -276,4 +379,4 @@ def _default_headers(self, token: Optional[str] = None) -> dict: } if token: headers["Authorization"] = f"Bearer {token}" - return headers + return headers \ No newline at end of file From 7dc9717e9abcabeffc65d2803897340b0635384e Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Thu, 3 Apr 2025 14:15:34 -0700 Subject: [PATCH 02/15] adding todos and questions Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 1cba6521..1f9d9b2d 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -201,6 +201,7 @@ def create_distributed_job( """ url = f"{self.base_url}/workloads/distributed" headers = self._default_headers(token=token) + ##do we need to include this as a script? this could be in the command instead launch_script = f""" ln -s {self.job_dir} /nemo_run cd /nemo_run/code @@ -329,6 +330,9 @@ def assign( self.experiment_dir = exp_dir self.job_dir = os.path.join(exp_dir, task_dir) self.experiment_id = exp_id + + ## TODO: need to figure out how to track location of data in PVC and in the local environment + assert any( map( lambda x: os.path.commonpath( From a1fc91dded097e317535a1508116c86bfd320830 Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Tue, 8 Apr 2025 19:49:51 -0700 Subject: [PATCH 03/15] support data movement into PVC before launching job Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 111 ++++++++++++------ .../run/torchx_backend/schedulers/dgxcloud.py | 2 +- 2 files changed, 74 insertions(+), 39 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 1f9d9b2d..72933ef6 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -16,6 +16,7 @@ from nemo_run.core.execution.base import Executor, ExecutorMacros from nemo_run.core.packaging.base import Packager from nemo_run.core.packaging.git import GitArchivePackager +from nemo_run.config import get_nemorun_home logger = logging.getLogger(__name__) @@ -56,6 +57,8 @@ class DGXCloudExecutor(Executor): nodes: int = 1 gpus_per_node: int = 0 nprocs_per_node: int = 1 + pvc_nemo_run_dir: str + pvc_job_dir: str = field(init=False, default="") pvcs: list[dict[str, Any]] = field(default_factory=list) distributed_framework: str = "PyTorch" custom_spec: dict[str, Any] = field(default_factory=dict) @@ -93,14 +96,10 @@ def get_project_and_cluster_id(self, token: str) -> tuple[Optional[str], Optiona break return project_id, cluster_id - def copy_directory_command(self): - # Local directory path to copy - local_dir_path = "/home/roclark/.nemo_run/experiments/nemo.collections.llm.api.pretrain/nemo.collections.llm.api.pretrain_1743621953" - - # Destination path inside the pod - dest_path = self.job_dir - + def copy_directory_data_command(self, local_dir_path, dest_path): # Create a temporary directory to hold the tarball + directory_name=os.path.basename(local_dir_path) + with tempfile.TemporaryDirectory() as temp_dir: tarball_path = os.path.join(temp_dir, "archive.tar.gz") @@ -115,18 +114,19 @@ def copy_directory_command(self): # Encode the tarball data to base64 encoded_data = base64.b64encode(file_data).decode("utf-8") - # Command to decode base64 data, save to a file, and extract inside the pod - cmd = f"mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz" + # Delete and recreate directory if it exists already, command to decode base64 data, save to a file, and extract inside the pod + cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz" return cmd - def create_data_mover_workload( - self, token: str, project_id: str, cluster_id: str, name: str, cmd: list[str] + self, token: str, project_id: str, cluster_id: str, name: str ): """ - Creates an cpu only workload to move data into PVC using the provided project/cluster IDs. + Creates an cpu only workload to move project directory into PVC using the provided project/cluster IDs. """ - headers = self._default_headers(token=token) + + cmd=self.copy_directory_data_command(self.job_dir, self.pvc_job_dir) + url = f"{self.base_url}/workloads/workspaces" headers = self._default_headers(token=token) @@ -176,12 +176,12 @@ def workload_completed( return False def delete_workload( - self, token: str, workload_id: str + self, token: str, workload_id: str, sleep: int = 10 ): while not self.workload_completed(token, workload_id): - time.sleep(10) + time.sleep(sleep) - url = f"{self.base_url}/workloads/{workload_id}" + url = f"{self.base_url}/workloads/workspaces/{workload_id}" headers = self._default_headers(token=token) response = requests.delete(url, headers=headers) @@ -194,21 +194,13 @@ def delete_workload( return response def create_distributed_job( - self, token: str, project_id: str, cluster_id: str, name: str, cmd: list[str] + self, token: str, project_id: str, cluster_id: str, name: str ): """ Creates a distributed PyTorch job using the provided project/cluster IDs. """ url = f"{self.base_url}/workloads/distributed" headers = self._default_headers(token=token) - ##do we need to include this as a script? this could be in the command instead - launch_script = f""" -ln -s {self.job_dir} /nemo_run -cd /nemo_run/code -{" ".join(cmd)} -""" - with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f: - f.write(launch_script) payload = { "name": name, @@ -216,7 +208,7 @@ def create_distributed_job( "projectId": project_id, "clusterId": cluster_id, "spec": { - "command": f"/bin/bash {self.job_dir}/launch_script.sh", + "command": f"/bin/bash {self.pvc_job_dir}/launch_script.sh", "image": self.container_image, "distributedFramework": self.distributed_framework, "minReplicas": self.nodes, @@ -248,8 +240,39 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: project_id, cluster_id = self.get_project_and_cluster_id(token) if not project_id or not cluster_id: raise RuntimeError("Unable to determine project/cluster IDs for job submission") + + #prepare launch script and move data to PVC + launch_script = f""" +ln -s {self.pvc_job_dir}/ /nemo_run +cd /nemo_run/code +{" ".join(cmd)} +""" + with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f: + f.write(launch_script) + + logger.info("Creating data movement workload") + resp = self.create_data_mover_workload(token, project_id, cluster_id, "data-mover") + if resp.status_code not in [200, 202]: + raise RuntimeError(f"Failed to copy data to PVC, status_code={resp.status_code}") + + workload_id= resp.json()["workloadId"] + resp = self.delete_workload(token, workload_id) + if resp.status_code >= 200 and resp.status_code < 300: + logger.info( + "Successfully delete data movement workload %s on DGX with response code %d", + workload_id, + resp.status_code, + ) + else: + logger.error( + "Failed to delete data movement workload %s, response code=%d, reason=%s", + workload_id, + resp.status_code, + resp.text, + ) - resp = self.create_distributed_job(token, project_id, cluster_id, name, cmd) + logger.info("Creating distributed workload") + resp = self.create_distributed_job(token, project_id, cluster_id, name) if resp.status_code not in [200, 202]: raise RuntimeError(f"Failed to create job, status_code={resp.status_code}") @@ -329,21 +352,33 @@ def assign( self.job_name = task_id self.experiment_dir = exp_dir self.job_dir = os.path.join(exp_dir, task_dir) + + ## setting linked PVC experiment and job directories + job_subdir = self.job_dir[len(get_nemorun_home())+1:] #+1 to remove the initial backslash + self.pvc_job_dir = os.path.join(self.pvc_nemo_run_dir, job_subdir) + + logger.info( + "PVC job directory set as: %s", + self.pvc_job_dir, + ) self.experiment_id = exp_id - ## TODO: need to figure out how to track location of data in PVC and in the local environment - - assert any( - map( - lambda x: os.path.commonpath( - [os.path.abspath(x["path"]), os.path.abspath(self.job_dir)] + def package_configs(self, *cfgs: tuple[str, str]) -> list[str]: + filenames = [] + basepath = os.path.join(self.job_dir, "configs") + for name, cfg in cfgs: + filename = os.path.join(basepath, name) + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, "w") as f: + f.write(cfg) + filenames.append( + os.path.join( + "/nemo_run/configs", + name, ) - == os.path.abspath(x["path"]), - self.pvcs, ) - ), ( - f"Need to specify atleast one PVC containing {self.job_dir}.\nTo update job dir to a PVC path, you can use set_nemorun_home() or the NEMORUN_HOME env var." - ) + + return filenames def package(self, packager: Packager, job_name: str): assert self.experiment_id, "Executor not assigned to an experiment." diff --git a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py index fbf1253e..27e0282a 100644 --- a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py +++ b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py @@ -87,7 +87,7 @@ def _submit_dryrun( # type: ignore cfg: Executor, ) -> AppDryRunInfo[DGXRequest]: assert isinstance(cfg, DGXCloudExecutor), ( - f"{cfg.__class__} not supported for skypilot scheduler." + f"{cfg.__class__} not supported for DGXCloud scheduler." ) executor = cfg From fa903b6788cc9219c16912b66c640725d4d9951c Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Wed, 9 Apr 2025 11:01:34 -0700 Subject: [PATCH 04/15] updating comments Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 72933ef6..c9e0c3ef 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -114,7 +114,7 @@ def copy_directory_data_command(self, local_dir_path, dest_path): # Encode the tarball data to base64 encoded_data = base64.b64encode(file_data).decode("utf-8") - # Delete and recreate directory if it exists already, command to decode base64 data, save to a file, and extract inside the pod + # Delete and recreate directory if it already exists, command to decode base64 data, save to a file, and extract inside the pod cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz" return cmd @@ -122,7 +122,7 @@ def create_data_mover_workload( self, token: str, project_id: str, cluster_id: str, name: str ): """ - Creates an cpu only workload to move project directory into PVC using the provided project/cluster IDs. + Creates an cpu only workload to move job directory into PVC using the provided project/cluster IDs. """ cmd=self.copy_directory_data_command(self.job_dir, self.pvc_job_dir) @@ -353,8 +353,8 @@ def assign( self.experiment_dir = exp_dir self.job_dir = os.path.join(exp_dir, task_dir) - ## setting linked PVC experiment and job directories - job_subdir = self.job_dir[len(get_nemorun_home())+1:] #+1 to remove the initial backslash + # setting linked PVC job directory + job_subdir = self.job_dir[len(get_nemorun_home())+1:] # +1 to remove the initial backslash self.pvc_job_dir = os.path.join(self.pvc_nemo_run_dir, job_subdir) logger.info( From 3818420974959a714de4e71f2c0f0845efab1a22 Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Wed, 9 Apr 2025 14:31:12 -0700 Subject: [PATCH 05/15] refactored data movement to custom move_data function and modified code to all use status to get workload status Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 122 +++++++++++++--------------- 1 file changed, 55 insertions(+), 67 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index c9e0c3ef..e3d3b4b1 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -96,30 +96,23 @@ def get_project_and_cluster_id(self, token: str) -> tuple[Optional[str], Optiona break return project_id, cluster_id - def copy_directory_data_command(self, local_dir_path, dest_path): - # Create a temporary directory to hold the tarball - directory_name=os.path.basename(local_dir_path) - + def copy_directory_data_command(self, local_dir_path, dest_path) -> str: with tempfile.TemporaryDirectory() as temp_dir: tarball_path = os.path.join(temp_dir, "archive.tar.gz") - - # Create a tarball of the directory subprocess.run(f"tar -czf {tarball_path} -C {local_dir_path} .", shell=True, check=True) - - # Read the tarball with open(tarball_path, "rb") as file: file_data = file.read() - - # Encode the tarball data to base64 encoded_data = base64.b64encode(file_data).decode("utf-8") # Delete and recreate directory if it already exists, command to decode base64 data, save to a file, and extract inside the pod cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz" return cmd + raise RuntimeError(f"Failed to generate data movement command") + def create_data_mover_workload( - self, token: str, project_id: str, cluster_id: str, name: str + self, token: str, project_id: str, cluster_id: str ): """ Creates an cpu only workload to move job directory into PVC using the provided project/cluster IDs. @@ -131,7 +124,7 @@ def create_data_mover_workload( headers = self._default_headers(token=token) payload = { - "name": name, + "name": "data-mover", "useGivenNameAsPrefix": True, "projectId": project_id, "clusterId": cluster_id, @@ -153,34 +146,9 @@ def create_data_mover_workload( return response - def workload_completed( - self, token: str, workload_id: str - ): - """ - Checks the status of the interactive workload - """ - - url = f"{self.base_url}/workloads/{workload_id}" - headers = self._default_headers(token=token) - - response = requests.get(url, headers=headers) - - if response.status_code not in [200, 202]: - raise RuntimeError(f"Failed to get workload status, status_code={response.status_code}") - - phase = response.json()["phase"] - - if phase == "Completed": - return True - else: - return False - def delete_workload( self, token: str, workload_id: str, sleep: int = 10 ): - while not self.workload_completed(token, workload_id): - time.sleep(sleep) - url = f"{self.base_url}/workloads/workspaces/{workload_id}" headers = self._default_headers(token=token) @@ -192,6 +160,43 @@ def delete_workload( response.text.strip(), ) return response + + def move_data( + self, token: str, project_id: str, cluster_id: str, sleep: float = 10 + ) -> None: + """ + Moves job directory into PVC and deletes the workload after completion + """ + + resp = self.create_data_mover_workload(token, project_id, cluster_id) + if resp.status_code not in [200, 202]: + raise RuntimeError(f"Failed to create data mover workload, status_code={resp.status_code}") + + resp_json = resp.json() + workload_id = resp_json["workloadId"] + status = DGXCloudState(resp_json["actualPhase"]) + + while status is DGXCloudState.PENDING or status is DGXCloudState.CREATING or status is DGXCloudState.INITIALIZING or status is DGXCloudState.RUNNING: + time.sleep(sleep) + status=self.status(workload_id) + + if status is not DGXCloudState.COMPLETED: + raise RuntimeError(f"Failed to move data to PVC") + + resp = self.delete_workload(token, workload_id) + if resp.status_code >= 200 and resp.status_code < 300: + logger.info( + "Successfully deleted data movement workload %s on DGXCloud with response code %d", + workload_id, + resp.status_code, + ) + else: + logger.error( + "Failed to delete data movement workload %s, response code=%d, reason=%s", + workload_id, + resp.status_code, + resp.text, + ) def create_distributed_job( self, token: str, project_id: str, cluster_id: str, name: str @@ -199,6 +204,7 @@ def create_distributed_job( """ Creates a distributed PyTorch job using the provided project/cluster IDs. """ + url = f"{self.base_url}/workloads/distributed" headers = self._default_headers(token=token) @@ -251,34 +257,17 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: f.write(launch_script) logger.info("Creating data movement workload") - resp = self.create_data_mover_workload(token, project_id, cluster_id, "data-mover") - if resp.status_code not in [200, 202]: - raise RuntimeError(f"Failed to copy data to PVC, status_code={resp.status_code}") + self.move_data(token, project_id, cluster_id) + return "", "" - workload_id= resp.json()["workloadId"] - resp = self.delete_workload(token, workload_id) - if resp.status_code >= 200 and resp.status_code < 300: - logger.info( - "Successfully delete data movement workload %s on DGX with response code %d", - workload_id, - resp.status_code, - ) - else: - logger.error( - "Failed to delete data movement workload %s, response code=%d, reason=%s", - workload_id, - resp.status_code, - resp.text, - ) + # logger.info("Creating distributed workload") + # resp = self.create_distributed_job(token, project_id, cluster_id, name) + # if resp.status_code not in [200, 202]: + # raise RuntimeError(f"Failed to create job, status_code={resp.status_code}") - logger.info("Creating distributed workload") - resp = self.create_distributed_job(token, project_id, cluster_id, name) - if resp.status_code not in [200, 202]: - raise RuntimeError(f"Failed to create job, status_code={resp.status_code}") - - r_json = resp.json() - job_id = r_json["workloadId"] - status = r_json["actualPhase"] + # r_json = resp.json() + # job_id = r_json["workloadId"] + # status = r_json["actualPhase"] return job_id, status def nnodes(self) -> int: @@ -294,10 +283,10 @@ def nproc_per_node(self) -> int: return 1 def status(self, job_id: str) -> Optional[DGXCloudState]: - url = f"{self.base_url}/workloads/distributed/{job_id}" + url = f"{self.base_url}/workloads/{job_id}" token = self.get_auth_token() if not token: - logger.error("Failed to retrieve auth token for cancellation request.") + logger.error("Failed to retrieve auth token for status request.") return None headers = self._default_headers(token=token) @@ -306,7 +295,7 @@ def status(self, job_id: str) -> Optional[DGXCloudState]: return DGXCloudState("Unknown") r_json = response.json() - return DGXCloudState(r_json["actualPhase"]) + return DGXCloudState(r_json["phase"]) def cancel(self, job_id: str): # Retrieve the authentication token for the REST calls @@ -377,7 +366,6 @@ def package_configs(self, *cfgs: tuple[str, str]) -> list[str]: name, ) ) - return filenames def package(self, packager: Packager, job_name: str): From 3067bfde5686d1fc34e11818e654297e00b4f4d1 Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Wed, 9 Apr 2025 14:32:07 -0700 Subject: [PATCH 06/15] comment back commented out code Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index e3d3b4b1..2b961709 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -258,16 +258,15 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: logger.info("Creating data movement workload") self.move_data(token, project_id, cluster_id) - return "", "" - # logger.info("Creating distributed workload") - # resp = self.create_distributed_job(token, project_id, cluster_id, name) - # if resp.status_code not in [200, 202]: - # raise RuntimeError(f"Failed to create job, status_code={resp.status_code}") + logger.info("Creating distributed workload") + resp = self.create_distributed_job(token, project_id, cluster_id, name) + if resp.status_code not in [200, 202]: + raise RuntimeError(f"Failed to create job, status_code={resp.status_code}") - # r_json = resp.json() - # job_id = r_json["workloadId"] - # status = r_json["actualPhase"] + r_json = resp.json() + job_id = r_json["workloadId"] + status = r_json["actualPhase"] return job_id, status def nnodes(self) -> int: From a68bfb7f50e6361bd6b89f6591db4a18caaf01fc Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Wed, 9 Apr 2025 15:35:06 -0700 Subject: [PATCH 07/15] formatting and adding back PVC checker Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 71 +++++++++++++++++------------ 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 2b961709..e4db4bdf 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -98,7 +98,6 @@ def get_project_and_cluster_id(self, token: str) -> tuple[Optional[str], Optiona def copy_directory_data_command(self, local_dir_path, dest_path) -> str: with tempfile.TemporaryDirectory() as temp_dir: - tarball_path = os.path.join(temp_dir, "archive.tar.gz") subprocess.run(f"tar -czf {tarball_path} -C {local_dir_path} .", shell=True, check=True) with open(tarball_path, "rb") as file: @@ -109,16 +108,14 @@ def copy_directory_data_command(self, local_dir_path, dest_path) -> str: cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz" return cmd - raise RuntimeError(f"Failed to generate data movement command") + raise RuntimeError("Failed to generate data movement command") - def create_data_mover_workload( - self, token: str, project_id: str, cluster_id: str - ): + def create_data_mover_workload(self, token: str, project_id: str, cluster_id: str): """ Creates an cpu only workload to move job directory into PVC using the provided project/cluster IDs. """ - cmd=self.copy_directory_data_command(self.job_dir, self.pvc_job_dir) + cmd = self.copy_directory_data_command(self.job_dir, self.pvc_job_dir) url = f"{self.base_url}/workloads/workspaces" headers = self._default_headers(token=token) @@ -133,7 +130,7 @@ def create_data_mover_workload( "args": f"'{cmd}'", "image": "busybox:1.37.0", "storage": {"pvc": self.pvcs}, - } + }, } response = requests.post(url, json=payload, headers=headers) @@ -146,9 +143,7 @@ def create_data_mover_workload( return response - def delete_workload( - self, token: str, workload_id: str, sleep: int = 10 - ): + def delete_workload(self, token: str, workload_id: str, sleep: int = 10): url = f"{self.base_url}/workloads/workspaces/{workload_id}" headers = self._default_headers(token=token) @@ -160,28 +155,33 @@ def delete_workload( response.text.strip(), ) return response - - def move_data( - self, token: str, project_id: str, cluster_id: str, sleep: float = 10 - ) -> None: + + def move_data(self, token: str, project_id: str, cluster_id: str, sleep: float = 10) -> None: """ - Moves job directory into PVC and deletes the workload after completion + Moves job directory into PVC and deletes the workload after completion """ resp = self.create_data_mover_workload(token, project_id, cluster_id) if resp.status_code not in [200, 202]: - raise RuntimeError(f"Failed to create data mover workload, status_code={resp.status_code}") + raise RuntimeError( + f"Failed to create data mover workload, status_code={resp.status_code}" + ) resp_json = resp.json() workload_id = resp_json["workloadId"] status = DGXCloudState(resp_json["actualPhase"]) - while status is DGXCloudState.PENDING or status is DGXCloudState.CREATING or status is DGXCloudState.INITIALIZING or status is DGXCloudState.RUNNING: + while ( + status is DGXCloudState.PENDING + or status is DGXCloudState.CREATING + or status is DGXCloudState.INITIALIZING + or status is DGXCloudState.RUNNING + ): time.sleep(sleep) - status=self.status(workload_id) + status = self.status(workload_id) if status is not DGXCloudState.COMPLETED: - raise RuntimeError(f"Failed to move data to PVC") + raise RuntimeError("Failed to move data to PVC") resp = self.delete_workload(token, workload_id) if resp.status_code >= 200 and resp.status_code < 300: @@ -198,9 +198,7 @@ def move_data( resp.text, ) - def create_distributed_job( - self, token: str, project_id: str, cluster_id: str, name: str - ): + def create_distributed_job(self, token: str, project_id: str, cluster_id: str, name: str): """ Creates a distributed PyTorch job using the provided project/cluster IDs. """ @@ -246,8 +244,8 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: project_id, cluster_id = self.get_project_and_cluster_id(token) if not project_id or not cluster_id: raise RuntimeError("Unable to determine project/cluster IDs for job submission") - - #prepare launch script and move data to PVC + + # prepare launch script and move data to PVC launch_script = f""" ln -s {self.pvc_job_dir}/ /nemo_run cd /nemo_run/code @@ -340,15 +338,28 @@ def assign( self.job_name = task_id self.experiment_dir = exp_dir self.job_dir = os.path.join(exp_dir, task_dir) - + assert any( + map( + lambda x: os.path.commonpath( + [os.path.abspath(x["path"]), os.path.abspath(self.pvc_nemo_run_dir)] + ) + == os.path.abspath(x["path"]), + self.pvcs, + ) + ), ( + f"Need to specify atleast one PVC containing {self.pvc_nemo_run_dir}. Update your PVC path or pvc_nemo_run_dir." + ) + # setting linked PVC job directory - job_subdir = self.job_dir[len(get_nemorun_home())+1:] # +1 to remove the initial backslash + job_subdir = self.job_dir[ + len(get_nemorun_home()) + 1 : + ] # +1 to remove the initial backslash self.pvc_job_dir = os.path.join(self.pvc_nemo_run_dir, job_subdir) logger.info( - "PVC job directory set as: %s", - self.pvc_job_dir, - ) + "PVC job directory set as: %s", + self.pvc_job_dir, + ) self.experiment_id = exp_id def package_configs(self, *cfgs: tuple[str, str]) -> list[str]: @@ -405,4 +416,4 @@ def _default_headers(self, token: Optional[str] = None) -> dict: } if token: headers["Authorization"] = f"Bearer {token}" - return headers \ No newline at end of file + return headers From f39964a3df8cad1f9deb772f8bef104f240265e6 Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Wed, 9 Apr 2025 22:57:32 -0700 Subject: [PATCH 08/15] fixing DGXC Executor tests for data mover redesign Signed-off-by: Zoey Zhang --- test/core/execution/test_dgxcloud.py | 214 ++++++++++-------- .../schedulers/test_dgxcloud.py | 2 + 2 files changed, 127 insertions(+), 89 deletions(-) diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index b0181091..68583258 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -22,7 +22,7 @@ from nemo_run.core.execution.dgxcloud import DGXCloudExecutor, DGXCloudState from nemo_run.core.packaging.git import GitArchivePackager - +from nemo_run.config import set_nemorun_home class TestDGXCloudExecutor: def test_init(self): @@ -34,6 +34,7 @@ def test_init(self): container_image="nvcr.io/nvidia/test:latest", nodes=2, gpus_per_node=8, + pvc_nemo_run_dir="/workspace/nemo_run", pvcs=[{"path": "/workspace", "claimName": "test-claim"}], ) @@ -46,6 +47,7 @@ def test_init(self): assert executor.gpus_per_node == 8 assert executor.pvcs == [{"path": "/workspace", "claimName": "test-claim"}] assert executor.distributed_framework == "PyTorch" + assert executor.pvc_nemo_run_dir == "/workspace/nemo_run" @patch("requests.post") def test_get_auth_token_success(self, mock_post): @@ -59,6 +61,7 @@ def test_get_auth_token_success(self, mock_post): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) token = executor.get_auth_token() @@ -86,6 +89,7 @@ def test_get_auth_token_failure(self, mock_post): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) token = executor.get_auth_token() @@ -104,6 +108,7 @@ def test_get_project_and_cluster_id_success(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) project_id, cluster_id = executor.get_project_and_cluster_id("test_token") @@ -129,6 +134,7 @@ def test_get_project_and_cluster_id_not_found(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) project_id, cluster_id = executor.get_project_and_cluster_id("test_token") @@ -143,50 +149,50 @@ def test_create_distributed_job(self, mock_post): mock_response.text = '{"status": "submitted"}' mock_post.return_value = mock_response - with tempfile.TemporaryDirectory() as tmp_dir: - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - nodes=2, - gpus_per_node=8, - pvcs=[{"path": tmp_dir, "claimName": "test-claim"}], - ) - executor.job_dir = tmp_dir - executor.env_vars = {"TEST_VAR": "test_value"} - - response = executor.create_distributed_job( - token="test_token", - project_id="proj_id", - cluster_id="cluster_id", - name="test_job", - cmd=["python", "train.py"], - ) - - assert response == mock_response - assert os.path.exists(os.path.join(tmp_dir, "launch_script.sh")) + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + nodes=2, + gpus_per_node=8, + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + executor.pvc_job_dir= "/workspace/nemo_run/job_dir" + executor.env_vars = {"TEST_VAR": "test_value"} + + response = executor.create_distributed_job( + token="test_token", + project_id="proj_id", + cluster_id="cluster_id", + name="test_job", + ) - # Check if the API call is made correctly - mock_post.assert_called_once() - # The URL is the first argument to post - args, kwargs = mock_post.call_args - assert kwargs["json"]["name"] == "test_job" - assert kwargs["json"]["projectId"] == "proj_id" - assert kwargs["json"]["clusterId"] == "cluster_id" - assert kwargs["json"]["spec"]["image"] == "nvcr.io/nvidia/test:latest" - assert kwargs["json"]["spec"]["numWorkers"] == 2 - assert kwargs["json"]["spec"]["compute"]["gpuDevicesRequest"] == 8 - assert kwargs["json"]["spec"]["environmentVariables"] == [ - {"name": "TEST_VAR", "value": "test_value"} - ] - assert kwargs["headers"] == executor._default_headers(token="test_token") + assert response == mock_response + + # Check if the API call is made correctly + mock_post.assert_called_once() + # The URL is the first argument to post + args, kwargs = mock_post.call_args + assert kwargs["json"]["name"] == "test_job" + assert kwargs["json"]["projectId"] == "proj_id" + assert kwargs["json"]["clusterId"] == "cluster_id" + assert kwargs["json"]["spec"]["image"] == "nvcr.io/nvidia/test:latest" + assert kwargs["json"]["spec"]["command"] == "/bin/bash /workspace/nemo_run/job_dir/launch_script.sh" + assert kwargs["json"]["spec"]["numWorkers"] == 2 + assert kwargs["json"]["spec"]["compute"]["gpuDevicesRequest"] == 8 + assert kwargs["json"]["spec"]["environmentVariables"] == [ + {"name": "TEST_VAR", "value": "test_value"} + ] + assert kwargs["headers"] == executor._default_headers(token="test_token") @patch.object(DGXCloudExecutor, "get_auth_token") @patch.object(DGXCloudExecutor, "get_project_and_cluster_id") + @patch.object(DGXCloudExecutor, "move_data") @patch.object(DGXCloudExecutor, "create_distributed_job") - def test_launch_success(self, mock_create_job, mock_get_ids, mock_get_token): + def test_launch_success(self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token): mock_get_token.return_value = "test_token" mock_get_ids.return_value = ("proj_id", "cluster_id") @@ -195,23 +201,28 @@ def test_launch_success(self, mock_create_job, mock_get_ids, mock_get_token): mock_response.json.return_value = {"workloadId": "job123", "actualPhase": "Pending"} mock_create_job.return_value = mock_response - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - ) + with tempfile.TemporaryDirectory() as tmp_dir: + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + ) + executor.job_dir = tmp_dir - job_id, status = executor.launch("test_job", ["python", "train.py"]) + job_id, status = executor.launch("test_job", ["python", "train.py"]) - assert job_id == "job123" - assert status == "Pending" - mock_get_token.assert_called_once() - mock_get_ids.assert_called_once_with("test_token") - mock_create_job.assert_called_once_with( - "test_token", "proj_id", "cluster_id", "test-job", ["python", "train.py"] - ) + assert job_id == "job123" + assert status == "Pending" + assert os.path.exists(os.path.join(tmp_dir, "launch_script.sh")) + mock_get_token.assert_called_once() + mock_get_ids.assert_called_once_with("test_token") + mock_move_data.assert_called_once_with("test_token", "proj_id", "cluster_id") + mock_create_job.assert_called_once_with( + "test_token", "proj_id", "cluster_id", "test-job" + ) @patch.object(DGXCloudExecutor, "get_auth_token") def test_launch_no_token(self, mock_get_token): @@ -223,6 +234,7 @@ def test_launch_no_token(self, mock_get_token): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) with pytest.raises(RuntimeError, match="Failed to get auth token"): @@ -240,6 +252,7 @@ def test_launch_no_project_id(self, mock_get_ids, mock_get_token): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) with pytest.raises(RuntimeError, match="Unable to determine project/cluster IDs"): @@ -247,25 +260,29 @@ def test_launch_no_project_id(self, mock_get_ids, mock_get_token): @patch.object(DGXCloudExecutor, "get_auth_token") @patch.object(DGXCloudExecutor, "get_project_and_cluster_id") + @patch.object(DGXCloudExecutor, "move_data") @patch.object(DGXCloudExecutor, "create_distributed_job") - def test_launch_job_creation_failed(self, mock_create_job, mock_get_ids, mock_get_token): + def test_launch_job_creation_failed(self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token): mock_get_token.return_value = "test_token" mock_get_ids.return_value = ("proj_id", "cluster_id") mock_response = MagicMock() mock_response.status_code = 400 mock_create_job.return_value = mock_response + + with tempfile.TemporaryDirectory() as tmp_dir: + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + ) + executor.job_dir = tmp_dir - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - ) - - with pytest.raises(RuntimeError, match="Failed to create job"): - executor.launch("test_job", ["python", "train.py"]) + with pytest.raises(RuntimeError, match="Failed to create job"): + executor.launch("test_job", ["python", "train.py"]) def test_nnodes(self): executor = DGXCloudExecutor( @@ -275,6 +292,7 @@ def test_nnodes(self): project_name="test_project", container_image="nvcr.io/nvidia/test:latest", nodes=3, + pvc_nemo_run_dir="/workspace/nemo_run", ) assert executor.nnodes() == 3 @@ -287,6 +305,7 @@ def test_nproc_per_node_with_gpus(self): project_name="test_project", container_image="nvcr.io/nvidia/test:latest", gpus_per_node=4, + pvc_nemo_run_dir="/workspace/nemo_run", ) assert executor.nproc_per_node() == 4 @@ -300,6 +319,7 @@ def test_nproc_per_node_with_nprocs(self): container_image="nvcr.io/nvidia/test:latest", gpus_per_node=0, nprocs_per_node=3, + pvc_nemo_run_dir="/workspace/nemo_run", ) assert executor.nproc_per_node() == 3 @@ -313,6 +333,7 @@ def test_nproc_per_node_default(self): container_image="nvcr.io/nvidia/test:latest", gpus_per_node=0, nprocs_per_node=0, + pvc_nemo_run_dir="/workspace/nemo_run", ) assert executor.nproc_per_node() == 1 @@ -321,7 +342,7 @@ def test_nproc_per_node_default(self): def test_status(self, mock_get): mock_response = MagicMock() mock_response.status_code = 200 - mock_response.json.return_value = {"actualPhase": "Running"} + mock_response.json.return_value = {"phase": "Running"} mock_get.return_value = mock_response with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"): @@ -331,13 +352,14 @@ def test_status(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) status = executor.status("job123") assert status == DGXCloudState.RUNNING mock_get.assert_called_once_with( - "https://dgxapi.example.com/workloads/distributed/job123", + "https://dgxapi.example.com/workloads/job123", headers=executor._default_headers(token="test_token"), ) @@ -350,6 +372,7 @@ def test_status_no_token(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) status = executor.status("job123") @@ -370,6 +393,7 @@ def test_status_error_response(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) status = executor.status("job123") @@ -389,6 +413,7 @@ def test_cancel(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) executor.cancel("job123") @@ -407,6 +432,7 @@ def test_cancel_no_token(self, mock_get): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) executor.cancel("job123") @@ -420,28 +446,33 @@ def test_logs(self): assert "Logs not available" in mock_warning.call_args[0][0] def test_assign(self): - with tempfile.TemporaryDirectory() as tmp_dir: - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - pvcs=[{"path": tmp_dir, "claimName": "test-claim"}], - ) - task_dir = "test_task" - executor.assign( - exp_id="test_exp", - exp_dir=tmp_dir, - task_id="test_task", - task_dir=task_dir, - ) + set_nemorun_home("/nemo_home") + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir=f"/workspace/nemo_run", + pvcs=[{"path": "/workspace", "claimName": "test-claim"}], + ) + + task_dir = "test_task" + exp_dir= f"/nemo_home/experiments/experiment" + executor.assign( + exp_id="test_exp", + exp_dir=exp_dir, + task_id="test_task", + task_dir=task_dir, + ) - assert executor.job_name == "test_task" - assert executor.experiment_dir == tmp_dir - assert executor.job_dir == os.path.join(tmp_dir, task_dir) - assert executor.experiment_id == "test_exp" + assert executor.job_name == "test_task" + assert executor.experiment_dir == exp_dir + assert executor.job_dir == os.path.join(exp_dir, task_dir) + assert executor.pvc_job_dir == os.path.join("/workspace/nemo_run/experiments/experiment", task_dir) + assert executor.experiment_id == "test_exp" def test_assign_no_pvc(self): with tempfile.TemporaryDirectory() as tmp_dir: @@ -451,10 +482,11 @@ def test_assign_no_pvc(self): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", pvcs=[{"path": "/other/path", "claimName": "test-claim"}], ) - with pytest.raises(AssertionError, match="Need to specify atleast one PVC"): + with pytest.raises(AssertionError, match="Need to specify at least one PVC"): executor.assign( exp_id="test_exp", exp_dir=tmp_dir, @@ -480,6 +512,7 @@ def test_package_git_packager(self, mock_subprocess_run, mock_context_run): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", pvcs=[{"path": tmp_dir, "claimName": "test-claim"}], ) executor.experiment_id = "test_exp" @@ -505,6 +538,7 @@ def test_macro_values(self): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) result = executor.macro_values() @@ -518,6 +552,7 @@ def test_default_headers_without_token(self): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) headers = executor._default_headers() @@ -533,6 +568,7 @@ def test_default_headers_with_token(self): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", ) headers = executor._default_headers(token="test_token") diff --git a/test/run/torchx_backend/schedulers/test_dgxcloud.py b/test/run/torchx_backend/schedulers/test_dgxcloud.py index 41b30013..5a3f9ae9 100644 --- a/test/run/torchx_backend/schedulers/test_dgxcloud.py +++ b/test/run/torchx_backend/schedulers/test_dgxcloud.py @@ -42,6 +42,7 @@ def dgx_cloud_executor(): app_secret="test_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", job_dir=tempfile.mkdtemp(), ) @@ -151,6 +152,7 @@ def test_save_and_get_job_dirs(): project_name="test_project", container_image="test:latest", job_dir=temp_dir, + pvc_nemo_run_dir="/workspace/nemo_run", ) _save_job_dir("test_app_id", "RUNNING", executor) From d5f3d0499f48f97caa5455418a39cc6e09035f83 Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Wed, 9 Apr 2025 22:58:51 -0700 Subject: [PATCH 09/15] cleanup Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index e4db4bdf..bfc4d152 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -54,10 +54,10 @@ class DGXCloudExecutor(Executor): app_secret: str project_name: str container_image: str + pvc_nemo_run_dir: str nodes: int = 1 gpus_per_node: int = 0 nprocs_per_node: int = 1 - pvc_nemo_run_dir: str pvc_job_dir: str = field(init=False, default="") pvcs: list[dict[str, Any]] = field(default_factory=list) distributed_framework: str = "PyTorch" @@ -112,7 +112,7 @@ def copy_directory_data_command(self, local_dir_path, dest_path) -> str: def create_data_mover_workload(self, token: str, project_id: str, cluster_id: str): """ - Creates an cpu only workload to move job directory into PVC using the provided project/cluster IDs. + Creates a CPU only workload to move job directory into PVC using the provided project/cluster IDs. """ cmd = self.copy_directory_data_command(self.job_dir, self.pvc_job_dir) @@ -143,7 +143,7 @@ def create_data_mover_workload(self, token: str, project_id: str, cluster_id: st return response - def delete_workload(self, token: str, workload_id: str, sleep: int = 10): + def delete_workload(self, token: str, workload_id: str): url = f"{self.base_url}/workloads/workspaces/{workload_id}" headers = self._default_headers(token=token) @@ -171,12 +171,7 @@ def move_data(self, token: str, project_id: str, cluster_id: str, sleep: float = workload_id = resp_json["workloadId"] status = DGXCloudState(resp_json["actualPhase"]) - while ( - status is DGXCloudState.PENDING - or status is DGXCloudState.CREATING - or status is DGXCloudState.INITIALIZING - or status is DGXCloudState.RUNNING - ): + while status in [DGXCloudState.PENDING, DGXCloudState.CREATING, DGXCloudState.INITIALIZING, DGXCloudState.RUNNING]: time.sleep(sleep) status = self.status(workload_id) @@ -347,12 +342,13 @@ def assign( self.pvcs, ) ), ( - f"Need to specify atleast one PVC containing {self.pvc_nemo_run_dir}. Update your PVC path or pvc_nemo_run_dir." + f"Need to specify at least one PVC containing {self.pvc_nemo_run_dir}. Update your PVC path or pvc_nemo_run_dir." ) # setting linked PVC job directory + nemo_run_home = get_nemorun_home() job_subdir = self.job_dir[ - len(get_nemorun_home()) + 1 : + len(nemo_run_home) + 1 : ] # +1 to remove the initial backslash self.pvc_job_dir = os.path.join(self.pvc_nemo_run_dir, job_subdir) From 77bf2c861cba0da30f7b66de2f34ff8daad22b4a Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Wed, 9 Apr 2025 23:10:49 -0700 Subject: [PATCH 10/15] test package configs Signed-off-by: Zoey Zhang --- test/core/execution/test_dgxcloud.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index 68583258..73c8b655 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -16,7 +16,7 @@ import os import subprocess import tempfile -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, mock_open, patch import pytest @@ -494,6 +494,30 @@ def test_assign_no_pvc(self): task_dir="test_task", ) + @patch("os.makedirs") + @patch("builtins.open", new_callable=mock_open) + def test_package_configs(self, mock_file, mock_makedirs): + with tempfile.TemporaryDirectory() as tmp_dir: + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "/other/path", "claimName": "test-claim"}], + ) + + configs = [("config1.yaml", "key: value"), ("subdir/config2.yaml", "another: config")] + + filenames = executor.package_configs(*configs) + + assert len(filenames) == 2 + assert filenames[0] == "/nemo_run/configs/config1.yaml" + assert filenames[1] == "/nemo_run/configs/subdir/config2.yaml" + mock_makedirs.assert_called() + assert mock_file.call_count == 2 + @patch("invoke.context.Context.run") @patch("subprocess.run") def test_package_git_packager(self, mock_subprocess_run, mock_context_run): From 3142662d98cf88da2a31504862ba904f3f8f378a Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Thu, 10 Apr 2025 01:04:40 -0700 Subject: [PATCH 11/15] fix formatting Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 13 +- test/core/execution/test_dgxcloud.py | 237 ++++++++++++++++++++++++--- 2 files changed, 220 insertions(+), 30 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index bfc4d152..effc7766 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -96,7 +96,7 @@ def get_project_and_cluster_id(self, token: str) -> tuple[Optional[str], Optiona break return project_id, cluster_id - def copy_directory_data_command(self, local_dir_path, dest_path) -> str: + def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> str: with tempfile.TemporaryDirectory() as temp_dir: tarball_path = os.path.join(temp_dir, "archive.tar.gz") subprocess.run(f"tar -czf {tarball_path} -C {local_dir_path} .", shell=True, check=True) @@ -171,7 +171,12 @@ def move_data(self, token: str, project_id: str, cluster_id: str, sleep: float = workload_id = resp_json["workloadId"] status = DGXCloudState(resp_json["actualPhase"]) - while status in [DGXCloudState.PENDING, DGXCloudState.CREATING, DGXCloudState.INITIALIZING, DGXCloudState.RUNNING]: + while status in [ + DGXCloudState.PENDING, + DGXCloudState.CREATING, + DGXCloudState.INITIALIZING, + DGXCloudState.RUNNING, + ]: time.sleep(sleep) status = self.status(workload_id) @@ -347,9 +352,7 @@ def assign( # setting linked PVC job directory nemo_run_home = get_nemorun_home() - job_subdir = self.job_dir[ - len(nemo_run_home) + 1 : - ] # +1 to remove the initial backslash + job_subdir = self.job_dir[len(nemo_run_home) + 1 :] # +1 to remove the initial backslash self.pvc_job_dir = os.path.join(self.pvc_nemo_run_dir, job_subdir) logger.info( diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index 73c8b655..f54f9636 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -24,6 +24,7 @@ from nemo_run.core.packaging.git import GitArchivePackager from nemo_run.config import set_nemorun_home + class TestDGXCloudExecutor: def test_init(self): executor = DGXCloudExecutor( @@ -142,6 +143,187 @@ def test_get_project_and_cluster_id_not_found(self, mock_get): assert project_id is None assert cluster_id is None + @patch("subprocess.run") + @patch("builtins.open", new_callable=mock_open, read_data=b"mock tarball") + def test_copy_directory_data_command_success(self, mock_file, mock_subprocess): + local_dir_path = "/mock/local/dir" + dest_path = "/mock/destination/path" + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + response = executor.copy_directory_data_command(local_dir_path, dest_path) + + mock_subprocess.assert_called_once() + mock_file.call_count == 1 + + assert ( + "rm -rf /mock/destination/path && mkdir -p /mock/destination/path && echo" in response + ) + assert ( + "base64 -d > /mock/destination/path/archive.tar.gz && tar -xzf /mock/destination/path/archive.tar.gz -C /mock/destination/path && rm /mock/destination/path/archive.tar.gz" + in response + ) + + @patch("tempfile.TemporaryDirectory") + def test_copy_directory_data_command_fails(self, mock_tempdir): + local_dir_path = "/mock/local/dir" + dest_path = "/mock/destination/path" + + mock_tempdir.side_effect = OSError("Temporary directory creation failed") + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + with pytest.raises(OSError, match="Temporary directory creation failed"): + executor.copy_directory_data_command(local_dir_path, dest_path) + + @patch("requests.post") + @patch.object(DGXCloudExecutor, "copy_directory_data_command") + def test_create_data_mover_workload_success(self, mock_command, mock_post): + mock_command.return_value = "sleep infinity" + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = '{"status": "submitted"}' + mock_post.return_value = mock_response + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + + response = executor.create_data_mover_workload( + token="test_token", + project_id="proj_id", + cluster_id="cluster_id", + ) + + assert response == mock_response + + # Check if the API call is made correctly + mock_post.assert_called_once() + # The URL is the first argument to post + args, kwargs = mock_post.call_args + assert kwargs["json"]["projectId"] == "proj_id" + assert kwargs["json"]["clusterId"] == "cluster_id" + assert kwargs["json"]["spec"]["command"] == "sh -c" + assert kwargs["json"]["spec"]["args"] == "'sleep infinity'" + assert kwargs["headers"] == executor._default_headers(token="test_token") + + @patch("requests.delete") + def test_delete_workload(self, mock_delete): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_delete.return_value = mock_response + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + + response = executor.delete_workload(token="test_token", workload_id="job123") + + assert response == mock_response + mock_delete.assert_called_once_with( + "https://dgxapi.example.com/workloads/workspaces/job123", + headers=executor._default_headers(token="test_token"), + ) + + @patch.object(DGXCloudExecutor, "create_data_mover_workload") + @patch.object(DGXCloudExecutor, "status") + @patch.object(DGXCloudExecutor, "delete_workload") + def test_move_data_success(self, mock_delete, mock_status, mock_create): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"workloadId": "job123", "actualPhase": "Pending"} + mock_create.return_value = mock_response + mock_delete.return_value = mock_response + + mock_status.return_value = DGXCloudState.COMPLETED + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + + executor.move_data(token="test_token", project_id="proj_id", cluster_id="cluster_id") + + mock_create.assert_called_once_with("test_token", "proj_id", "cluster_id") + mock_status.assert_called() + mock_delete.assert_called_once_with("test_token", "job123") + + @patch.object(DGXCloudExecutor, "create_data_mover_workload") + def test_move_data_data_mover_fail(self, mock_create): + mock_response = MagicMock() + mock_response.status_code = 400 + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + + with pytest.raises(RuntimeError, match="Failed to create data mover workload"): + executor.move_data(token="test_token", project_id="proj_id", cluster_id="cluster_id") + + @patch.object(DGXCloudExecutor, "create_data_mover_workload") + @patch.object(DGXCloudExecutor, "status") + def test_move_data_failed(self, mock_status, mock_create): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"workloadId": "job123", "actualPhase": "Pending"} + mock_create.return_value = mock_response + + mock_status.return_value = DGXCloudState.FAILED + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + + with pytest.raises(RuntimeError, match="Failed to move data to PVC"): + executor.move_data(token="test_token", project_id="proj_id", cluster_id="cluster_id") + + mock_create.assert_called_once_with("test_token", "proj_id", "cluster_id") + mock_status.assert_called() + @patch("requests.post") def test_create_distributed_job(self, mock_post): mock_response = MagicMock() @@ -160,7 +342,7 @@ def test_create_distributed_job(self, mock_post): pvc_nemo_run_dir="/workspace/nemo_run", pvcs=[{"path": "workspace", "claimName": "test-claim"}], ) - executor.pvc_job_dir= "/workspace/nemo_run/job_dir" + executor.pvc_job_dir = "/workspace/nemo_run/job_dir" executor.env_vars = {"TEST_VAR": "test_value"} response = executor.create_distributed_job( @@ -180,7 +362,10 @@ def test_create_distributed_job(self, mock_post): assert kwargs["json"]["projectId"] == "proj_id" assert kwargs["json"]["clusterId"] == "cluster_id" assert kwargs["json"]["spec"]["image"] == "nvcr.io/nvidia/test:latest" - assert kwargs["json"]["spec"]["command"] == "/bin/bash /workspace/nemo_run/job_dir/launch_script.sh" + assert ( + kwargs["json"]["spec"]["command"] + == "/bin/bash /workspace/nemo_run/job_dir/launch_script.sh" + ) assert kwargs["json"]["spec"]["numWorkers"] == 2 assert kwargs["json"]["spec"]["compute"]["gpuDevicesRequest"] == 8 assert kwargs["json"]["spec"]["environmentVariables"] == [ @@ -262,14 +447,16 @@ def test_launch_no_project_id(self, mock_get_ids, mock_get_token): @patch.object(DGXCloudExecutor, "get_project_and_cluster_id") @patch.object(DGXCloudExecutor, "move_data") @patch.object(DGXCloudExecutor, "create_distributed_job") - def test_launch_job_creation_failed(self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token): + def test_launch_job_creation_failed( + self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token + ): mock_get_token.return_value = "test_token" mock_get_ids.return_value = ("proj_id", "cluster_id") mock_response = MagicMock() mock_response.status_code = 400 mock_create_job.return_value = mock_response - + with tempfile.TemporaryDirectory() as tmp_dir: executor = DGXCloudExecutor( base_url="https://dgxapi.example.com", @@ -446,7 +633,6 @@ def test_logs(self): assert "Logs not available" in mock_warning.call_args[0][0] def test_assign(self): - set_nemorun_home("/nemo_home") executor = DGXCloudExecutor( @@ -455,12 +641,12 @@ def test_assign(self): app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", - pvc_nemo_run_dir=f"/workspace/nemo_run", + pvc_nemo_run_dir="/workspace/nemo_run", pvcs=[{"path": "/workspace", "claimName": "test-claim"}], ) task_dir = "test_task" - exp_dir= f"/nemo_home/experiments/experiment" + exp_dir = "/nemo_home/experiments/experiment" executor.assign( exp_id="test_exp", exp_dir=exp_dir, @@ -471,7 +657,9 @@ def test_assign(self): assert executor.job_name == "test_task" assert executor.experiment_dir == exp_dir assert executor.job_dir == os.path.join(exp_dir, task_dir) - assert executor.pvc_job_dir == os.path.join("/workspace/nemo_run/experiments/experiment", task_dir) + assert executor.pvc_job_dir == os.path.join( + "/workspace/nemo_run/experiments/experiment", task_dir + ) assert executor.experiment_id == "test_exp" def test_assign_no_pvc(self): @@ -497,26 +685,25 @@ def test_assign_no_pvc(self): @patch("os.makedirs") @patch("builtins.open", new_callable=mock_open) def test_package_configs(self, mock_file, mock_makedirs): - with tempfile.TemporaryDirectory() as tmp_dir: - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - pvc_nemo_run_dir="/workspace/nemo_run", - pvcs=[{"path": "/other/path", "claimName": "test-claim"}], - ) + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "/other/path", "claimName": "test-claim"}], + ) - configs = [("config1.yaml", "key: value"), ("subdir/config2.yaml", "another: config")] + configs = [("config1.yaml", "key: value"), ("subdir/config2.yaml", "another: config")] - filenames = executor.package_configs(*configs) + filenames = executor.package_configs(*configs) - assert len(filenames) == 2 - assert filenames[0] == "/nemo_run/configs/config1.yaml" - assert filenames[1] == "/nemo_run/configs/subdir/config2.yaml" - mock_makedirs.assert_called() - assert mock_file.call_count == 2 + assert len(filenames) == 2 + assert filenames[0] == "/nemo_run/configs/config1.yaml" + assert filenames[1] == "/nemo_run/configs/subdir/config2.yaml" + mock_makedirs.assert_called() + assert mock_file.call_count == 2 @patch("invoke.context.Context.run") @patch("subprocess.run") From 1b42e64e4a3343f732d4ac6c57110b0a498a3b0d Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Thu, 10 Apr 2025 10:55:29 -0700 Subject: [PATCH 12/15] removing unreachable code Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index effc7766..71b0ecbe 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -108,8 +108,6 @@ def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> st cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz" return cmd - raise RuntimeError("Failed to generate data movement command") - def create_data_mover_workload(self, token: str, project_id: str, cluster_id: str): """ Creates a CPU only workload to move job directory into PVC using the provided project/cluster IDs. From 4c1558190574c78d14939afaf0e536f1cbc615d6 Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Mon, 14 Apr 2025 13:51:18 -0700 Subject: [PATCH 13/15] add flag for launching from inside the cluster Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 71b0ecbe..a6d677d3 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -55,6 +55,7 @@ class DGXCloudExecutor(Executor): project_name: str container_image: str pvc_nemo_run_dir: str + launched_from_cluster: bool = False nodes: int = 1 gpus_per_node: int = 0 nprocs_per_node: int = 1 @@ -251,9 +252,10 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: """ with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f: f.write(launch_script) - - logger.info("Creating data movement workload") - self.move_data(token, project_id, cluster_id) + + if not self.launched_from_cluster: + logger.info("Creating data movement workload") + self.move_data(token, project_id, cluster_id) logger.info("Creating distributed workload") resp = self.create_distributed_job(token, project_id, cluster_id, name) From 51aa582cd571090a1b6750ea817206dfac08c949 Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Mon, 14 Apr 2025 14:50:58 -0700 Subject: [PATCH 14/15] fix format Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index a6d677d3..67dc1a3b 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -252,7 +252,7 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: """ with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f: f.write(launch_script) - + if not self.launched_from_cluster: logger.info("Creating data movement workload") self.move_data(token, project_id, cluster_id) From 0a1c2aa75a1e3db3eedad5bcb8675af34b7ed4d8 Mon Sep 17 00:00:00 2001 From: Zoey Zhang Date: Thu, 17 Apr 2025 14:59:35 -0700 Subject: [PATCH 15/15] fixing nsys prefix Signed-off-by: Zoey Zhang --- nemo_run/core/execution/dgxcloud.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 67dc1a3b..9dafdb20 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -361,6 +361,11 @@ def assign( ) self.experiment_id = exp_id + def get_launcher_prefix(self) -> Optional[list[str]]: + launcher = self.get_launcher() + if launcher.nsys_profile: + return launcher.get_nsys_prefix(profile_dir="/nemo_run") + def package_configs(self, *cfgs: tuple[str, str]) -> list[str]: filenames = [] basepath = os.path.join(self.job_dir, "configs")