Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 162 additions & 18 deletions nemo_run/core/execution/dgxcloud.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import base64
import json
import logging
import os
import subprocess
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import time
import tempfile
from typing import Any, Optional, Type

import requests
Expand All @@ -13,6 +16,7 @@
from nemo_run.core.execution.base import Executor, ExecutorMacros
from nemo_run.core.packaging.base import Packager
from nemo_run.core.packaging.git import GitArchivePackager
from nemo_run.config import get_nemorun_home

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,9 +54,12 @@
app_secret: str
project_name: str
container_image: str
pvc_nemo_run_dir: str
launched_from_cluster: bool = False
nodes: int = 1
gpus_per_node: int = 0
nprocs_per_node: int = 1
pvc_job_dir: str = field(init=False, default="")
pvcs: list[dict[str, Any]] = field(default_factory=list)
distributed_framework: str = "PyTorch"
custom_spec: dict[str, Any] = field(default_factory=dict)
Expand Down Expand Up @@ -90,29 +97,121 @@
break
return project_id, cluster_id

def create_distributed_job(
self, token: str, project_id: str, cluster_id: str, name: str, cmd: list[str]
):
def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use rsync here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my research, rsync does not seem to have support for data movement to a kubernetes pod.

with tempfile.TemporaryDirectory() as temp_dir:
tarball_path = os.path.join(temp_dir, "archive.tar.gz")
subprocess.run(f"tar -czf {tarball_path} -C {local_dir_path} .", shell=True, check=True)
with open(tarball_path, "rb") as file:
file_data = file.read()
encoded_data = base64.b64encode(file_data).decode("utf-8")

# Delete and recreate directory if it already exists, command to decode base64 data, save to a file, and extract inside the pod
cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz"
return cmd

def create_data_mover_workload(self, token: str, project_id: str, cluster_id: str):
"""
Creates a CPU only workload to move job directory into PVC using the provided project/cluster IDs.
"""

cmd = self.copy_directory_data_command(self.job_dir, self.pvc_job_dir)

url = f"{self.base_url}/workloads/workspaces"
headers = self._default_headers(token=token)

payload = {
"name": "data-mover",
"useGivenNameAsPrefix": True,
"projectId": project_id,
"clusterId": cluster_id,
"spec": {
"command": "sh -c",
"args": f"'{cmd}'",
"image": "busybox:1.37.0",
"storage": {"pvc": self.pvcs},
},
}

response = requests.post(url, json=payload, headers=headers)

logger.debug(
"Created workload; response code=%s, content=%s",
response.status_code,
response.text.strip(),
)

return response

def delete_workload(self, token: str, workload_id: str):
url = f"{self.base_url}/workloads/workspaces/{workload_id}"
headers = self._default_headers(token=token)

response = requests.delete(url, headers=headers)

logger.debug(
"Delete interactive workspace; response code=%s, content=%s",
response.status_code,
response.text.strip(),
)
return response

def move_data(self, token: str, project_id: str, cluster_id: str, sleep: float = 10) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to keep a single data mover workload alive? Creating and deleting for each job will result in longer submission times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be a good next step!

Our biggest concern was that the data mover workload takes up unnecessary space in an gpu node if its stays open as the run:ai scheduler does not prevent cpu workloads from being scheduled onto gpu nodes. We would also have to connect build kubernetes python sdk support to copy data into the data mover workload.
In testing, the data mover workload seems to be created and deleted pretty quickly so for now I think its the quickest way to build this support in.

"""
Moves job directory into PVC and deletes the workload after completion
"""

resp = self.create_data_mover_workload(token, project_id, cluster_id)
if resp.status_code not in [200, 202]:
raise RuntimeError(
f"Failed to create data mover workload, status_code={resp.status_code}"
)

resp_json = resp.json()
workload_id = resp_json["workloadId"]
status = DGXCloudState(resp_json["actualPhase"])

while status in [
DGXCloudState.PENDING,
DGXCloudState.CREATING,
DGXCloudState.INITIALIZING,
DGXCloudState.RUNNING,
]:
time.sleep(sleep)
status = self.status(workload_id)

if status is not DGXCloudState.COMPLETED:
raise RuntimeError("Failed to move data to PVC")

resp = self.delete_workload(token, workload_id)
if resp.status_code >= 200 and resp.status_code < 300:
logger.info(
"Successfully deleted data movement workload %s on DGXCloud with response code %d",
workload_id,
resp.status_code,
)
else:
logger.error(
"Failed to delete data movement workload %s, response code=%d, reason=%s",
workload_id,
resp.status_code,
resp.text,
)

def create_distributed_job(self, token: str, project_id: str, cluster_id: str, name: str):
"""
Creates a distributed PyTorch job using the provided project/cluster IDs.
"""

url = f"{self.base_url}/workloads/distributed"
headers = self._default_headers(token=token)
launch_script = f"""
ln -s {self.job_dir} /nemo_run
cd /nemo_run/code
{" ".join(cmd)}
"""
with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f:
f.write(launch_script)

payload = {
"name": name,
"useGivenNameAsPrefix": True,
"projectId": project_id,
"clusterId": cluster_id,
"spec": {
"command": f"/bin/bash {self.job_dir}/launch_script.sh",
"command": f"/bin/bash {self.pvc_job_dir}/launch_script.sh",
"image": self.container_image,
"distributedFramework": self.distributed_framework,
"minReplicas": self.nodes,
Expand Down Expand Up @@ -145,7 +244,21 @@
if not project_id or not cluster_id:
raise RuntimeError("Unable to determine project/cluster IDs for job submission")

resp = self.create_distributed_job(token, project_id, cluster_id, name, cmd)
# prepare launch script and move data to PVC
launch_script = f"""
ln -s {self.pvc_job_dir}/ /nemo_run
cd /nemo_run/code
{" ".join(cmd)}
"""
with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f:
f.write(launch_script)

if not self.launched_from_cluster:
logger.info("Creating data movement workload")
self.move_data(token, project_id, cluster_id)

logger.info("Creating distributed workload")
resp = self.create_distributed_job(token, project_id, cluster_id, name)
if resp.status_code not in [200, 202]:
raise RuntimeError(f"Failed to create job, status_code={resp.status_code}")

Expand All @@ -167,10 +280,10 @@
return 1

def status(self, job_id: str) -> Optional[DGXCloudState]:
url = f"{self.base_url}/workloads/distributed/{job_id}"
url = f"{self.base_url}/workloads/{job_id}"
token = self.get_auth_token()
if not token:
logger.error("Failed to retrieve auth token for cancellation request.")
logger.error("Failed to retrieve auth token for status request.")
return None

headers = self._default_headers(token=token)
Expand All @@ -179,7 +292,7 @@
return DGXCloudState("Unknown")

r_json = response.json()
return DGXCloudState(r_json["actualPhase"])
return DGXCloudState(r_json["phase"])

def cancel(self, job_id: str):
# Retrieve the authentication token for the REST calls
Expand Down Expand Up @@ -225,18 +338,49 @@
self.job_name = task_id
self.experiment_dir = exp_dir
self.job_dir = os.path.join(exp_dir, task_dir)
self.experiment_id = exp_id
assert any(
map(
lambda x: os.path.commonpath(
[os.path.abspath(x["path"]), os.path.abspath(self.job_dir)]
[os.path.abspath(x["path"]), os.path.abspath(self.pvc_nemo_run_dir)]
)
== os.path.abspath(x["path"]),
self.pvcs,
)
), (
f"Need to specify atleast one PVC containing {self.job_dir}.\nTo update job dir to a PVC path, you can use set_nemorun_home() or the NEMORUN_HOME env var."
f"Need to specify at least one PVC containing {self.pvc_nemo_run_dir}. Update your PVC path or pvc_nemo_run_dir."
)

# setting linked PVC job directory
nemo_run_home = get_nemorun_home()
job_subdir = self.job_dir[len(nemo_run_home) + 1 :] # +1 to remove the initial backslash
self.pvc_job_dir = os.path.join(self.pvc_nemo_run_dir, job_subdir)

logger.info(
"PVC job directory set as: %s",
self.pvc_job_dir,
)
self.experiment_id = exp_id

def get_launcher_prefix(self) -> Optional[list[str]]:

Check notice

Code scanning / CodeQL

Explicit returns mixed with implicit (fall through) returns Note

Mixing implicit and explicit returns may indicate an error, as implicit returns always return None.
launcher = self.get_launcher()
if launcher.nsys_profile:
return launcher.get_nsys_prefix(profile_dir="/nemo_run")

def package_configs(self, *cfgs: tuple[str, str]) -> list[str]:
filenames = []
basepath = os.path.join(self.job_dir, "configs")
for name, cfg in cfgs:
filename = os.path.join(basepath, name)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w") as f:
f.write(cfg)
filenames.append(
os.path.join(
"/nemo_run/configs",
name,
)
)
return filenames

def package(self, packager: Packager, job_name: str):
assert self.experiment_id, "Executor not assigned to an experiment."
Expand Down
2 changes: 1 addition & 1 deletion nemo_run/run/torchx_backend/schedulers/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _submit_dryrun( # type: ignore
cfg: Executor,
) -> AppDryRunInfo[DGXRequest]:
assert isinstance(cfg, DGXCloudExecutor), (
f"{cfg.__class__} not supported for skypilot scheduler."
f"{cfg.__class__} not supported for DGXCloud scheduler."
)
executor = cfg

Expand Down
Loading
Loading