Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -555,24 +555,28 @@

if task.status.state in TERMINAL_TASK_STATES:
raise ServerError(
error=InvalidParamsError(
error=UnsupportedOperationError(
message=f'Task {task.id} is in terminal state: {task.status.state}'
)
)

# The operation MUST return a Task object as the first event in the stream
# https://a2a-protocol.org/latest/specification/#316-subscribe-to-task
yield task

task_manager = TaskManager(
task_id=task.id,
context_id=task.context_id,
task_store=self.task_store,
initial_message=None,
context=context,
)

result_aggregator = ResultAggregator(task_manager)

queue = await self._queue_manager.tap(task.id)
if not queue:
raise ServerError(error=TaskNotFoundError())

Check notice on line 579 in src/a2a/server/request_handlers/default_request_handler.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/request_handlers/default_request_handler.py (172-183)

consumer = EventConsumer(queue)
async for event in result_aggregator.consume_and_emit(consumer):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,10 @@ async def exec_side_effect(_request, queue: EventQueue):
# Allow producer to emit the next event
allow_second_event.set()

received = await resub_gen.__anext__()
first_subscribe_event = await anext(resub_gen)
assert first_subscribe_event == task_for_resub

received = await anext(resub_gen)
assert received == second_event

# Finish producer to allow cleanup paths to complete
Expand Down Expand Up @@ -2706,7 +2709,7 @@ async def test_on_subscribe_to_task_in_terminal_state(terminal_state):
async for _ in request_handler.on_subscribe_to_task(params, context):
pass # pragma: no cover

assert isinstance(exc_info.value.error, InvalidParamsError)
assert isinstance(exc_info.value.error, UnsupportedOperationError)
assert exc_info.value.error.message
assert (
f'Task {task_id} is in terminal state: {terminal_state}'
Expand Down
4 changes: 3 additions & 1 deletion tests/server/request_handlers/test_jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,9 @@ async def streaming_coro():
collected_events: list[Any] = []
async for event in response:
collected_events.append(event)
assert len(collected_events) == len(events)
assert (
len(collected_events) == len(events) + 1
) # First event is task itself
assert mock_task.history is not None and len(mock_task.history) == 0

async def test_on_subscribe_no_existing_task_error(self) -> None:
Expand Down
Loading