diff --git a/qasync/__init__.py b/qasync/__init__.py index 3363e97..f42ced3 100644 --- a/qasync/__init__.py +++ b/qasync/__init__.py @@ -28,6 +28,7 @@ import time from concurrent.futures import Future from queue import Queue +from collections import deque logger = logging.getLogger(__name__) @@ -294,6 +295,79 @@ def __log_debug(self, *args, **kwargs): self._logger.debug(*args, **kwargs) +@with_logger +class _CallSoonQueue(QtCore.QObject): + def __init__(self): + super().__init__() + # Contains asyncio.Handle objects + # Use a deque instead of Queue, as we don't require + # synchronization between threads here. + self.__callbacks = deque() + + # Keep track of the current timer. + # The queue can only have a single timer that services it. + # Once fired, all pending callbacks will be processed. + self.__timer_id = None + self.__stopped = False + self.__debug_enabled = False + + def add_callback(self, handle): + # handle must be an asyncio.Handle + self.__callbacks.append(handle) + self.__log_debug("Registering call_soon handle %s", id(handle)) + + # Create a timer if it doesn't yet exist + if self.__timer_id is None: + # Set a 0-delay timer on itself, this will ensure thats + # it gets fired immediately after window events are processed the next time. + # See https://doc.qt.io/qt-6/qtimer.html#interval-prop + self.__timer_id = self.startTimer(0) + self.__log_debug("Registering call_soon timer %s", self.__timer_id) + return handle + + def timerEvent(self, event): + timerId = event.timerId() + # We should have only one timer active at the same time, so + # this assert will get hit only when something's very bad + assert timerId == self.__timer_id + + # Stop timer if stopped + if self.__stopped: + self.__log_debug("call_soon queue stopped, clearing handles") + # TODO: Do we need to del the handles or somehow invalidate them? + self.__callbacks.clear() + self.killTimer(timerId) + self.__timer_id = None + return + + # Iterate over pending callbacks + # TODO: Runtime deadline, don't process the entire queue if it takes too long? + while len(self.__callbacks) > 0: + handle = self.__callbacks.popleft() + self.__log_debug("Calling call_soon handle %s", id(handle)) + handle._run() + + # No more callbacks exist, we can dispose this timer. + # It will be recreated once a callback is registered again. + # It's should be safe to assume that another thread isn't calling + # add_callback during the lifetime of timerEvent + self.__log_debug("Stopping call_soon timer %s", timerId) + self.killTimer(timerId) + self.__timer_id = None + assert len(self.__callbacks) == 0 + + def stop(self): + self.__log_debug("Stopping call_soon queue") + self.__stopped = True + + def set_debug(self, enabled): + self.__debug_enabled = enabled + + def __log_debug(self, *args, **kwargs): + if self.__debug_enabled: + self._logger.debug(*args, **kwargs) + + def _fileno(fd): if isinstance(fd, int): return fd @@ -339,6 +413,7 @@ def __init__(self, app=None, set_running_loop=False, already_running=False): self._read_notifiers = {} self._write_notifiers = {} self._timer = _SimpleTimer() + self._call_soon_queue = _CallSoonQueue() self.__call_soon_signaller = signaller = _make_signaller(QtCore, object, tuple) self.__call_soon_signal = signaller.signal @@ -441,6 +516,7 @@ def close(self): super().close() self._timer.stop() + self._call_soon_queue.stop() self.__app = None for notifier in itertools.chain( @@ -474,6 +550,11 @@ def call_later(self, delay, callback, *args, context=None): return self._add_callback(asyncio.Handle(callback, args, self), delay) def _add_callback(self, handle, delay=0): + if delay == 0: + # To ensure that we can guarantee the execution order of + # 0-delay callbacks, add them to a special queue, rather than + # assume that Qt will fire the timerEvents in order + return self._call_soon_queue.add_callback(handle) return self._timer.add_callback(handle, delay) def call_soon(self, callback, *args, context=None): @@ -717,6 +798,7 @@ def set_debug(self, enabled): super().set_debug(enabled) self.__debug_enabled = enabled self._timer.set_debug(enabled) + self._call_soon_queue.set_debug(enabled) def __enter__(self): return self