From 0966cf928abc99ec6944344c1c85f10761a7106c Mon Sep 17 00:00:00 2001 From: Ritwij Aryan Parmar Date: Wed, 10 Jun 2026 13:28:19 -0400 Subject: [PATCH] feat(client): add privacy-safe request history Signed-off-by: Ritwij Aryan Parmar --- src/dedalus_mcp/client/__init__.py | 24 ++-- src/dedalus_mcp/client/core.py | 86 +++++++++++-- src/dedalus_mcp/client/diagnostics.py | 168 ++++++++++++++++++++++++++ tests/test_client.py | 74 ++++++++++++ 4 files changed, 332 insertions(+), 20 deletions(-) create mode 100644 src/dedalus_mcp/client/diagnostics.py diff --git a/src/dedalus_mcp/client/__init__.py b/src/dedalus_mcp/client/__init__.py index 15952e4..40976a9 100644 --- a/src/dedalus_mcp/client/__init__.py +++ b/src/dedalus_mcp/client/__init__.py @@ -19,6 +19,7 @@ from .connection import open_connection from .core import ClientCapabilitiesConfig, MCPClient +from .diagnostics import ClientRequestHistory, ClientRequestRecord from .errors import ( AuthRequiredError, BadRequestError, @@ -30,23 +31,22 @@ ) from .transports import lambda_http_client + __all__ = [ - # Core - "MCPClient", - "ClientCapabilitiesConfig", - "open_connection", - # Auth - "BearerAuth", - "DPoPAuth", - "generate_dpop_proof", - # Transports - "lambda_http_client", - # Errors - "MCPConnectionError", "AuthRequiredError", "BadRequestError", + "BearerAuth", + "ClientCapabilitiesConfig", + "ClientRequestHistory", + "ClientRequestRecord", + "DPoPAuth", "ForbiddenError", + "MCPClient", + "MCPConnectionError", "ServerError", "SessionExpiredError", "TransportError", + "generate_dpop_proof", + "lambda_http_client", + "open_connection", ] diff --git a/src/dedalus_mcp/client/core.py b/src/dedalus_mcp/client/core.py index a0491c8..af2c893 100644 --- a/src/dedalus_mcp/client/core.py +++ b/src/dedalus_mcp/client/core.py @@ -22,6 +22,7 @@ from collections.abc import Awaitable, Callable, Iterable from contextlib import AsyncExitStack from dataclasses import dataclass +import time from typing import Any, TypeVar import warnings import weakref @@ -31,6 +32,7 @@ import httpx from mcp.client.session import ClientSession +from .diagnostics import ClientRequestHistory, ClientRequestRecord, elapsed_ms_since from .error_handling import ( extract_http_error, extract_network_error, @@ -135,6 +137,7 @@ def __init__( self._root_lock = Lock() self._roots_version = 0 self._roots: list[Root] = [self._normalize_root(root) for root in initial_roots] + self._request_history = ClientRequestHistory() self._session: ClientSession | None = None self.initialize_result: InitializeResult | None = None @@ -210,6 +213,7 @@ async def connect( client._root_lock = Lock() client._roots_version = 0 client._roots = [] + client._request_history = ClientRequestHistory() client._session = _transport_override client._closed = False client._finalizer = weakref.finalize(client, cls._warn_unclosed, id(client)) @@ -423,7 +427,67 @@ async def send_request( progress_callback: Callable[[float, float | None, str | None], Awaitable[None] | None] | None = None, ) -> T_RequestResult: """Forward a request to the server and await the result.""" - return await self.session.send_request(request, result_type, progress_callback=progress_callback) + started_at_ns = time.perf_counter_ns() + try: + result = await self.session.send_request(request, result_type, progress_callback=progress_callback) + except Exception as exc: + self._request_history.record_error( + request=request, + result_type=result_type, + session_id=self.session_id, + started_at_ns=started_at_ns, + duration_ms=elapsed_ms_since(started_at_ns), + error=exc, + ) + raise + + self._request_history.record_success( + request=request, + result_type=result_type, + session_id=self.session_id, + started_at_ns=started_at_ns, + duration_ms=elapsed_ms_since(started_at_ns), + ) + return result + + def request_history(self) -> tuple[ClientRequestRecord, ...]: + """Return privacy-safe metadata for recent client requests. + + The history intentionally excludes request parameters and results. It is + meant for debugging slow, failed, or unexpectedly repeated MCP calls. + """ + return self._request_history.snapshot() + + def clear_request_history(self) -> None: + """Clear retained request diagnostics.""" + self._request_history.clear() + + async def _record_session_operation( + self, method: str, result_type: type[T_RequestResult], operation: Callable[[], Awaitable[T_RequestResult]] + ) -> T_RequestResult: + """Record metadata around a convenience ClientSession call.""" + started_at_ns = time.perf_counter_ns() + try: + result = await operation() + except Exception as exc: + self._request_history.record_method_error( + method=method, + result_type=result_type, + session_id=self.session_id, + started_at_ns=started_at_ns, + duration_ms=elapsed_ms_since(started_at_ns), + error=exc, + ) + raise + + self._request_history.record_method_success( + method=method, + result_type=result_type, + session_id=self.session_id, + started_at_ns=started_at_ns, + duration_ms=elapsed_ms_since(started_at_ns), + ) + return result async def cancel_request(self, request_id: RequestId, *, reason: str | None = None) -> None: """Emit notifications/cancelled. @@ -468,42 +532,48 @@ async def list_tools(self) -> ListToolsResult: See: https://modelcontextprotocol.io/specification/2024-11-05/server/tools """ - return await self.session.list_tools() + return await self._record_session_operation("tools/list", ListToolsResult, self.session.list_tools) async def call_tool(self, name: str, arguments: dict[str, Any] | None = None) -> CallToolResult: """Call a tool on the server. See: https://modelcontextprotocol.io/specification/2024-11-05/server/tools """ - return await self.session.call_tool(name, arguments) + return await self._record_session_operation( + "tools/call", CallToolResult, lambda: self.session.call_tool(name, arguments) + ) async def list_resources(self) -> ListResourcesResult: """List all resources available on the server. See: https://modelcontextprotocol.io/specification/2024-11-05/server/resources """ - return await self.session.list_resources() + return await self._record_session_operation("resources/list", ListResourcesResult, self.session.list_resources) async def read_resource(self, uri: str) -> ReadResourceResult: """Read a resource from the server. See: https://modelcontextprotocol.io/specification/2024-11-05/server/resources """ - return await self.session.read_resource(uri) + return await self._record_session_operation( + "resources/read", ReadResourceResult, lambda: self.session.read_resource(uri) + ) async def list_prompts(self) -> ListPromptsResult: """List all prompts available on the server. See: https://modelcontextprotocol.io/specification/2024-11-05/server/prompts """ - return await self.session.list_prompts() + return await self._record_session_operation("prompts/list", ListPromptsResult, self.session.list_prompts) async def get_prompt(self, name: str, arguments: dict[str, Any] | None = None) -> GetPromptResult: """Get a prompt from the server. See: https://modelcontextprotocol.io/specification/2024-11-05/server/prompts """ - return await self.session.get_prompt(name, arguments) + return await self._record_session_operation( + "prompts/get", GetPromptResult, lambda: self.session.get_prompt(name, arguments) + ) # ------------------------------------------------------------------ # Internal helpers @@ -557,4 +627,4 @@ def _normalize_root(value: Root | dict[str, Any]) -> Root: return Root.model_validate(value) -__all__ = ["MCPClient", "ClientCapabilitiesConfig"] +__all__ = ["ClientCapabilitiesConfig", "MCPClient"] diff --git a/src/dedalus_mcp/client/diagnostics.py b/src/dedalus_mcp/client/diagnostics.py new file mode 100644 index 0000000..af3e5e4 --- /dev/null +++ b/src/dedalus_mcp/client/diagnostics.py @@ -0,0 +1,168 @@ +# Copyright (c) 2026 Dedalus Labs, Inc. and its contributors +# SPDX-License-Identifier: MIT + +"""Lightweight client-side diagnostics for MCP request lifecycles.""" + +from __future__ import annotations + +from collections import deque +from dataclasses import dataclass +import time +from typing import Any + +from ..types.messages import ClientRequest + + +@dataclass(frozen=True, slots=True) +class ClientRequestRecord: + """Privacy-safe metadata for a completed client request.""" + + sequence: int + method: str + result_type: str + session_id: str | None + started_at_ns: int + duration_ms: float + ok: bool + error_type: str | None = None + error_message: str | None = None + + +class ClientRequestHistory: + """Bounded request history for debugging MCP client sessions. + + The history intentionally stores only operational metadata. It does not + retain request parameters, tool arguments, resource URIs, or result payloads. + """ + + def __init__(self, *, max_records: int = 128) -> None: + if max_records <= 0: + msg = "max_records must be greater than 0" + raise ValueError(msg) + + self._records: deque[ClientRequestRecord] = deque(maxlen=max_records) + self._next_sequence = 1 + + def snapshot(self) -> tuple[ClientRequestRecord, ...]: + """Return records in oldest-to-newest order.""" + return tuple(self._records) + + def clear(self) -> None: + """Drop all retained request records.""" + self._records.clear() + + def record_success( + self, + *, + request: ClientRequest, + result_type: type[Any], + session_id: str | None, + started_at_ns: int, + duration_ms: float, + ) -> None: + self._append( + method=request_method(request), + result_type=result_type, + session_id=session_id, + started_at_ns=started_at_ns, + duration_ms=duration_ms, + ok=True, + ) + + def record_method_success( + self, *, method: str, result_type: type[Any], session_id: str | None, started_at_ns: int, duration_ms: float + ) -> None: + self._append( + method=method, + result_type=result_type, + session_id=session_id, + started_at_ns=started_at_ns, + duration_ms=duration_ms, + ok=True, + ) + + def record_error( + self, + *, + request: ClientRequest, + result_type: type[Any], + session_id: str | None, + started_at_ns: int, + duration_ms: float, + error: BaseException, + ) -> None: + self._append( + method=request_method(request), + result_type=result_type, + session_id=session_id, + started_at_ns=started_at_ns, + duration_ms=duration_ms, + ok=False, + error_type=type(error).__name__, + error_message=str(error)[:500], + ) + + def record_method_error( + self, + *, + method: str, + result_type: type[Any], + session_id: str | None, + started_at_ns: int, + duration_ms: float, + error: BaseException, + ) -> None: + self._append( + method=method, + result_type=result_type, + session_id=session_id, + started_at_ns=started_at_ns, + duration_ms=duration_ms, + ok=False, + error_type=type(error).__name__, + error_message=str(error)[:500], + ) + + def _append( + self, + *, + method: str, + result_type: type[Any], + session_id: str | None, + started_at_ns: int, + duration_ms: float, + ok: bool, + error_type: str | None = None, + error_message: str | None = None, + ) -> None: + self._records.append( + ClientRequestRecord( + sequence=self._next_sequence, + method=method, + result_type=result_type.__name__, + session_id=session_id, + started_at_ns=started_at_ns, + duration_ms=round(duration_ms, 3), + ok=ok, + error_type=error_type, + error_message=error_message, + ) + ) + self._next_sequence += 1 + + +def request_method(request: ClientRequest) -> str: + """Return the JSON-RPC method name for a typed client request.""" + root = getattr(request, "root", None) + method = getattr(root, "method", None) + if isinstance(method, str) and method: + return method + return type(root).__name__ if root is not None else type(request).__name__ + + +def elapsed_ms_since(started_at_ns: int) -> float: + """Return elapsed monotonic time in milliseconds.""" + return (time.perf_counter_ns() - started_at_ns) / 1_000_000 + + +__all__ = ["ClientRequestHistory", "ClientRequestRecord", "elapsed_ms_since", "request_method"] diff --git a/tests/test_client.py b/tests/test_client.py index be9a11b..c816362 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -38,9 +38,11 @@ def __init__( self.logging_callback = logging_callback self.client_info = client_info self.send_request_calls: list[tuple[ClientRequest, type[Any], dict[str, Any]]] = [] + self.tool_calls: list[tuple[str, dict[str, Any] | None]] = [] self.notifications: list[ClientNotification] = [] self.roots_notifications = 0 self.next_result: Any = EmptyResult() + self.next_error: BaseException | None = None async def __aenter__(self) -> FakeClientSession: return self @@ -63,11 +65,19 @@ async def send_request( progress_callback: Callable[[float, float | None, str | None], Any] | None = None, ) -> Any: self.send_request_calls.append((request, result_type, {"progress_callback": progress_callback})) + if self.next_error is not None: + raise self.next_error return self.next_result async def send_notification(self, notification: ClientNotification) -> None: self.notifications.append(notification) + async def call_tool(self, name: str, arguments: dict[str, Any] | None = None) -> Any: + self.tool_calls.append((name, arguments)) + if self.next_error is not None: + raise self.next_error + return self.next_result + async def send_roots_list_changed(self) -> None: self.roots_notifications += 1 @@ -149,6 +159,70 @@ async def test_send_request_and_ping_delegate_to_session(monkeypatch: pytest.Mon assert extras["progress_callback"] is None +@pytest.mark.anyio +async def test_send_request_records_privacy_safe_history(monkeypatch: pytest.MonkeyPatch) -> None: + factory = SessionFactory() + monkeypatch.setattr("dedalus_mcp.client.core.ClientSession", factory) + + recv, send = anyio.create_memory_object_stream(0) + async with MCPClient(recv, send, get_session_id=lambda: "session-123") as client: + session = factory.instances[0] + session.next_result = EmptyResult() + await client.ping() + + history = client.request_history() + assert len(history) == 1 + record = history[0] + assert record.method == "ping" + assert record.result_type == "EmptyResult" + assert record.session_id == "session-123" + assert record.duration_ms >= 0 + assert record.ok is True + assert record.error_type is None + + +@pytest.mark.anyio +async def test_call_tool_history_does_not_store_arguments(monkeypatch: pytest.MonkeyPatch) -> None: + factory = SessionFactory() + monkeypatch.setattr("dedalus_mcp.client.core.ClientSession", factory) + + recv, send = anyio.create_memory_object_stream(0) + async with MCPClient(recv, send) as client: + await client.call_tool("fetch_customer", {"customer_id": "cust_123", "api_token": "secret-token"}) + + record = client.request_history()[-1] + assert record.method == "tools/call" + assert record.ok is True + history_text = repr(record) + assert "cust_123" not in history_text + assert "secret-token" not in history_text + assert "api_token" not in history_text + + +@pytest.mark.anyio +async def test_send_request_records_errors_and_can_clear_history(monkeypatch: pytest.MonkeyPatch) -> None: + factory = SessionFactory() + monkeypatch.setattr("dedalus_mcp.client.core.ClientSession", factory) + + recv, send = anyio.create_memory_object_stream(0) + async with MCPClient(recv, send) as client: + session = factory.instances[0] + session.next_error = RuntimeError("server closed stream after partial response") + + with pytest.raises(RuntimeError): + await client.ping() + + history = client.request_history() + assert len(history) == 1 + assert history[0].method == "ping" + assert history[0].ok is False + assert history[0].error_type == "RuntimeError" + assert "partial response" in (history[0].error_message or "") + + client.clear_request_history() + assert client.request_history() == () + + @pytest.mark.anyio async def test_sampling_and_elicitation_handlers_wrapped(monkeypatch: pytest.MonkeyPatch) -> None: factory = SessionFactory()