From 685e4271c1dff9c3b7757202787b6746937b0590 Mon Sep 17 00:00:00 2001 From: Henrik Jess Nielsen Date: Sat, 9 May 2026 17:08:36 +0200 Subject: [PATCH] fix(mcp): add MCP session initialize handshake before tool calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FastMCP 2.0 requires: 1. POST initialize → get mcp-session-id header 2. POST notifications/initialized with session ID 3. POST tools/call with session ID Also prefer structuredContent over text/content in result extraction. --- app/mcp_client.py | 61 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/app/mcp_client.py b/app/mcp_client.py index 6abfd45..9ce2c2c 100644 --- a/app/mcp_client.py +++ b/app/mcp_client.py @@ -2,8 +2,11 @@ FastMCP 2.0 uses streamable HTTP transport (MCP spec §6.3.3): - POST to /mcp with Accept: application/json, text/event-stream -- Server may respond with JSON or SSE stream (text/event-stream) -- SSE responses have `data: {...}` lines containing JSON-RPC messages +- Requires MCP session initialization handshake before tool calls: + 1. POST initialize (no session ID) → get mcp-session-id header + 2. POST notifications/initialized with session ID + 3. POST tools/call with session ID +- Server responds with SSE (text/event-stream) containing JSON-RPC messages """ from __future__ import annotations @@ -17,14 +20,14 @@ import httpx MCP_URL = os.environ.get("DEVOPS_MCP_URL", "http://localhost:8000") _MCP_ENDPOINT = f"{MCP_URL}/mcp" -_HEADERS = { +_BASE_HEADERS = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream", } def _parse_sse_data(text: str) -> Dict[str, Any]: - """Extract JSON-RPC result from SSE stream data lines.""" + """Extract first JSON-RPC result/error from SSE stream.""" for line in text.splitlines(): line = line.strip() if line.startswith("data:"): @@ -41,10 +44,13 @@ def _parse_sse_data(text: str) -> Dict[str, Any]: def _extract_result(data: Dict[str, Any]) -> Dict[str, Any]: - """Extract tool result content from JSON-RPC response.""" + """Unwrap JSON-RPC response into plain dict.""" if "error" in data: raise RuntimeError(f"MCP error: {data['error']}") result = data.get("result", {}) + # Prefer structuredContent (FastMCP 2.0 returns this alongside text) + if "structuredContent" in result: + return result["structuredContent"] content = result.get("content", []) if content and isinstance(content[0], dict): text = content[0].get("text", "{}") @@ -55,16 +61,47 @@ def _extract_result(data: Dict[str, Any]) -> Dict[str, Any]: return result -async def _call_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: - """Call a DevOpsMCP tool via MCP JSON-RPC 2.0 streamable HTTP transport.""" - payload = { +async def _initialize_session(client: httpx.AsyncClient) -> str: + """Perform MCP initialize handshake; returns session ID.""" + init_payload = { "jsonrpc": "2.0", - "method": "tools/call", - "params": {"name": tool_name, "arguments": arguments}, - "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "devops-dash", "version": "1.0"}, + }, + "id": 0, } + resp = await client.post(_MCP_ENDPOINT, json=init_payload, headers=_BASE_HEADERS) + resp.raise_for_status() + session_id = resp.headers.get("mcp-session-id", "") + if not session_id: + raise RuntimeError("MCP initialize did not return mcp-session-id") + + # Send initialized notification (no id = notification, no response expected) + notif_headers = {**_BASE_HEADERS, "mcp-session-id": session_id} + await client.post( + _MCP_ENDPOINT, + json={"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}, + headers=notif_headers, + ) + return session_id + + +async def _call_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: + """Call a DevOpsMCP tool via MCP JSON-RPC 2.0 with proper session handshake.""" async with httpx.AsyncClient(timeout=60.0) as client: - resp = await client.post(_MCP_ENDPOINT, json=payload, headers=_HEADERS) + session_id = await _initialize_session(client) + headers = {**_BASE_HEADERS, "mcp-session-id": session_id} + + payload = { + "jsonrpc": "2.0", + "method": "tools/call", + "params": {"name": tool_name, "arguments": arguments}, + "id": 1, + } + resp = await client.post(_MCP_ENDPOINT, json=payload, headers=headers) resp.raise_for_status() content_type = resp.headers.get("content-type", "")