Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify consumers by adding asyncio.Queue.iter() and asyncio.Queue.iter_nowait() #14

Closed
wants to merge 38 commits into from

Conversation

nineteendo
Copy link
Owner

@nineteendo nineteendo commented Jun 23, 2024

Purpose

Currently consuming items from a queue is very complex. You need to

  1. use an infinite while loop
  2. call queue.task_done() after processing each item
  3. add all worker tasks to a list
  4. join the queue
  5. cancel all worker tasks.
  6. wait until all worker tasks are cancelled

By adding asyncio.Queue.iter() and asyncio.Queue.iter_nowait(), this becomes a lot easier, you only need to call queue.shutdown().

Overview of changes

  • Calling queue.Queue.iter() returns an asynchronous generator which iterates over the queue of items
  • Calling queue.Queue.iter_nowait() returns a generator which iterates over the queue of items without blocking.
  • A private _AsyncQueueIterator has been added to handle the asynchronous iteration
  • The example in the documentation is greatly simplified by the new addition

Example

Without iteration

import asyncio

async def producer(queue):
    for i in range(5):
        print(f'{i} -> queue')
        await queue.put(i)
        await asyncio.sleep(.1)

async def consumer(queue):
    while True:
        i = await queue.get()
        print(f'queue -> {i}')
        await asyncio.sleep(.3)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    tasks = []
    for _ in range(1):
        task = asyncio.create_task(consumer(queue))
        tasks.append(task)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue))

    print('produced everything')
    await queue.join()
    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)
    print('consumed everything')

asyncio.run(main())

With iteration

import asyncio

async def producer(queue):
    for i in range(5):
        print(f'{i} -> queue')
        await queue.put(i)
        await asyncio.sleep(.1)

async def consumer(queue):
    async for i in queue.iter():
        print(f'queue -> {i}')
        await asyncio.sleep(.3)

async def main():
    queue = asyncio.Queue()
    async with asyncio.TaskGroup() as tg1:
        tg1.create_task(consumer(queue))
        async with asyncio.TaskGroup() as tg2:
            tg2.create_task(producer(queue))

        queue.shutdown()
        print('produced everything')

    print('consumed everything')

asyncio.run(main())

Output

0 -> queue
queue -> 0
1 -> queue
2 -> queue
queue -> 1
3 -> queue
4 -> queue
produced everything
queue -> 2
queue -> 3
queue -> 4
consumed everything

📚 Documentation preview 📚: https://cpython-previews--14.org.readthedocs.build/

@nineteendo nineteendo closed this Jun 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant