-
Notifications
You must be signed in to change notification settings - Fork 14
Add SSE streaming support (message/stream) #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Add abstract streaming methods to Broker base class: - send_stream_event() for publishing events - subscribe_to_stream() for receiving events - Implement pub-sub pattern in InMemoryBroker: - Track subscribers with _event_subscribers dict - Use anyio.Lock for thread-safe subscription management - Handle automatic cleanup of disconnected subscribers - Support final status updates to complete streams - Add StreamEvent type union and TypeAdapter to schema Co-Authored-By: Claude <[email protected]>
- Rewrite stream_message to return AsyncGenerator[StreamEvent, None] - Implement streaming workflow: - Create and submit task, yielding it immediately - Start task execution asynchronously in background - Subscribe to broker event stream and forward all events - Handle context_id generation and history_length parameter - Clean up worker imports for better organization
- Add streaming parameter to FastA2A constructor - Implement message/stream handler that: - Parses streaming requests with proper validation - Wraps task manager events in JSON-RPC responses - Returns Server-Sent Events (SSE) response - Update agent capabilities to advertise streaming support - Use camelCase serialization for protocol compliance
- Add sse-starlette>=2.0.0 for Server-Sent Events support - Add dev dependencies: - httpx-sse for testing SSE functionality - pytest-asyncio for async test support - Add comprehensive coverage configuration: - Enable branch coverage - Configure source paths and omit patterns - Add exclusion patterns for abstract methods and TYPE_CHECKING These dependencies enable the streaming implementation and improve the development/testing experience.
- Test agent card with all configuration parameters - Test HTTP HEAD and OPTIONS methods support - Test caching behavior with ETag and Last-Modified headers - Test documentation endpoint functionality These tests ensure the agent card endpoints behave correctly according to the A2A protocol specification.
- Add test_streaming_integration.py with end-to-end streaming tests - Add test_broker.py to test pub-sub functionality and edge cases - Add test_task_manager.py to test streaming message handling - Test concurrent subscribers, disconnection handling, error cases - Verify SSE response format and JSON-RPC compliance
|
cc @Kludex |
|
@Kludex any thoughts on this? |
|
@physicsrob Thanks for this, I used this code in my A2A PoC in my company and was pretty happy with it. |
|
@Kludex @samuelcolvin is anyone able to have a look at this? |
|
@Coding-Crashkurse Thanks! Unfortunately I'm in the same boat. I need to be able to use pydantic-ai agents with A2A (including streaming, deps, etc). I'm going to start investigating other strategies soon. |
|
I'll check it today. Sorry the delay. |
|
Any update on getting this in? @physicsrob @Kludex |
This PR implements streaming support for FastA2A using Server-Sent Events (SSE), adding support for the message/stream endpoint.
I plan on following this up with a PR targeting pydantic-ai, which will use this support to add message/stream support for pydantic-ai's
Agent.to_a2a2()method.Core Implementation:
Dependencies:
Configuration:
Testing:
The implementation follows the A2A protocol v0.2.5 specification for streaming responses. Workers can emit real-time status updates and artifact chunks during task execution using the broker's
send_stream_event() method.