Skip to content

New asyncWrap helper to execute blocking calls on the native Qt event loop #139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions examples/modal_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import asyncio
import sys

# from PyQt6.QtWidgets import
from PySide6.QtWidgets import QApplication, QMessageBox, QProgressBar

from qasync import QEventLoop, asyncWrap


async def master():
progress = QProgressBar()
progress.setRange(0, 99)
progress.show()
await first_50(progress)


async def first_50(progress):
for i in range(50):
progress.setValue(i)
await asyncio.sleep(0.1)

# Schedule the last 50% to run asynchronously
asyncio.create_task(last_50(progress))

# create a notification box, use helper to make entering event loop safe.
result = await asyncWrap(
lambda: QMessageBox.information(
None, "Task Completed", "The first 50% of the task is completed."
)
)
assert result == QMessageBox.StandardButton.Ok


async def last_50(progress):
for i in range(50, 100):
progress.setValue(i)
await asyncio.sleep(0.1)


if __name__ == "__main__":
app = QApplication(sys.argv)

event_loop = QEventLoop(app)
asyncio.set_event_loop(event_loop)

event_loop.run_until_complete(master())
event_loop.close()
40 changes: 39 additions & 1 deletion qasync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
BSD License
"""

__all__ = ["QEventLoop", "QThreadExecutor", "asyncSlot", "asyncClose"]
__all__ = ["QEventLoop", "QThreadExecutor", "asyncSlot", "asyncClose", "asyncWrap"]

import asyncio
import contextlib
Expand Down Expand Up @@ -849,6 +849,44 @@ def wrapper(*args, **kwargs):
return outer_decorator


async def asyncWrap(fn, *args, **kwargs):
"""
Wrap a blocking function as an asynchronous and run it on the native Qt event loop.
The function will be scheduled using a one shot QTimer which prevents blocking the
QEventLoop. An example usage of this is raising a modal dialogue inside an asyncSlot.
```python
async def before_shutdown(self):
await asyncio.sleep(2)

@asyncSlot()
async def shutdown_clicked(self):
# do some work async
asyncio.create_task(self.before_shutdown())

# run on the native Qt loop, not blocking the QEventLoop
result = await asyncWrap(
lambda: QMessageBox.information(None, "Done", "It is now safe to shutdown.")
)
if result == QMessageBox.StandardButton.Ok:
app.exit(0)
```
"""
future = asyncio.Future()

@functools.wraps(fn)
def helper():
try:
result = fn(*args, **kwargs)
except Exception as e:
future.set_exception(e)
else:
future.set_result(result)

# Schedule the helper to run in the next event loop iteration
QtCore.QTimer.singleShot(0, helper)
return await future


def _get_qevent_loop():
return QEventLoop(QApplication.instance() or QApplication(sys.argv))

Expand Down
53 changes: 53 additions & 0 deletions tests/test_qeventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,59 @@ async def mycoro():
assert not loop.is_running()


@pytest.mark.parametrize(
"async_wrap, expect_async_called, expect_exception",
[(False, False, True), (True, True, False)],
)
def test_async_wrap(
loop, application, async_wrap, expect_async_called, expect_exception
):
"""
Re-entering the event loop from a Task will fail if there is another
runnable task.
"""
async_called = False
main_called = False

async def async_job():
nonlocal async_called
async_called = True

def sync_callback():
coro = async_job()
asyncio.create_task(coro)
assert not async_called
application.processEvents()
assert async_called if expect_async_called else not async_called
return 1, coro

async def main():
nonlocal main_called
if async_wrap:
res, coro = await qasync.asyncWrap(sync_callback)
else:
res, coro = sync_callback()
if expect_exception:
await coro # avoid warnings about unawaited coroutines
assert res == 1
main_called = True


exceptions = []
loop.set_exception_handler(lambda loop, context: exceptions.append(context))

loop.run_until_complete(main())
assert main_called, "The main function should have been called"

if expect_exception:
# We will now have an error in there, because the task 'async_job' could not
# be entered, because the task 'main' was still being executed by the event loop.
assert len(exceptions) == 1
assert isinstance(exceptions[0]["exception"], RuntimeError)
else:
assert len(exceptions) == 0


def test_slow_callback_duration_logging(loop, caplog):
async def mycoro():
time.sleep(1)
Expand Down
Loading