Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,8 @@ program

// Wire HTTP transport through MCPServer.handleMessage() — single source of
// truth for JSON-RPC validation, notification handling, and request routing.
httpTrans.onMessage(async (msg: Record<string, unknown>, signal?: AbortSignal) =>
server.handleMessage(msg, signal),
httpTrans.onMessage(async (msg: Record<string, unknown>, signal?: AbortSignal, context?: import('./transports').TransportMessageContext) =>
server.handleMessage(msg, signal, context),
);
server.wireRateLimiterCleanup(httpTrans);
httpTrans.start();
Expand Down
229 changes: 225 additions & 4 deletions src/mcp-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ import {
MissingSecretError,
getSecretStore,
} from './core/secrets';
import { currentRequestContext } from './observability/request-id';
import type { TransportMessageContext } from './transports';

/** Recording tools excluded from session recording to prevent infinite loops */
const SKIP_RECORDING_TOOLS = new Set([
Expand Down Expand Up @@ -288,6 +290,33 @@ export class MCPServer {
*/
private rateLimiterSweepTimer: NodeJS.Timeout | null = null;

/**
* Pending server→client request resolvers, keyed by JSON-RPC id string
* (#960). Populated by `requestFromClient`; drained by the response fork
* at the top of `handleMessage` and by `rejectAllPendingS2cRequests` on
* connection close.
*/
private pendingClientRequests: Map<
string,
{
resolve: (value: unknown) => void;
reject: (err: Error) => void;
timer: ReturnType<typeof setTimeout>;
signal?: AbortSignal;
signalListener?: () => void;
mcpSessionId?: string;
}
> = new Map();
/** Monotonic counter — prefixed `oc-s2c-` cannot collide with client ids. */
private nextS2cRequestId = 1;
/**
* Capabilities the client declared in `initialize` (#960). Downstream
* features (roots #880, sampling #876, elicitation #877) check this cache
* before issuing a server→client request and fall back when absent.
*/
private clientCapabilities: { roots?: object; sampling?: object; elicitation?: object } = {};
private clientCapabilitiesBySession: Map<string, { roots?: object; sampling?: object; elicitation?: object }> = new Map();

constructor(sessionManager?: SessionManager, options: MCPServerOptions = {}) {
this.sessionManager = sessionManager || getSessionManager();
this.options = options;
Expand Down Expand Up @@ -434,6 +463,131 @@ export class MCPServer {
this.sendResponse(notification as unknown as MCPResponse);
}

/**
* Server→client request/response primitive (#960).
*
* Allocates a unique request id (prefixed `oc-s2c-` so it cannot collide
* with client-allocated ids), serializes the request via the existing
* `sendResponse` path, and registers a one-shot resolver. The transport
* delivers the JSON-RPC envelope; the response is matched back to the
* resolver in `handleMessage` (response fork) before the regular
* client-request validation runs.
*
* Failure modes (rejection):
* - `timeoutMs` elapses → `Error('s2c_timeout:<method>')`.
* - `signal` fires → `Error('s2c_aborted')`.
* - Connection closes (`close()` / transport teardown) →
* `Error('s2c_aborted:connection_closed')`.
* - Client returns a JSON-RPC `error` → `Error(error.message)`.
*
* Concurrent in-flight requests are independent — no head-of-line blocking.
*/
protected async requestFromClient<T = unknown>(
method: string,
params?: Record<string, unknown>,
options?: { timeoutMs?: number; signal?: AbortSignal },
): Promise<T> {
const id = `oc-s2c-${this.nextS2cRequestId++}`;
const timeoutMs = options?.timeoutMs ?? 30_000;
const mcpSessionId = currentRequestContext()?.mcpSessionId;
return new Promise<T>((resolve, reject) => {
const cleanup = (): void => {
const entry = this.pendingClientRequests.get(id);
if (!entry) return;
if (entry.timer) clearTimeout(entry.timer);
if (entry.signal && entry.signalListener) {
entry.signal.removeEventListener('abort', entry.signalListener);
}
this.pendingClientRequests.delete(id);
};

const timer = setTimeout(() => {
cleanup();
reject(new Error(`s2c_timeout:${method}`));
}, timeoutMs);

let signalListener: (() => void) | undefined;
if (options?.signal) {
if (options.signal.aborted) {
clearTimeout(timer);
reject(new Error('s2c_aborted'));
return;
}
signalListener = (): void => {
cleanup();
reject(new Error('s2c_aborted'));
};
options.signal.addEventListener('abort', signalListener, { once: true });
}

this.pendingClientRequests.set(id, {
resolve: (value: unknown) => {
cleanup();
resolve(value as T);
},
reject: (err: Error) => {
cleanup();
reject(err);
},
timer,
signal: options?.signal,
signalListener,
mcpSessionId,
});

const request = {
jsonrpc: '2.0' as const,
id,
method,
...(params ? { params } : {}),
};
try {
const transport = this.transport;
if (mcpSessionId && transport && typeof transport.sendToSession === 'function') {
const sent = transport.sendToSession(mcpSessionId, request as unknown as MCPResponse);
if (!sent) {
cleanup();
Comment on lines +546 to +549
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve HTTP session routing in dual-transport mode

When requestFromClient runs during an HTTP request in --transport both, this.transport is still the stdio transport (see src/index.ts where server.start() is called before wiring HTTP). Because stdio has no sendToSession, this branch falls back to sendResponse, which writes the server→client RPC onto stdio instead of the HTTP session’s SSE stream. That makes S2C methods (roots/sampling/elicitation) time out for HTTP clients in dual mode even though the request context has a valid mcpSessionId.

Useful? React with 👍 / 👎.

reject(new Error('s2c_aborted:connection_closed'));
return;
}
} else {
this.sendResponse(request as unknown as MCPResponse);
}
} catch (sendErr) {
cleanup();
reject(sendErr instanceof Error ? sendErr : new Error(String(sendErr)));
}
});
}

/**
* Reject every pending server→client request. Called when the transport
* tears down (HTTP DELETE / stdio EOF) so callers don't hang forever.
*/
private rejectAllPendingS2cRequests(reason: string): void {
for (const [id, entry] of this.pendingClientRequests) {
// Use entry.reject which also clears the timer / signal listener.
try {
entry.reject(new Error(reason));
} catch {
// best-effort
}
this.pendingClientRequests.delete(id);
}
}

private rejectPendingS2cRequestsForSession(sessionId: string, reason: string): void {
for (const [id, entry] of this.pendingClientRequests) {
if (entry.mcpSessionId !== sessionId) continue;
try {
entry.reject(new Error(reason));
} catch {
// best-effort
}
this.pendingClientRequests.delete(id);
}
}

/**
* Build a coalescing progress-reporter for a single tools/call invocation.
*
Expand Down Expand Up @@ -549,6 +703,8 @@ export class MCPServer {
// someone else's binding. sessionTenants is instead reclaimed by
// (a) the MCP `sessions/delete` handler and (b) the periodic
// `sweepSessionTenants()` tick scheduled in start().
this.rejectPendingS2cRequestsForSession(sessionId, 's2c_aborted:connection_closed');
this.clientCapabilitiesBySession.delete(sessionId);
},
);
}
Expand Down Expand Up @@ -586,13 +742,48 @@ export class MCPServer {
async handleMessage(
parsed: Record<string, unknown>,
signal?: AbortSignal,
transportContext?: TransportMessageContext,
): Promise<MCPResponse | null> {
// Record activity — every inbound MCP request flows through this method
// (stdio and HTTP transports both route here; see start()). By wiring at
// the single dispatch point we guarantee acceptance criterion 8 (issue
// #649) without having to touch every registerTool() call site.
getIdleState().notifyActive();

// #960 — Server→client response fork. If this message has an id and
// (result | error) but no method, it is a response to a request the server
// sent earlier via `requestFromClient`. Route it to the pending resolver
// and short-circuit BEFORE the regular client-request validation (which
// would otherwise reject the response as "missing method field").
if (
typeof parsed === 'object' &&
parsed !== null &&
parsed.jsonrpc === '2.0' &&
parsed.id !== undefined &&
parsed.id !== null &&
typeof parsed.method !== 'string' &&
('result' in parsed || 'error' in parsed)
) {
const idKey = String(parsed.id);
const entry = this.pendingClientRequests.get(idKey);
if (entry) {
if (entry.mcpSessionId && transportContext?.mcpSessionId !== entry.mcpSessionId) {
console.error(`[MCPServer] dropping client response for id=${idKey} from non-owner session`);
return null;
}
if ('error' in parsed) {
const err = parsed.error as { message?: string; code?: number };
entry.reject(new Error(err?.message ?? `s2c_error_code:${err?.code ?? 'unknown'}`));
} else {
entry.resolve(parsed.result);
}
} else {
// Stray response (e.g. client echoed a stale id) — log and drop.
console.error(`[MCPServer] dropping stray client response for id=${idKey}`);
}
return null;
}

// Validate JSON-RPC 2.0 envelope
if (
typeof parsed !== 'object' ||
Expand Down Expand Up @@ -715,8 +906,8 @@ export class MCPServer {
}

// Wire the transport message handler to MCPServer protocol logic
this.transport.onMessage(async (parsed: Record<string, unknown>, signal?: AbortSignal) =>
this.handleMessage(parsed, signal),
this.transport.onMessage(async (parsed: Record<string, unknown>, signal?: AbortSignal, context?: TransportMessageContext) =>
this.handleMessage(parsed, signal, context),
);

this.transport.start();
Expand Down Expand Up @@ -815,6 +1006,24 @@ export class MCPServer {
* Handle initialize request
*/
private async handleInitialize(params?: Record<string, unknown>): Promise<MCPResult> {
// #960 — capture client capabilities for downstream consumers (roots,
// sampling, elicitation). Downstream issues check
// `this.clientCapabilities.<feature>` before issuing the server→client
// request and fall back to a heuristic / error path when absent.
const caps = params?.capabilities as { roots?: object; sampling?: object; elicitation?: object } | undefined;
if (caps && typeof caps === 'object') {
const captured = {
...(caps.roots !== undefined ? { roots: caps.roots } : {}),
...(caps.sampling !== undefined ? { sampling: caps.sampling } : {}),
...(caps.elicitation !== undefined ? { elicitation: caps.elicitation } : {}),
};
this.clientCapabilities = captured;
const mcpSessionId = currentRequestContext()?.mcpSessionId;
if (mcpSessionId) {
this.clientCapabilitiesBySession.set(mcpSessionId, captured);
}
}

// Detect client identity for progressive disclosure decisions
const clientInfo = params?.clientInfo as { name?: string; version?: string } | undefined;
const rawName = clientInfo?.name ?? '';
Expand Down Expand Up @@ -2095,6 +2304,11 @@ export class MCPServer {
}

private async _stopInternal(): Promise<void> {
// #960 — reject every in-flight server→client request before the
// transport tears down so callers don't hang forever on Promises that
// can never resolve.
this.rejectAllPendingS2cRequests('s2c_aborted:connection_closed');
Comment on lines +2307 to +2310
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject pending S2C requests on HTTP session delete

This cleanup only runs during full server shutdown, so in HTTP mode a client DELETE /mcp (which just removes that session/SSE connection in src/transports/http.ts around handleDelete) does not reject any in-flight requestFromClient promise for that session. In that disconnect path, callers wait until timeoutMs instead of getting the documented immediate s2c_aborted:connection_closed, which can stall tool flows for up to 30s per request.

Useful? React with 👍 / 👎.


// Stop dashboard
if (this.dashboard) {
this.dashboard.stop();
Expand Down Expand Up @@ -2124,13 +2338,20 @@ export class MCPServer {
// Base 5s for session/CDP cleanup + 6s per Chrome instance (5s kill + 1s buffer)
const timeoutMs = Math.max(5000, 5000 + poolInstanceCount * 6000);

let cleanupTimeout: ReturnType<typeof setTimeout> | null = null;
await Promise.race([
this.cleanup(),
new Promise<void>((resolve) => setTimeout(() => {
new Promise<void>((resolve) => {
cleanupTimeout = setTimeout(() => {
console.error(`[MCPServer] Cleanup timed out after ${timeoutMs / 1000}s, forcing exit`);
resolve();
}, timeoutMs)),
}, timeoutMs);
cleanupTimeout.unref?.();
}),
]);
if (cleanupTimeout) {
clearTimeout(cleanupTimeout);
}
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/observability/request-id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ export interface RequestContext {
tenantId?: string;
/** keyId (hashed) when per-tenant auth identifies a specific API key. */
keyId?: string;
/** HTTP MCP session id when the transport provides one. */
mcpSessionId?: string;
}

const requestStore = new AsyncLocalStorage<RequestContext>();
Expand Down
26 changes: 21 additions & 5 deletions src/transports/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
DEFAULT_HTTP_JSON_RPC_BATCH_MAX_SIZE,
} from '../config/defaults';
import { ClientDisconnectError } from '../errors/abort';
import { MCPTransport } from './index';
import { MCPTransport, TransportMessageContext } from './index';
import { renderPrometheusMetrics, type PrometheusMetric } from './prometheus';
import { getDashboardState } from '../desktop/dashboard-state';
import type { SessionManager } from '../session-manager';
Expand Down Expand Up @@ -185,7 +185,7 @@ export interface HTTPTransportOptions {
export class HTTPTransport implements MCPTransport {
private server: http.Server | null = null;
private messageHandler:
| ((msg: Record<string, unknown>, signal?: AbortSignal) => Promise<MCPResponse | null>)
| ((msg: Record<string, unknown>, signal?: AbortSignal, context?: TransportMessageContext) => Promise<MCPResponse | null>)
| null = null;
private port: number;
private host: string;
Expand Down Expand Up @@ -338,7 +338,7 @@ export class HTTPTransport implements MCPTransport {
}

onMessage(
handler: (msg: Record<string, unknown>, signal?: AbortSignal) => Promise<MCPResponse | null>,
handler: (msg: Record<string, unknown>, signal?: AbortSignal, context?: TransportMessageContext) => Promise<MCPResponse | null>,
): void {
this.messageHandler = handler;
}
Expand All @@ -358,6 +358,20 @@ export class HTTPTransport implements MCPTransport {
}
}

sendToSession(sessionId: string, response: MCPResponse): boolean {
let sent = false;
for (const conn of this.sseConnections) {
if (conn.sessionId !== sessionId) continue;
try {
conn.res.write(`data: ${JSON.stringify(response)}\n\n`);
sent = true;
} catch {
// Connection may have been closed
}
}
return sent;
}

start(): void {
this.server = http.createServer((req, res) => {
this.handleHTTPRequest(req, res);
Expand Down Expand Up @@ -1170,6 +1184,7 @@ export class HTTPTransport implements MCPTransport {
? principal.tenantId
: tenantId,
keyId: principal?.mode === 'api-key' ? principal.keyId : undefined,
mcpSessionId: sessionId,
},
() => this.processBatch(parsed, sessionId, tenantId, signal, principal),
Comment on lines +1187 to 1189
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Propagate assigned batch session id into request context

When a batch arrives without Mcp-Session-Id, processBatch() may assign a new session id (for initialize), but the async request context is created earlier with mcpSessionId: sessionId still undefined. Any downstream requestFromClient() call in that same batch then misses session affinity and falls back to sendResponse broadcast routing, allowing cross-session delivery/response matching instead of targeting the new session. This is observable in HTTP batch flows that combine initialize with later requests that trigger server→client RPC.

Useful? React with 👍 / 👎.

);
Expand Down Expand Up @@ -1223,8 +1238,9 @@ export class HTTPTransport implements MCPTransport {
? principal.tenantId
: tenantId,
keyId: principal?.mode === 'api-key' ? principal.keyId : undefined,
mcpSessionId: sessionId,
},
() => this.messageHandler!(msg, signal),
() => this.messageHandler!(msg, signal, { mcpSessionId: sessionId, tenantId }),
);

if (sessionId) {
Expand Down Expand Up @@ -1408,7 +1424,7 @@ export class HTTPTransport implements MCPTransport {
(record as Record<PropertyKey, unknown>)[PRINCIPAL_SYM] = principal;
}

return await handler(record, signal);
return await handler(record, signal, { mcpSessionId: sessionId, tenantId });
} catch (error) {
const id = record !== null
? ((record.id as string | number | undefined) ?? 0)
Expand Down
Loading
Loading