@@ -251,6 +251,13 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None:
251251 await event_source .response .aclose ()
252252 break
253253
254+ async def _send_error_response (self , ctx : RequestContext , error : Exception ) -> None :
255+ """Send an error response to the client."""
256+ error_data = ErrorData (code = 32000 , message = str (error ))
257+ jsonrpc_error = JSONRPCError (jsonrpc = "2.0" , id = ctx .session_message .message .root .id , error = error_data )
258+ session_message = SessionMessage (message = JSONRPCMessage (jsonrpc_error ))
259+ await ctx .read_stream_writer .send (session_message )
260+
254261 async def _handle_post_request (self , ctx : RequestContext ) -> None :
255262 """Handle a POST request with response processing."""
256263 headers = self ._prepare_request_headers (ctx .headers )
@@ -403,10 +410,13 @@ async def post_writer(
403410 )
404411
405412 async def handle_request_async ():
406- if is_resumption :
407- await self ._handle_resumption_request (ctx )
408- else :
409- await self ._handle_post_request (ctx )
413+ try :
414+ if is_resumption :
415+ await self ._handle_resumption_request (ctx )
416+ else :
417+ await self ._handle_post_request (ctx )
418+ except Exception as e :
419+ await self ._send_error_response (ctx , e )
410420
411421 # If this is a request, start a new task to handle it
412422 if isinstance (message .root , JSONRPCRequest ):
@@ -475,6 +485,9 @@ async def streamablehttp_client(
475485 read_stream_writer , read_stream = anyio .create_memory_object_stream [SessionMessage | Exception ](0 )
476486 write_stream , write_stream_reader = anyio .create_memory_object_stream [SessionMessage ](0 )
477487
488+ transport ._write_stream = write_stream_reader
489+ transport ._read_stream_writer = read_stream_writer
490+
478491 async with anyio .create_task_group () as tg :
479492 try :
480493 logger .debug (f"Connecting to StreamableHTTP endpoint: { url } " )
0 commit comments