"""MCP proxy client — calls DevOpsMCP's worklog/standup tools over HTTP MCP protocol. FastMCP 2.0 uses streamable HTTP transport (MCP spec §6.3.3): - POST to /mcp with Accept: application/json, text/event-stream - Requires MCP session initialization handshake before tool calls: 1. POST initialize (no session ID) → get mcp-session-id header 2. POST notifications/initialized with session ID 3. POST tools/call with session ID - Server responds with SSE (text/event-stream) containing JSON-RPC messages """ from __future__ import annotations import json import os from typing import Any, Dict, Optional import httpx MCP_URL = os.environ.get("DEVOPS_MCP_URL", "http://localhost:8000") _MCP_ENDPOINT = f"{MCP_URL}/mcp" _BASE_HEADERS = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream", } def _parse_sse_data(text: str) -> Dict[str, Any]: """Extract first JSON-RPC result/error from SSE stream.""" for line in text.splitlines(): line = line.strip() if line.startswith("data:"): chunk = line[5:].strip() if not chunk or chunk == "[DONE]": continue try: msg = json.loads(chunk) if "result" in msg or "error" in msg: return msg except json.JSONDecodeError: continue return {} def _extract_result(data: Dict[str, Any]) -> Dict[str, Any]: """Unwrap JSON-RPC response into plain dict.""" if "error" in data: raise RuntimeError(f"MCP error: {data['error']}") result = data.get("result", {}) # Prefer structuredContent (FastMCP 2.0 returns this alongside text) if "structuredContent" in result: return result["structuredContent"] content = result.get("content", []) if content and isinstance(content[0], dict): text = content[0].get("text", "{}") try: return json.loads(text) except json.JSONDecodeError: return {"raw": text} return result async def _initialize_session(client: httpx.AsyncClient) -> str: """Perform MCP initialize handshake; returns session ID.""" init_payload = { "jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "devops-dash", "version": "1.0"}, }, "id": 0, } resp = await client.post(_MCP_ENDPOINT, json=init_payload, headers=_BASE_HEADERS) resp.raise_for_status() session_id = resp.headers.get("mcp-session-id", "") if not session_id: raise RuntimeError("MCP initialize did not return mcp-session-id") # Send initialized notification (no id = notification, no response expected) notif_headers = {**_BASE_HEADERS, "mcp-session-id": session_id} await client.post( _MCP_ENDPOINT, json={"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}, headers=notif_headers, ) return session_id async def _call_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Call a DevOpsMCP tool via MCP JSON-RPC 2.0 with proper session handshake.""" async with httpx.AsyncClient(timeout=60.0) as client: session_id = await _initialize_session(client) headers = {**_BASE_HEADERS, "mcp-session-id": session_id} payload = { "jsonrpc": "2.0", "method": "tools/call", "params": {"name": tool_name, "arguments": arguments}, "id": 1, } resp = await client.post(_MCP_ENDPOINT, json=payload, headers=headers) resp.raise_for_status() content_type = resp.headers.get("content-type", "") if "text/event-stream" in content_type: data = _parse_sse_data(resp.text) else: data = resp.json() return _extract_result(data) async def get_worklog( context: str = "egmont", days: int = 7, group_by: str = "repo", since_date: Optional[str] = None, until_date: Optional[str] = None, ) -> Dict[str, Any]: args: Dict[str, Any] = {"context": context, "days": days, "group_by": group_by} if since_date: args["since_date"] = since_date if until_date: args["until_date"] = until_date return await _call_tool("worklog", args) async def get_standup(days: int = 2, context: str = "egmont") -> Dict[str, Any]: return await _call_tool("generate_standup", {"days": days, "context": context}) async def list_knowledge(category: Optional[str] = None, tag: Optional[str] = None) -> Dict[str, Any]: args: Dict[str, Any] = {} if category: args["category"] = category if tag: args["tag"] = tag return await _call_tool("list_knowledge", args) async def get_knowledge(storage_filename: str) -> Dict[str, Any]: return await _call_tool("get_knowledge", {"storage_filename": storage_filename, "include_metadata": False}) async def list_howtos() -> Dict[str, Any]: return await _call_tool("list_howtos", {}) async def get_howto(filename: str) -> Dict[str, Any]: return await _call_tool("get_howto_content", {"filename": filename}) async def list_agents(domain: Optional[str] = None) -> Dict[str, Any]: args: Dict[str, Any] = {} if domain: args["domain"] = domain return await _call_tool("get_agent", args) async def get_agent(name: str) -> Dict[str, Any]: return await _call_tool("get_agent", {"name": name}) async def list_skills(domain: Optional[str] = None) -> Dict[str, Any]: args: Dict[str, Any] = {} if domain: args["domain"] = domain return await _call_tool("get_skill", args) async def get_skill(name: str) -> Dict[str, Any]: return await _call_tool("get_skill", {"name": name})