From 86c44799a69a44ec11a9397a0235ff64d51c9d50 Mon Sep 17 00:00:00 2001 From: Henrik Jess Nielsen Date: Sat, 9 May 2026 17:00:52 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20MCP=20client=20SSE=20transport=20?= =?UTF-8?q?=E2=80=94=20add=20correct=20Accept=20headers=20and=20SSE=20resp?= =?UTF-8?q?onse=20parsing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FastMCP 2.0 streamable HTTP requires Accept: application/json, text/event-stream and can return text/event-stream responses with data: {...} SSE lines. --- app/mcp_client.py | 73 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 56 insertions(+), 17 deletions(-) diff --git a/app/mcp_client.py b/app/mcp_client.py index 67fd54a..6abfd45 100644 --- a/app/mcp_client.py +++ b/app/mcp_client.py @@ -1,7 +1,14 @@ -"""MCP proxy client — calls DevOpsMCP's worklog/standup tools over HTTP MCP protocol.""" +"""MCP proxy client — calls DevOpsMCP's worklog/standup tools over HTTP MCP protocol. + +FastMCP 2.0 uses streamable HTTP transport (MCP spec §6.3.3): +- POST to /mcp with Accept: application/json, text/event-stream +- Server may respond with JSON or SSE stream (text/event-stream) +- SSE responses have `data: {...}` lines containing JSON-RPC messages +""" from __future__ import annotations +import json import os from typing import Any, Dict, Optional @@ -10,31 +17,63 @@ import httpx MCP_URL = os.environ.get("DEVOPS_MCP_URL", "http://localhost:8000") _MCP_ENDPOINT = f"{MCP_URL}/mcp" +_HEADERS = { + "Content-Type": "application/json", + "Accept": "application/json, text/event-stream", +} + + +def _parse_sse_data(text: str) -> Dict[str, Any]: + """Extract JSON-RPC result from SSE stream data lines.""" + for line in text.splitlines(): + line = line.strip() + if line.startswith("data:"): + chunk = line[5:].strip() + if not chunk or chunk == "[DONE]": + continue + try: + msg = json.loads(chunk) + if "result" in msg or "error" in msg: + return msg + except json.JSONDecodeError: + continue + return {} + + +def _extract_result(data: Dict[str, Any]) -> Dict[str, Any]: + """Extract tool result content from JSON-RPC response.""" + if "error" in data: + raise RuntimeError(f"MCP error: {data['error']}") + result = data.get("result", {}) + content = result.get("content", []) + if content and isinstance(content[0], dict): + text = content[0].get("text", "{}") + try: + return json.loads(text) + except json.JSONDecodeError: + return {"raw": text} + return result + async def _call_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: - """Call a DevOpsMCP tool via MCP JSON-RPC 2.0 over HTTP.""" + """Call a DevOpsMCP tool via MCP JSON-RPC 2.0 streamable HTTP transport.""" payload = { "jsonrpc": "2.0", "method": "tools/call", "params": {"name": tool_name, "arguments": arguments}, "id": 1, } - async with httpx.AsyncClient(timeout=30.0) as client: - resp = await client.post(_MCP_ENDPOINT, json=payload) + async with httpx.AsyncClient(timeout=60.0) as client: + resp = await client.post(_MCP_ENDPOINT, json=payload, headers=_HEADERS) resp.raise_for_status() - data = resp.json() - if "error" in data: - raise RuntimeError(f"MCP error: {data['error']}") - result = data.get("result", {}) - content = result.get("content", []) - if content and isinstance(content[0], dict): - import json - text = content[0].get("text", "{}") - try: - return json.loads(text) - except json.JSONDecodeError: - return {"raw": text} - return result + + content_type = resp.headers.get("content-type", "") + if "text/event-stream" in content_type: + data = _parse_sse_data(resp.text) + else: + data = resp.json() + + return _extract_result(data) async def get_worklog(