diff --git a/src/nemo_run/core/execution/base.py b/src/nemo_run/core/execution/base.py index cd86498f..31368128 100644 --- a/src/nemo_run/core/execution/base.py +++ b/src/nemo_run/core/execution/base.py @@ -227,6 +227,9 @@ def package_configs(self, *cfgs: tuple[str, str]) -> list[str]: filenames.append(filename) return filenames + def create_job_dir(self): + os.makedirs(self.job_dir, exist_ok=True) + def cleanup(self, handle: str): ... diff --git a/src/nemo_run/core/execution/dgxcloud.py b/src/nemo_run/core/execution/dgxcloud.py index c68f893d..2c19f3f7 100644 --- a/src/nemo_run/core/execution/dgxcloud.py +++ b/src/nemo_run/core/execution/dgxcloud.py @@ -226,7 +226,6 @@ def assign( self.experiment_dir = exp_dir self.job_dir = os.path.join(exp_dir, task_dir) self.experiment_id = exp_id - os.makedirs(self.job_dir, exist_ok=True) assert any( map( lambda x: os.path.commonpath( diff --git a/src/nemo_run/core/execution/docker.py b/src/nemo_run/core/execution/docker.py index 3c6baa4c..fcf15ab7 100644 --- a/src/nemo_run/core/execution/docker.py +++ b/src/nemo_run/core/execution/docker.py @@ -162,7 +162,6 @@ def assign( self.experiment_id = exp_id self.experiment_dir = exp_dir self.job_dir = os.path.join(exp_dir, task_dir) - os.makedirs(self.job_dir, exist_ok=True) def nnodes(self) -> int: return 1 diff --git a/src/nemo_run/core/execution/local.py b/src/nemo_run/core/execution/local.py index c68260d8..31f96387 100644 --- a/src/nemo_run/core/execution/local.py +++ b/src/nemo_run/core/execution/local.py @@ -48,7 +48,6 @@ def assign( self.experiment_id = exp_id self.experiment_dir = exp_dir self.job_dir = os.path.join(exp_dir, task_dir) - os.makedirs(self.job_dir, exist_ok=True) def nnodes(self) -> int: return 1 diff --git a/src/nemo_run/core/execution/skypilot.py b/src/nemo_run/core/execution/skypilot.py index 546de627..f685c748 100644 --- a/src/nemo_run/core/execution/skypilot.py +++ b/src/nemo_run/core/execution/skypilot.py @@ -280,8 +280,6 @@ def assign( self.job_dir = os.path.join(exp_dir, task_dir) self.experiment_id = exp_id - os.makedirs(self.job_dir, exist_ok=True) - def package(self, packager: Packager, job_name: str): assert self.experiment_id, "Executor not assigned to an experiment." if isinstance(packager, GitArchivePackager): diff --git a/src/nemo_run/core/execution/slurm.py b/src/nemo_run/core/execution/slurm.py index 6ee6e67f..6e90f01e 100644 --- a/src/nemo_run/core/execution/slurm.py +++ b/src/nemo_run/core/execution/slurm.py @@ -514,7 +514,6 @@ def assign( self.job_dir = os.path.join(exp_dir, task_dir) self.experiment_id = exp_id - os.makedirs(self.job_dir, exist_ok=True) self.tunnel._set_job_dir(self.experiment_id) def get_launcher_prefix(self) -> Optional[list[str]]: diff --git a/src/nemo_run/run/experiment.py b/src/nemo_run/run/experiment.py index 8956e575..fe59e29f 100644 --- a/src/nemo_run/run/experiment.py +++ b/src/nemo_run/run/experiment.py @@ -310,10 +310,7 @@ def __init__( self._runner = get_runner() if not _reconstruct: - os.makedirs(self._exp_dir, exist_ok=False) - self.executor = executor if executor else LocalExecutor() - self._save_config() else: assert isinstance(executor, Executor) self.executor = executor @@ -334,6 +331,10 @@ def to_config(self) -> Config: log_level=self.log_level, ) + def _save_experiment(self, exist_ok: bool = False): + os.makedirs(self._exp_dir, exist_ok=exist_ok) + self._save_config() + def _save_config(self): with open(os.path.join(self._exp_dir, self.__class__._CONFIG_FILE), "w+") as f: f.write(ZlibJSONSerializer().serialize(self.to_config())) @@ -389,6 +390,13 @@ def _load_jobs(self) -> list[Job | JobGroup]: return jobs + def _prepare(self, exist_ok: bool = False): + self._save_experiment(exist_ok=exist_ok) + for job in self.jobs: + job.prepare() + + self._save_jobs() + def _add_single_job( self, task: Union[Partial, Script], @@ -434,7 +442,6 @@ def _add_single_job( plugin.assign(self._id) plugin.setup(cloned, executor) - job.prepare() self._jobs.append(job) return job.id @@ -482,7 +489,6 @@ def _add_job_group( assert isinstance(_executor, Executor) plugin.setup(task, _executor) - job_group.prepare() self._jobs.append(job_group) return job_group.id @@ -552,16 +558,17 @@ def add( dependencies=dependencies.copy() if dependencies else None, ) - self._save_jobs() return job_id - def dryrun(self, log: bool = True): + def dryrun(self, log: bool = True, exist_ok: bool = False, delete_exp_dir: bool = True): """ Logs the raw scripts that will be executed for each task. """ if log: self.console.log(f"[bold magenta]Experiment {self._id} dryrun...") + self._prepare(exist_ok=exist_ok) + for job in self.jobs: if isinstance(job, Job): if log: @@ -571,6 +578,9 @@ def dryrun(self, log: bool = True): self.console.log(f"[bold magenta]Task Group {job.id}\n") job.launch(wait=False, runner=self._runner, dryrun=True, direct=False, log_dryrun=log) + if delete_exp_dir: + shutil.rmtree(self._exp_dir) + def run( self, sequential: bool = False, @@ -614,6 +624,9 @@ def run( self.console.log("[bold magenta]Experiment in inspection mode...") return + # Prepare experiment before running + self._prepare() + if direct: self.console.log( "[bold magenta]Running the experiment with direct=True. " @@ -637,8 +650,8 @@ def run( os.path.join(job.executor.job_dir, f"log_{job.id}_direct_run.out") ): job.launch(wait=True, direct=True, runner=self._runner) - self._save_jobs() + self._save_jobs() self._launched = any(map(lambda job: job.launched, self.jobs)) self._direct = True return @@ -669,7 +682,7 @@ def run( for i in range(1, len(self.jobs)): self.jobs[i].dependencies.append(self.jobs[i - 1].id) - self.dryrun(log=False) + self.dryrun(log=False, exist_ok=True, delete_exp_dir=False) for tunnel in self.tunnels.values(): if isinstance(tunnel, SSHTunnel): tunnel.connect() @@ -746,7 +759,6 @@ def _run_dag(self, detach: bool, tail_logs: bool, executors: set[Executor]): job.executor.dependencies = deps # type: ignore job.launch(wait=False, runner=self._runner) - self._save_jobs() except Exception as e: self.console.log(f"Error running job {job.id}: {e}") raise e @@ -754,6 +766,7 @@ def _run_dag(self, detach: bool, tail_logs: bool, executors: set[Executor]): if wait: self._wait_for_jobs(jobs=[job_map[node] for node in level]) + self._save_jobs() self._launched = any(map(lambda job: job.launched, self.jobs)) self._waited = wait @@ -955,7 +968,6 @@ def reset(self) -> "Experiment": old_id, old_exp_dir, old_launched = self._id, self._exp_dir, self._launched self._id = f"{self._title}_{int(time.time())}" self._exp_dir = os.path.join(NEMORUN_HOME, "experiments", self._title, self._id) - os.makedirs(self._exp_dir, exist_ok=False) self._launched = False self._live_progress = None @@ -967,12 +979,9 @@ def reset(self) -> "Experiment": _current_experiment.set(self) _set_current_experiment = True - if "__main__.py" in os.listdir(old_exp_dir): - shutil.copy(os.path.join(old_exp_dir, "__main__.py"), self._exp_dir) - try: if "__external_main__" not in sys.modules: - maybe_load_external_main(self._exp_dir) + maybe_load_external_main(old_exp_dir) for job in jobs: if isinstance(job, Job): @@ -1022,8 +1031,6 @@ def reset(self) -> "Experiment": self._current_experiment_token = None self._reconstruct = False - self._save_config() - return self def _initialize_live_progress(self): diff --git a/src/nemo_run/run/job.py b/src/nemo_run/run/job.py index 1bd2306f..f25998ef 100644 --- a/src/nemo_run/run/job.py +++ b/src/nemo_run/run/job.py @@ -92,6 +92,7 @@ def logs(self, runner: Runner, regex: str | None = None): ) def prepare(self): + self.executor.create_job_dir() self._executable = package( self.id, self.task, executor=self.executor, serialize_to_file=True ) @@ -306,6 +307,7 @@ def logs(self, runner: Runner, regex: str | None = None): ) def prepare(self): + self.executor.create_job_dir() self._executables: list[tuple[AppDef, Executor]] = [] for i, task in enumerate(self.tasks): executor = self.executors if self._merge else self.executors[i] # type: ignore diff --git a/test/core/execution/test_local.py b/test/core/execution/test_local.py index 2553e002..e6dada28 100644 --- a/test/core/execution/test_local.py +++ b/test/core/execution/test_local.py @@ -37,7 +37,7 @@ def test_local_executor_assign(): assert executor.experiment_id == "test_exp" assert executor.job_dir == os.path.join(tmp_dir, "test_task") - assert os.path.exists(executor.job_dir) + assert not os.path.exists(executor.job_dir) def test_local_executor_nnodes():