-
Notifications
You must be signed in to change notification settings - Fork 2.8k
fix: handle ClosedResourceError in StreamableHTTP message router #1384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: handle ClosedResourceError in StreamableHTTP message router #1384
Conversation
|
Hi, In anyio's Implementation1. Conditions for Iteration TerminationClass inheritance: As we can see in the implementation of async def __anext__(self) -> T_co:
try:
return await self.receive()
except EndOfStream:
raise StopAsyncIteration from NoneThat is, the 2. When to Raise EndOfStream or ClosedResourceError
def receive_nowait(self) -> T_co:
"""
Receive the next item if it can be done without waiting.
:return: the received item
:raises ~anyio.ClosedResourceError: if this send stream has been closed
:raises ~anyio.EndOfStream: if the buffer is empty and this stream has been
closed from the sending end
:raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks
waiting to send
"""All if self._closed:
raise ClosedResourceErrorAnd of course, class MemoryObjectReceiveStream:
...
...
def close(self) -> None:
"""
Close the stream.
This works the exact same way as :meth:`aclose`, but is provided as a special
case for the benefit of synchronous callbacks.
"""
if not self._closed:
self._closed = True
self._state.open_receive_channels -= 1
if self._state.open_receive_channels == 0:
send_events = list(self._state.waiting_senders.keys())
for event in send_events:
event.set()Review of Known IssuesIn issue #1219, the debug information clearly shows The traceback in issue #1190 also lists the root cause of the error. It occurs when In fact, looking at the anyio implementation above, it's very clear that Why This Implementation is AppropriateThis implementation is not "silencing the error". In fact, in scenarios where multiple coroutines operate on the same stream simultaneously, checking whether the stream has been closed is a necessary operation. Since When checking externally, we simultaneously check |
126d1ed to
2c02f06
Compare
|
I describe an easy way to reproduce this issue here #1190 (comment) Your test case is quite similar to the underlying implementation of the example. |
ofek
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test file needs a new line at the end.
|
Hi, all! I recently tried to review the exception occurrence situation again. Here are some additional insights I'd like to share. Welcome any feedback and guidance! Core ProblemSynchronous flow execution completes, causing Test Case AnalysisBased on three test scenarios in
Execution Flow Analysis1. System Startup Phase# streamable_http_manager.py:170-187
async def run_stateless_server():
async with http_transport.connect() as streams:
# Start message router task
tg.start_soon(message_router)
# Start MCP server
await self.app.run(streams)2. Message Router Suspension# streamable_http.py:831
async def message_router():
async for session_message in write_stream_reader: # Key point
# Process message routing
# After processing, return to loop start
# Call checkpoint() again to suspend and wait for next messageKey Mechanism: The 3. Request Processing Phase3.1 Invalid Request Headers Scenario (Fast Failure)# streamable_http.py:315-323
async def _handle_post_request():
# Synchronous Accept header validation
has_json, has_sse = self._check_accept_headers(request) # Synchronous
if not (has_json and has_sse):
response = self._create_error_response(...) # Synchronous
await response(scope, receive, send) # Only yield point
return # Immediate return3.2 JSON Response Mode Scenario (After Processing Response)# streamable_http.py:397-439
if self.is_json_response_enabled:
# Wait for response
async for event_message in request_stream_reader: # Line 408
if isinstance(event_message.message.root, JSONRPCResponse | JSONRPCError):
response_message = event_message.message
break
# Send response
response = self._create_json_response(response_message)
await response(scope, receive, send)
# Clean up resources
await self._clean_up_memory_streams(request_id) # Synchronous4. Transport Termination Phase# streamable_http_manager.py:189-193
# Immediately terminate after processing request
await http_transport.handle_request(scope, receive, send)
await http_transport.terminate() # Immediately close streams# streamable_http.py:623-653
async def terminate(self):
self._terminated = True
# Close all streams
if self._write_stream_reader is not None:
await self._write_stream_reader.aclose() # Close stream used by message routerPrecise Timing of Race ConditionKey TimelineCore Problem1. When validation fails early (T3), the main coroutine executes synchronous error handling and immediately terminates the transport, causing the same race condition as described in the test cases. 2. After T4, the main coroutine executes all synchronous code with no opportunity to yield control, preventing the message router from completing its current iteration before the stream is closed. Root CauseThe message router suspends via SolutionDirect ClosedResourceError HandlingReferencing the handling practice for request_stream, the appropriate solution is to directly catch # streamable_http.py:862-871 (existing code)
if request_stream_id in self._request_streams:
try:
# Send both the message and the event ID
await self._request_streams[request_stream_id][0].send(EventMessage(message, event_id))
except (
anyio.BrokenResourceError,
anyio.ClosedResourceError, # Already catching this error
):
# Stream might be closed, remove from registry
self._request_streams.pop(request_stream_id, None)Solution: Directly catch # streamable_http.py:829-887
async def message_router():
try:
async for session_message in write_stream_reader:
# Process message routing logic
# ...
except anyio.ClosedResourceError:
# Stream closed, graceful exit
if self._terminated:
logger.debug("Read stream closed by client")
else:
logger.debug("Read stream closed unexpectedly")
except Exception:
logger.exception("Error in message router")This way, when the main coroutine closes the stream, the message router will gracefully catch I would like to emphasize once again that directly catching the error is appropriate and correct. Of course, I also welcome more guidance on this solution, and I'm very happy to learn from everyone. 😊😊 |
Notes: - Disable returning JSON responses when running in stateless mode. This change prevents the `anyio.ClosedResourceError` that can occur when the response stream is closed unexpectedly. - This resolves an observed anyio.ClosedResourceError. More discussion and context are available in the linked threads. - An alternative is to catch this error inside message_router, handle it gracefully, and log it instead of letting it propagate to the top-level; that approach is left for a follow-up. Github Issue Threads: - jlowin/fastmcp#2083 - modelcontextprotocol/python-sdk#1384 (comment)
|
Is this blocked on reviews? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to stop using uvicorn in every test in this repository. It kills me (and the test suite...).
We shouldn't rely on network requests in the test suite.
That said, the change seems fine.
@maxisbey Can you please decide what do you want to do with the test here?
|
I think we should merge for now since it's a high priority issue and fixes a bug affecting people. Fixing the uvicorn in test suites is something that's been mentioned a few times and we should fix in some follow ups: |
|
It looks like the merge commit that was just introduced impacted coverage and now CI is failing because a handful of lines are no longer covered. |
Head branch was pushed to by a user without write access
Fix Race Condition in StreamableHTTP Transport (Closes #1363)
Motivation and Context
Starting from v1.12.0, MCP servers in HTTP Streamable mode experience a race condition that causes
ClosedResourceErrorexceptions when requests fail validation early (e.g., due to incorrect Accept headers). This issue affects server reliability and can be reproduced consistently with fast-failing requests.The race condition occurs because:
async for write_stream_readerloopwrite_stream_readercallscheckpoint()inreceive(), yielding controlwrite_stream_readerClosedResourceErrorThis fix ensures graceful handling of stream closure scenarios without propagating exceptions that could destabilize the server.
How Has This Been Tested?
Test Suite
Added comprehensive test suite in
tests/issues/test_1363_race_condition_streamable_http.pythat reproduces the race condition:Invalid Accept Headers Test:
application/jsonin Accept headertext/event-streamin Accept headerInvalid Content-Type Test:
Log Analysis:
ClosedResourceErrorexceptions occurTest Execution
Breaking Changes
None. This is a bug fix that maintains full backward compatibility.
Types of changes
Checklist
Additional context
Implementation Details
The fix adds explicit exception handling for
anyio.ClosedResourceErrorin the message router loop:This approach:
Related Issues