diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index 30d1ee89..0115676e 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import logging from collections.abc import AsyncGenerator @@ -435,7 +436,8 @@ async def _cleanup_producer( task_id: str, ) -> None: """Cleans up the agent execution task and queue manager entry.""" - await producer_task + with contextlib.suppress(asyncio.CancelledError): + await producer_task await self._queue_manager.close(task_id) async with self._running_agents_lock: self._running_agents.pop(task_id, None) diff --git a/tests/server/request_handlers/test_default_request_handler.py b/tests/server/request_handlers/test_default_request_handler.py index 88dd77ab..e53d9070 100644 --- a/tests/server/request_handlers/test_default_request_handler.py +++ b/tests/server/request_handlers/test_default_request_handler.py @@ -1794,6 +1794,32 @@ async def dummy_coro_for_task(): # No error should be raised by pop if key is missing and default is None. +@pytest.mark.asyncio +async def test_cleanup_producer_swallows_cancelled_error(): + """Test _cleanup_producer suppresses CancelledError from producer_task.""" + mock_task_store = AsyncMock(spec=TaskStore) + mock_queue_manager = AsyncMock(spec=QueueManager) + request_handler = DefaultRequestHandler( + agent_executor=DummyAgentExecutor(), + task_store=mock_task_store, + queue_manager=mock_queue_manager, + ) + + task_id = 'task_cancelled' + + async def cancelled_coro(): + raise asyncio.CancelledError + + producer_task = asyncio.create_task(cancelled_coro()) + request_handler._running_agents[task_id] = producer_task + await asyncio.sleep(0) + + await request_handler._cleanup_producer(producer_task, task_id) + + mock_queue_manager.close.assert_awaited_once_with(task_id) + assert task_id not in request_handler._running_agents + + @pytest.mark.asyncio async def test_set_task_push_notification_config_no_notifier(): """Test on_set_task_push_notification_config when _push_config_store is None."""