Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/dedalus_mcp/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from .connection import open_connection
from .core import ClientCapabilitiesConfig, MCPClient
from .diagnostics import ClientRequestHistory, ClientRequestRecord
from .errors import (
AuthRequiredError,
BadRequestError,
Expand All @@ -30,23 +31,22 @@
)
from .transports import lambda_http_client


__all__ = [
# Core
"MCPClient",
"ClientCapabilitiesConfig",
"open_connection",
# Auth
"BearerAuth",
"DPoPAuth",
"generate_dpop_proof",
# Transports
"lambda_http_client",
# Errors
"MCPConnectionError",
"AuthRequiredError",
"BadRequestError",
"BearerAuth",
"ClientCapabilitiesConfig",
"ClientRequestHistory",
"ClientRequestRecord",
"DPoPAuth",
"ForbiddenError",
"MCPClient",
"MCPConnectionError",
"ServerError",
"SessionExpiredError",
"TransportError",
"generate_dpop_proof",
"lambda_http_client",
"open_connection",
]
86 changes: 78 additions & 8 deletions src/dedalus_mcp/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from collections.abc import Awaitable, Callable, Iterable
from contextlib import AsyncExitStack
from dataclasses import dataclass
import time
from typing import Any, TypeVar
import warnings
import weakref
Expand All @@ -31,6 +32,7 @@
import httpx
from mcp.client.session import ClientSession

from .diagnostics import ClientRequestHistory, ClientRequestRecord, elapsed_ms_since
from .error_handling import (
extract_http_error,
extract_network_error,
Expand Down Expand Up @@ -135,6 +137,7 @@ def __init__(
self._root_lock = Lock()
self._roots_version = 0
self._roots: list[Root] = [self._normalize_root(root) for root in initial_roots]
self._request_history = ClientRequestHistory()

self._session: ClientSession | None = None
self.initialize_result: InitializeResult | None = None
Expand Down Expand Up @@ -210,6 +213,7 @@ async def connect(
client._root_lock = Lock()
client._roots_version = 0
client._roots = []
client._request_history = ClientRequestHistory()
client._session = _transport_override
client._closed = False
client._finalizer = weakref.finalize(client, cls._warn_unclosed, id(client))
Expand Down Expand Up @@ -423,7 +427,67 @@ async def send_request(
progress_callback: Callable[[float, float | None, str | None], Awaitable[None] | None] | None = None,
) -> T_RequestResult:
"""Forward a request to the server and await the result."""
return await self.session.send_request(request, result_type, progress_callback=progress_callback)
started_at_ns = time.perf_counter_ns()
try:
result = await self.session.send_request(request, result_type, progress_callback=progress_callback)
except Exception as exc:
self._request_history.record_error(
request=request,
result_type=result_type,
session_id=self.session_id,
started_at_ns=started_at_ns,
duration_ms=elapsed_ms_since(started_at_ns),
error=exc,
)
raise

self._request_history.record_success(
request=request,
result_type=result_type,
session_id=self.session_id,
started_at_ns=started_at_ns,
duration_ms=elapsed_ms_since(started_at_ns),
)
return result

def request_history(self) -> tuple[ClientRequestRecord, ...]:
"""Return privacy-safe metadata for recent client requests.

The history intentionally excludes request parameters and results. It is
meant for debugging slow, failed, or unexpectedly repeated MCP calls.
"""
return self._request_history.snapshot()

def clear_request_history(self) -> None:
"""Clear retained request diagnostics."""
self._request_history.clear()

async def _record_session_operation(
self, method: str, result_type: type[T_RequestResult], operation: Callable[[], Awaitable[T_RequestResult]]
) -> T_RequestResult:
"""Record metadata around a convenience ClientSession call."""
started_at_ns = time.perf_counter_ns()
try:
result = await operation()
except Exception as exc:
self._request_history.record_method_error(
method=method,
result_type=result_type,
session_id=self.session_id,
started_at_ns=started_at_ns,
duration_ms=elapsed_ms_since(started_at_ns),
error=exc,
)
raise

self._request_history.record_method_success(
method=method,
result_type=result_type,
session_id=self.session_id,
started_at_ns=started_at_ns,
duration_ms=elapsed_ms_since(started_at_ns),
)
return result

async def cancel_request(self, request_id: RequestId, *, reason: str | None = None) -> None:
"""Emit notifications/cancelled.
Expand Down Expand Up @@ -468,42 +532,48 @@ async def list_tools(self) -> ListToolsResult:

See: https://modelcontextprotocol.io/specification/2024-11-05/server/tools
"""
return await self.session.list_tools()
return await self._record_session_operation("tools/list", ListToolsResult, self.session.list_tools)

async def call_tool(self, name: str, arguments: dict[str, Any] | None = None) -> CallToolResult:
"""Call a tool on the server.

See: https://modelcontextprotocol.io/specification/2024-11-05/server/tools
"""
return await self.session.call_tool(name, arguments)
return await self._record_session_operation(
"tools/call", CallToolResult, lambda: self.session.call_tool(name, arguments)
)

async def list_resources(self) -> ListResourcesResult:
"""List all resources available on the server.

See: https://modelcontextprotocol.io/specification/2024-11-05/server/resources
"""
return await self.session.list_resources()
return await self._record_session_operation("resources/list", ListResourcesResult, self.session.list_resources)

async def read_resource(self, uri: str) -> ReadResourceResult:
"""Read a resource from the server.

See: https://modelcontextprotocol.io/specification/2024-11-05/server/resources
"""
return await self.session.read_resource(uri)
return await self._record_session_operation(
"resources/read", ReadResourceResult, lambda: self.session.read_resource(uri)
)

async def list_prompts(self) -> ListPromptsResult:
"""List all prompts available on the server.

See: https://modelcontextprotocol.io/specification/2024-11-05/server/prompts
"""
return await self.session.list_prompts()
return await self._record_session_operation("prompts/list", ListPromptsResult, self.session.list_prompts)

async def get_prompt(self, name: str, arguments: dict[str, Any] | None = None) -> GetPromptResult:
"""Get a prompt from the server.

See: https://modelcontextprotocol.io/specification/2024-11-05/server/prompts
"""
return await self.session.get_prompt(name, arguments)
return await self._record_session_operation(
"prompts/get", GetPromptResult, lambda: self.session.get_prompt(name, arguments)
)

# ------------------------------------------------------------------
# Internal helpers
Expand Down Expand Up @@ -557,4 +627,4 @@ def _normalize_root(value: Root | dict[str, Any]) -> Root:
return Root.model_validate(value)


__all__ = ["MCPClient", "ClientCapabilitiesConfig"]
__all__ = ["ClientCapabilitiesConfig", "MCPClient"]
168 changes: 168 additions & 0 deletions src/dedalus_mcp/client/diagnostics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Copyright (c) 2026 Dedalus Labs, Inc. and its contributors
# SPDX-License-Identifier: MIT

"""Lightweight client-side diagnostics for MCP request lifecycles."""

from __future__ import annotations

from collections import deque
from dataclasses import dataclass
import time
from typing import Any

from ..types.messages import ClientRequest


@dataclass(frozen=True, slots=True)
class ClientRequestRecord:
"""Privacy-safe metadata for a completed client request."""

sequence: int
method: str
result_type: str
session_id: str | None
started_at_ns: int
duration_ms: float
ok: bool
error_type: str | None = None
error_message: str | None = None


class ClientRequestHistory:
"""Bounded request history for debugging MCP client sessions.

The history intentionally stores only operational metadata. It does not
retain request parameters, tool arguments, resource URIs, or result payloads.
"""

def __init__(self, *, max_records: int = 128) -> None:
if max_records <= 0:
msg = "max_records must be greater than 0"
raise ValueError(msg)

self._records: deque[ClientRequestRecord] = deque(maxlen=max_records)
self._next_sequence = 1

def snapshot(self) -> tuple[ClientRequestRecord, ...]:
"""Return records in oldest-to-newest order."""
return tuple(self._records)

def clear(self) -> None:
"""Drop all retained request records."""
self._records.clear()

def record_success(
self,
*,
request: ClientRequest,
result_type: type[Any],
session_id: str | None,
started_at_ns: int,
duration_ms: float,
) -> None:
self._append(
method=request_method(request),
result_type=result_type,
session_id=session_id,
started_at_ns=started_at_ns,
duration_ms=duration_ms,
ok=True,
)

def record_method_success(
self, *, method: str, result_type: type[Any], session_id: str | None, started_at_ns: int, duration_ms: float
) -> None:
self._append(
method=method,
result_type=result_type,
session_id=session_id,
started_at_ns=started_at_ns,
duration_ms=duration_ms,
ok=True,
)

def record_error(
self,
*,
request: ClientRequest,
result_type: type[Any],
session_id: str | None,
started_at_ns: int,
duration_ms: float,
error: BaseException,
) -> None:
self._append(
method=request_method(request),
result_type=result_type,
session_id=session_id,
started_at_ns=started_at_ns,
duration_ms=duration_ms,
ok=False,
error_type=type(error).__name__,
error_message=str(error)[:500],
)

def record_method_error(
self,
*,
method: str,
result_type: type[Any],
session_id: str | None,
started_at_ns: int,
duration_ms: float,
error: BaseException,
) -> None:
self._append(
method=method,
result_type=result_type,
session_id=session_id,
started_at_ns=started_at_ns,
duration_ms=duration_ms,
ok=False,
error_type=type(error).__name__,
error_message=str(error)[:500],
)

def _append(
self,
*,
method: str,
result_type: type[Any],
session_id: str | None,
started_at_ns: int,
duration_ms: float,
ok: bool,
error_type: str | None = None,
error_message: str | None = None,
) -> None:
self._records.append(
ClientRequestRecord(
sequence=self._next_sequence,
method=method,
result_type=result_type.__name__,
session_id=session_id,
started_at_ns=started_at_ns,
duration_ms=round(duration_ms, 3),
ok=ok,
error_type=error_type,
error_message=error_message,
)
)
self._next_sequence += 1


def request_method(request: ClientRequest) -> str:
"""Return the JSON-RPC method name for a typed client request."""
root = getattr(request, "root", None)
method = getattr(root, "method", None)
if isinstance(method, str) and method:
return method
return type(root).__name__ if root is not None else type(request).__name__


def elapsed_ms_since(started_at_ns: int) -> float:
"""Return elapsed monotonic time in milliseconds."""
return (time.perf_counter_ns() - started_at_ns) / 1_000_000


__all__ = ["ClientRequestHistory", "ClientRequestRecord", "elapsed_ms_since", "request_method"]
Loading