""" Asyncio-based YAML LSP WebSocket proxy. Architecture: Editor (WebSocket) ──► YamlWsProxy ──► yaml-language-server (TCP:YAML_LSP_PORT) Intercepts: - textDocument/didOpen + didChange → tracks document content per URI - textDocument/completion requests → detects template context (AzDO / GHA) - textDocument/completion responses → injects pipeline template completions Context detection: AzDO: scan back for '- template: PATH@ALIAS', cursor in 'parameters:' block GHA: scan back for 'uses: ORG/REPO/.github/workflows/FILE@REF', cursor in 'with:' block """ import asyncio import json import logging import pathlib import re from typing import Any from aiohttp import web, WSMsgType from .catalog import PipelineTemplateCatalog logger = logging.getLogger(__name__) _CHUNK = 65536 # AzDO pipeline schema — baked into the Docker image; falls back to upstream URL _AZDO_SCHEMA_PATH = pathlib.Path("/azdo-pipeline-schema.json") _AZDO_SCHEMA_URL = ( "https://raw.githubusercontent.com/microsoft/azure-pipelines-vscode" "/main/service-schema.json" ) # Glob patterns that yaml-language-server matches against the file path _AZDO_SCHEMA_GLOBS = [ "azure-pipelines.yml", "azure-pipelines.yaml", "*azure-pipelines.yml", "*azure-pipelines.yaml", ] def _get_azdo_schema_uri() -> str: """Return the best available URI for the AzDO pipeline schema.""" if _AZDO_SCHEMA_PATH.exists(): return _AZDO_SCHEMA_PATH.as_uri() return _AZDO_SCHEMA_URL def _inject_azdo_init_options(msg: dict) -> dict: """Inject AzDO schema into yaml-language-server's initializationOptions. yaml-language-server reads schemas from initializationOptions.settings.yaml.schemas at startup — this is more reliable than a post-init didChangeConfiguration. """ schema_uri = _get_azdo_schema_uri() params = msg.setdefault("params", {}) init_opts = params.setdefault("initializationOptions", {}) settings = init_opts.setdefault("settings", {}) yaml_cfg = settings.setdefault("yaml", {}) schemas = yaml_cfg.setdefault("schemas", {}) schemas[schema_uri] = _AZDO_SCHEMA_GLOBS yaml_cfg.setdefault("completion", True) yaml_cfg.setdefault("validate", True) yaml_cfg.setdefault("hover", True) logger.debug( "Injected AzDO schema into initializationOptions (uri=%s)", schema_uri ) return msg # ── LSP framing ──────────────────────────────────────────────────────────────── async def _inject_azdo_schema_config(proc: asyncio.subprocess.Process) -> None: """Send workspace/didChangeConfiguration to load the AzDO pipeline schema. Called immediately after the editor's 'initialized' notification as a belt-and-suspenders complement to the initializationOptions injection. """ schema_uri = _get_azdo_schema_uri() config = { "jsonrpc": "2.0", "method": "workspace/didChangeConfiguration", "params": { "settings": { "yaml": { "schemas": {schema_uri: _AZDO_SCHEMA_GLOBS}, "completion": True, "validate": True, "hover": True, } } }, } body = json.dumps(config).encode() proc.stdin.write(f"Content-Length: {len(body)}\r\n\r\n".encode() + body) await proc.stdin.drain() logger.info("Injected AzDO pipeline schema config (source: %s)", schema_uri) async def _respond_workspace_configuration( proc: asyncio.subprocess.Process, request: dict ) -> None: """Reply to yaml-language-server's workspace/configuration pull request. yaml-language-server asks the client for settings via this request. We respond on behalf of the client with the AzDO schema config so that task:, inputs:, steps: and pipeline structure completions work even if the editor doesn't handle workspace/configuration responses. """ schema_uri = _get_azdo_schema_uri() yaml_settings = { "schemas": {schema_uri: _AZDO_SCHEMA_GLOBS}, "completion": True, "validate": True, "hover": True, "schemaStore": {"enable": False}, } # The request.params.items is a list of {section: "yaml"} etc. # We reply with one result entry per requested item. items = request.get("params", {}).get("items", []) result = [] for item in items: section = item.get("section", "") if section == "yaml": result.append(yaml_settings) elif section == "http": result.append({"proxy": None, "proxyStrictSSL": False}) else: result.append(None) response = {"jsonrpc": "2.0", "id": request["id"], "result": result} body = json.dumps(response).encode() proc.stdin.write(f"Content-Length: {len(body)}\r\n\r\n".encode() + body) await proc.stdin.drain() logger.debug( "Responded to workspace/configuration (id=%s, sections=%s)", request.get("id"), [i.get("section") for i in items], ) class _LspFrameBuffer: """Reassembles LSP Content-Length framed messages from a stream of bytes.""" def __init__(self) -> None: self._buf = b"" def feed(self, data: bytes) -> list[bytes]: """Feed bytes and return complete frames (as raw bytes, without header).""" self._buf += data frames = [] while True: sep = self._buf.find(b"\r\n\r\n") if sep == -1: break header = self._buf[:sep] length = 0 for part in header.split(b"\r\n"): if part.lower().startswith(b"content-length:"): length = int(part.split(b":")[1].strip()) body_start = sep + 4 if len(self._buf) < body_start + length: break frames.append(self._buf[body_start : body_start + length]) self._buf = self._buf[body_start + length :] return frames def _lsp_frame(body: bytes) -> bytes: return f"Content-Length: {len(body)}\r\n\r\n".encode() + body # ── Document + context tracking ──────────────────────────────────────────────── def _detect_doc_format(lines: list[str]) -> str: """ Return 'azdo', 'gha', or 'unknown' based on document signals. AzDO: has '- template:' lines (with or without @alias) or 'azure-pipelines' keywords GHA: has 'uses:' with '.github/workflows/' or 'runs-on:' with 'on:' block """ text = "\n".join(lines[:80]) # only scan first 80 lines for speed if re.search(r"^on:\s*$|workflow_call|runs-on:", text, re.MULTILINE): return "gha" if re.search(r"-\s+template\s*:", text, re.MULTILINE): return "azdo" # Fallback: any yaml with azure pipeline stage/step/job keys → azdo if re.search(r"^stages:|^jobs:|^steps:|^trigger:|^pool:", text, re.MULTILINE): return "azdo" return "unknown" def _detect_azdo_context(lines: list[str], line_idx: int, char_idx: int) -> dict: """ Return context dict for AzDO completion at (line_idx, char_idx). Detects: - 'template_path': cursor is on a '- template:' line value - 'param_name': cursor is inside 'parameters:' block below a known template - 'unknown' """ current = lines[line_idx][:char_idx] if line_idx < len(lines) else "" indent = len(current) - len(current.lstrip()) # Are we ON a '- template:' line? if re.search(r"-\s+template:\s*", current): # Extract partial path typed so far m = re.search(r"-\s+template:\s*(\S*)$", current) prefix = m.group(1) if m else "" return {"type": "template_path", "format": "azdo", "prefix": prefix} # Are we in a 'parameters:' block? Scan backwards for the enclosing template line lookback = lines[max(0, line_idx - 40) : line_idx + 1] lookback = list(lookback) lookback[-1] = lookback[-1][:char_idx] # Find the most recent '- template: PATH@ALIAS' above cursor template_key = None in_params_block = False for i in range(len(lookback) - 1, -1, -1): ln = lookback[i] # Detect '- template: tasks/k8s/deploy.yaml@pipeline-templates' m = re.search(r"-\s+template:\s+(\S+@\S+)", ln) if m: template_key = m.group(1) break # Detect entering a 'parameters:' block if re.match(r"\s*parameters\s*:", ln): in_params_block = True if template_key and in_params_block: # Check if cursor is after 'paramname: ' (value context) value_m = re.match(r"\s*-\s+(\w+)\s*:\s*(.*)$", current) if value_m: return { "type": "param_value", "format": "azdo", "template_key": template_key, "param": value_m.group(1), } # Otherwise: parameter name completion return {"type": "param_name", "format": "azdo", "template_key": template_key} return {"type": "unknown", "format": "azdo"} def _detect_gha_context(lines: list[str], line_idx: int, char_idx: int) -> dict: """ Return context dict for GHA completion at (line_idx, char_idx). Detects: - 'workflow_ref': cursor is on a 'uses:' line value - 'input_name': cursor is inside 'with:' block below a known 'uses:' line - 'unknown' """ current = lines[line_idx][:char_idx] if line_idx < len(lines) else "" # Are we ON a 'uses:' line? if re.match(r"\s*uses\s*:", current): m = re.search(r"uses:\s*(\S*)$", current) prefix = m.group(1) if m else "" return {"type": "workflow_ref", "format": "gha", "prefix": prefix} # Scan back for the enclosing 'uses:' line inside a step lookback = lines[max(0, line_idx - 20) : line_idx + 1] lookback = list(lookback) lookback[-1] = lookback[-1][:char_idx] template_key = None in_with_block = False for i in range(len(lookback) - 1, -1, -1): ln = lookback[i] # 'uses: org/repo/.github/workflows/file.yml@ref' m = re.search(r"uses:\s+(\S+/\.github/workflows/\S+)", ln) if m: template_key = m.group(1) break if re.match(r"\s+with\s*:", ln): in_with_block = True if template_key and in_with_block: return {"type": "input_name", "format": "gha", "template_key": template_key} return {"type": "unknown", "format": "gha"} # ── Completion injection ─────────────────────────────────────────────────────── def _inject_completions(msg: dict[str, Any], context: dict) -> bytes: """Inject template-aware items at the top of a completion response.""" result = msg.get("result") if result is None: return json.dumps(msg).encode() items: list | None = None if isinstance(result, list): items = result elif isinstance(result, dict) and "items" in result: items = result["items"] if items is None: return json.dumps(msg).encode() ctx_type = context.get("type", "unknown") fmt = context.get("format", "unknown") lru_items: list[dict[str, Any]] = [] if ctx_type == "template_path" and fmt == "azdo": lru_items = PipelineTemplateCatalog.azdo_template_completion_items() elif ctx_type == "param_name" and fmt == "azdo": key = context.get("template_key", "") lru_items = PipelineTemplateCatalog.azdo_param_completion_items(key) elif ctx_type == "param_value" and fmt == "azdo": key = context.get("template_key", "") param = context.get("param", "") lru_items = PipelineTemplateCatalog.azdo_param_value_items(key, param) elif ctx_type == "workflow_ref" and fmt == "gha": lru_items = PipelineTemplateCatalog.gha_workflow_completion_items() elif ctx_type == "input_name" and fmt == "gha": key = context.get("template_key", "") lru_items = PipelineTemplateCatalog.gha_input_completion_items(key) if not lru_items: return json.dumps(msg).encode() # Downgrade existing items' sortText so LRU items appear first for item in items: st = item.get("sortText", item.get("label", "")) item["sortText"] = "9_" + st if isinstance(result, list): msg["result"] = lru_items + items else: result["items"] = lru_items + items msg["result"] = result return json.dumps(msg).encode() # ── Proxy session ───────────────────────────────────────────────────────────── class _YamlSession: """Per-WebSocket state: document lines and pending completion requests.""" def __init__(self) -> None: self.docs: dict[str, list[str]] = {} # uri → lines self.pending: dict = {} # request_id → context dict def update_doc(self, uri: str, text: str) -> None: self.docs[uri] = text.splitlines() def record_request(self, msg: dict) -> None: req_id = msg.get("id") if req_id is None: return params = msg.get("params", {}) uri = params.get("textDocument", {}).get("uri", "") position = params.get("position", {}) lines = self.docs.get(uri, []) doc_format = _detect_doc_format(lines) line_idx = position.get("line", 0) char_idx = position.get("character", 0) if doc_format == "azdo": ctx = _detect_azdo_context(lines, line_idx, char_idx) elif doc_format == "gha": ctx = _detect_gha_context(lines, line_idx, char_idx) else: ctx = {"type": "unknown", "format": "unknown"} self.pending[req_id] = ctx def pop_context(self, msg_id) -> dict: return self.pending.pop(msg_id, {"type": "unknown", "format": "unknown"}) # ── Main WS handler ─────────────────────────────────────────────────────────── async def yaml_ws_handler(request: web.Request, yaml_lsp_port: int = 0) -> web.WebSocketResponse: """ WebSocket handler for the /yaml endpoint. Spawns yaml-language-server --stdio per editor connection (one process per session). Bridges WS ↔ process stdin/stdout, intercepting completion messages to inject pipeline template completions. Note: yaml_lsp_port is unused — kept for API compatibility. yaml-language-server uses --stdio so no TCP port is needed. """ import shutil ws = web.WebSocketResponse() await ws.prepare(request) yaml_ls = shutil.which("yaml-language-server") if not yaml_ls: logger.error("yaml-language-server not found in PATH") await ws.close(code=1011, message=b"yaml-language-server not installed", timeout=2.0) return ws try: proc = await asyncio.create_subprocess_exec( yaml_ls, "--stdio", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) except Exception as exc: logger.error("Failed to start yaml-language-server: %s", exc) await ws.close(code=1011, message=b"YAML LSP failed to start", timeout=2.0) return ws logger.info("yaml-language-server started (--stdio) PID=%d", proc.pid) session = _YamlSession() ws_buf = _LspFrameBuffer() proc_buf = _LspFrameBuffer() async def client_to_server() -> None: """WS → stdin: track document content and completion requests.""" try: async for msg in ws: if msg.type not in (WSMsgType.BINARY, WSMsgType.TEXT): continue raw = msg.data if msg.type == WSMsgType.BINARY else msg.data.encode() frames = ws_buf.feed(raw) for frame in frames: _parsed: Any = None try: _parsed = json.loads(frame) method = _parsed.get("method", "") if method == "initialize": # Inject AzDO schema into initializationOptions so # yaml-language-server loads it from the very start. _parsed = _inject_azdo_init_options(_parsed) frame = json.dumps(_parsed).encode() elif method in ("textDocument/didOpen", "textDocument/didChange"): params = _parsed.get("params", {}) uri = params.get("textDocument", {}).get("uri", "") text = params.get("textDocument", {}).get("text") or "" if not text: changes = params.get("contentChanges", []) if changes: text = changes[-1].get("text", "") if uri and text: session.update_doc(uri, text) elif method == "textDocument/completion": session.record_request(_parsed) except Exception: pass proc.stdin.write(_lsp_frame(frame)) # After 'initialized', inject AzDO pipeline schema config so # yaml-language-server provides task:/inputs: completions. if _parsed is not None and _parsed.get("method") == "initialized": await _inject_azdo_schema_config(proc) await proc.stdin.drain() except Exception: pass finally: try: proc.stdin.close() except Exception: pass proc.kill() async def server_to_client() -> None: """stdout → WS: inject completions into completion responses.""" try: while True: data = await proc.stdout.read(_CHUNK) if not data: break frames = proc_buf.feed(data) for frame in frames: try: parsed = json.loads(frame) msg_id = parsed.get("id") # yaml-language-server uses the PULL model: it sends # workspace/configuration requests to ask the client for # settings. Respond directly from the proxy so the # schema is always applied regardless of editor support. if parsed.get("method") == "workspace/configuration": await _respond_workspace_configuration(proc, parsed) # Still forward to the WS client so the editor can # see and optionally override with its own response. await ws.send_bytes(_lsp_frame(frame)) continue if msg_id is not None and "result" in parsed: ctx = session.pop_context(msg_id) modified = _inject_completions(parsed, ctx) await ws.send_bytes(_lsp_frame(modified)) continue except Exception: pass await ws.send_bytes(_lsp_frame(frame)) except Exception: pass finally: await ws.close() await asyncio.gather(client_to_server(), server_to_client(), return_exceptions=True) try: proc.kill() except Exception: pass logger.info("yaml-language-server session ended PID=%d", proc.pid) return ws