diff --git a/src/nimcp/mummy_transport.nim b/src/nimcp/mummy_transport.nim index 46d04d1..7d18982 100644 --- a/src/nimcp/mummy_transport.nim +++ b/src/nimcp/mummy_transport.nim @@ -72,7 +72,7 @@ type sessionId*: string isActive*: bool lastActivity*: times.Time - + MummyTransport* = ref object base*: HttpServerBase connections*: Table[string, StreamingConnection] # Active streaming connections @@ -91,15 +91,15 @@ proc httpNotificationWrapper(ctx: McpRequestContext, notificationType: string, d echo fmt"Data: {data}" echo fmt"Current request context exists: {currentHTTPRequest != nil}" echo fmt"Session ID: {ctx.sessionId}" - + let transport = cast[MummyTransport](ctx.transport.httpTransport) - + # Try to send to specific session first if we have one from context var targetSessionId = ctx.sessionId if targetSessionId.len == 0: targetSessionId = currentSessionId echo fmt"Using thread-local session ID: {targetSessionId}" - + if targetSessionId.len > 0 and targetSessionId in transport.connections: let connection = transport.connections[targetSessionId] if connection.isActive and connection.sseConnection.active: @@ -118,7 +118,7 @@ proc httpNotificationWrapper(ctx: McpRequestContext, notificationType: string, d return else: echo fmt"Connection for session {targetSessionId} is inactive or SSE closed" - + # For non-session requests, we can't send real-time notifications echo "WARNING: HTTP notification requested but no active SSE connection available" echo fmt"Available sessions: {transport.connections.len}" @@ -160,14 +160,14 @@ proc parseAcceptHeader(acceptHeader: string): seq[tuple[mediaType: string, quali ## Parse Accept header with quality values result = @[] let parts = acceptHeader.split(",") - + for part in parts: let trimmed = part.strip() let components = trimmed.split(";") if components.len > 0: let mediaType = components[0].strip() var quality = 1.0 - + # Parse quality value if present if components.len > 1: for component in components[1..^1]: @@ -177,24 +177,24 @@ proc parseAcceptHeader(acceptHeader: string): seq[tuple[mediaType: string, quali quality = parseFloat(qPair[1].strip()) except: quality = 1.0 - + result.add((mediaType, quality)) proc clientSupportsStreaming(request: Request): bool = ## Check if client supports SSE streaming (regardless of preference) if "Accept" notin request.headers: return false - + let acceptHeader = request.headers["Accept"] let acceptedTypes = parseAcceptHeader(acceptHeader) - + # Check if text/event-stream is accepted at all for (mediaType, quality) in acceptedTypes: if mediaType == "text/event-stream" and quality > 0.0: return true elif mediaType == "*/*" and quality > 0.0: return true - + return false proc supportsStreaming(request: Request): bool = @@ -205,21 +205,21 @@ proc supportsStreaming(request: Request): bool = if "Accept" notin request.headers: echo "No Accept header found - defaulting to JSON mode" return false - + let acceptHeader = request.headers["Accept"] echo fmt"Accept header: {acceptHeader}" - + # Parse the Accept header properly let acceptedTypes = parseAcceptHeader(acceptHeader) echo "Parsed Accept types:" for (mediaType, quality) in acceptedTypes: echo fmt" {mediaType} (q={quality})" - + # Find quality values for both JSON and SSE var jsonQuality = 0.0 var sseQuality = 0.0 var hasWildcard = false - + for (mediaType, quality) in acceptedTypes: case mediaType: of "application/json": @@ -229,15 +229,15 @@ proc supportsStreaming(request: Request): bool = of "*/*": hasWildcard = true # Wildcards support both, but we prefer JSON by default - + echo fmt"Quality scores: JSON={jsonQuality}, SSE={sseQuality}, Wildcard={hasWildcard}" - + # Decision logic: # 1. If SSE quality > JSON quality, use streaming - # 2. If only SSE is present (no JSON), use streaming + # 2. If only SSE is present (no JSON), use streaming # 3. If both are equal or JSON is higher, use JSON # 4. If wildcard only, use JSON (more compatible) - + if sseQuality > 0.0 and jsonQuality == 0.0: echo "Streaming mode: SSE requested, no JSON preference" return true @@ -267,33 +267,33 @@ proc handleJsonRequest(transport: MummyTransport, server: McpServer, request: Re echo "\n=== JSON REQUEST HANDLER ===" echo fmt"Request ID: {jsonRpcRequest.id}" echo fmt"Method: {jsonRpcRequest.`method`}" - + var headers = corsHeadersFor("POST, GET, OPTIONS") headers["Content-Type"] = "application/json" - + if sessionId != "": echo fmt"Using session ID: {sessionId}" headers["Mcp-Session-Id"] = sessionId else: echo "No session ID in JSON mode" - + # Set thread-local request context for notifications currentHTTPRequest = request echo "Set current HTTP request context for notifications" - + # Use the existing server's request handler with transport access echo "=== TRANSPORT CONFIGURATION ===" let capabilities = {tcEvents, tcUnicast} echo fmt"Transport capabilities: {capabilities}" - let mcpTransport = McpTransport(kind: tkHttp, capabilities: capabilities, + let mcpTransport = McpTransport(kind: tkHttp, capabilities: capabilities, httpTransport: cast[pointer](transport), httpSendNotification: httpNotificationWrapper) echo fmt"Created transport with kind: {mcpTransport.kind}" - + echo "=== REQUEST PROCESSING ===" echo "Calling server.handleRequest..." let response = server.handleRequest(mcpTransport, jsonRpcRequest) echo fmt"Response received - ID: {response.id}, Error present: {response.error.isSome}" - + # Only send a response if it's not empty (i.e., not a notification) if response.id.kind != jridString or response.id.str != "": echo fmt"Sending 200 response with body length: {($response).len}" @@ -309,14 +309,14 @@ proc handleStreamingRequest(transport: MummyTransport, server: McpServer, reques echo "\n=== STREAMING REQUEST HANDLER ===" echo fmt"Request ID: {jsonRpcRequest.id}" echo fmt"Method: {jsonRpcRequest.`method`}" - + # Create SSE connection for this POST request echo "Creating SSE connection using Mummy's respondSSE()" let sseConnection = request.respondSSE() - - # Generate session ID for tracking (optional for single-request streams) + + # Generate session ID for tracking (optional for single-request streams) let actualSessionId = if sessionId != "": sessionId else: "sse-" & $sseConnection.clientId - + # Track this connection temporarily (for notifications during processing) let connection = StreamingConnection( request: request, @@ -328,26 +328,26 @@ proc handleStreamingRequest(transport: MummyTransport, server: McpServer, reques transport.connections[actualSessionId] = connection echo fmt"Created SSE connection for POST request: {actualSessionId}" echo fmt"SSE connection active: {sseConnection.active}" - + # Set thread-local request context for notifications currentHTTPRequest = request currentSessionId = actualSessionId echo fmt"Set request context for notifications: {actualSessionId}" - + try: # Handle the request with transport access echo "=== STREAMING TRANSPORT CONFIGURATION ===" let capabilities = {tcEvents, tcUnicast} echo fmt"Transport capabilities: {capabilities}" - let mcpTransport = McpTransport(kind: tkHttp, capabilities: capabilities, + let mcpTransport = McpTransport(kind: tkHttp, capabilities: capabilities, httpTransport: cast[pointer](transport), httpSendNotification: httpNotificationWrapper) echo fmt"Created streaming transport with kind: {mcpTransport.kind}" - + echo "=== STREAMING REQUEST PROCESSING ===" echo "Calling server.handleRequest for streaming..." let response = server.handleRequest(mcpTransport, jsonRpcRequest) echo fmt"Streaming response received - ID: {response.id}, Error present: {response.error.isSome}" - + # Send the final response as an SSE event, then close the stream per MCP spec if response.id.kind != jridString or response.id.str != "": echo fmt"Sending final response as SSE event for request ID: {response.id}" @@ -361,13 +361,13 @@ proc handleStreamingRequest(transport: MummyTransport, server: McpServer, reques echo "Sending completion event for notification-only response" let completeEventId = fmt"complete-{getTime().toUnix()}" writeSSEEvent(sseConnection, "complete", "{\"status\": \"complete\"}", completeEventId) - + # Per MCP spec: "After all JSON-RPC responses have been sent, the server SHOULD close the SSE stream" echo "All responses sent - closing SSE stream per MCP specification" - + except Exception as e: echo fmt"Streaming request error: {e.msg}" - # Send error as SSE event, then close stream + # Send error as SSE event, then close stream if sseConnection.active: let errorEvent = %*{ "jsonrpc": "2.0", @@ -380,15 +380,15 @@ proc handleStreamingRequest(transport: MummyTransport, server: McpServer, reques } writeSSEEvent(sseConnection, "error", $errorEvent) echo "Error sent via SSE - stream will close" - + # Clean up and let stream close naturally - + finally: - # Clean up connection tracking + # Clean up connection tracking if actualSessionId in transport.connections: transport.connections.del(actualSessionId) echo fmt"Removed connection: {actualSessionId}" - + echo "SSE stream closed - POST request completed" proc handleMcpRequest(transport: MummyTransport, server: McpServer, request: Request) {.gcsafe.} = @@ -401,16 +401,16 @@ proc handleMcpRequest(transport: MummyTransport, server: McpServer, request: Req echo fmt"Body length: {request.body.len}" if request.body.len > 0 and request.body.len < 200: echo fmt"Body preview: {request.body}" - + var headers = corsHeadersFor("POST, GET, OPTIONS") - + # DNS rebinding protection (skip for OPTIONS requests) if request.httpMethod != "OPTIONS" and not transport.validateOrigin(request): echo "Origin validation failed" headers["Content-Type"] = "text/plain" request.respond(403, headers, "Forbidden: Invalid origin") return - + # Authentication validation (skip for OPTIONS requests) if request.httpMethod != "OPTIONS": let authResult = validateAuthentication(transport, request) @@ -424,22 +424,22 @@ proc handleMcpRequest(transport: MummyTransport, server: McpServer, request: Req request.respond(authResult.errorCode, headers, errorMsg) return echo "Authentication successful" - + # Determine response mode based on Accept header let streamingSupported = supportsStreaming(request) let clientCanStream = clientSupportsStreaming(request) let sessionId = getSessionId(request) - + case request.httpMethod: of "OPTIONS": request.respond(204, headers, "") - + of "GET": # Check if this is an SSE resume request if "Last-Event-ID" in request.headers: let lastEventId = request.headers["Last-Event-ID"] echo fmt"SSE resume request with Last-Event-ID: {lastEventId}" - + # Per MCP spec: streams are closed after responses are sent # Resume requests for completed streams should return 404 echo "Stream no longer available (POST request completed)" @@ -461,22 +461,22 @@ proc handleMcpRequest(transport: MummyTransport, server: McpServer, request: Req if sessionId != "": headers["Mcp-Session-Id"] = sessionId request.respond(200, headers, $info) - + of "POST": try: if request.body.len == 0: headers["Content-Type"] = "application/json" let errorResponse = createJsonRpcError( - JsonRpcId(kind: jridString, str: ""), - InvalidRequest, + JsonRpcId(kind: jridString, str: ""), + InvalidRequest, "Empty request body" ) request.respond(400, headers, $(%errorResponse)) return - + # Parse the JSON-RPC request using the existing protocol parser let jsonRpcRequest = parseJsonRpcMessage(request.body) - + # Handle based on streaming support and request type echo "=== RESPONSE MODE SELECTION ===" echo fmt"Streaming supported: {streamingSupported}" @@ -486,7 +486,7 @@ proc handleMcpRequest(transport: MummyTransport, server: McpServer, request: Req echo fmt"Active connections: {transport.connections.len}" if sessionId != "" and sessionId in transport.connections: echo fmt"Found existing connection for session: {sessionId}" - + # Force streaming mode for context-aware tool calls if client supports it # Context-aware tools are designed to send notifications during processing var forceStreaming = false @@ -499,14 +499,14 @@ proc handleMcpRequest(transport: MummyTransport, server: McpServer, request: Req echo fmt"Forcing streaming mode for context-aware tool: {toolName}" else: echo fmt"Regular tool call, not forcing streaming: {toolName}" - + if forceStreaming or streamingSupported: echo "Using streaming mode (per-POST SSE stream)" handleStreamingRequest(transport, server, request, jsonRpcRequest, sessionId) else: echo "Using regular JSON response mode" handleJsonRequest(transport, server, request, jsonRpcRequest, sessionId) - + except JsonParsingError as e: headers["Content-Type"] = "application/json" let errorResponse = createParseError(details = e.msg) @@ -515,7 +515,7 @@ proc handleMcpRequest(transport: MummyTransport, server: McpServer, request: Req headers["Content-Type"] = "application/json" let errorResponse = createInternalError(JsonRpcId(kind: jridString, str: ""), e.msg) request.respond(500, headers, $(%errorResponse)) - + else: request.respond(405, headers, "Method not allowed") @@ -526,7 +526,7 @@ proc sendNotification(transport: MummyTransport, sseConnection: SSEConnection, n echo fmt"Data: {data}" echo fmt"SSE connection active: {sseConnection.active}" echo fmt"Transport connections count: {transport.connections.len}" - + # Log all current connections if transport.connections.len > 0: echo "Active connections:" @@ -534,7 +534,7 @@ proc sendNotification(transport: MummyTransport, sseConnection: SSEConnection, n echo fmt" Session: {sessionId} (active: {conn.isActive}, SSE active: {conn.sseConnection.active})" else: echo "No active connections for notifications" - + # Create MCP notification in proper format per specification let notification = %*{ "jsonrpc": "2.0", @@ -545,7 +545,7 @@ proc sendNotification(transport: MummyTransport, sseConnection: SSEConnection, n "data": data } } - + # Send notification via SSE with event ID let eventId = fmt"notify-{getTime().toUnix()}-{rand(1000)}" echo "Sending notification via SSE event" @@ -560,23 +560,23 @@ proc sendProgressNotification(transport: MummyTransport, sseConnection: SSEConne echo fmt"Progress: {progress}, Token: {progressToken}" if total != nil: echo fmt"Total: {total}" if message.len > 0: echo fmt"Message: {message}" - + let params = %*{ "progress": progress, "progressToken": progressToken } - + if total != nil: params["total"] = total if message.len > 0: params["message"] = %message - + let notification = %*{ "jsonrpc": "2.0", "method": "notifications/progress", "params": params } - + let eventId = fmt"progress-{getTime().toUnix()}-{rand(1000)}" writeSSENotification(sseConnection, notification, eventId) echo "Progress notification sent via SSE" @@ -588,21 +588,21 @@ proc sendLoggingNotification(transport: MummyTransport, sseConnection: SSEConnec echo "\n=== SEND LOGGING NOTIFICATION ===" echo fmt"Data: {data}, Level: {level}" if logger.len > 0: echo fmt"Logger: {logger}" - + let params = %*{ "data": data, "level": level } - + if logger.len > 0: params["logger"] = %logger - + let notification = %*{ "jsonrpc": "2.0", "method": "notifications/message", "params": params } - + let eventId = fmt"log-{getTime().toUnix()}-{rand(1000)}" writeSSENotification(sseConnection, notification, eventId) echo "Logging notification sent via SSE" @@ -614,7 +614,7 @@ proc sendNotificationToSession*(transport: MummyTransport, sessionId: string, no echo fmt"Notification type: {notificationType}" echo fmt"Data: {data}" echo fmt"Transport connections count: {transport.connections.len}" - + # Check if we have an active connection for this session if sessionId in transport.connections: let connection = transport.connections[sessionId] @@ -624,7 +624,7 @@ proc sendNotificationToSession*(transport: MummyTransport, sessionId: string, no echo fmt"Connection path: {connection.request.path}" echo fmt"SSE connection active: {connection.sseConnection.active}" echo "Sending notification via SSE to active connection" - + # Create MCP notification in proper format per specification let notification = %*{ "jsonrpc": "2.0", @@ -635,7 +635,7 @@ proc sendNotificationToSession*(transport: MummyTransport, sessionId: string, no "data": data } } - + # Send as SSE event with event ID let eventId = fmt"session-notify-{getTime().toUnix()}-{rand(1000)}" writeSSENotification(connection.sseConnection, notification, eventId) @@ -657,17 +657,17 @@ proc cleanupInactiveConnections*(transport: MummyTransport) = ## Clean up connections that are no longer active or have timed out let cutoff = getTime() - initDuration(minutes = 30) # 30 minute timeout var toRemove: seq[string] = @[] - + echo "=== CONNECTION CLEANUP ===" for sessionId, connection in transport.connections: if not connection.isActive or not connection.sseConnection.active or connection.lastActivity < cutoff: echo fmt"Marking connection {sessionId} for removal - active: {connection.isActive}, SSE active: {connection.sseConnection.active}, last activity: {connection.lastActivity}" toRemove.add(sessionId) - + for sessionId in toRemove: transport.connections.del(sessionId) echo fmt"Removed inactive connection: {sessionId}" - + echo fmt"Cleanup complete - {transport.connections.len} connections remaining" proc markConnectionInactive*(transport: MummyTransport, sessionId: string) = @@ -676,13 +676,12 @@ proc markConnectionInactive*(transport: MummyTransport, sessionId: string) = transport.connections[sessionId].isActive = false echo fmt"Marked connection {sessionId} as inactive" -proc setupRoutes(transport: MummyTransport, server: McpServer) = - # Handle all MCP requests on the root path - transport.base.router.get("/", proc(request: Request) {.gcsafe.} = transport.handleMcpRequest(server, request)) - transport.base.router.post("/", proc(request: Request) {.gcsafe.} = transport.handleMcpRequest(server, request)) - transport.base.router.options("/", proc(request: Request) {.gcsafe.} = transport.handleMcpRequest(server, request)) +proc setupRoutes(transport: MummyTransport, server: McpServer, path: string) = + transport.base.router.get(path, proc(request: Request) {.gcsafe.} = transport.handleMcpRequest(server, request)) + transport.base.router.post(path, proc(request: Request) {.gcsafe.} = transport.handleMcpRequest(server, request)) + transport.base.router.options(path, proc(request: Request) {.gcsafe.} = transport.handleMcpRequest(server, request)) -proc serve*(transport: MummyTransport, server: McpServer) = +proc serve*(transport: MummyTransport, server: McpServer, path: string = "/") = ## Serve the HTTP server and serve MCP requests echo "\n=== SERVER STARTUP ===" echo fmt"Server name: {server.serverInfo.name}" @@ -694,19 +693,19 @@ proc serve*(transport: MummyTransport, server: McpServer) = if transport.base.authConfig.enabled: echo "Bearer token configured: [REDACTED]" echo fmt"CORS allowed origins: {transport.base.allowedOrigins}" - - transport.setupRoutes(server) + + transport.setupRoutes(server, path) transport.base.httpServer = newServer(transport.base.router) - + # Start connection cleanup task echo "Starting connection cleanup task" # Note: In a production implementation, you'd want to run this in a separate thread # For now, we'll just add the cleanup capability - + echo fmt"Starting MCP HTTP server at http://{transport.base.host}:{transport.base.port}" echo "Server supports both JSON and SSE streaming modes" echo "Press Ctrl+C to stop the server" echo "=== SERVER READY ===" - + transport.base.httpServer.serve(Port(transport.base.port), transport.base.host)