From 3b2ba8e632ed2057de5a70c9858596651bfdb083 Mon Sep 17 00:00:00 2001 From: Robert Clark Date: Wed, 3 Sep 2025 14:49:24 -0500 Subject: [PATCH] Add node reservations for LeptonExecutor Allow users to specify an existing node reservation with the LeptonExecutor to be able to run on dedicated resources. Signed-Off-By: Robert Clark --- docs/source/guides/execution.md | 6 ++ nemo_run/core/execution/lepton.py | 14 +++- test/core/execution/test_lepton.py | 118 +++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 2 deletions(-) diff --git a/docs/source/guides/execution.md b/docs/source/guides/execution.md index 1eb8d82e..c6e54fa9 100644 --- a/docs/source/guides/execution.md +++ b/docs/source/guides/execution.md @@ -295,6 +295,12 @@ def your_lepton_executor(nodes: int, gpus_per_node: int, container_image: str): mounts=[{"path": storage_path, "mount_path": mount_path}], # Optional: Add custom environment variables or PyTorch specs if needed env_vars=common_envs(), + # Optional: Specify a node reservation to schedule jobs with + # node_reservation="my-node-reservation", + # Optional: Specify commands to run at container launch prior to the job starting + # pre_launch_commands=["nvidia-smi"], + # Optional: Specify image pull secrets for authenticating with container registries + # image_pull_secrets=["my-image-pull-secret"], # packager=run.GitArchivePackager() # Choose appropriate packager ) return executor diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index b1d10ce6..61a70b43 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -20,7 +20,12 @@ LeptonContainer, Mount, ) -from leptonai.api.v1.types.job import LeptonJob, LeptonJobState, LeptonJobUserSpec +from leptonai.api.v1.types.job import ( + LeptonJob, + LeptonJobState, + LeptonJobUserSpec, + ReservationConfig, +) from leptonai.api.v1.types.replica import Replica from nemo_run.config import get_nemorun_home @@ -51,6 +56,7 @@ class LeptonExecutor(Executor): shared_memory_size: int = 65536 resource_shape: str = "" node_group: str = "" + node_reservation: str = "" mounts: list[dict[str, Any]] = field(default_factory=list) lepton_job_dir: str = field(init=False, default="") image_pull_secrets: list[str] = field( @@ -260,8 +266,12 @@ def create_lepton_job(self, name: str): log=None, queue_config=None, stopped=None, - reservation_config=None, ) + + if self.node_reservation: + job_spec.reservation_config = ReservationConfig(reservation_id=self.node_reservation) + job_spec.reservation_config.reservation_id = self.node_reservation + job = LeptonJob(spec=job_spec, metadata=Metadata(id=name)) created_job = client.job.create(job) diff --git a/test/core/execution/test_lepton.py b/test/core/execution/test_lepton.py index 0ce503f0..7fdc08cc 100644 --- a/test/core/execution/test_lepton.py +++ b/test/core/execution/test_lepton.py @@ -59,6 +59,42 @@ def test_init(self): assert executor.nemo_run_dir == "/workspace/nemo_run" assert executor.mounts == [{"path": "/workspace", "mount_path": "/workspace"}] + def test_init_with_node_reservation(self): + """Test initialization with node_reservation parameter.""" + executor = LeptonExecutor( + resource_shape="gpu.8xh100-80gb", + node_group="my-node-group", + container_image="test-image", + nodes=2, + gpus_per_node=8, + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + node_reservation="my-reservation-id", + ) + + assert executor.node_reservation == "my-reservation-id" + + def test_init_with_empty_node_reservation(self): + """Test initialization with empty node_reservation string.""" + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + node_reservation="", + ) + + assert executor.node_reservation == "" + + def test_init_without_node_reservation(self): + """Test initialization without node_reservation parameter (default behavior).""" + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + assert executor.node_reservation == "" + @patch("nemo_run.core.execution.lepton.APIClient") def test_stop_job(self, mock_APIClient): mock_instance = MagicMock() @@ -344,6 +380,88 @@ def test_create_lepton_job(self, mock_APIClient_class): mock_client.job.create.assert_called_once() + @patch("nemo_run.core.execution.lepton.APIClient") + def test_create_lepton_job_with_reservation_config(self, mock_APIClient_class): + """Test create_lepton_job creates ReservationConfig when node_reservation is set.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + mock_client.nodegroup.list_all.return_value = [] + valid_node_ids = ["node-id-1", "node-id-2"] + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + node_reservation="my-reservation-id", + ) + executor._valid_node_ids = MagicMock(return_value=valid_node_ids) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + # Verify that job.create was called with the correct ReservationConfig + mock_client.job.create.assert_called_once() + created_job = mock_client.job.create.call_args[0][0] + assert created_job.spec.reservation_config is not None + assert created_job.spec.reservation_config.reservation_id == "my-reservation-id" + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_create_lepton_job_without_reservation_config(self, mock_APIClient_class): + """Test create_lepton_job creates no ReservationConfig when node_reservation is not set.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + mock_client.nodegroup.list_all.return_value = [] + valid_node_ids = ["node-id-1", "node-id-2"] + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + # No node_reservation set + ) + executor._valid_node_ids = MagicMock(return_value=valid_node_ids) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + # Verify that job.create was called with no ReservationConfig + mock_client.job.create.assert_called_once() + created_job = mock_client.job.create.call_args[0][0] + assert created_job.spec.reservation_config is None + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_create_lepton_job_with_empty_reservation_config(self, mock_APIClient_class): + """Test create_lepton_job creates no ReservationConfig when node_reservation is empty string.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + mock_client.nodegroup.list_all.return_value = [] + valid_node_ids = ["node-id-1", "node-id-2"] + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + node_reservation="", # Empty string + ) + executor._valid_node_ids = MagicMock(return_value=valid_node_ids) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + # Verify that job.create was called with no ReservationConfig + mock_client.job.create.assert_called_once() + created_job = mock_client.job.create.call_args[0][0] + assert created_job.spec.reservation_config is None + def test_nnodes(self): executor = LeptonExecutor( container_image="nvcr.io/nvidia/test:latest",