Files
iLSP/ilsp/yaml_lsp/proxy.py
Henrik Jess Nielsen ae751f944c
All checks were successful
Build and Deploy iLSP / test (push) Successful in 25s
Build and Deploy iLSP / build-and-deploy (push) Successful in 1m37s
fix: yaml stdio per-connection, iac catalog path, makefile port
- yaml-language-server: rewrite to stdio per WebSocket (fixes crash loop)
  vscode-jsonrpc v9 createServerSocketTransport is a TCP client, not server
  now spawns yaml-language-server --stdio per connection via asyncio subprocess

- bicep/modules.py: add /iac_source_catalog.json as first path in _IAC_SOURCE_PATHS
  Dockerfile copies to /iac_source_catalog.json but path wasn't listed

- server.py: remove YAML_LSP_PORT daemon (no longer needed with stdio mode)

- Makefile: add -e HTTP_PORT=$(HEALTH_PORT) to all docker run commands
  server defaulted to :8000 but Makefile exposed :2089 with no override
2026-05-10 16:37:42 +02:00

385 lines
14 KiB
Python

"""
Asyncio-based YAML LSP WebSocket proxy.
Architecture:
Editor (WebSocket) ──► YamlWsProxy ──► yaml-language-server (TCP:YAML_LSP_PORT)
Intercepts:
- textDocument/didOpen + didChange → tracks document content per URI
- textDocument/completion requests → detects template context (AzDO / GHA)
- textDocument/completion responses → injects pipeline template completions
Context detection:
AzDO: scan back for '- template: PATH@ALIAS', cursor in 'parameters:' block
GHA: scan back for 'uses: ORG/REPO/.github/workflows/FILE@REF', cursor in 'with:' block
"""
import asyncio
import json
import logging
import re
from typing import Any
from aiohttp import web, WSMsgType
from .catalog import PipelineTemplateCatalog
logger = logging.getLogger(__name__)
_CHUNK = 65536
# ── LSP framing ────────────────────────────────────────────────────────────────
class _LspFrameBuffer:
"""Reassembles LSP Content-Length framed messages from a stream of bytes."""
def __init__(self) -> None:
self._buf = b""
def feed(self, data: bytes) -> list[bytes]:
"""Feed bytes and return complete frames (as raw bytes, without header)."""
self._buf += data
frames = []
while True:
sep = self._buf.find(b"\r\n\r\n")
if sep == -1:
break
header = self._buf[:sep]
length = 0
for part in header.split(b"\r\n"):
if part.lower().startswith(b"content-length:"):
length = int(part.split(b":")[1].strip())
body_start = sep + 4
if len(self._buf) < body_start + length:
break
frames.append(self._buf[body_start : body_start + length])
self._buf = self._buf[body_start + length :]
return frames
def _lsp_frame(body: bytes) -> bytes:
return f"Content-Length: {len(body)}\r\n\r\n".encode() + body
# ── Document + context tracking ────────────────────────────────────────────────
def _detect_doc_format(lines: list[str]) -> str:
"""
Return 'azdo', 'gha', or 'unknown' based on document signals.
AzDO: has '- template:' lines (with or without @alias) or 'azure-pipelines' keywords
GHA: has 'uses:' with '.github/workflows/' or 'runs-on:' with 'on:' block
"""
text = "\n".join(lines[:80]) # only scan first 80 lines for speed
if re.search(r"^on:\s*$|workflow_call|runs-on:", text, re.MULTILINE):
return "gha"
if re.search(r"-\s+template\s*:", text, re.MULTILINE):
return "azdo"
# Fallback: any yaml with azure pipeline stage/step/job keys → azdo
if re.search(r"^stages:|^jobs:|^steps:|^trigger:|^pool:", text, re.MULTILINE):
return "azdo"
return "unknown"
def _detect_azdo_context(lines: list[str], line_idx: int, char_idx: int) -> dict:
"""
Return context dict for AzDO completion at (line_idx, char_idx).
Detects:
- 'template_path': cursor is on a '- template:' line value
- 'param_name': cursor is inside 'parameters:' block below a known template
- 'unknown'
"""
current = lines[line_idx][:char_idx] if line_idx < len(lines) else ""
indent = len(current) - len(current.lstrip())
# Are we ON a '- template:' line?
if re.search(r"-\s+template:\s*", current):
# Extract partial path typed so far
m = re.search(r"-\s+template:\s*(\S*)$", current)
prefix = m.group(1) if m else ""
return {"type": "template_path", "format": "azdo", "prefix": prefix}
# Are we in a 'parameters:' block? Scan backwards for the enclosing template line
lookback = lines[max(0, line_idx - 40) : line_idx + 1]
lookback = list(lookback)
lookback[-1] = lookback[-1][:char_idx]
# Find the most recent '- template: PATH@ALIAS' above cursor
template_key = None
in_params_block = False
for i in range(len(lookback) - 1, -1, -1):
ln = lookback[i]
# Detect '- template: tasks/k8s/deploy.yaml@pipeline-templates'
m = re.search(r"-\s+template:\s+(\S+@\S+)", ln)
if m:
template_key = m.group(1)
break
# Detect entering a 'parameters:' block
if re.match(r"\s*parameters\s*:", ln):
in_params_block = True
if template_key and in_params_block:
# Check if cursor is after 'paramname: ' (value context)
value_m = re.match(r"\s*-\s+(\w+)\s*:\s*(.*)$", current)
if value_m:
return {
"type": "param_value",
"format": "azdo",
"template_key": template_key,
"param": value_m.group(1),
}
# Otherwise: parameter name completion
return {"type": "param_name", "format": "azdo", "template_key": template_key}
return {"type": "unknown", "format": "azdo"}
def _detect_gha_context(lines: list[str], line_idx: int, char_idx: int) -> dict:
"""
Return context dict for GHA completion at (line_idx, char_idx).
Detects:
- 'workflow_ref': cursor is on a 'uses:' line value
- 'input_name': cursor is inside 'with:' block below a known 'uses:' line
- 'unknown'
"""
current = lines[line_idx][:char_idx] if line_idx < len(lines) else ""
# Are we ON a 'uses:' line?
if re.match(r"\s*uses\s*:", current):
m = re.search(r"uses:\s*(\S*)$", current)
prefix = m.group(1) if m else ""
return {"type": "workflow_ref", "format": "gha", "prefix": prefix}
# Scan back for the enclosing 'uses:' line inside a step
lookback = lines[max(0, line_idx - 20) : line_idx + 1]
lookback = list(lookback)
lookback[-1] = lookback[-1][:char_idx]
template_key = None
in_with_block = False
for i in range(len(lookback) - 1, -1, -1):
ln = lookback[i]
# 'uses: org/repo/.github/workflows/file.yml@ref'
m = re.search(r"uses:\s+(\S+/\.github/workflows/\S+)", ln)
if m:
template_key = m.group(1)
break
if re.match(r"\s+with\s*:", ln):
in_with_block = True
if template_key and in_with_block:
return {"type": "input_name", "format": "gha", "template_key": template_key}
return {"type": "unknown", "format": "gha"}
# ── Completion injection ───────────────────────────────────────────────────────
def _inject_completions(msg: dict[str, Any], context: dict) -> bytes:
"""Inject template-aware items at the top of a completion response."""
result = msg.get("result")
if result is None:
return json.dumps(msg).encode()
items: list | None = None
if isinstance(result, list):
items = result
elif isinstance(result, dict) and "items" in result:
items = result["items"]
if items is None:
return json.dumps(msg).encode()
ctx_type = context.get("type", "unknown")
fmt = context.get("format", "unknown")
lru_items: list[dict[str, Any]] = []
if ctx_type == "template_path" and fmt == "azdo":
lru_items = PipelineTemplateCatalog.azdo_template_completion_items()
elif ctx_type == "param_name" and fmt == "azdo":
key = context.get("template_key", "")
lru_items = PipelineTemplateCatalog.azdo_param_completion_items(key)
elif ctx_type == "param_value" and fmt == "azdo":
key = context.get("template_key", "")
param = context.get("param", "")
lru_items = PipelineTemplateCatalog.azdo_param_value_items(key, param)
elif ctx_type == "workflow_ref" and fmt == "gha":
lru_items = PipelineTemplateCatalog.gha_workflow_completion_items()
elif ctx_type == "input_name" and fmt == "gha":
key = context.get("template_key", "")
lru_items = PipelineTemplateCatalog.gha_input_completion_items(key)
if not lru_items:
return json.dumps(msg).encode()
# Downgrade existing items' sortText so LRU items appear first
for item in items:
st = item.get("sortText", item.get("label", ""))
item["sortText"] = "9_" + st
if isinstance(result, list):
msg["result"] = lru_items + items
else:
result["items"] = lru_items + items
msg["result"] = result
return json.dumps(msg).encode()
# ── Proxy session ─────────────────────────────────────────────────────────────
class _YamlSession:
"""Per-WebSocket state: document lines and pending completion requests."""
def __init__(self) -> None:
self.docs: dict[str, list[str]] = {} # uri → lines
self.pending: dict = {} # request_id → context dict
def update_doc(self, uri: str, text: str) -> None:
self.docs[uri] = text.splitlines()
def record_request(self, msg: dict) -> None:
req_id = msg.get("id")
if req_id is None:
return
params = msg.get("params", {})
uri = params.get("textDocument", {}).get("uri", "")
position = params.get("position", {})
lines = self.docs.get(uri, [])
doc_format = _detect_doc_format(lines)
line_idx = position.get("line", 0)
char_idx = position.get("character", 0)
if doc_format == "azdo":
ctx = _detect_azdo_context(lines, line_idx, char_idx)
elif doc_format == "gha":
ctx = _detect_gha_context(lines, line_idx, char_idx)
else:
ctx = {"type": "unknown", "format": "unknown"}
self.pending[req_id] = ctx
def pop_context(self, msg_id) -> dict:
return self.pending.pop(msg_id, {"type": "unknown", "format": "unknown"})
# ── Main WS handler ───────────────────────────────────────────────────────────
async def yaml_ws_handler(request: web.Request, yaml_lsp_port: int = 0) -> web.WebSocketResponse:
"""
WebSocket handler for the /yaml endpoint.
Spawns yaml-language-server --stdio per editor connection (one process per
session). Bridges WS ↔ process stdin/stdout, intercepting completion messages
to inject pipeline template completions.
Note: yaml_lsp_port is unused — kept for API compatibility.
yaml-language-server uses --stdio so no TCP port is needed.
"""
import shutil
ws = web.WebSocketResponse()
await ws.prepare(request)
yaml_ls = shutil.which("yaml-language-server")
if not yaml_ls:
logger.error("yaml-language-server not found in PATH")
await ws.close(code=1011, message=b"yaml-language-server not installed", timeout=2.0)
return ws
try:
proc = await asyncio.create_subprocess_exec(
yaml_ls, "--stdio",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
except Exception as exc:
logger.error("Failed to start yaml-language-server: %s", exc)
await ws.close(code=1011, message=b"YAML LSP failed to start", timeout=2.0)
return ws
logger.info("yaml-language-server started (--stdio) PID=%d", proc.pid)
session = _YamlSession()
ws_buf = _LspFrameBuffer()
proc_buf = _LspFrameBuffer()
async def client_to_server() -> None:
"""WS → stdin: track document content and completion requests."""
try:
async for msg in ws:
if msg.type not in (WSMsgType.BINARY, WSMsgType.TEXT):
continue
raw = msg.data if msg.type == WSMsgType.BINARY else msg.data.encode()
frames = ws_buf.feed(raw)
for frame in frames:
try:
parsed = json.loads(frame)
method = parsed.get("method", "")
if method in ("textDocument/didOpen", "textDocument/didChange"):
params = parsed.get("params", {})
uri = params.get("textDocument", {}).get("uri", "")
text = params.get("textDocument", {}).get("text") or ""
if not text:
changes = params.get("contentChanges", [])
if changes:
text = changes[-1].get("text", "")
if uri and text:
session.update_doc(uri, text)
elif method == "textDocument/completion":
session.record_request(parsed)
except Exception:
pass
proc.stdin.write(_lsp_frame(frame))
await proc.stdin.drain()
except Exception:
pass
finally:
try:
proc.stdin.close()
except Exception:
pass
proc.kill()
async def server_to_client() -> None:
"""stdout → WS: inject completions into completion responses."""
try:
while True:
data = await proc.stdout.read(_CHUNK)
if not data:
break
frames = proc_buf.feed(data)
for frame in frames:
try:
parsed = json.loads(frame)
msg_id = parsed.get("id")
if msg_id is not None and "result" in parsed:
ctx = session.pop_context(msg_id)
modified = _inject_completions(parsed, ctx)
await ws.send_bytes(_lsp_frame(modified))
continue
except Exception:
pass
await ws.send_bytes(_lsp_frame(frame))
except Exception:
pass
finally:
await ws.close()
await asyncio.gather(client_to_server(), server_to_client(), return_exceptions=True)
try:
proc.kill()
except Exception:
pass
logger.info("yaml-language-server session ended PID=%d", proc.pid)
return ws