Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/source/guides/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ def your_lepton_executor(nodes: int, gpus_per_node: int, container_image: str):
mounts=[{"path": storage_path, "mount_path": mount_path}],
# Optional: Add custom environment variables or PyTorch specs if needed
env_vars=common_envs(),
# Optional: Specify a node reservation to schedule jobs with
# node_reservation="my-node-reservation",
# Optional: Specify commands to run at container launch prior to the job starting
# pre_launch_commands=["nvidia-smi"],
# Optional: Specify image pull secrets for authenticating with container registries
# image_pull_secrets=["my-image-pull-secret"],
# packager=run.GitArchivePackager() # Choose appropriate packager
)
return executor
Expand Down
14 changes: 12 additions & 2 deletions nemo_run/core/execution/lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
LeptonContainer,
Mount,
)
from leptonai.api.v1.types.job import LeptonJob, LeptonJobState, LeptonJobUserSpec
from leptonai.api.v1.types.job import (
LeptonJob,
LeptonJobState,
LeptonJobUserSpec,
ReservationConfig,
)
from leptonai.api.v1.types.replica import Replica

from nemo_run.config import get_nemorun_home
Expand Down Expand Up @@ -51,6 +56,7 @@ class LeptonExecutor(Executor):
shared_memory_size: int = 65536
resource_shape: str = ""
node_group: str = ""
node_reservation: str = ""
mounts: list[dict[str, Any]] = field(default_factory=list)
lepton_job_dir: str = field(init=False, default="")
image_pull_secrets: list[str] = field(
Expand Down Expand Up @@ -260,8 +266,12 @@ def create_lepton_job(self, name: str):
log=None,
queue_config=None,
stopped=None,
reservation_config=None,
)

if self.node_reservation:
job_spec.reservation_config = ReservationConfig(reservation_id=self.node_reservation)
job_spec.reservation_config.reservation_id = self.node_reservation

job = LeptonJob(spec=job_spec, metadata=Metadata(id=name))

created_job = client.job.create(job)
Expand Down
118 changes: 118 additions & 0 deletions test/core/execution/test_lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,42 @@ def test_init(self):
assert executor.nemo_run_dir == "/workspace/nemo_run"
assert executor.mounts == [{"path": "/workspace", "mount_path": "/workspace"}]

def test_init_with_node_reservation(self):
"""Test initialization with node_reservation parameter."""
executor = LeptonExecutor(
resource_shape="gpu.8xh100-80gb",
node_group="my-node-group",
container_image="test-image",
nodes=2,
gpus_per_node=8,
nemo_run_dir="/workspace/nemo_run",
mounts=[{"path": "/workspace", "mount_path": "/workspace"}],
node_reservation="my-reservation-id",
)

assert executor.node_reservation == "my-reservation-id"

def test_init_with_empty_node_reservation(self):
"""Test initialization with empty node_reservation string."""
executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
mounts=[{"path": "/test", "mount_path": "/test"}],
node_reservation="",
)

assert executor.node_reservation == ""

def test_init_without_node_reservation(self):
"""Test initialization without node_reservation parameter (default behavior)."""
executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
mounts=[{"path": "/test", "mount_path": "/test"}],
)

assert executor.node_reservation == ""

@patch("nemo_run.core.execution.lepton.APIClient")
def test_stop_job(self, mock_APIClient):
mock_instance = MagicMock()
Expand Down Expand Up @@ -344,6 +380,88 @@ def test_create_lepton_job(self, mock_APIClient_class):

mock_client.job.create.assert_called_once()

@patch("nemo_run.core.execution.lepton.APIClient")
def test_create_lepton_job_with_reservation_config(self, mock_APIClient_class):
"""Test create_lepton_job creates ReservationConfig when node_reservation is set."""
mock_client = mock_APIClient_class.return_value
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))

mock_client.nodegroup.list_all.return_value = []
valid_node_ids = ["node-id-1", "node-id-2"]

executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
node_group="123456",
mounts=[{"path": "/test", "mount_path": "/test"}],
node_reservation="my-reservation-id",
)
executor._valid_node_ids = MagicMock(return_value=valid_node_ids)
executor._node_group_id = MagicMock(return_value=node_group)

executor.create_lepton_job("my-lepton-job")

# Verify that job.create was called with the correct ReservationConfig
mock_client.job.create.assert_called_once()
created_job = mock_client.job.create.call_args[0][0]
assert created_job.spec.reservation_config is not None
assert created_job.spec.reservation_config.reservation_id == "my-reservation-id"

@patch("nemo_run.core.execution.lepton.APIClient")
def test_create_lepton_job_without_reservation_config(self, mock_APIClient_class):
"""Test create_lepton_job creates no ReservationConfig when node_reservation is not set."""
mock_client = mock_APIClient_class.return_value
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))

mock_client.nodegroup.list_all.return_value = []
valid_node_ids = ["node-id-1", "node-id-2"]

executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
node_group="123456",
mounts=[{"path": "/test", "mount_path": "/test"}],
# No node_reservation set
)
executor._valid_node_ids = MagicMock(return_value=valid_node_ids)
executor._node_group_id = MagicMock(return_value=node_group)

executor.create_lepton_job("my-lepton-job")

# Verify that job.create was called with no ReservationConfig
mock_client.job.create.assert_called_once()
created_job = mock_client.job.create.call_args[0][0]
assert created_job.spec.reservation_config is None

@patch("nemo_run.core.execution.lepton.APIClient")
def test_create_lepton_job_with_empty_reservation_config(self, mock_APIClient_class):
"""Test create_lepton_job creates no ReservationConfig when node_reservation is empty string."""
mock_client = mock_APIClient_class.return_value
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))

mock_client.nodegroup.list_all.return_value = []
valid_node_ids = ["node-id-1", "node-id-2"]

executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
node_group="123456",
mounts=[{"path": "/test", "mount_path": "/test"}],
node_reservation="", # Empty string
)
executor._valid_node_ids = MagicMock(return_value=valid_node_ids)
executor._node_group_id = MagicMock(return_value=node_group)

executor.create_lepton_job("my-lepton-job")

# Verify that job.create was called with no ReservationConfig
mock_client.job.create.assert_called_once()
created_job = mock_client.job.create.call_args[0][0]
assert created_job.spec.reservation_config is None

def test_nnodes(self):
executor = LeptonExecutor(
container_image="nvcr.io/nvidia/test:latest",
Expand Down
Loading