Bladeren bron

fix: support multiple concurrent HTTP clients

The HTTP MCP server creates a single Transport + McpServer pair at
startup. Once the first client initializes, all subsequent clients
are rejected with "Server already initialized" — making the HTTP
mode unusable for reconnect, crash recovery, or multi-client scenarios.

Replace the singleton with a per-session architecture: each initialize
request creates its own McpServer + Transport pair, stored in a
sessions Map keyed by session ID. The shared Store (SQLite) is
stateless and safe for concurrent access.

Key changes:
- createSession() factory creates fresh McpServer + Transport per client
- POST /mcp routes by mcp-session-id header to existing sessions
- New initialize requests (no session header) create new sessions
- Unknown session IDs return 404 per MCP spec
- Missing session IDs return 400
- onsessioninitialized callback stores sessions at the right time
- transport.onclose cleans up the sessions Map
- Shutdown iterates all active sessions

Tested with 3+ concurrent clients, session cleanup via DELETE,
cross-session isolation, and rapid session creation.

Fixes #195
Closes #163

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Joel Johnson 2 maanden geleden
bovenliggende
commit
383a2e5cf1
1 gewijzigde bestanden met toevoegingen van 84 en 8 verwijderingen
  1. 84 8
      src/mcp.ts

+ 84 - 8
src/mcp.ts

@@ -14,6 +14,7 @@ import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mc
 import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
 import { WebStandardStreamableHTTPServerTransport }
   from "@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js";
+import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
 import { z } from "zod";
 import {
   createStore,
@@ -552,12 +553,31 @@ export type HttpServerHandle = {
  */
 export async function startMcpHttpServer(port: number, options?: { quiet?: boolean }): Promise<HttpServerHandle> {
   const store = createStore();
-  const mcpServer = createMcpServer(store);
-  const transport = new WebStandardStreamableHTTPServerTransport({
-    sessionIdGenerator: () => randomUUID(),
-    enableJsonResponse: true,
-  });
-  await mcpServer.connect(transport);
+
+  // Session map: each client gets its own McpServer + Transport pair (MCP spec requirement).
+  // The store is shared — it's stateless SQLite, safe for concurrent access.
+  const sessions = new Map<string, WebStandardStreamableHTTPServerTransport>();
+
+  async function createSession(): Promise<WebStandardStreamableHTTPServerTransport> {
+    const transport = new WebStandardStreamableHTTPServerTransport({
+      sessionIdGenerator: () => randomUUID(),
+      enableJsonResponse: true,
+      onsessioninitialized: (sessionId: string) => {
+        sessions.set(sessionId, transport);
+        log(`${ts()} New session ${sessionId} (${sessions.size} active)`);
+      },
+    });
+    const server = createMcpServer(store);
+    await server.connect(transport);
+
+    transport.onclose = () => {
+      if (transport.sessionId) {
+        sessions.delete(transport.sessionId);
+      }
+    };
+
+    return transport;
+  }
 
   const startTime = Date.now();
   const quiet = options?.quiet ?? false;
@@ -669,8 +689,38 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
         for (const [k, v] of Object.entries(nodeReq.headers)) {
           if (typeof v === "string") headers[k] = v;
         }
+
+        // Route to existing session or create new one on initialize
+        const sessionId = headers["mcp-session-id"];
+        let transport: WebStandardStreamableHTTPServerTransport;
+
+        if (sessionId) {
+          const existing = sessions.get(sessionId);
+          if (!existing) {
+            nodeRes.writeHead(404, { "Content-Type": "application/json" });
+            nodeRes.end(JSON.stringify({
+              jsonrpc: "2.0",
+              error: { code: -32001, message: "Session not found" },
+              id: body?.id ?? null,
+            }));
+            return;
+          }
+          transport = existing;
+        } else if (isInitializeRequest(body)) {
+          transport = await createSession();
+        } else {
+          nodeRes.writeHead(400, { "Content-Type": "application/json" });
+          nodeRes.end(JSON.stringify({
+            jsonrpc: "2.0",
+            error: { code: -32000, message: "Bad Request: Missing session ID" },
+            id: body?.id ?? null,
+          }));
+          return;
+        }
+
         const request = new Request(url, { method: "POST", headers, body: rawBody });
         const response = await transport.handleRequest(request, { parsedBody: body });
+
         nodeRes.writeHead(response.status, Object.fromEntries(response.headers));
         nodeRes.end(Buffer.from(await response.arrayBuffer()));
         log(`${ts()} POST /mcp ${label} (${Date.now() - reqStart}ms)`);
@@ -678,11 +728,34 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
       }
 
       if (pathname === "/mcp") {
-        const url = `http://localhost:${port}${pathname}`;
         const headers: Record<string, string> = {};
         for (const [k, v] of Object.entries(nodeReq.headers)) {
           if (typeof v === "string") headers[k] = v;
         }
+
+        // GET/DELETE must have a valid session
+        const sessionId = headers["mcp-session-id"];
+        if (!sessionId) {
+          nodeRes.writeHead(400, { "Content-Type": "application/json" });
+          nodeRes.end(JSON.stringify({
+            jsonrpc: "2.0",
+            error: { code: -32000, message: "Bad Request: Missing session ID" },
+            id: null,
+          }));
+          return;
+        }
+        const transport = sessions.get(sessionId);
+        if (!transport) {
+          nodeRes.writeHead(404, { "Content-Type": "application/json" });
+          nodeRes.end(JSON.stringify({
+            jsonrpc: "2.0",
+            error: { code: -32001, message: "Session not found" },
+            id: null,
+          }));
+          return;
+        }
+
+        const url = `http://localhost:${port}${pathname}`;
         const rawBody = nodeReq.method !== "GET" && nodeReq.method !== "HEAD" ? await collectBody(nodeReq) : undefined;
         const request = new Request(url, { method: nodeReq.method || "GET", headers, ...(rawBody ? { body: rawBody } : {}) });
         const response = await transport.handleRequest(request);
@@ -711,7 +784,10 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
   const stop = async () => {
     if (stopping) return;
     stopping = true;
-    await transport.close();
+    for (const transport of sessions.values()) {
+      await transport.close();
+    }
+    sessions.clear();
     httpServer.close();
     store.close();
     await disposeDefaultLlamaCpp();