Skip to content

[Bug]: ResultAggregator._continue_consuming background task is not referenced and may be garbage collected #774

@kevinlu310

Description

@kevinlu310

What happened?

In ResultAggregator.consume_and_break_on_interrupt(), when a non-blocking message/send request interrupts early (e.g., blocking=False), the remaining event consumption is delegated to a background task via asyncio.create_task(). However, the returned Task object is not saved, making it eligible for garbage collection.

# src/a2a/server/tasks/result_aggregator.py, line 162
asyncio.create_task(  # noqa: RUF006
    self._continue_consuming(event_stream, event_callback)
)

Per the Python docs:

Important: Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn't referenced elsewhere may get garbage collected at any time, even before it's done.

When this task is garbage collected:

  1. The event_stream async generator is finalized
  2. Remaining events (artifacts, completed/failed status updates) are never processed
  3. The event_callback (push notification sender) never fires for the final task state
  4. The task remains stuck in working state permanently from the client's perspective

Steps to Reproduce

  1. Configure an A2A server with push notifications enabled
  2. Send a message/send request with MessageSendConfiguration(blocking=False, push_notification_config=...)
  3. Have the agent executor take a non-trivial amount of time (>10 seconds) to complete
  4. Observe that the task is acknowledged with working status but never transitions to completed
  5. The push notification webhook for the final state is never sent

The longer the agent takes, the more likely the GC collects the task. For agents running 60+ seconds, this is essentially 100% reproducible.

Expected Behavior

The background _continue_consuming task should complete, processing all remaining events and firing the push notification callback for each, including the terminal state.

Root Cause

The noqa: RUF006 comment on line 162 suppresses Ruff's asyncio-dangling-task rule, which specifically warns about this pattern. The TODO on line 161 also acknowledges the issue:

# TODO: We should track all outstanding tasks to ensure they eventually complete.

DefaultRequestHandler already has a _background_tasks: set[asyncio.Task] and _track_background_task() method for exactly this purpose, but ResultAggregator doesn't use it.

Suggested Fix

Return the background task from consume_and_break_on_interrupt() so the caller can track it, or accept a task-tracking callback. For example:

# Return the background task in the tuple
async def consume_and_break_on_interrupt(
    self, consumer, blocking=True, event_callback=None
) -> tuple[Task | Message | None, bool, asyncio.Task | None]:
    ...
    bg_task = None
    if should_interrupt:
        bg_task = asyncio.create_task(
            self._continue_consuming(event_stream, event_callback)
        )
        interrupted = True
        break
    return await self.task_manager.get_task(), interrupted, bg_task

Then in DefaultRequestHandler.on_message_send:

result, interrupted_or_non_blocking, bg_task = await result_aggregator.consume_and_break_on_interrupt(...)
if bg_task:
    bg_task.set_name(f'continue_consuming:{task_id}')
    self._track_background_task(bg_task)

Environment

  • a2a-sdk version: 0.3.22 (also present on latest main)
  • Python: 3.12.12
  • OS: macOS (ARM64)

Related

Relevant log output

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions