Skip to content

Commit 55eb97f

Browse files
prekshivyasprekshivyas
authored andcommitted
add a grace for Jobs that may start in Unknown (NVIDIA-NeMo#291)
* add a grace for Jobs that may start in Unknown Signed-off-by: Prekshi Vyas <prekshivyas@gmail.com> * add a grace for Jobs that may start in Unknown Signed-off-by: Prekshi Vyas <prekshivyas@gmail.com> * add a grace for Jobs that may start in Unknown Signed-off-by: Prekshi Vyas <prekshivyas@gmail.com> * fix linting Signed-off-by: Prekshi Vyas <prekshivyas@gmail.com> * make the handling of Unknown job status better by polling Signed-off-by: prekshivyas <prekhsivyas@gmail.com> --------- Signed-off-by: Prekshi Vyas <prekshivyas@gmail.com> Signed-off-by: prekshivyas <prekhsivyas@gmail.com> Co-authored-by: prekshivyas <prekhsivyas@gmail.com> Signed-off-by: Zoey Zhang <zozhang@nvidia.com>
1 parent f964b0a commit 55eb97f

1 file changed

Lines changed: 33 additions & 8 deletions

File tree

nemo_run/core/execution/lepton.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,13 @@ def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> Li
8585
full_command = ["sh", "-c", cmd]
8686
return full_command
8787

88-
def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = 5) -> None:
88+
def move_data(
89+
self,
90+
sleep: float = 10,
91+
timeout: int = 600,
92+
poll_interval: int = 5,
93+
unknowns_grace_period: int = 60,
94+
) -> None:
8995
"""
9096
Moves job directory into remote storage and deletes the workload after completion.
9197
"""
@@ -124,20 +130,39 @@ def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int =
124130
job_id = response.metadata.id_
125131

126132
start_time = time.time()
127-
count = 0
128133

129134
while True:
130135
if time.time() - start_time > timeout:
131136
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.")
137+
132138
current_job = client.job.get(job_id)
133139
current_job_status = current_job.status.state
134-
if count > 0 and current_job_status in [
135-
LeptonJobState.Completed,
136-
LeptonJobState.Failed,
137-
LeptonJobState.Unknown,
138-
]:
140+
if (
141+
current_job_status == LeptonJobState.Completed
142+
or current_job_status == LeptonJobState.Failed
143+
):
139144
break
140-
count += 1
145+
elif current_job_status == LeptonJobState.Unknown:
146+
logging.warning(
147+
f"Job {job_id} entered Unknown state, checking for up to {unknowns_grace_period} seconds every 2 seconds..."
148+
)
149+
unknown_start_time = time.time()
150+
recovered = False
151+
while time.time() - unknown_start_time < unknowns_grace_period:
152+
time.sleep(2)
153+
current_job = client.job.get(job_id)
154+
current_job_status = current_job.status.state
155+
if current_job_status != LeptonJobState.Unknown:
156+
logging.info(
157+
f"Job {job_id} recovered from Unknown state to {current_job_status}"
158+
)
159+
recovered = True
160+
break
161+
if not recovered:
162+
logging.error(
163+
f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds"
164+
)
165+
break
141166
time.sleep(poll_interval)
142167

143168
if current_job_status != LeptonJobState.Completed:

0 commit comments

Comments
 (0)