diff --git a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py index 3c9de2cb..fbf1253e 100644 --- a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py +++ b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py @@ -184,7 +184,7 @@ def _cancel_existing(self, app_id: str) -> None: executor: DGXCloudExecutor = job_info.get("executor", None) # type: ignore if not executor: return None - executor.delete(job_id) + executor.cancel(job_id) def list(self) -> list[ListAppResponse]: ... diff --git a/test/run/torchx_backend/schedulers/test_dgxcloud.py b/test/run/torchx_backend/schedulers/test_dgxcloud.py index ab724c29..41b30013 100644 --- a/test/run/torchx_backend/schedulers/test_dgxcloud.py +++ b/test/run/torchx_backend/schedulers/test_dgxcloud.py @@ -74,3 +74,87 @@ def test_dgx_cloud_scheduler_methods(dgx_cloud_scheduler): assert hasattr(dgx_cloud_scheduler, "describe") assert hasattr(dgx_cloud_scheduler, "_cancel_existing") assert hasattr(dgx_cloud_scheduler, "_validate") + + +def test_schedule(dgx_cloud_scheduler, mock_app_def, dgx_cloud_executor): + with ( + mock.patch.object(DGXCloudExecutor, "package") as mock_package, + mock.patch.object(DGXCloudExecutor, "launch") as mock_launch, + ): + mock_package.return_value = None + mock_launch.return_value = ("test_job_id", "RUNNING") + + # Set job_name and experiment_id on executor + dgx_cloud_executor.job_name = "test_job" + dgx_cloud_executor.experiment_id = "test_experiment" + + dryrun_info = dgx_cloud_scheduler._submit_dryrun(mock_app_def, dgx_cloud_executor) + app_id = dgx_cloud_scheduler.schedule(dryrun_info) + + assert app_id == "test_experiment___test_role___test_job_id" + mock_package.assert_called_once() + mock_launch.assert_called_once() + + +def test_describe(dgx_cloud_scheduler, dgx_cloud_executor): + with ( + mock.patch( + "nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs" + ) as mock_get_job_dirs, + mock.patch.object(DGXCloudExecutor, "status") as mock_status, + ): + mock_get_job_dirs.return_value = { + "test_experiment___test_role___test_job_id": { + "job_status": "RUNNING", + "executor": dgx_cloud_executor, + } + } + mock_status.return_value = "RUNNING" + + response = dgx_cloud_scheduler.describe("test_experiment___test_role___test_job_id") + assert response is not None + assert response.app_id == "test_experiment___test_role___test_job_id" + assert len(response.roles) == 1 + assert response.roles[0].name == "test_role" + + +def test_cancel_existing(dgx_cloud_scheduler, dgx_cloud_executor): + with ( + mock.patch( + "nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs" + ) as mock_get_job_dirs, + mock.patch.object(DGXCloudExecutor, "cancel") as mock_cancel, + ): + mock_get_job_dirs.return_value = { + "test_experiment___test_role___test_job_id": { + "job_status": "RUNNING", + "executor": dgx_cloud_executor, + } + } + + dgx_cloud_scheduler._cancel_existing("test_experiment___test_role___test_job_id") + mock_cancel.assert_called_once_with("test_job_id") + + +def test_save_and_get_job_dirs(): + with tempfile.TemporaryDirectory() as temp_dir: + from nemo_run.config import set_nemorun_home + + set_nemorun_home(temp_dir) + + from nemo_run.run.torchx_backend.schedulers.dgxcloud import _get_job_dirs, _save_job_dir + + executor = DGXCloudExecutor( + base_url="https://test.com", + app_id="test_id", + app_secret="test_secret", + project_name="test_project", + container_image="test:latest", + job_dir=temp_dir, + ) + + _save_job_dir("test_app_id", "RUNNING", executor) + job_dirs = _get_job_dirs() + + assert "test_app_id" in job_dirs + assert isinstance(job_dirs["test_app_id"]["executor"], DGXCloudExecutor) diff --git a/test/run/torchx_backend/schedulers/test_docker.py b/test/run/torchx_backend/schedulers/test_docker.py index 68168295..3055d967 100644 --- a/test/run/torchx_backend/schedulers/test_docker.py +++ b/test/run/torchx_backend/schedulers/test_docker.py @@ -22,6 +22,8 @@ from nemo_run.core.execution.docker import DockerExecutor from nemo_run.run.torchx_backend.schedulers.docker import ( + DockerContainer, + DockerJobRequest, PersistentDockerScheduler, create_scheduler, ) @@ -77,3 +79,123 @@ def test_docker_scheduler_methods(docker_scheduler): assert hasattr(docker_scheduler, "describe") assert hasattr(docker_scheduler, "log_iter") assert hasattr(docker_scheduler, "close") + + +def test_schedule(docker_scheduler, mock_app_def, docker_executor): + with ( + mock.patch.object(DockerExecutor, "package") as mock_package, + mock.patch.object(DockerContainer, "run") as mock_run, + ): + mock_package.return_value = None + mock_run.return_value = ("test_container_id", "RUNNING") + + # Set job_name on executor + docker_executor.job_name = "test_job" + + dryrun_info = docker_scheduler._submit_dryrun(mock_app_def, docker_executor) + docker_scheduler.schedule(dryrun_info) + + mock_package.assert_called_once() + mock_run.assert_called_once() + + +def test_describe(docker_scheduler, docker_executor): + with ( + mock.patch.object(DockerJobRequest, "load") as mock_load, + mock.patch.object(DockerContainer, "get_container") as mock_get_container, + ): + mock_load.return_value = DockerJobRequest( + id="test_session___test_role___test_container_id", + executor=docker_executor, + containers=[ + DockerContainer( + name="test_role", + command=["test"], + executor=docker_executor, + extra_env={}, + ) + ], + ) + mock_get_container.return_value = None + + response = docker_scheduler.describe("test_session___test_role___test_container_id") + assert response is not None + assert response.app_id == "test_session___test_role___test_container_id" + assert "SUCCEEDED" in str(response.state) + assert len(response.roles) == 1 + + +def test_save_and_get_job_dirs(): + with tempfile.TemporaryDirectory() as temp_dir: + from nemo_run.config import set_nemorun_home + + set_nemorun_home(temp_dir) + + from nemo_run.run.torchx_backend.schedulers.docker import DockerJobRequest + + executor = DockerExecutor( + container_image="test:latest", + job_dir=temp_dir, + ) + + req = DockerJobRequest( + id="test_app_id", + executor=executor, + containers=[ + DockerContainer( + name="test_role", + command=["test"], + executor=executor, + extra_env={}, + ) + ], + ) + req.save() + + loaded_req = DockerJobRequest.load("test_app_id") + assert loaded_req is not None + assert loaded_req.id == "test_app_id" + assert isinstance(loaded_req.executor, DockerExecutor) + + +def test_run_opts(docker_scheduler): + opts = docker_scheduler._run_opts() + assert "copy_env" in str(opts) + assert "env" in str(opts) + assert "privileged" in str(opts) + + +def test_log_iter(docker_scheduler, docker_executor): + with ( + mock.patch.object(DockerJobRequest, "load") as mock_load, + mock.patch.object(DockerContainer, "get_container") as mock_get_container, + ): + mock_load.return_value = DockerJobRequest( + id="test_session___test_role___test_container_id", + executor=docker_executor, + containers=[ + DockerContainer( + name="test_role", + command=["test"], + executor=docker_executor, + extra_env={}, + ) + ], + ) + container_mock = mock.Mock() + container_mock.logs = mock.Mock(return_value=["log1", "log2"]) + mock_get_container.return_value = container_mock + + logs = list( + docker_scheduler.log_iter("test_session___test_role___test_container_id", "test_role") + ) + assert logs == ["log1", "log2"] + assert mock_get_container.call_count == 1 + assert container_mock.logs.call_count == 1 + + +def test_close(docker_scheduler): + with mock.patch.object(DockerContainer, "delete") as mock_delete: + docker_scheduler._scheduled_reqs = [] # No requests to clean up + docker_scheduler.close() + mock_delete.assert_not_called() # No cleanup needed since no requests diff --git a/test/run/torchx_backend/schedulers/test_skypilot.py b/test/run/torchx_backend/schedulers/test_skypilot.py index ad2121e1..d5fc751e 100644 --- a/test/run/torchx_backend/schedulers/test_skypilot.py +++ b/test/run/torchx_backend/schedulers/test_skypilot.py @@ -14,8 +14,10 @@ # limitations under the License. import tempfile +from unittest import mock import pytest +from torchx.schedulers.api import AppDryRunInfo from torchx.specs import AppDef, Role from nemo_run.core.execution.skypilot import SkypilotExecutor @@ -57,3 +59,54 @@ def test_skypilot_scheduler_methods(skypilot_scheduler): assert hasattr(skypilot_scheduler, "schedule") assert hasattr(skypilot_scheduler, "describe") assert hasattr(skypilot_scheduler, "_validate") + + +def test_submit_dryrun(skypilot_scheduler, mock_app_def, skypilot_executor): + with mock.patch.object(SkypilotExecutor, "package") as mock_package: + mock_package.return_value = None + + dryrun_info = skypilot_scheduler._submit_dryrun(mock_app_def, skypilot_executor) + assert isinstance(dryrun_info, AppDryRunInfo) + assert dryrun_info.request is not None + + +def test_schedule(skypilot_scheduler, mock_app_def, skypilot_executor): + class MockHandle: + def get_cluster_name(self): + return "test_cluster_name" + + with ( + mock.patch.object(SkypilotExecutor, "package") as mock_package, + mock.patch.object(SkypilotExecutor, "launch") as mock_launch, + ): + mock_package.return_value = None + mock_launch.return_value = (123, MockHandle()) + + # Set job_name and experiment_id on executor + skypilot_executor.job_name = "test_job" + skypilot_executor.experiment_id = "test_session" + + dryrun_info = skypilot_scheduler._submit_dryrun(mock_app_def, skypilot_executor) + app_id = skypilot_scheduler.schedule(dryrun_info) + + assert app_id == "test_session___test_cluster_name___test_role___123" + mock_package.assert_called_once() + mock_launch.assert_called_once() + + +def test_cancel_existing(skypilot_scheduler, skypilot_executor): + with ( + mock.patch.object(SkypilotExecutor, "parse_app") as mock_parse_app, + mock.patch.object(SkypilotExecutor, "cancel") as mock_cancel, + ): + mock_parse_app.return_value = ("test_cluster_name", "test_role", 123) + + skypilot_scheduler._cancel_existing("test_session___test_cluster_name___test_role___123") + mock_cancel.assert_called_once_with( + app_id="test_session___test_cluster_name___test_role___123" + ) + + +def test_validate(skypilot_scheduler, mock_app_def): + # Test that validation doesn't raise any errors + skypilot_scheduler._validate(mock_app_def, "skypilot")