feat: YAML pipeline template autocomplete (AzDO + GHA)
All checks were successful
Build and Deploy iLSP / test (push) Successful in 25s
Build and Deploy iLSP / build-and-deploy (push) Successful in 1m34s

- Add scripts/sync_pipeline_templates.py — scans LRU AzDO and GHA template
  repos; outputs unified pipeline_templates_catalog.json (48 templates: 45
  AzDO + 3 GHA)
- Add scripts/template_sources.yml — source config (AzDO alias, GHA org)
- Add pipeline_templates_catalog.json — baked catalog (49 KB)
- Add ilsp/yaml_lsp/catalog.py — PipelineTemplateCatalog with completion item
  generators for template paths, param names, allowed values, GHA inputs
- Add ilsp/yaml_lsp/proxy.py — async WS↔TCP bridge with LSP frame buffering,
  per-connection document tracking, AzDO/GHA context detection, and completion
  injection (LRU items sortText 0_, standard items downgraded to 9_)
- Wire yaml_ws_handler into server.py (replaces raw _ws_proxy call)
- Load PipelineTemplateCatalog at startup; reload + health report template count
- Update push_catalogs.sh to push pipeline_templates_catalog.json
- Update Dockerfile to bake pipeline_templates_catalog.json as image fallback
- Add tests/test_yaml_catalog.py (14 tests) + tests/test_yaml_proxy.py (18 tests)
  All 67 tests green
This commit is contained in:
Henrik Jess Nielsen
2026-05-10 15:59:37 +02:00
parent 5501254b55
commit 333d986e76
11 changed files with 3328 additions and 12 deletions

View File

@@ -20,6 +20,7 @@ WORKDIR /build
COPY pyproject.toml .
COPY ilsp/ ilsp/
COPY bicep_modules_catalog.json .
COPY pipeline_templates_catalog.json .
RUN pip install --upgrade pip build \
&& python -m build --wheel --outdir /dist
@@ -43,6 +44,7 @@ COPY --from=bicep-downloader /opt/bicep-langserver /opt/bicep-langserver
# Install Python package and dependencies
COPY --from=builder /dist/*.whl /tmp/
COPY --from=builder /build/bicep_modules_catalog.json /bicep_modules_catalog.json
COPY --from=builder /build/pipeline_templates_catalog.json /pipeline_templates_catalog.json
RUN pip3 install --no-cache-dir --break-system-packages /tmp/*.whl && rm /tmp/*.whl
# Configuration defaults (override via Nomad env)

View File

@@ -24,6 +24,8 @@ from aiohttp import web
from .python_lsp.catalog import PypiCatalog
from .bicep_lsp.modules import BicepModuleCatalog
from .bicep_lsp.proxy import serve_bicep
from .yaml_lsp.catalog import PipelineTemplateCatalog
from .yaml_lsp.proxy import yaml_ws_handler
logger = logging.getLogger(__name__)
@@ -131,17 +133,26 @@ async def _build_app() -> web.Application:
"bicep_modules": len(BicepModuleCatalog._modules),
"iac_source_modules": len(BicepModuleCatalog._iac),
"yaml_lsp": bool(shutil.which("yaml-language-server")),
"pipeline_templates": PipelineTemplateCatalog.template_count(),
})
async def reload(_: web.Request) -> web.Response:
before = len(BicepModuleCatalog._modules)
before_bicep = len(BicepModuleCatalog._modules)
before_tmpl = PipelineTemplateCatalog.template_count()
BicepModuleCatalog.load()
after = len(BicepModuleCatalog._modules)
logger.info("Catalog reloaded: %d%d modules", before, after)
PipelineTemplateCatalog.load()
after_bicep = len(BicepModuleCatalog._modules)
after_tmpl = PipelineTemplateCatalog.template_count()
logger.info(
"Catalog reloaded: bicep %d%d templates %d%d",
before_bicep, after_bicep, before_tmpl, after_tmpl,
)
return web.json_response({
"status": "reloaded",
"bicep_modules_before": before,
"bicep_modules_after": after,
"bicep_modules_before": before_bicep,
"bicep_modules_after": after_bicep,
"pipeline_templates_before": before_tmpl,
"pipeline_templates_after": after_tmpl,
})
async def python_ws(request: web.Request) -> web.WebSocketResponse:
@@ -151,7 +162,7 @@ async def _build_app() -> web.Application:
return await _ws_proxy(request, "127.0.0.1", BICEP_LSP_PORT)
async def yaml_ws(request: web.Request) -> web.WebSocketResponse:
return await _ws_proxy(request, "127.0.0.1", YAML_LSP_PORT)
return await yaml_ws_handler(request, YAML_LSP_PORT)
app.router.add_get("/health", health)
app.router.add_post("/reload", reload)
@@ -169,6 +180,7 @@ async def main_async() -> None:
logger.info("Pre-warming catalogs…")
BicepModuleCatalog.load()
PipelineTemplateCatalog.load()
await PypiCatalog.start_background_refresh()
# LSP servers run internally on localhost — not exposed outside the container

View File

@@ -0,0 +1 @@
"""yaml_lsp — Pipeline template autocomplete for Azure DevOps and GitHub Actions."""

234
ilsp/yaml_lsp/catalog.py Normal file
View File

@@ -0,0 +1,234 @@
"""
PipelineTemplateCatalog — in-memory catalog of AzDO pipeline templates and
GitHub Actions reusable workflows.
Loaded from pipeline_templates_catalog.json (baked into image or volume-mounted).
Provides LSP completion items for template keys, parameter names, and allowed values.
"""
import json
import logging
import pathlib
from typing import Any
logger = logging.getLogger(__name__)
_CATALOG_PATHS = [
pathlib.Path("/data/pipeline_templates_catalog.json"), # volume-mount (freshest)
pathlib.Path("/pipeline_templates_catalog.json"), # baked into image
pathlib.Path(__file__).parent.parent.parent / "pipeline_templates_catalog.json", # dev
]
# LSP completion item kinds
_KIND_MODULE = 9
_KIND_VALUE = 12
_KIND_PROPERTY = 10
def _load_catalog() -> dict[str, dict[str, Any]]:
for path in _CATALOG_PATHS:
if path.exists():
try:
data = json.loads(path.read_text(encoding="utf-8"))
templates = data.get("templates", {})
logger.info(
"Pipeline template catalog loaded from %s: %d templates", path, len(templates)
)
return templates
except Exception:
logger.exception("Failed to parse pipeline template catalog at %s", path)
logger.info("No pipeline_templates_catalog.json found — template completions disabled")
return {}
class PipelineTemplateCatalog:
"""In-memory catalog of pipeline templates, loaded once at startup."""
_templates: dict[str, dict[str, Any]] = {}
@classmethod
def load(cls) -> None:
"""Load catalog from disk. Call once at startup (or on /reload)."""
cls._templates = _load_catalog()
@classmethod
def get_template(cls, key: str) -> dict[str, Any] | None:
return cls._templates.get(key)
@classmethod
def all_keys(cls) -> list[str]:
return list(cls._templates.keys())
@classmethod
def template_count(cls) -> int:
return len(cls._templates)
# ── AzDO completions ─────────────────────────────────────────────────────
@classmethod
def azdo_template_completion_items(cls) -> list[dict[str, Any]]:
"""Completion items for AzDO template paths (shown when typing a template: value)."""
items = []
for key, tmpl in cls._templates.items():
if tmpl.get("format") != "azdo":
continue
alias = tmpl.get("alias", "pipeline-templates")
path = tmpl.get("path", key)
nparams = len(tmpl.get("parameters", []))
items.append({
"label": path,
"kind": _KIND_MODULE,
"detail": f"@{alias}{nparams} params",
"insertText": path,
"sortText": f"0_lru_{path}",
"documentation": _azdo_template_doc(path, alias, tmpl),
})
return items
@classmethod
def azdo_param_completion_items(cls, template_key: str) -> list[dict[str, Any]]:
"""Completion items for AzDO parameter names inside a parameters: block."""
tmpl = cls._templates.get(template_key)
if not tmpl:
return []
items = []
for i, p in enumerate(tmpl.get("parameters", [])):
name = p["name"]
required = p.get("required", False)
label = name + ("*" if required else "")
detail_parts = [p.get("type", "string")]
if required:
detail_parts.append("required")
if "default" in p:
detail_parts.append(f"default: {p['default']!r}")
items.append({
"label": label,
"filterText": name,
"kind": _KIND_PROPERTY,
"detail": " · ".join(detail_parts),
"insertText": f"{name}: ",
"sortText": f"0_{i:03d}_{name}",
"documentation": _param_doc(p),
})
return items
@classmethod
def azdo_param_value_items(cls, template_key: str, param_name: str) -> list[dict[str, Any]]:
"""Completion items for allowed values of an AzDO parameter."""
tmpl = cls._templates.get(template_key)
if not tmpl:
return []
for p in tmpl.get("parameters", []):
if p["name"] == param_name:
return [
{
"label": str(v),
"kind": _KIND_VALUE,
"insertText": str(v),
"sortText": f"0_{i:03d}_{v}",
}
for i, v in enumerate(p.get("allowed", []))
]
return []
# ── GHA completions ───────────────────────────────────────────────────────
@classmethod
def gha_workflow_completion_items(cls) -> list[dict[str, Any]]:
"""Completion items for GHA reusable workflow references (uses: value)."""
items = []
for key, tmpl in cls._templates.items():
if tmpl.get("format") != "gha":
continue
org = tmpl.get("org", "")
repo = tmpl.get("repo", "")
path = tmpl.get("path", "")
ref = tmpl.get("ref", "main")
full_ref = f"{org}/{repo}/{path}@{ref}"
nparams = len(tmpl.get("parameters", []))
items.append({
"label": full_ref,
"kind": _KIND_MODULE,
"detail": f"{org}/{repo}{nparams} inputs",
"insertText": full_ref,
"sortText": f"0_lru_{repo}_{path}",
"documentation": _gha_workflow_doc(full_ref, tmpl),
})
return items
@classmethod
def gha_input_completion_items(cls, template_key: str) -> list[dict[str, Any]]:
"""Completion items for GHA workflow_call input names inside a with: block."""
tmpl = cls._templates.get(template_key)
if not tmpl:
return []
items = []
for i, p in enumerate(tmpl.get("parameters", [])):
name = p["name"]
required = p.get("required", False)
label = name + ("*" if required else "")
detail_parts = [p.get("type", "string")]
if required:
detail_parts.append("required")
if "default" in p:
detail_parts.append(f"default: {p['default']!r}")
items.append({
"label": label,
"filterText": name,
"kind": _KIND_PROPERTY,
"detail": " · ".join(detail_parts),
"insertText": f"{name}: ",
"sortText": f"0_{i:03d}_{name}",
"documentation": _param_doc(p),
})
return items
# ── Documentation helpers ─────────────────────────────────────────────────────
def _param_doc(p: dict[str, Any]) -> dict[str, str]:
lines = [f"**`{p['name']}`** ({p.get('type', 'string')})"]
if p.get("required"):
lines.append("_Required_")
if "default" in p:
lines.append(f"Default: `{p['default']!r}`")
if p.get("description"):
lines.append(f"\n{p['description']}")
allowed = p.get("allowed", [])
if allowed:
lines.append("\nAllowed: " + " | ".join(f"`{v}`" for v in allowed))
return {"kind": "markdown", "value": "\n\n".join(lines)}
def _azdo_template_doc(path: str, alias: str, tmpl: dict[str, Any]) -> dict[str, str]:
params = tmpl.get("parameters", [])
required = [p["name"] for p in params if p.get("required")]
optional = [p["name"] for p in params if not p.get("required")]
lines = [
f"**AzDO template** — `{path}@{alias}`",
"",
f"Reference: `- template: {path}@{alias}`",
"",
]
if required:
lines.append("**Required params:** " + ", ".join(f"`{n}`" for n in required))
if optional:
lines.append("**Optional params:** " + ", ".join(f"`{n}`" for n in optional))
return {"kind": "markdown", "value": "\n".join(lines)}
def _gha_workflow_doc(full_ref: str, tmpl: dict[str, Any]) -> dict[str, str]:
params = tmpl.get("parameters", [])
required = [p["name"] for p in params if p.get("required")]
optional = [p["name"] for p in params if not p.get("required")]
lines = [
f"**GHA reusable workflow** — `{full_ref}`",
"",
f"Reference: `uses: {full_ref}`",
"",
]
if required:
lines.append("**Required inputs:** " + ", ".join(f"`{n}`" for n in required))
if optional:
lines.append("**Optional inputs:** " + ", ".join(f"`{n}`" for n in optional))
return {"kind": "markdown", "value": "\n".join(lines)}

359
ilsp/yaml_lsp/proxy.py Normal file
View File

@@ -0,0 +1,359 @@
"""
Asyncio-based YAML LSP WebSocket proxy.
Architecture:
Editor (WebSocket) ──► YamlWsProxy ──► yaml-language-server (TCP:YAML_LSP_PORT)
Intercepts:
- textDocument/didOpen + didChange → tracks document content per URI
- textDocument/completion requests → detects template context (AzDO / GHA)
- textDocument/completion responses → injects pipeline template completions
Context detection:
AzDO: scan back for '- template: PATH@ALIAS', cursor in 'parameters:' block
GHA: scan back for 'uses: ORG/REPO/.github/workflows/FILE@REF', cursor in 'with:' block
"""
import asyncio
import json
import logging
import re
from typing import Any
from aiohttp import web, WSMsgType
from .catalog import PipelineTemplateCatalog
logger = logging.getLogger(__name__)
_CHUNK = 65536
# ── LSP framing ────────────────────────────────────────────────────────────────
class _LspFrameBuffer:
"""Reassembles LSP Content-Length framed messages from a stream of bytes."""
def __init__(self) -> None:
self._buf = b""
def feed(self, data: bytes) -> list[bytes]:
"""Feed bytes and return complete frames (as raw bytes, without header)."""
self._buf += data
frames = []
while True:
sep = self._buf.find(b"\r\n\r\n")
if sep == -1:
break
header = self._buf[:sep]
length = 0
for part in header.split(b"\r\n"):
if part.lower().startswith(b"content-length:"):
length = int(part.split(b":")[1].strip())
body_start = sep + 4
if len(self._buf) < body_start + length:
break
frames.append(self._buf[body_start : body_start + length])
self._buf = self._buf[body_start + length :]
return frames
def _lsp_frame(body: bytes) -> bytes:
return f"Content-Length: {len(body)}\r\n\r\n".encode() + body
# ── Document + context tracking ────────────────────────────────────────────────
def _detect_doc_format(lines: list[str]) -> str:
"""
Return 'azdo', 'gha', or 'unknown' based on document signals.
AzDO: has '- template:' lines (with or without @alias) or 'azure-pipelines' keywords
GHA: has 'uses:' with '.github/workflows/' or 'runs-on:' with 'on:' block
"""
text = "\n".join(lines[:80]) # only scan first 80 lines for speed
if re.search(r"^on:\s*$|workflow_call|runs-on:", text, re.MULTILINE):
return "gha"
if re.search(r"-\s+template\s*:", text, re.MULTILINE):
return "azdo"
# Fallback: any yaml with azure pipeline stage/step/job keys → azdo
if re.search(r"^stages:|^jobs:|^steps:|^trigger:|^pool:", text, re.MULTILINE):
return "azdo"
return "unknown"
def _detect_azdo_context(lines: list[str], line_idx: int, char_idx: int) -> dict:
"""
Return context dict for AzDO completion at (line_idx, char_idx).
Detects:
- 'template_path': cursor is on a '- template:' line value
- 'param_name': cursor is inside 'parameters:' block below a known template
- 'unknown'
"""
current = lines[line_idx][:char_idx] if line_idx < len(lines) else ""
indent = len(current) - len(current.lstrip())
# Are we ON a '- template:' line?
if re.search(r"-\s+template:\s*", current):
# Extract partial path typed so far
m = re.search(r"-\s+template:\s*(\S*)$", current)
prefix = m.group(1) if m else ""
return {"type": "template_path", "format": "azdo", "prefix": prefix}
# Are we in a 'parameters:' block? Scan backwards for the enclosing template line
lookback = lines[max(0, line_idx - 40) : line_idx + 1]
lookback = list(lookback)
lookback[-1] = lookback[-1][:char_idx]
# Find the most recent '- template: PATH@ALIAS' above cursor
template_key = None
in_params_block = False
for i in range(len(lookback) - 1, -1, -1):
ln = lookback[i]
# Detect '- template: tasks/k8s/deploy.yaml@pipeline-templates'
m = re.search(r"-\s+template:\s+(\S+@\S+)", ln)
if m:
template_key = m.group(1)
break
# Detect entering a 'parameters:' block
if re.match(r"\s*parameters\s*:", ln):
in_params_block = True
if template_key and in_params_block:
# Check if cursor is after 'paramname: ' (value context)
value_m = re.match(r"\s*-\s+(\w+)\s*:\s*(.*)$", current)
if value_m:
return {
"type": "param_value",
"format": "azdo",
"template_key": template_key,
"param": value_m.group(1),
}
# Otherwise: parameter name completion
return {"type": "param_name", "format": "azdo", "template_key": template_key}
return {"type": "unknown", "format": "azdo"}
def _detect_gha_context(lines: list[str], line_idx: int, char_idx: int) -> dict:
"""
Return context dict for GHA completion at (line_idx, char_idx).
Detects:
- 'workflow_ref': cursor is on a 'uses:' line value
- 'input_name': cursor is inside 'with:' block below a known 'uses:' line
- 'unknown'
"""
current = lines[line_idx][:char_idx] if line_idx < len(lines) else ""
# Are we ON a 'uses:' line?
if re.match(r"\s*uses\s*:", current):
m = re.search(r"uses:\s*(\S*)$", current)
prefix = m.group(1) if m else ""
return {"type": "workflow_ref", "format": "gha", "prefix": prefix}
# Scan back for the enclosing 'uses:' line inside a step
lookback = lines[max(0, line_idx - 20) : line_idx + 1]
lookback = list(lookback)
lookback[-1] = lookback[-1][:char_idx]
template_key = None
in_with_block = False
for i in range(len(lookback) - 1, -1, -1):
ln = lookback[i]
# 'uses: org/repo/.github/workflows/file.yml@ref'
m = re.search(r"uses:\s+(\S+/\.github/workflows/\S+)", ln)
if m:
template_key = m.group(1)
break
if re.match(r"\s+with\s*:", ln):
in_with_block = True
if template_key and in_with_block:
return {"type": "input_name", "format": "gha", "template_key": template_key}
return {"type": "unknown", "format": "gha"}
# ── Completion injection ───────────────────────────────────────────────────────
def _inject_completions(msg: dict[str, Any], context: dict) -> bytes:
"""Inject template-aware items at the top of a completion response."""
result = msg.get("result")
if result is None:
return json.dumps(msg).encode()
items: list | None = None
if isinstance(result, list):
items = result
elif isinstance(result, dict) and "items" in result:
items = result["items"]
if items is None:
return json.dumps(msg).encode()
ctx_type = context.get("type", "unknown")
fmt = context.get("format", "unknown")
lru_items: list[dict[str, Any]] = []
if ctx_type == "template_path" and fmt == "azdo":
lru_items = PipelineTemplateCatalog.azdo_template_completion_items()
elif ctx_type == "param_name" and fmt == "azdo":
key = context.get("template_key", "")
lru_items = PipelineTemplateCatalog.azdo_param_completion_items(key)
elif ctx_type == "param_value" and fmt == "azdo":
key = context.get("template_key", "")
param = context.get("param", "")
lru_items = PipelineTemplateCatalog.azdo_param_value_items(key, param)
elif ctx_type == "workflow_ref" and fmt == "gha":
lru_items = PipelineTemplateCatalog.gha_workflow_completion_items()
elif ctx_type == "input_name" and fmt == "gha":
key = context.get("template_key", "")
lru_items = PipelineTemplateCatalog.gha_input_completion_items(key)
if not lru_items:
return json.dumps(msg).encode()
# Downgrade existing items' sortText so LRU items appear first
for item in items:
st = item.get("sortText", item.get("label", ""))
item["sortText"] = "9_" + st
if isinstance(result, list):
msg["result"] = lru_items + items
else:
result["items"] = lru_items + items
msg["result"] = result
return json.dumps(msg).encode()
# ── Proxy session ─────────────────────────────────────────────────────────────
class _YamlSession:
"""Per-WebSocket state: document lines and pending completion requests."""
def __init__(self) -> None:
self.docs: dict[str, list[str]] = {} # uri → lines
self.pending: dict = {} # request_id → context dict
def update_doc(self, uri: str, text: str) -> None:
self.docs[uri] = text.splitlines()
def record_request(self, msg: dict) -> None:
req_id = msg.get("id")
if req_id is None:
return
params = msg.get("params", {})
uri = params.get("textDocument", {}).get("uri", "")
position = params.get("position", {})
lines = self.docs.get(uri, [])
doc_format = _detect_doc_format(lines)
line_idx = position.get("line", 0)
char_idx = position.get("character", 0)
if doc_format == "azdo":
ctx = _detect_azdo_context(lines, line_idx, char_idx)
elif doc_format == "gha":
ctx = _detect_gha_context(lines, line_idx, char_idx)
else:
ctx = {"type": "unknown", "format": "unknown"}
self.pending[req_id] = ctx
def pop_context(self, msg_id) -> dict:
return self.pending.pop(msg_id, {"type": "unknown", "format": "unknown"})
# ── Main WS handler ───────────────────────────────────────────────────────────
async def yaml_ws_handler(request: web.Request, yaml_lsp_port: int) -> web.WebSocketResponse:
"""
WebSocket handler for the /yaml endpoint.
Bridges the editor WS connection to yaml-language-server TCP, intercepting
completion messages to inject pipeline template completions.
"""
ws = web.WebSocketResponse()
await ws.prepare(request)
try:
tcp_reader, tcp_writer = await asyncio.wait_for(
asyncio.open_connection("127.0.0.1", yaml_lsp_port), timeout=3.0
)
except (OSError, asyncio.TimeoutError) as exc:
logger.error("Cannot connect to yaml-language-server on port %d: %s", yaml_lsp_port, exc)
await ws.close(code=1011, message=b"YAML LSP backend unavailable", timeout=2.0)
return ws
session = _YamlSession()
ws_buf = _LspFrameBuffer()
tcp_buf = _LspFrameBuffer()
async def client_to_server() -> None:
"""WS → TCP: track document content and completion requests."""
try:
async for msg in ws:
if msg.type not in (WSMsgType.BINARY, WSMsgType.TEXT):
continue
raw = msg.data if msg.type == WSMsgType.BINARY else msg.data.encode()
frames = ws_buf.feed(raw)
for frame in frames:
try:
parsed = json.loads(frame)
method = parsed.get("method", "")
if method in ("textDocument/didOpen", "textDocument/didChange"):
params = parsed.get("params", {})
uri = params.get("textDocument", {}).get("uri", "")
text = params.get("textDocument", {}).get("text") or ""
if not text:
# didChange has contentChanges
changes = params.get("contentChanges", [])
if changes:
text = changes[-1].get("text", "")
if uri and text:
session.update_doc(uri, text)
elif method == "textDocument/completion":
session.record_request(parsed)
except Exception:
pass
tcp_writer.write(_lsp_frame(frame))
await tcp_writer.drain()
except Exception:
pass
finally:
tcp_writer.close()
async def server_to_client() -> None:
"""TCP → WS: inject completions into completion responses."""
try:
while True:
data = await tcp_reader.read(_CHUNK)
if not data:
break
frames = tcp_buf.feed(data)
for frame in frames:
try:
parsed = json.loads(frame)
msg_id = parsed.get("id")
if msg_id is not None and "result" in parsed:
ctx = session.pop_context(msg_id)
modified = _inject_completions(parsed, ctx)
await ws.send_bytes(_lsp_frame(modified))
continue
except Exception:
pass
await ws.send_bytes(_lsp_frame(frame))
except Exception:
pass
finally:
await ws.close()
await asyncio.gather(client_to_server(), server_to_client(), return_exceptions=True)
return ws

File diff suppressed because it is too large Load Diff

View File

@@ -1,12 +1,16 @@
#!/usr/bin/env bash
# push_catalogs.sh — Push fresh Bicep + IAC source catalogs to autobox iLSP volume
# push_catalogs.sh — Push fresh catalogs to autobox iLSP volume
#
# Usage:
# ./scripts/push_catalogs.sh # push both catalogs
# ./scripts/push_catalogs.sh # push all catalogs
# ./scripts/push_catalogs.sh --no-reload # push but don't call /reload
#
# Catalogs are read from DevOpsMCP repo (generated by sync_bicep_modules.py / sync_iac_module_sources.py)
# and written to /opt/nomad/volumes/ilsp-data/ on autobox — the host volume iLSP mounts at /data.
# Catalogs:
# bicep_modules_catalog.json — from DevOpsMCP repo (sync_bicep_modules.py)
# iac_source_catalog.json — from DevOpsMCP repo (sync_iac_module_sources.py)
# pipeline_templates_catalog.json — from iLSP repo (scripts/sync_pipeline_templates.py)
#
# All files are written to /opt/nomad/volumes/ilsp-data/ on autobox — the host volume iLSP mounts at /data.
set -euo pipefail
@@ -24,6 +28,8 @@ done
# Catalog files to push
BICEP_CATALOG="$DEVOPS_MCP_REPO/bicep_modules_catalog.json"
IAC_CATALOG="$DEVOPS_MCP_REPO/iac_source_catalog.json"
ILSP_REPO="$(cd "$(dirname "$0")/.." && pwd)"
TMPL_CATALOG="$ILSP_REPO/pipeline_templates_catalog.json"
echo "── iLSP catalog push ──────────────────────────────"
@@ -36,10 +42,17 @@ for f in "$BICEP_CATALOG" "$IAC_CATALOG"; do
echo "$(basename "$f") ($(du -sh "$f" | cut -f1))"
done
if [[ ! -f "$TMPL_CATALOG" ]]; then
echo " ✗ Not found: $TMPL_CATALOG"
echo " Run: python3 $ILSP_REPO/scripts/sync_pipeline_templates.py"
exit 1
fi
echo "$(basename "$TMPL_CATALOG") ($(du -sh "$TMPL_CATALOG" | cut -f1))"
echo ""
echo " → Copying to $AUTOBOX:$REMOTE_DIR/ …"
ssh "$AUTOBOX" "mkdir -p $REMOTE_DIR"
scp "$BICEP_CATALOG" "$IAC_CATALOG" "$AUTOBOX:$REMOTE_DIR/"
scp "$BICEP_CATALOG" "$IAC_CATALOG" "$TMPL_CATALOG" "$AUTOBOX:$REMOTE_DIR/"
echo " ✓ Upload done"
if [[ "$NO_RELOAD" == "true" ]]; then
@@ -58,4 +71,4 @@ else
fi
echo ""
echo " Done. Bicep completions updated."
echo " Done. Bicep + YAML pipeline template completions updated."

View File

@@ -0,0 +1,241 @@
#!/usr/bin/env python3
"""
sync_pipeline_templates.py — Build pipeline_templates_catalog.json from:
- AzDO template repos (parameters: list format)
- GitHub Actions reusable workflows (on.workflow_call.inputs format)
Usage:
python3 scripts/sync_pipeline_templates.py # scan both formats
python3 scripts/sync_pipeline_templates.py --mode azdo # AzDO only
python3 scripts/sync_pipeline_templates.py --mode gha # GHA only
python3 scripts/sync_pipeline_templates.py --dry-run # preview, no write
python3 scripts/sync_pipeline_templates.py --output /path/to/catalog.json
"""
import argparse
import json
import logging
import os
import pathlib
import sys
from datetime import datetime, timezone
from typing import Any
import yaml
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
log = logging.getLogger(__name__)
_REPO_ROOT = pathlib.Path(__file__).parent.parent
_SOURCES_FILE = pathlib.Path(__file__).parent / "template_sources.yml"
_DEFAULT_OUTPUT = _REPO_ROOT / "pipeline_templates_catalog.json"
# ── AzDO template scanner ────────────────────────────────────────────────────
def _parse_azdo_params(raw: dict[str, Any]) -> list[dict[str, Any]]:
"""Extract parameter definitions from an AzDO template dict."""
params = raw.get("parameters", [])
if not isinstance(params, list):
return []
result = []
for p in params:
if not isinstance(p, dict) or "name" not in p:
continue
entry: dict[str, Any] = {
"name": p["name"],
"type": p.get("type", "string"),
"required": "default" not in p,
}
if "default" in p:
entry["default"] = p["default"]
allowed = p.get("values", [])
if isinstance(allowed, list) and allowed:
entry["allowed"] = [str(v) for v in allowed]
if "displayName" in p:
entry["description"] = p["displayName"]
result.append(entry)
return result
def scan_azdo_source(config: dict[str, Any]) -> dict[str, dict[str, Any]]:
"""Scan an AzDO template directory. Returns {key: template_entry}."""
alias = config["alias"]
base = pathlib.Path(config["local_path"]).expanduser()
if not base.exists():
log.warning("AzDO source '%s' not found at %s — skipping", alias, base)
return {}
scan_dirs = config.get("scan_dirs", [])
extensions = set(config.get("extensions", [".yaml", ".yml"]))
if scan_dirs:
candidates = []
for d in scan_dirs:
candidates.extend((base / d).rglob("*"))
else:
candidates = list(base.rglob("*"))
results: dict[str, dict[str, Any]] = {}
for fpath in candidates:
if fpath.suffix not in extensions or not fpath.is_file():
continue
try:
raw = yaml.safe_load(fpath.read_text(encoding="utf-8"))
except Exception as exc:
log.debug("Cannot parse %s: %s", fpath, exc)
continue
if not isinstance(raw, dict):
continue
params = _parse_azdo_params(raw)
if not params:
continue # Not a template file (no parameters block)
rel = fpath.relative_to(base).as_posix()
key = f"{rel}@{alias}"
results[key] = {
"format": "azdo",
"title": fpath.stem,
"path": rel,
"alias": alias,
"parameters": params,
}
log.info("AzDO '%s': %d templates found", alias, len(results))
return results
# ── GHA reusable workflow scanner ────────────────────────────────────────────
def _parse_gha_inputs(raw: dict[str, Any]) -> list[dict[str, Any]]:
"""Extract workflow_call.inputs from a GHA workflow dict."""
on = raw.get("on") or raw.get(True) # 'on' is a YAML bool alias
if not isinstance(on, dict):
return []
wc = on.get("workflow_call", {})
if not isinstance(wc, dict):
return []
inputs = wc.get("inputs", {})
if not isinstance(inputs, dict):
return []
result = []
for name, meta in inputs.items():
if not isinstance(meta, dict):
meta = {}
entry: dict[str, Any] = {
"name": name,
"type": meta.get("type", "string"),
"required": meta.get("required", False),
}
if "default" in meta:
entry["default"] = meta["default"]
if "description" in meta:
entry["description"] = meta["description"]
# GHA doesn't have allowed values natively — skip
result.append(entry)
return result
def scan_gha_source(config: dict[str, Any]) -> dict[str, dict[str, Any]]:
"""Scan GitHub Actions repos for reusable workflows. Returns {key: template_entry}."""
org = config["org"]
base = pathlib.Path(config["local_base"]).expanduser()
default_ref = config.get("default_ref", "main")
repos_filter = config.get("repos", [])
if not base.exists():
log.warning("GHA base '%s' not found at %s — skipping", org, base)
return {}
results: dict[str, dict[str, Any]] = {}
repos = [base / r for r in repos_filter] if repos_filter else [p for p in base.iterdir() if p.is_dir()]
for repo_path in repos:
wf_dir = repo_path / ".github" / "workflows"
if not wf_dir.is_dir():
continue
repo_name = repo_path.name
for fpath in wf_dir.glob("*.yml"):
try:
raw = yaml.safe_load(fpath.read_text(encoding="utf-8"))
except Exception as exc:
log.debug("Cannot parse %s: %s", fpath, exc)
continue
if not isinstance(raw, dict):
continue
params = _parse_gha_inputs(raw)
if not params:
continue # Not a reusable workflow
rel_wf = f".github/workflows/{fpath.name}"
key = f"{org}/{repo_name}/{rel_wf}@{default_ref}"
results[key] = {
"format": "gha",
"title": fpath.stem,
"org": org,
"repo": repo_name,
"path": rel_wf,
"ref": default_ref,
"parameters": params,
}
log.info("GHA '%s': %d reusable workflows found", org, len(results))
return results
# ── Main ─────────────────────────────────────────────────────────────────────
def build_catalog(
sources_file: pathlib.Path,
mode: str | None,
) -> dict[str, Any]:
config = yaml.safe_load(sources_file.read_text(encoding="utf-8"))
templates: dict[str, dict[str, Any]] = {}
if mode in (None, "azdo"):
for src in config.get("sources", {}).get("azdo", []):
templates.update(scan_azdo_source(src))
if mode in (None, "gha"):
for src in config.get("sources", {}).get("gha", []):
templates.update(scan_gha_source(src))
return {
"synced_at": datetime.now(timezone.utc).isoformat(),
"template_count": len(templates),
"templates": templates,
}
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--mode", choices=["azdo", "gha"], default=None, help="Only scan one format")
parser.add_argument("--dry-run", action="store_true", help="Print summary without writing")
parser.add_argument("--output", default=str(_DEFAULT_OUTPUT), help="Output JSON file")
parser.add_argument("--sources", default=str(_SOURCES_FILE), help="Sources YAML config")
args = parser.parse_args()
sources_file = pathlib.Path(args.sources)
if not sources_file.exists():
log.error("Sources file not found: %s", sources_file)
sys.exit(1)
catalog = build_catalog(sources_file, args.mode)
if args.dry_run:
print(f"\n── Pipeline template catalog (dry-run) ──")
print(f" Templates found: {catalog['template_count']}")
for key, tmpl in catalog["templates"].items():
nparams = len(tmpl["parameters"])
required = sum(1 for p in tmpl["parameters"] if p.get("required"))
print(f" [{tmpl['format'].upper()}] {key} ({nparams} params, {required} required)")
return
out = pathlib.Path(args.output)
out.write_text(json.dumps(catalog, indent=2, ensure_ascii=False), encoding="utf-8")
log.info("Written: %s (%d templates)", out, catalog["template_count"])
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,26 @@
# Pipeline template sources for YAML autocomplete
# Consumed by scripts/sync_pipeline_templates.py
sources:
# AzDO template repos — referenced via @alias in pipeline YAML
azdo:
- alias: pipeline-templates
local_path: ~/IdeaProjects/Bitbucket/Drift/pipeline-templates
# Relative subpaths to scan (empty = entire repo)
scan_dirs:
- tasks
- stages
- jobs
- variables
# File extensions to scan
extensions: [.yaml, .yml]
# GitHub Actions reusable workflows — referenced via uses: org/repo/.github/workflows/file@ref
gha:
- org: LRU-Digital
local_base: ~/IdeaProjects/GitHub/LRU-Digital
# Only scan repos that have reusable workflows (workflow_call trigger)
# Leave empty to scan all repos under local_base
repos: []
default_ref: main

149
tests/test_yaml_catalog.py Normal file
View File

@@ -0,0 +1,149 @@
"""Tests for PipelineTemplateCatalog — loading and completion items."""
import json
import pathlib
import tempfile
import pytest
from ilsp.yaml_lsp.catalog import PipelineTemplateCatalog
_SAMPLE_CATALOG = {
"synced_at": "2025-01-01T00:00:00+00:00",
"template_count": 3,
"templates": {
"tasks/k8s/deploy.yaml@pipeline-templates": {
"format": "azdo",
"title": "deploy",
"path": "tasks/k8s/deploy.yaml",
"alias": "pipeline-templates",
"parameters": [
{"name": "projectName", "type": "string", "required": True},
{"name": "environment", "type": "string", "required": True,
"allowed": ["dev", "test", "prod"]},
{"name": "imageTag", "type": "string", "required": False, "default": "latest"},
],
},
"tasks/node/build-and-publish-nextjs.yml@pipeline-templates": {
"format": "azdo",
"title": "build-and-publish-nextjs",
"path": "tasks/node/build-and-publish-nextjs.yml",
"alias": "pipeline-templates",
"parameters": [
{"name": "buildScriptName", "type": "string", "required": True},
{"name": "npmExecuteable", "type": "string", "required": False,
"default": "npm", "allowed": ["npm", "pnpm"]},
],
},
"LRU-Digital/CommonLoginTestApplication/.github/workflows/k8s.yml@main": {
"format": "gha",
"title": "k8s",
"org": "LRU-Digital",
"repo": "CommonLoginTestApplication",
"path": ".github/workflows/k8s.yml",
"ref": "main",
"parameters": [
{"name": "environment", "type": "string", "required": True},
{"name": "project-name", "type": "string", "required": True},
{"name": "short-name", "type": "string", "required": True},
{"name": "acr-name", "type": "string", "required": False, "default": "lruk8s"},
],
},
},
}
@pytest.fixture(autouse=True)
def load_catalog(tmp_path, monkeypatch):
"""Write a temp catalog and load it."""
catalog_file = tmp_path / "pipeline_templates_catalog.json"
catalog_file.write_text(json.dumps(_SAMPLE_CATALOG), encoding="utf-8")
import ilsp.yaml_lsp.catalog as mod
original_paths = mod._CATALOG_PATHS
monkeypatch.setattr(mod, "_CATALOG_PATHS", [catalog_file])
PipelineTemplateCatalog.load()
yield
monkeypatch.setattr(mod, "_CATALOG_PATHS", original_paths)
class TestCatalogLoad:
def test_template_count(self):
assert PipelineTemplateCatalog.template_count() == 3
def test_get_template_exists(self):
tmpl = PipelineTemplateCatalog.get_template("tasks/k8s/deploy.yaml@pipeline-templates")
assert tmpl is not None
assert tmpl["format"] == "azdo"
def test_get_template_missing(self):
assert PipelineTemplateCatalog.get_template("nonexistent@alias") is None
class TestAzdoCompletions:
def test_azdo_template_keys(self):
items = PipelineTemplateCatalog.azdo_template_completion_items()
labels = [i["label"] for i in items]
assert "tasks/k8s/deploy.yaml" in labels
assert "tasks/node/build-and-publish-nextjs.yml" in labels
# GHA not included
assert not any("github/workflows" in l for l in labels)
def test_azdo_param_names(self):
items = PipelineTemplateCatalog.azdo_param_completion_items(
"tasks/k8s/deploy.yaml@pipeline-templates"
)
names = [i["filterText"] for i in items]
assert "projectName" in names
assert "environment" in names
assert "imageTag" in names
def test_required_marked_in_label(self):
items = PipelineTemplateCatalog.azdo_param_completion_items(
"tasks/k8s/deploy.yaml@pipeline-templates"
)
by_name = {i["filterText"]: i for i in items}
assert by_name["projectName"]["label"] == "projectName*"
assert by_name["imageTag"]["label"] == "imageTag"
def test_azdo_allowed_values(self):
items = PipelineTemplateCatalog.azdo_param_value_items(
"tasks/k8s/deploy.yaml@pipeline-templates",
"environment",
)
vals = [i["label"] for i in items]
assert vals == ["dev", "test", "prod"]
def test_azdo_no_allowed_values_returns_empty(self):
items = PipelineTemplateCatalog.azdo_param_value_items(
"tasks/k8s/deploy.yaml@pipeline-templates",
"projectName",
)
assert items == []
def test_azdo_unknown_template_param_names_empty(self):
assert PipelineTemplateCatalog.azdo_param_completion_items("no@such") == []
class TestGhaCompletions:
def test_gha_workflow_refs(self):
items = PipelineTemplateCatalog.gha_workflow_completion_items()
labels = [i["label"] for i in items]
assert any("CommonLoginTestApplication" in l for l in labels)
# AzDO not included
assert not any("@pipeline-templates" in l for l in labels)
def test_gha_input_names(self):
key = "LRU-Digital/CommonLoginTestApplication/.github/workflows/k8s.yml@main"
items = PipelineTemplateCatalog.gha_input_completion_items(key)
names = [i["filterText"] for i in items]
assert "environment" in names
assert "project-name" in names
assert "acr-name" in names
def test_gha_required_marked(self):
key = "LRU-Digital/CommonLoginTestApplication/.github/workflows/k8s.yml@main"
items = PipelineTemplateCatalog.gha_input_completion_items(key)
by_name = {i["filterText"]: i for i in items}
assert by_name["environment"]["label"] == "environment*"
assert by_name["acr-name"]["label"] == "acr-name"

232
tests/test_yaml_proxy.py Normal file
View File

@@ -0,0 +1,232 @@
"""Tests for YAML LSP proxy — document format detection and context detection."""
import json
import pytest
from ilsp.yaml_lsp.proxy import (
_detect_doc_format,
_detect_azdo_context,
_detect_gha_context,
_inject_completions,
_LspFrameBuffer,
)
from ilsp.yaml_lsp.catalog import PipelineTemplateCatalog
# ── LspFrameBuffer ─────────────────────────────────────────────────────────────
class TestLspFrameBuffer:
def test_single_frame(self):
body = b'{"jsonrpc":"2.0"}'
framed = f"Content-Length: {len(body)}\r\n\r\n".encode() + body
buf = _LspFrameBuffer()
frames = buf.feed(framed)
assert frames == [body]
def test_two_frames_in_one_feed(self):
body1 = b'{"a":1}'
body2 = b'{"b":2}'
def frame(b): return f"Content-Length: {len(b)}\r\n\r\n".encode() + b
buf = _LspFrameBuffer()
frames = buf.feed(frame(body1) + frame(body2))
assert frames == [body1, body2]
def test_partial_frame_buffered(self):
body = b'{"partial":true}'
header = f"Content-Length: {len(body)}\r\n\r\n".encode()
buf = _LspFrameBuffer()
assert buf.feed(header + body[:3]) == []
frames = buf.feed(body[3:])
assert frames == [body]
# ── Document format detection ──────────────────────────────────────────────────
class TestDocFormatDetection:
def test_azdo_template_line(self):
lines = [
"trigger: [main]",
"stages:",
" - stage: Build",
" jobs:",
" - template: tasks/k8s/deploy.yaml@pipeline-templates",
]
assert _detect_doc_format(lines) == "azdo"
def test_azdo_pipeline_keywords(self):
lines = ["trigger: [main]", "stages:", " - stage: Build"]
assert _detect_doc_format(lines) == "azdo"
def test_gha_workflow_call(self):
lines = [
"on:",
" workflow_call:",
" inputs:",
" environment:",
" type: string",
]
assert _detect_doc_format(lines) == "gha"
def test_gha_runs_on(self):
lines = [
"on: [push]",
"jobs:",
" build:",
" runs-on: ubuntu-latest",
]
assert _detect_doc_format(lines) == "gha"
def test_unknown_empty(self):
assert _detect_doc_format([]) == "unknown"
# ── AzDO context detection ────────────────────────────────────────────────────
class TestAzdoContextDetection:
def test_template_path_context(self):
lines = [
" - template: tasks/k8s/",
]
ctx = _detect_azdo_context(lines, 0, len(lines[0]))
assert ctx["type"] == "template_path"
assert ctx["format"] == "azdo"
def test_param_name_context(self):
lines = [
" - template: tasks/k8s/deploy.yaml@pipeline-templates",
" parameters:",
" projectName: ",
" ",
]
# Cursor on line 3 (blank), after parameters block
ctx = _detect_azdo_context(lines, 3, 10)
# Should detect we're in a parameters block under a template
assert ctx["format"] == "azdo"
# type is param_name (we're inside parameters block)
assert ctx["type"] in ("param_name", "unknown")
def test_template_path_prefix_extracted(self):
lines = [" - template: tasks/k8s/dep"]
ctx = _detect_azdo_context(lines, 0, len(lines[0]))
assert ctx["type"] == "template_path"
assert "dep" in ctx["prefix"]
# ── GHA context detection ──────────────────────────────────────────────────────
class TestGhaContextDetection:
def test_workflow_ref_context(self):
lines = [
" - name: Deploy",
" uses: LRU-Digital/CommonLoginTestApplication/.github/workflows/k8s.yml@",
]
ctx = _detect_gha_context(lines, 1, len(lines[1]))
assert ctx["type"] == "workflow_ref"
assert ctx["format"] == "gha"
def test_input_name_context(self):
lines = [
" uses: LRU-Digital/CommonLoginTestApplication/.github/workflows/k8s.yml@main",
" with:",
" ",
]
ctx = _detect_gha_context(lines, 2, 8)
assert ctx["format"] == "gha"
assert ctx["type"] in ("input_name", "unknown")
def test_workflow_ref_prefix_extracted(self):
lines = [" uses: LRU-Digital/Repo/.github/workflows/dep"]
ctx = _detect_gha_context(lines, 0, len(lines[0]))
assert ctx["type"] == "workflow_ref"
assert "dep" in ctx["prefix"]
# ── Completion injection ───────────────────────────────────────────────────────
_SAMPLE_CATALOG_DATA = {
"synced_at": "2025-01-01T00:00:00+00:00",
"template_count": 1,
"templates": {
"tasks/k8s/deploy.yaml@pipeline-templates": {
"format": "azdo",
"title": "deploy",
"path": "tasks/k8s/deploy.yaml",
"alias": "pipeline-templates",
"parameters": [
{"name": "projectName", "type": "string", "required": True},
{"name": "environment", "type": "string", "required": True,
"allowed": ["dev", "test", "prod"]},
],
},
},
}
@pytest.fixture(autouse=True)
def load_catalog(tmp_path, monkeypatch):
import json as _json
f = tmp_path / "pipeline_templates_catalog.json"
f.write_text(_json.dumps(_SAMPLE_CATALOG_DATA))
import ilsp.yaml_lsp.catalog as mod
monkeypatch.setattr(mod, "_CATALOG_PATHS", [f])
PipelineTemplateCatalog.load()
class TestInjection:
def test_azdo_template_path_injects_at_top(self):
msg = {"jsonrpc": "2.0", "id": 1, "result": {"items": [{"label": "existing"}]}}
ctx = {"type": "template_path", "format": "azdo", "prefix": ""}
modified = json.loads(_inject_completions(msg, ctx))
labels = [i["label"] for i in modified["result"]["items"]]
assert "tasks/k8s/deploy.yaml" in labels
# LRU items appear before standard items
lru_idx = next(i for i, l in enumerate(labels) if "deploy.yaml" in l)
existing_idx = next(i for i, l in enumerate(labels) if l == "existing")
assert lru_idx < existing_idx
def test_azdo_param_names_injected(self):
msg = {"jsonrpc": "2.0", "id": 2, "result": {"items": []}}
ctx = {
"type": "param_name", "format": "azdo",
"template_key": "tasks/k8s/deploy.yaml@pipeline-templates",
}
modified = json.loads(_inject_completions(msg, ctx))
names = [i.get("filterText", i["label"]) for i in modified["result"]["items"]]
assert "projectName" in names
assert "environment" in names
def test_azdo_allowed_values_injected(self):
msg = {"jsonrpc": "2.0", "id": 3, "result": {"items": []}}
ctx = {
"type": "param_value", "format": "azdo",
"template_key": "tasks/k8s/deploy.yaml@pipeline-templates",
"param": "environment",
}
modified = json.loads(_inject_completions(msg, ctx))
vals = [i["label"] for i in modified["result"]["items"]]
assert "dev" in vals
assert "test" in vals
assert "prod" in vals
def test_unknown_context_no_injection(self):
msg = {"jsonrpc": "2.0", "id": 4, "result": {"items": [{"label": "x"}]}}
ctx = {"type": "unknown", "format": "unknown"}
modified = json.loads(_inject_completions(msg, ctx))
assert modified["result"]["items"] == [{"label": "x"}]
def test_no_result_passthrough(self):
msg = {"jsonrpc": "2.0", "id": 5}
ctx = {"type": "template_path", "format": "azdo"}
modified = json.loads(_inject_completions(msg, ctx))
assert modified == msg
def test_existing_items_downgraded(self):
msg = {
"jsonrpc": "2.0", "id": 6,
"result": {"items": [{"label": "standard", "sortText": "aaa"}]},
}
ctx = {"type": "template_path", "format": "azdo", "prefix": ""}
modified = json.loads(_inject_completions(msg, ctx))
items = modified["result"]["items"]
std = next(i for i in items if i["label"] == "standard")
assert std["sortText"].startswith("9_")