From 52a0cec206e7266d72dd921c33ee701e20fe2eb5 Mon Sep 17 00:00:00 2001 From: Calvin Brown Date: Wed, 17 Sep 2025 07:55:08 -0500 Subject: [PATCH 1/3] Adding custom httpx client for streamable mcp connection --- .../adk/tools/mcp_tool/mcp_session_manager.py | 39 +++++++++++++------ .../mcp_tool/test_mcp_session_manager.py | 13 +++++++ 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/src/google/adk/tools/mcp_tool/mcp_session_manager.py b/src/google/adk/tools/mcp_tool/mcp_session_manager.py index fbe843a510..daa0a0e689 100644 --- a/src/google/adk/tools/mcp_tool/mcp_session_manager.py +++ b/src/google/adk/tools/mcp_tool/mcp_session_manager.py @@ -29,7 +29,8 @@ from typing import Union import anyio -from pydantic import BaseModel +import httpx +from pydantic import BaseModel, ConfigDict try: from mcp import ClientSession @@ -99,13 +100,16 @@ class StreamableHTTPConnectionParams(BaseModel): Streamable HTTP server. terminate_on_close: Whether to terminate the MCP Streamable HTTP server when the connection is closed. + httpx_client: httpx.AsyncClient to use for the connection. """ - + url: str headers: dict[str, Any] | None = None timeout: float = 5.0 sse_read_timeout: float = 60 * 5.0 terminate_on_close: bool = True + httpx_client: httpx.AsyncClient | None = None + model_config = ConfigDict(arbitrary_types_allowed=True) def retry_on_closed_resource(func): @@ -277,15 +281,28 @@ def _create_client(self, merged_headers: Optional[Dict[str, str]] = None): sse_read_timeout=self._connection_params.sse_read_timeout, ) elif isinstance(self._connection_params, StreamableHTTPConnectionParams): - client = streamablehttp_client( - url=self._connection_params.url, - headers=merged_headers, - timeout=timedelta(seconds=self._connection_params.timeout), - sse_read_timeout=timedelta( - seconds=self._connection_params.sse_read_timeout - ), - terminate_on_close=self._connection_params.terminate_on_close, - ) + if self._connection_params.httpx_client: + client = streamablehttp_client( + url=self._connection_params.url, + headers=merged_headers, + timeout=timedelta(seconds=self._connection_params.timeout), + sse_read_timeout=timedelta( + seconds=self._connection_params.sse_read_timeout + ), + terminate_on_close=self._connection_params.terminate_on_close, + httpx_client_factory=self._connection_params.httpx_client, + ) + else: + client = streamablehttp_client( + url=self._connection_params.url, + headers=merged_headers, + timeout=timedelta(seconds=self._connection_params.timeout), + sse_read_timeout=timedelta( + seconds=self._connection_params.sse_read_timeout + ), + terminate_on_close=self._connection_params.terminate_on_close, + ) + else: raise ValueError( 'Unable to initialize connection. Connection should be' diff --git a/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py b/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py index 559e51719a..12b215c9d0 100644 --- a/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py +++ b/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py @@ -20,6 +20,7 @@ from unittest.mock import Mock from unittest.mock import patch +import httpx import pytest # Skip all tests in this module if Python version is less than 3.10 @@ -143,6 +144,18 @@ def test_init_with_streamable_http_params(self): manager = MCPSessionManager(http_params) assert manager._connection_params == http_params + assert manager._connection_params.httpx_client is None + + def test_init_with_streamable_http_params_with_httpx_client(self): + """Test initialization with StreamableHTTPConnectionParams.""" + client = httpx.AsyncClient() + http_params = StreamableHTTPConnectionParams( + url="https://example.com/mcp", timeout=15.0, httpx_client=client + ) + manager = MCPSessionManager(http_params) + + assert manager._connection_params == http_params + assert manager._connection_params.httpx_client == client def test_generate_session_key_stdio(self): """Test session key generation for stdio connections.""" From 48930736005e2c3cddcaeacd4099de3ed9df0c21 Mon Sep 17 00:00:00 2001 From: Calvin Brown Date: Wed, 17 Sep 2025 07:58:02 -0500 Subject: [PATCH 2/3] running autoformat --- src/google/adk/tools/mcp_tool/mcp_session_manager.py | 7 ++++--- tests/unittests/tools/mcp_tool/test_mcp_session_manager.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/google/adk/tools/mcp_tool/mcp_session_manager.py b/src/google/adk/tools/mcp_tool/mcp_session_manager.py index daa0a0e689..3bf8b8ae5c 100644 --- a/src/google/adk/tools/mcp_tool/mcp_session_manager.py +++ b/src/google/adk/tools/mcp_tool/mcp_session_manager.py @@ -30,7 +30,8 @@ import anyio import httpx -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel +from pydantic import ConfigDict try: from mcp import ClientSession @@ -102,7 +103,7 @@ class StreamableHTTPConnectionParams(BaseModel): when the connection is closed. httpx_client: httpx.AsyncClient to use for the connection. """ - + url: str headers: dict[str, Any] | None = None timeout: float = 5.0 @@ -302,7 +303,7 @@ def _create_client(self, merged_headers: Optional[Dict[str, str]] = None): ), terminate_on_close=self._connection_params.terminate_on_close, ) - + else: raise ValueError( 'Unable to initialize connection. Connection should be' diff --git a/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py b/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py index 12b215c9d0..fb17db78c6 100644 --- a/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py +++ b/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py @@ -145,7 +145,7 @@ def test_init_with_streamable_http_params(self): assert manager._connection_params == http_params assert manager._connection_params.httpx_client is None - + def test_init_with_streamable_http_params_with_httpx_client(self): """Test initialization with StreamableHTTPConnectionParams.""" client = httpx.AsyncClient() From 9c7d1200d27902fbc11c8389e6dacdc1754a2cd4 Mon Sep 17 00:00:00 2001 From: Calvin Brown Date: Wed, 17 Sep 2025 08:01:23 -0500 Subject: [PATCH 3/3] Addressing gemini code review block --- .../adk/tools/mcp_tool/mcp_session_manager.py | 31 +++++++------------ .../mcp_tool/test_mcp_session_manager.py | 2 +- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/src/google/adk/tools/mcp_tool/mcp_session_manager.py b/src/google/adk/tools/mcp_tool/mcp_session_manager.py index 3bf8b8ae5c..fa64e3aaea 100644 --- a/src/google/adk/tools/mcp_tool/mcp_session_manager.py +++ b/src/google/adk/tools/mcp_tool/mcp_session_manager.py @@ -282,27 +282,18 @@ def _create_client(self, merged_headers: Optional[Dict[str, str]] = None): sse_read_timeout=self._connection_params.sse_read_timeout, ) elif isinstance(self._connection_params, StreamableHTTPConnectionParams): + kwargs = { + 'url': self._connection_params.url, + 'headers': merged_headers, + 'timeout': timedelta(seconds=self._connection_params.timeout), + 'sse_read_timeout': timedelta( + seconds=self._connection_params.sse_read_timeout + ), + 'terminate_on_close': self._connection_params.terminate_on_close, + } if self._connection_params.httpx_client: - client = streamablehttp_client( - url=self._connection_params.url, - headers=merged_headers, - timeout=timedelta(seconds=self._connection_params.timeout), - sse_read_timeout=timedelta( - seconds=self._connection_params.sse_read_timeout - ), - terminate_on_close=self._connection_params.terminate_on_close, - httpx_client_factory=self._connection_params.httpx_client, - ) - else: - client = streamablehttp_client( - url=self._connection_params.url, - headers=merged_headers, - timeout=timedelta(seconds=self._connection_params.timeout), - sse_read_timeout=timedelta( - seconds=self._connection_params.sse_read_timeout - ), - terminate_on_close=self._connection_params.terminate_on_close, - ) + kwargs['httpx_client_factory'] = self._connection_params.httpx_client + client = streamablehttp_client(**kwargs) else: raise ValueError( diff --git a/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py b/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py index fb17db78c6..ed384ffa13 100644 --- a/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py +++ b/tests/unittests/tools/mcp_tool/test_mcp_session_manager.py @@ -147,7 +147,7 @@ def test_init_with_streamable_http_params(self): assert manager._connection_params.httpx_client is None def test_init_with_streamable_http_params_with_httpx_client(self): - """Test initialization with StreamableHTTPConnectionParams.""" + """Test initialization with StreamableHTTPConnectionParams with httpx client.""" client = httpx.AsyncClient() http_params = StreamableHTTPConnectionParams( url="https://example.com/mcp", timeout=15.0, httpx_client=client