Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nemo_run/run/torchx_backend/schedulers/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def _cancel_existing(self, app_id: str) -> None:
executor: DGXCloudExecutor = job_info.get("executor", None) # type: ignore
if not executor:
return None
executor.delete(job_id)
executor.cancel(job_id)

def list(self) -> list[ListAppResponse]: ...

Expand Down
84 changes: 84 additions & 0 deletions test/run/torchx_backend/schedulers/test_dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,87 @@ def test_dgx_cloud_scheduler_methods(dgx_cloud_scheduler):
assert hasattr(dgx_cloud_scheduler, "describe")
assert hasattr(dgx_cloud_scheduler, "_cancel_existing")
assert hasattr(dgx_cloud_scheduler, "_validate")


def test_schedule(dgx_cloud_scheduler, mock_app_def, dgx_cloud_executor):
with (
mock.patch.object(DGXCloudExecutor, "package") as mock_package,
mock.patch.object(DGXCloudExecutor, "launch") as mock_launch,
):
mock_package.return_value = None
mock_launch.return_value = ("test_job_id", "RUNNING")

# Set job_name and experiment_id on executor
dgx_cloud_executor.job_name = "test_job"
dgx_cloud_executor.experiment_id = "test_experiment"

dryrun_info = dgx_cloud_scheduler._submit_dryrun(mock_app_def, dgx_cloud_executor)
app_id = dgx_cloud_scheduler.schedule(dryrun_info)

assert app_id == "test_experiment___test_role___test_job_id"
mock_package.assert_called_once()
mock_launch.assert_called_once()


def test_describe(dgx_cloud_scheduler, dgx_cloud_executor):
with (
mock.patch(
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
) as mock_get_job_dirs,
mock.patch.object(DGXCloudExecutor, "status") as mock_status,
):
mock_get_job_dirs.return_value = {
"test_experiment___test_role___test_job_id": {
"job_status": "RUNNING",
"executor": dgx_cloud_executor,
}
}
mock_status.return_value = "RUNNING"

response = dgx_cloud_scheduler.describe("test_experiment___test_role___test_job_id")
assert response is not None
assert response.app_id == "test_experiment___test_role___test_job_id"
assert len(response.roles) == 1
assert response.roles[0].name == "test_role"


def test_cancel_existing(dgx_cloud_scheduler, dgx_cloud_executor):
with (
mock.patch(
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
) as mock_get_job_dirs,
mock.patch.object(DGXCloudExecutor, "cancel") as mock_cancel,
):
mock_get_job_dirs.return_value = {
"test_experiment___test_role___test_job_id": {
"job_status": "RUNNING",
"executor": dgx_cloud_executor,
}
}

dgx_cloud_scheduler._cancel_existing("test_experiment___test_role___test_job_id")
mock_cancel.assert_called_once_with("test_job_id")


def test_save_and_get_job_dirs():
with tempfile.TemporaryDirectory() as temp_dir:
from nemo_run.config import set_nemorun_home

set_nemorun_home(temp_dir)

from nemo_run.run.torchx_backend.schedulers.dgxcloud import _get_job_dirs, _save_job_dir

executor = DGXCloudExecutor(
base_url="https://test.com",
app_id="test_id",
app_secret="test_secret",
project_name="test_project",
container_image="test:latest",
job_dir=temp_dir,
)

_save_job_dir("test_app_id", "RUNNING", executor)
job_dirs = _get_job_dirs()

assert "test_app_id" in job_dirs
assert isinstance(job_dirs["test_app_id"]["executor"], DGXCloudExecutor)
122 changes: 122 additions & 0 deletions test/run/torchx_backend/schedulers/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

from nemo_run.core.execution.docker import DockerExecutor
from nemo_run.run.torchx_backend.schedulers.docker import (
DockerContainer,
DockerJobRequest,
PersistentDockerScheduler,
create_scheduler,
)
Expand Down Expand Up @@ -77,3 +79,123 @@ def test_docker_scheduler_methods(docker_scheduler):
assert hasattr(docker_scheduler, "describe")
assert hasattr(docker_scheduler, "log_iter")
assert hasattr(docker_scheduler, "close")


def test_schedule(docker_scheduler, mock_app_def, docker_executor):
with (
mock.patch.object(DockerExecutor, "package") as mock_package,
mock.patch.object(DockerContainer, "run") as mock_run,
):
mock_package.return_value = None
mock_run.return_value = ("test_container_id", "RUNNING")

# Set job_name on executor
docker_executor.job_name = "test_job"

dryrun_info = docker_scheduler._submit_dryrun(mock_app_def, docker_executor)
docker_scheduler.schedule(dryrun_info)

mock_package.assert_called_once()
mock_run.assert_called_once()


def test_describe(docker_scheduler, docker_executor):
with (
mock.patch.object(DockerJobRequest, "load") as mock_load,
mock.patch.object(DockerContainer, "get_container") as mock_get_container,
):
mock_load.return_value = DockerJobRequest(
id="test_session___test_role___test_container_id",
executor=docker_executor,
containers=[
DockerContainer(
name="test_role",
command=["test"],
executor=docker_executor,
extra_env={},
)
],
)
mock_get_container.return_value = None

response = docker_scheduler.describe("test_session___test_role___test_container_id")
assert response is not None
assert response.app_id == "test_session___test_role___test_container_id"
assert "SUCCEEDED" in str(response.state)
assert len(response.roles) == 1


def test_save_and_get_job_dirs():
with tempfile.TemporaryDirectory() as temp_dir:
from nemo_run.config import set_nemorun_home

set_nemorun_home(temp_dir)

from nemo_run.run.torchx_backend.schedulers.docker import DockerJobRequest

executor = DockerExecutor(
container_image="test:latest",
job_dir=temp_dir,
)

req = DockerJobRequest(
id="test_app_id",
executor=executor,
containers=[
DockerContainer(
name="test_role",
command=["test"],
executor=executor,
extra_env={},
)
],
)
req.save()

loaded_req = DockerJobRequest.load("test_app_id")
assert loaded_req is not None
assert loaded_req.id == "test_app_id"
assert isinstance(loaded_req.executor, DockerExecutor)


def test_run_opts(docker_scheduler):
opts = docker_scheduler._run_opts()
assert "copy_env" in str(opts)
assert "env" in str(opts)
assert "privileged" in str(opts)


def test_log_iter(docker_scheduler, docker_executor):
with (
mock.patch.object(DockerJobRequest, "load") as mock_load,
mock.patch.object(DockerContainer, "get_container") as mock_get_container,
):
mock_load.return_value = DockerJobRequest(
id="test_session___test_role___test_container_id",
executor=docker_executor,
containers=[
DockerContainer(
name="test_role",
command=["test"],
executor=docker_executor,
extra_env={},
)
],
)
container_mock = mock.Mock()
container_mock.logs = mock.Mock(return_value=["log1", "log2"])
mock_get_container.return_value = container_mock

logs = list(
docker_scheduler.log_iter("test_session___test_role___test_container_id", "test_role")
)
assert logs == ["log1", "log2"]
assert mock_get_container.call_count == 1
assert container_mock.logs.call_count == 1


def test_close(docker_scheduler):
with mock.patch.object(DockerContainer, "delete") as mock_delete:
docker_scheduler._scheduled_reqs = [] # No requests to clean up
docker_scheduler.close()
mock_delete.assert_not_called() # No cleanup needed since no requests
53 changes: 53 additions & 0 deletions test/run/torchx_backend/schedulers/test_skypilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
# limitations under the License.

import tempfile
from unittest import mock

import pytest
from torchx.schedulers.api import AppDryRunInfo
from torchx.specs import AppDef, Role

from nemo_run.core.execution.skypilot import SkypilotExecutor
Expand Down Expand Up @@ -57,3 +59,54 @@ def test_skypilot_scheduler_methods(skypilot_scheduler):
assert hasattr(skypilot_scheduler, "schedule")
assert hasattr(skypilot_scheduler, "describe")
assert hasattr(skypilot_scheduler, "_validate")


def test_submit_dryrun(skypilot_scheduler, mock_app_def, skypilot_executor):
with mock.patch.object(SkypilotExecutor, "package") as mock_package:
mock_package.return_value = None

dryrun_info = skypilot_scheduler._submit_dryrun(mock_app_def, skypilot_executor)
assert isinstance(dryrun_info, AppDryRunInfo)
assert dryrun_info.request is not None


def test_schedule(skypilot_scheduler, mock_app_def, skypilot_executor):
class MockHandle:
def get_cluster_name(self):
return "test_cluster_name"

with (
mock.patch.object(SkypilotExecutor, "package") as mock_package,
mock.patch.object(SkypilotExecutor, "launch") as mock_launch,
):
mock_package.return_value = None
mock_launch.return_value = (123, MockHandle())

# Set job_name and experiment_id on executor
skypilot_executor.job_name = "test_job"
skypilot_executor.experiment_id = "test_session"

dryrun_info = skypilot_scheduler._submit_dryrun(mock_app_def, skypilot_executor)
app_id = skypilot_scheduler.schedule(dryrun_info)

assert app_id == "test_session___test_cluster_name___test_role___123"
mock_package.assert_called_once()
mock_launch.assert_called_once()


def test_cancel_existing(skypilot_scheduler, skypilot_executor):
with (
mock.patch.object(SkypilotExecutor, "parse_app") as mock_parse_app,
mock.patch.object(SkypilotExecutor, "cancel") as mock_cancel,
):
mock_parse_app.return_value = ("test_cluster_name", "test_role", 123)

skypilot_scheduler._cancel_existing("test_session___test_cluster_name___test_role___123")
mock_cancel.assert_called_once_with(
app_id="test_session___test_cluster_name___test_role___123"
)


def test_validate(skypilot_scheduler, mock_app_def):
# Test that validation doesn't raise any errors
skypilot_scheduler._validate(mock_app_def, "skypilot")
Loading