diff --git a/snakemake_interface_executor_plugins/executors/remote.py b/snakemake_interface_executor_plugins/executors/remote.py index 74ef734..fa261a9 100644 --- a/snakemake_interface_executor_plugins/executors/remote.py +++ b/snakemake_interface_executor_plugins/executors/remote.py @@ -79,11 +79,13 @@ def __init__( self.active_jobs = list() self.lock = threading.Lock() + self.shutdown_event = threading.Event() self.wait = True self.wait_thread = threading.Thread(target=self._wait_thread) self.wait_thread.daemon = True self.wait_thread.start() + max_status_checks_frac = Fraction( self.max_status_checks_per_second ).limit_denominator() @@ -201,6 +203,7 @@ def _wait_thread(self): def shutdown(self): with self.lock: self.wait = False + self.shutdown_event.set() self.wait_thread.join() if not self.workflow.remote_execution_settings.immediate_submit: # Only delete tmpdir (containing jobscripts) if not using @@ -272,7 +275,11 @@ async def sleep(self): if self.next_seconds_between_status_checks is None else self.next_seconds_between_status_checks ) - await asyncio.sleep(duration) + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, lambda: self.shutdown_event.wait(timeout=duration) + ) + @property def next_seconds_between_status_checks(self):