From c99ddc1adecd7b8d0652df3fc83a8764a94864b2 Mon Sep 17 00:00:00 2001 From: Prekshi Vyas Date: Tue, 15 Jul 2025 22:29:46 +0000 Subject: [PATCH 1/5] add a grace for Jobs that may start in Unknown Signed-off-by: Prekshi Vyas --- nemo_run/core/execution/lepton.py | 139 +++++++++++++++++------------- 1 file changed, 77 insertions(+), 62 deletions(-) diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index f3cd2c92..6409aebd 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -82,69 +82,84 @@ def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> Li full_command = ["sh", "-c", cmd] return full_command - def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = 5) -> None: - """ - Moves job directory into remote storage and deletes the workload after completion. - """ - client = APIClient() - cmd = self.copy_directory_data_command(self.job_dir, self.lepton_job_dir) - node_group_id = self._node_group_id(client) - valid_node_ids = self._valid_node_ids(node_group_id, client) - - job_spec = LeptonJobUserSpec( - resource_shape="cpu.small", - affinity=LeptonResourceAffinity( - allowed_dedicated_node_groups=[node_group_id.metadata.id_], - allowed_nodes_in_node_group=valid_node_ids, - ), - container=LeptonContainer( - image="busybox:1.37.0", - command=cmd, - ), - completions=1, - parallelism=1, - mounts=[Mount(**mount) for mount in self.mounts], - ) - - custom_name = f"data-mover-{int(datetime.now().timestamp())}" - - job = LeptonJob( - metadata=Metadata( - id=custom_name, - name=custom_name, - visibility=LeptonVisibility("private"), - ), - spec=job_spec, - ) - - response = client.job.create(job) - job_id = response.metadata.id_ - - start_time = time.time() - count = 0 - - while True: - if time.time() - start_time > timeout: - raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.") - current_job = client.job.get(job_id) - current_job_status = current_job.status.state - if count > 0 and current_job_status in [ - LeptonJobState.Completed, - LeptonJobState.Failed, - LeptonJobState.Unknown, - ]: + def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = 5, unknowns_grace_period: int = 60) -> None: + """ + Moves job directory into remote storage and deletes the workload after completion. + """ + client = APIClient() + cmd = self.copy_directory_data_command(self.job_dir, self.lepton_job_dir) + node_group_id = self._node_group_id(client) + valid_node_ids = self._valid_node_ids(node_group_id, client) + + job_spec = LeptonJobUserSpec( + resource_shape="cpu.small", + affinity=LeptonResourceAffinity( + allowed_dedicated_node_groups=[node_group_id.metadata.id_], + allowed_nodes_in_node_group=valid_node_ids, + ), + container=LeptonContainer( + image="busybox:1.37.0", + command=cmd, + ), + completions=1, + parallelism=1, + mounts=[Mount(**mount) for mount in self.mounts], + ) + + custom_name = f"data-mover-{int(datetime.now().timestamp())}" + + job = LeptonJob( + metadata=Metadata( + id=custom_name, + name=custom_name, + visibility=LeptonVisibility("private"), + ), + spec=job_spec, + ) + + response = client.job.create(job) + job_id = response.metadata.id_ + + start_time = time.time() + unknown_start_time = None + count = 0 + + while True: + if time.time() - start_time > timeout: + raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.") + + current_job = client.job.get(job_id) + current_job_status = current_job.status.state + + if count > 0: + if current_job_status == LeptonJobState.Completed: break - count += 1 - time.sleep(poll_interval) - - if current_job_status != LeptonJobState.Completed: - raise RuntimeError(f"Job {job_id} failed with status: {current_job_status}") - - delete_success = client.job.delete(job_id) - if delete_success: - logging.info(f"Successfully deleted job {job_id}") - else: - logging.error(f"Failed to delete job {job_id}") + elif current_job_status == LeptonJobState.Failed: + break + elif current_job_status == LeptonJobState.Unknown: + if unknown_start_time is None: + unknown_start_time = time.time() + logging.warning(f"Job {job_id} entered Unknown state, giving it {unknowns_grace_period} seconds to recover...") + + elif time.time() - unknown_start_time > unknowns_grace_period: + logging.error(f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds") + break + else: + if unknown_start_time is not None: + logging.info(f"Job {job_id} recovered from Unknown state to {current_job_status}") + unknown_start_time = None + + count += 1 + time.sleep(poll_interval) + + if current_job_status != LeptonJobState.Completed: + raise RuntimeError(f"Job {job_id} failed with status: {current_job_status}") + + delete_success = client.job.delete(job_id) + if delete_success: + logging.info(f"Successfully deleted job {job_id}") + else: + logging.error(f"Failed to delete job {job_id}") def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup: """ From 748885ec9db35adc2e9e45be69dc528d452d2eec Mon Sep 17 00:00:00 2001 From: Prekshi Vyas Date: Tue, 15 Jul 2025 22:37:11 +0000 Subject: [PATCH 2/5] add a grace for Jobs that may start in Unknown Signed-off-by: Prekshi Vyas --- nemo_run/core/execution/lepton.py | 150 +++++++++++++++--------------- 1 file changed, 75 insertions(+), 75 deletions(-) diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index 6409aebd..17bcc29b 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -83,83 +83,83 @@ def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> Li return full_command def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = 5, unknowns_grace_period: int = 60) -> None: - """ - Moves job directory into remote storage and deletes the workload after completion. - """ - client = APIClient() - cmd = self.copy_directory_data_command(self.job_dir, self.lepton_job_dir) - node_group_id = self._node_group_id(client) - valid_node_ids = self._valid_node_ids(node_group_id, client) - - job_spec = LeptonJobUserSpec( - resource_shape="cpu.small", - affinity=LeptonResourceAffinity( - allowed_dedicated_node_groups=[node_group_id.metadata.id_], - allowed_nodes_in_node_group=valid_node_ids, - ), - container=LeptonContainer( - image="busybox:1.37.0", - command=cmd, - ), - completions=1, - parallelism=1, - mounts=[Mount(**mount) for mount in self.mounts], - ) - - custom_name = f"data-mover-{int(datetime.now().timestamp())}" - - job = LeptonJob( - metadata=Metadata( - id=custom_name, - name=custom_name, - visibility=LeptonVisibility("private"), - ), - spec=job_spec, - ) - - response = client.job.create(job) - job_id = response.metadata.id_ - - start_time = time.time() - unknown_start_time = None - count = 0 - - while True: - if time.time() - start_time > timeout: - raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.") + """ + Moves job directory into remote storage and deletes the workload after completion. + """ + client = APIClient() + cmd = self.copy_directory_data_command(self.job_dir, self.lepton_job_dir) + node_group_id = self._node_group_id(client) + valid_node_ids = self._valid_node_ids(node_group_id, client) + + job_spec = LeptonJobUserSpec( + resource_shape="cpu.small", + affinity=LeptonResourceAffinity( + allowed_dedicated_node_groups=[node_group_id.metadata.id_], + allowed_nodes_in_node_group=valid_node_ids, + ), + container=LeptonContainer( + image="busybox:1.37.0", + command=cmd, + ), + completions=1, + parallelism=1, + mounts=[Mount(**mount) for mount in self.mounts], + ) + + custom_name = f"data-mover-{int(datetime.now().timestamp())}" + + job = LeptonJob( + metadata=Metadata( + id=custom_name, + name=custom_name, + visibility=LeptonVisibility("private"), + ), + spec=job_spec, + ) + + response = client.job.create(job) + job_id = response.metadata.id_ + + start_time = time.time() + unknown_start_time = None + count = 0 + + while True: + if time.time() - start_time > timeout: + raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.") + + current_job = client.job.get(job_id) + current_job_status = current_job.status.state - current_job = client.job.get(job_id) - current_job_status = current_job.status.state - - if count > 0: - if current_job_status == LeptonJobState.Completed: - break - elif current_job_status == LeptonJobState.Failed: - break - elif current_job_status == LeptonJobState.Unknown: - if unknown_start_time is None: - unknown_start_time = time.time() - logging.warning(f"Job {job_id} entered Unknown state, giving it {unknowns_grace_period} seconds to recover...") - - elif time.time() - unknown_start_time > unknowns_grace_period: - logging.error(f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds") + if count > 0: + if current_job_status == LeptonJobState.Completed: + break + elif current_job_status == LeptonJobState.Failed: break - else: - if unknown_start_time is not None: - logging.info(f"Job {job_id} recovered from Unknown state to {current_job_status}") - unknown_start_time = None - - count += 1 - time.sleep(poll_interval) - - if current_job_status != LeptonJobState.Completed: - raise RuntimeError(f"Job {job_id} failed with status: {current_job_status}") - - delete_success = client.job.delete(job_id) - if delete_success: - logging.info(f"Successfully deleted job {job_id}") - else: - logging.error(f"Failed to delete job {job_id}") + elif current_job_status == LeptonJobState.Unknown: + if unknown_start_time is None: + unknown_start_time = time.time() + logging.warning(f"Job {job_id} entered Unknown state, giving it {unknowns_grace_period} seconds to recover...") + + elif time.time() - unknown_start_time > unknowns_grace_period: + logging.error(f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds") + break + else: + if unknown_start_time is not None: + logging.info(f"Job {job_id} recovered from Unknown state to {current_job_status}") + unknown_start_time = None + + count += 1 + time.sleep(poll_interval) + + if current_job_status != LeptonJobState.Completed: + raise RuntimeError(f"Job {job_id} failed with status: {current_job_status}") + + delete_success = client.job.delete(job_id) + if delete_success: + logging.info(f"Successfully deleted job {job_id}") + else: + logging.error(f"Failed to delete job {job_id}") def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup: """ From 99c11fda3fb6f95a601d8388a207d1e5ed28e672 Mon Sep 17 00:00:00 2001 From: Prekshi Vyas Date: Tue, 15 Jul 2025 22:39:07 +0000 Subject: [PATCH 3/5] add a grace for Jobs that may start in Unknown Signed-off-by: Prekshi Vyas --- nemo_run/core/execution/lepton.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index 17bcc29b..eeb39eac 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -132,9 +132,7 @@ def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = current_job_status = current_job.status.state if count > 0: - if current_job_status == LeptonJobState.Completed: - break - elif current_job_status == LeptonJobState.Failed: + if current_job_status == LeptonJobState.Completed or current_job_status == LeptonJobState.Failed: break elif current_job_status == LeptonJobState.Unknown: if unknown_start_time is None: From e5a60a1c88853577b8803f402e8fc82bec81e706 Mon Sep 17 00:00:00 2001 From: Prekshi Vyas Date: Tue, 15 Jul 2025 23:53:34 +0000 Subject: [PATCH 4/5] fix linting Signed-off-by: Prekshi Vyas --- nemo_run/core/execution/lepton.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index eeb39eac..e14a02cb 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -82,7 +82,13 @@ def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> Li full_command = ["sh", "-c", cmd] return full_command - def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = 5, unknowns_grace_period: int = 60) -> None: + def move_data( + self, + sleep: float = 10, + timeout: int = 600, + poll_interval: int = 5, + unknowns_grace_period: int = 60, + ) -> None: """ Moves job directory into remote storage and deletes the workload after completion. """ @@ -127,26 +133,34 @@ def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = while True: if time.time() - start_time > timeout: raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.") - + current_job = client.job.get(job_id) current_job_status = current_job.status.state - if count > 0: - if current_job_status == LeptonJobState.Completed or current_job_status == LeptonJobState.Failed: + if ( + current_job_status == LeptonJobState.Completed + or current_job_status == LeptonJobState.Failed + ): break elif current_job_status == LeptonJobState.Unknown: if unknown_start_time is None: unknown_start_time = time.time() - logging.warning(f"Job {job_id} entered Unknown state, giving it {unknowns_grace_period} seconds to recover...") + logging.warning( + f"Job {job_id} entered Unknown state, giving it {unknowns_grace_period} seconds to recover..." + ) elif time.time() - unknown_start_time > unknowns_grace_period: - logging.error(f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds") + logging.error( + f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds" + ) break else: if unknown_start_time is not None: - logging.info(f"Job {job_id} recovered from Unknown state to {current_job_status}") + logging.info( + f"Job {job_id} recovered from Unknown state to {current_job_status}" + ) unknown_start_time = None - + count += 1 time.sleep(poll_interval) From b228cd8f4b9fd7c60a43cebde32eadbff35d1be0 Mon Sep 17 00:00:00 2001 From: prekshivyas Date: Wed, 23 Jul 2025 21:22:03 -0700 Subject: [PATCH 5/5] make the handling of Unknown job status better by polling Signed-off-by: prekshivyas --- nemo_run/core/execution/lepton.py | 48 +++++++++++++++---------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index e14a02cb..7c1fe83f 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -127,8 +127,6 @@ def move_data( job_id = response.metadata.id_ start_time = time.time() - unknown_start_time = None - count = 0 while True: if time.time() - start_time > timeout: @@ -136,32 +134,32 @@ def move_data( current_job = client.job.get(job_id) current_job_status = current_job.status.state - if count > 0: - if ( - current_job_status == LeptonJobState.Completed - or current_job_status == LeptonJobState.Failed - ): - break - elif current_job_status == LeptonJobState.Unknown: - if unknown_start_time is None: - unknown_start_time = time.time() - logging.warning( - f"Job {job_id} entered Unknown state, giving it {unknowns_grace_period} seconds to recover..." - ) - - elif time.time() - unknown_start_time > unknowns_grace_period: - logging.error( - f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds" - ) - break - else: - if unknown_start_time is not None: + if ( + current_job_status == LeptonJobState.Completed + or current_job_status == LeptonJobState.Failed + ): + break + elif current_job_status == LeptonJobState.Unknown: + logging.warning( + f"Job {job_id} entered Unknown state, checking for up to {unknowns_grace_period} seconds every 2 seconds..." + ) + unknown_start_time = time.time() + recovered = False + while time.time() - unknown_start_time < unknowns_grace_period: + time.sleep(2) + current_job = client.job.get(job_id) + current_job_status = current_job.status.state + if current_job_status != LeptonJobState.Unknown: logging.info( f"Job {job_id} recovered from Unknown state to {current_job_status}" ) - unknown_start_time = None - - count += 1 + recovered = True + break + if not recovered: + logging.error( + f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds" + ) + break time.sleep(poll_interval) if current_job_status != LeptonJobState.Completed: