Skip to content
Draft
171 changes: 146 additions & 25 deletions src/qasync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
BSD License
"""

__all__ = ["QEventLoop", "QThreadExecutor", "asyncSlot", "asyncClose", "asyncWrap"]
__all__ = [
"QEventLoop",
"QThreadExecutor",
"QThreadPoolExecutor",
"asyncSlot",
"asyncClose",
"asyncWrap",
]

import asyncio
import contextlib
Expand All @@ -22,6 +29,7 @@
import time
from concurrent.futures import Future
from queue import Queue
from weakref import WeakSet

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -162,8 +170,80 @@ def wait(self):
super().wait()


def _result_or_cancel(fut, timeout=None):
try:
try:
return fut.result(timeout)
finally:
fut.cancel()
finally:
# Break a reference cycle with the exception in self._exception
del fut


class QThreadExecutorBase:
def __init__(self):
self._been_shutdown = False
self.futures = WeakSet()

def submit(self, callback, *args, **kwargs):
raise NotImplementedError()

def map(self, func, *iterables, timeout=None, chunksize=1):
"""Map the function to the iterables in a blocking way."""
# based on standard python implementation for BaseExecutor.map
end_time = time.monotonic() + timeout if timeout is not None else None
futures = [self.submit(func, *args) for args in zip(*iterables)]

# the generator must be an inner function so that map() and the submit
# occurs immediately.
def generator():
# reverse and pop to not keep future references around
# (for reference cycles in exceptions)
try:
futures.reverse()
while futures:
if end_time is not None:
yield _result_or_cancel(
futures.pop(), timeout=end_time - time.monotonic()
)
else:
yield _result_or_cancel(futures.pop())
finally:
for future in futures:
future.cancel()

return generator()

def shutdown(self, wait=True, *, cancel_futures=False):
if self._been_shutdown:
raise RuntimeError(f"{self.__class__.__name__} has been shutdown")
self._been_shutdown = True

def __enter__(self, *args):
if self._been_shutdown:
raise RuntimeError(f"{self.__class__.__name__} has been shutdown")
return self

def __exit__(self, *args):
self.shutdown()

@staticmethod
def compute_stack_size():
# Match cpython/Python/thread_pthread.h
if sys.platform.startswith("darwin"):
stack_size = 16 * 2**20
elif sys.platform.startswith("freebsd"):
stack_size = 4 * 2**20
elif sys.platform.startswith("aix"):
stack_size = 2 * 2**20
else:
stack_size = None
return stack_size


@with_logger
class QThreadExecutor:
class QThreadExecutor(QThreadExecutorBase):
"""
ThreadExecutor that produces QThreads.

Expand All @@ -181,23 +261,16 @@ def __init__(self, max_workers=10, stack_size=None):
self.__max_workers = max_workers
self.__queue = Queue()
if stack_size is None:
# Match cpython/Python/thread_pthread.h
if sys.platform.startswith("darwin"):
stack_size = 16 * 2**20
elif sys.platform.startswith("freebsd"):
stack_size = 4 * 2**20
elif sys.platform.startswith("aix"):
stack_size = 2 * 2**20
stack_size = self.compute_stack_size()
self.__workers = [
_QThreadWorker(self.__queue, i + 1, stack_size) for i in range(max_workers)
]
self.__been_shutdown = False

for w in self.__workers:
w.start()

def submit(self, callback, *args, **kwargs):
if self.__been_shutdown:
if self._been_shutdown:
raise RuntimeError("QThreadExecutor has been shutdown")

future = Future()
Expand All @@ -208,32 +281,80 @@ def submit(self, callback, *args, **kwargs):
kwargs,
)
self.__queue.put((future, callback, args, kwargs))
self.futures.add(future)
return future

def map(self, func, *iterables, timeout=None):
raise NotImplementedError("use as_completed on the event loop")

def shutdown(self, wait=True):
if self.__been_shutdown:
raise RuntimeError("QThreadExecutor has been shutdown")

self.__been_shutdown = True
def shutdown(self, wait=True, *, cancel_futures=False):
super().shutdown(wait=wait, cancel_futures=cancel_futures)

self._logger.debug("Shutting down")
for i in range(len(self.__workers)):
# Signal workers to stop
self.__queue.put(None)
if cancel_futures:
for future in self.futures:
future.cancel()
if wait:
for w in self.__workers:
w.wait()

def __enter__(self, *args):
if self.__been_shutdown:
raise RuntimeError("QThreadExecutor has been shutdown")
return self

def __exit__(self, *args):
self.shutdown()
class _QThreadPoolExecutorRunnable(QtCore.QRunnable):
def __init__(self, callback, *args, **kwargs):
super().__init__()
self._callback = callback
self._args = args
self._kwargs = kwargs
self.future = Future()

def run(self):
if self.future.set_running_or_notify_cancel():
try:
result = self._callback(*self._args, **self._kwargs)
self.future.set_result(result)
except Exception as e:
self.future.set_exception(e)


@with_logger
class QThreadPoolExecutor(QThreadExecutorBase):
"""
ThreadPoolExecutor uses a QThreadPool as the underlying implementation.

Same API as `concurrent.futures.Executor`

>>> from qasync import QThreadPoolExecutor
>>> with QThreadPoolExecutor() as executor:
... f = executor.submit(lambda x: 2 + x, 2)
... r = f.result()
... assert r == 4
"""

def __init__(self, pool=None):
super().__init__()
self.pool = pool or QtCore.QThreadPool.globalInstance()

def submit(self, callback, *args, **kwargs):
if self._been_shutdown:
raise RuntimeError(f"{self.__class__.__name__} has been shutdown")

runnable = _QThreadPoolExecutorRunnable(callback, *args, **kwargs)
self.pool.start(runnable)
self.futures.add(runnable.future)
return runnable.future

def shutdown(self, wait=True, *, cancel_futures=False):
super().shutdown(wait=wait, cancel_futures=cancel_futures)
self._logger.debug("Shutting down")
if cancel_futures:
for future in self.futures:
future.cancel()
if wait:
for w in list(self.futures):
try:
w.result()
except Exception:
pass


def _format_handle(handle: asyncio.Handle):
Expand Down
Loading
Loading