Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions nemo_run/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ def from_dict(raw_data: dict | list | str | float | int | bool, cls: Type[_T]) -
if isinstance(raw_data, dict):
underlying_types = get_underlying_types(cls)
underlying_types = [tp for tp in underlying_types if tp is not type(None)]
assert (
len(underlying_types) == 1
), f"Unable to load {cls}. Nested union types are not currently supported."
assert len(underlying_types) == 1, (
f"Unable to load {cls}. Nested union types are not currently supported."
)
cls = underlying_types[0] # type: ignore

if dataclasses.is_dataclass(cls):
Expand Down
6 changes: 3 additions & 3 deletions nemo_run/core/execution/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ def get_launcher(self) -> Launcher:
self._setup_launcher()
self._launcher_setup = True

assert self.launcher is None or isinstance(
self.launcher, Launcher
), f"{self.info()} could not setup the launcher."
assert self.launcher is None or isinstance(self.launcher, Launcher), (
f"{self.info()} could not setup the launcher."
)
if self.launcher is None:
self.launcher = Launcher()

Expand Down
4 changes: 3 additions & 1 deletion nemo_run/core/execution/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ def assign(
== os.path.abspath(x["path"]),
self.pvcs,
)
), f"Need to specify atleast one PVC containing {self.job_dir}.\nTo update job dir to a PVC path, you can use set_nemorun_home() or the NEMORUN_HOME env var."
), (
f"Need to specify atleast one PVC containing {self.job_dir}.\nTo update job dir to a PVC path, you can use set_nemorun_home() or the NEMORUN_HOME env var."
)

def package(self, packager: Packager, job_name: str):
assert self.experiment_id, "Executor not assigned to an experiment."
Expand Down
12 changes: 6 additions & 6 deletions nemo_run/core/execution/skypilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ class SkypilotExecutor(Executor):
packager: Packager = field(default_factory=lambda: GitArchivePackager()) # type: ignore # noqa: F821

def __post_init__(self):
assert (
_SKYPILOT_AVAILABLE
), "Skypilot is not installed. Please install it using `pip install nemo_run[skypilot]"
assert isinstance(
self.packager, GitArchivePackager
), "Only GitArchivePackager is currently supported for SkypilotExecutor."
assert _SKYPILOT_AVAILABLE, (
"Skypilot is not installed. Please install it using `pip install nemo_run[skypilot]"
)
assert isinstance(self.packager, GitArchivePackager), (
"Only GitArchivePackager is currently supported for SkypilotExecutor."
)

@classmethod
def parse_app(cls: Type["SkypilotExecutor"], app_id: str) -> tuple[str, str, int]:
Expand Down
59 changes: 35 additions & 24 deletions nemo_run/core/execution/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.folder})"


def get_packaging_job_key(experiment_id: str, job_name: str) -> str:
return f"{experiment_id}:{job_name}"


@dataclass(kw_only=True)
class SlurmExecutor(Executor):
"""
Expand Down Expand Up @@ -357,12 +361,12 @@ def merge(
main_executor.run_as_group = True

if main_executor.het_group_indices:
assert (
main_executor.heterogeneous
), "heterogeneous must be True if het_group_indices is provided"
assert (
len(main_executor.het_group_indices) == num_tasks
), "het_group_indices must be the same length as the number of tasks"
assert main_executor.heterogeneous, (
"heterogeneous must be True if het_group_indices is provided"
)
assert len(main_executor.het_group_indices) == num_tasks, (
"het_group_indices must be the same length as the number of tasks"
)
assert all(
x <= y
for x, y in zip(
Expand Down Expand Up @@ -566,6 +570,8 @@ def package_configs(self, *cfgs: tuple[str, str]) -> list[str]:
return filenames

def package(self, packager: Packager, job_name: str):
assert self.experiment_id, "Executor not assigned to an experiment."

if job_name in self.tunnel.packaging_jobs and not packager.symlink_from_remote_dir:
logger.info(
f"Packaging for job {job_name} in tunnel {self.tunnel.key} already done. Skipping subsequent packagings.\n"
Expand All @@ -575,16 +581,20 @@ def package(self, packager: Packager, job_name: str):

if packager.symlink_from_remote_dir:
logger.info(
f"Packager {packager} is configured to symlink from remote dir. Skipping packaging."
f"Packager {get_packaging_job_key(self.experiment_id, job_name)} is configured to symlink from remote dir. Skipping packaging."
)
if type(packager) is Packager:
self.tunnel.packaging_jobs[job_name] = PackagingJob(symlink=False)
self.tunnel.packaging_jobs[get_packaging_job_key(self.experiment_id, job_name)] = (
PackagingJob(symlink=False)
)
return

self.tunnel.packaging_jobs[job_name] = PackagingJob(
symlink=True,
src_path=packager.symlink_from_remote_dir,
dst_path=os.path.join(self.tunnel.job_dir, Path(self.job_dir).name, "code"),
self.tunnel.packaging_jobs[get_packaging_job_key(self.experiment_id, job_name)] = (
PackagingJob(
symlink=True,
src_path=packager.symlink_from_remote_dir,
dst_path=os.path.join(self.tunnel.job_dir, Path(self.job_dir).name, "code"),
)
)

# Tunnel job dir is the directory of the experiment id, so the base job dir is two levels up
Expand All @@ -599,7 +609,6 @@ def package(self, packager: Packager, job_name: str):

return

assert self.experiment_id, "Executor not assigned to an experiment."
if isinstance(packager, GitArchivePackager):
output = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
Expand Down Expand Up @@ -628,11 +637,13 @@ def package(self, packager: Packager, job_name: str):
f"tar -xvzf {local_pkg} -C {local_code_extraction_path} --ignore-zeros", hide=True
)

self.tunnel.packaging_jobs[job_name] = PackagingJob(
symlink=False,
dst_path=None
if type(packager) is Packager
else os.path.join(self.tunnel.job_dir, Path(self.job_dir).name, "code"),
self.tunnel.packaging_jobs[get_packaging_job_key(self.experiment_id, job_name)] = (
PackagingJob(
symlink=False,
dst_path=None
if type(packager) is Packager
else os.path.join(self.tunnel.job_dir, Path(self.job_dir).name, "code"),
)
)

def parse_deps(self) -> list[str]:
Expand Down Expand Up @@ -826,9 +837,9 @@ def materialize(self) -> str:

sbatch_flags = []
if self.slurm_config.heterogeneous:
assert (
len(self.jobs) == len(self.slurm_config.resource_group)
), f"Number of jobs {len(self.jobs)} must match number of resource group requests {len(self.slurm_config.resource_group)}.\nIf you are just submitting a single job, make sure that heterogeneous=False in the executor."
assert len(self.jobs) == len(self.slurm_config.resource_group), (
f"Number of jobs {len(self.jobs)} must match number of resource group requests {len(self.slurm_config.resource_group)}.\nIf you are just submitting a single job, make sure that heterogeneous=False in the executor."
)
final_group_index = len(self.slurm_config.resource_group) - 1
if self.slurm_config.het_group_indices:
final_group_index = self.slurm_config.het_group_indices.index(
Expand All @@ -838,9 +849,9 @@ def materialize(self) -> str:
for i in range(len(self.slurm_config.resource_group)):
resource_req = self.slurm_config.resource_group[i]
if resource_req.het_group_index:
assert (
self.slurm_config.resource_group[i - 1].het_group_index is not None
), "het_group_index must be set for all requests in resource_group"
assert self.slurm_config.resource_group[i - 1].het_group_index is not None, (
"het_group_index must be set for all requests in resource_group"
)
if (
i > 0
and resource_req.het_group_index
Expand Down
9 changes: 4 additions & 5 deletions nemo_run/core/packaging/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ def package(self, path: Path, job_dir: str, name: str) -> str:
capture_output=True,
text=True,
).stdout.strip()
assert not bool(
untracked_files
), "Your repo has untracked files. Please track your files via git or set check_untracked_files to False to proceed with packaging."
assert not bool(untracked_files), (
"Your repo has untracked files. Please track your files via git or set check_untracked_files to False to proceed with packaging."
)

ctx = Context()
# we first add git files into an uncompressed archive
Expand Down Expand Up @@ -159,8 +159,7 @@ def package(self, path: Path, job_dir: str, name: str) -> str:
)
pattern_tar_file_name = os.path.join(git_base_path, pattern_tar_file_name)
include_pattern_cmd = (
f"find {relative_include_pattern} -type f | "
f"tar -cf {pattern_tar_file_name} -T -"
f"find {relative_include_pattern} -type f | tar -cf {pattern_tar_file_name} -T -"
)

with ctx.cd(include_pattern_relative_path):
Expand Down
44 changes: 22 additions & 22 deletions nemo_run/run/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def __init__(
self._exp_dir = os.path.join(base_dir, "experiments", title, self._id)

self.log_level = log_level
self._runner = get_runner()
self._runner = get_runner(component_defaults=None, experiment=self)

if not _reconstruct:
self.executor = executor if executor else LocalExecutor()
Expand Down Expand Up @@ -528,9 +528,9 @@ def add(
"""
Add a configured function along with its executor config to the experiment.
"""
assert (
_current_experiment.get(None) == self
), "Using Experiment without it's context manager is not permitted."
assert _current_experiment.get(None) == self, (
"Using Experiment without it's context manager is not permitted."
)

job_ids = set([job.id for job in self.jobs])
for dep in dependencies or []:
Expand Down Expand Up @@ -612,9 +612,9 @@ def run(
tail_logs: If True, tails logs from all tasks in the experiment. If False, relies on task specific setting. Defaults to False.
direct: If True, runs all tasks in the experiment sequentially in the same process. Note that if direct=True, then sequential also will be True. Defaults to False.
"""
assert (
_current_experiment.get(None) == self
), "Using Experiment without it's context manager is not permitted."
assert _current_experiment.get(None) == self, (
"Using Experiment without it's context manager is not permitted."
)

if self._launched:
self.console.log("[bold magenta]Experiment already running...")
Expand All @@ -636,13 +636,13 @@ def run(
self.console.log("[bold red]No jobs to run in this experiment.")
return

assert all(
map(lambda job: isinstance(job, Job), self.jobs)
), "Jobs in this experiment contain JobGroup which cannot be run directly for now."
assert all(map(lambda job: isinstance(job, Job), self.jobs)), (
"Jobs in this experiment contain JobGroup which cannot be run directly for now."
)

assert all(
map(lambda job: not job.dependencies, self.jobs)
), "Jobs in this experiment contain dependencies which cannot be run directly for now."
assert all(map(lambda job: not job.dependencies, self.jobs)), (
"Jobs in this experiment contain dependencies which cannot be run directly for now."
)

for job in self.jobs:
assert isinstance(job, Job)
Expand Down Expand Up @@ -674,9 +674,9 @@ def run(
detach = False

is_dag = any(map(lambda job: len(job.dependencies) > 0, self.jobs))
assert not (
is_dag and sequential
), "Jobs in this experiment have dependencies, they cannot be run sequentially. Set sequential=False."
assert not (is_dag and sequential), (
"Jobs in this experiment have dependencies, they cannot be run sequentially. Set sequential=False."
)

if sequential:
for i in range(1, len(self.jobs)):
Expand Down Expand Up @@ -732,9 +732,9 @@ def _run_dag(self, detach: bool, tail_logs: bool, executors: set[Executor]):
)
else:
# All jobs will be executed in parallel
assert all(
map(lambda x: x in self._PARALLEL_SUPPORTED_EXECUTORS, executors)
), f"Parallel mode not supported for atleast one of {executors}. Set sequential=True."
assert all(map(lambda x: x in self._PARALLEL_SUPPORTED_EXECUTORS, executors)), (
f"Parallel mode not supported for atleast one of {executors}. Set sequential=True."
)
wait = False
self.detach = detach

Expand All @@ -751,9 +751,9 @@ def _run_dag(self, detach: bool, tail_logs: bool, executors: set[Executor]):
for dep_id in job.dependencies:
dep = job_map[dep_id]
handle = dep.handle
assert (
dep.launched and handle
), f"Dependency {dep.id} for {job.id} not yet launched."
assert dep.launched and handle, (
f"Dependency {dep.id} for {job.id} not yet launched."
)
deps.append(handle)

job.executor.dependencies = deps # type: ignore
Expand Down
6 changes: 3 additions & 3 deletions nemo_run/run/torchx_backend/schedulers/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ def _submit_dryrun( # type: ignore
app: AppDef,
cfg: Executor,
) -> AppDryRunInfo[DGXRequest]:
assert isinstance(
cfg, DGXCloudExecutor
), f"{cfg.__class__} not supported for skypilot scheduler."
assert isinstance(cfg, DGXCloudExecutor), (
f"{cfg.__class__} not supported for skypilot scheduler."
)
executor = cfg

assert len(app.roles) == 1, "Only single-role apps are supported."
Expand Down
6 changes: 3 additions & 3 deletions nemo_run/run/torchx_backend/schedulers/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ def __init__(self, session_name: str) -> None:
self._scheduled_reqs: list[DockerJobRequest] = []

def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[DockerJobRequest]: # type: ignore
assert isinstance(
cfg, DockerExecutor
), f"{cfg.__class__} not supported for docker scheduler."
assert isinstance(cfg, DockerExecutor), (
f"{cfg.__class__} not supported for docker scheduler."
)
executor = cfg

if len(app.roles) > 1:
Expand Down
18 changes: 9 additions & 9 deletions nemo_run/run/torchx_backend/schedulers/skypilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class SkypilotScheduler(SchedulerMixin, Scheduler[dict[str, str]]): # type: ign
def __init__(self, session_name: str) -> None:
# NOTE: make sure any new init options are supported in create_scheduler(...)
super().__init__("skypilot", session_name)
assert (
_SKYPILOT_AVAILABLE
), "Skypilot is not installed. Please install it using `pip install nemo_run[skypilot]"
assert _SKYPILOT_AVAILABLE, (
"Skypilot is not installed. Please install it using `pip install nemo_run[skypilot]"
)

def _run_opts(self) -> runopts:
opts = runopts()
Expand All @@ -108,9 +108,9 @@ def schedule(self, dryrun_info: AppDryRunInfo[SkypilotRequest]) -> str:
executor = req.executor
executor.package(executor.packager, job_name=executor.job_name)
job_id, handle = executor.launch(task)
assert (
job_id and handle
), f"Failed scheduling run on Skypilot. Job id: {job_id}, Handle: {handle}"
assert job_id and handle, (
f"Failed scheduling run on Skypilot. Job id: {job_id}, Handle: {handle}"
)
app_id = f"{executor.experiment_id}___{handle.get_cluster_name()}___{task.name}___{job_id}"
_, task_details = SkypilotExecutor.status(app_id=app_id)
if task_details:
Expand All @@ -127,9 +127,9 @@ def _submit_dryrun( # type: ignore
) -> AppDryRunInfo[SkypilotRequest]:
from sky.utils import common_utils

assert isinstance(
cfg, SkypilotExecutor
), f"{cfg.__class__} not supported for skypilot scheduler."
assert isinstance(cfg, SkypilotExecutor), (
f"{cfg.__class__} not supported for skypilot scheduler."
)
executor = cfg

assert len(app.roles) == 1, "Only 1 role supported for Skypilot executor."
Expand Down
Loading
Loading