feat: context-aware Bicep completions (version + param injection)
Some checks failed
Build and Deploy iLSP / test (push) Failing after 27s
Build and Deploy iLSP / build-and-deploy (push) Has been skipped

- ProxySession tracks open documents per TCP connection
- _detect_context() identifies version, param, and module_path contexts
- version context: autocomplete versions for 'br/modules:NAME:' cursor positions
- param context: autocomplete params for specific module+version (with version fallback)
- modules.py: added get_module_by_name(), version_completion_items(), param_completion_items()
- 28/28 tests passing
This commit is contained in:
Henrik Jess Nielsen
2026-05-10 15:04:11 +02:00
parent eafddb6f4a
commit 27947e4f7f
3 changed files with 481 additions and 31 deletions

View File

@@ -4,8 +4,10 @@ LRU Bicep module catalog — loaded from the bundled catalog file at startup.
The catalog (bicep_modules_catalog.json) is baked into the Docker image at build time.
No runtime dependency on DevOpsMCP or any external service.
Provides completion items for LRU-internal Bicep modules with
higher sort priority than standard Azure modules.
Provides completion items for:
- LRU module names (when typing a module reference string)
- Module versions per module (when cursor is after 'br/modules:NAME:')
- Module params per version (when cursor is inside a params {} block)
"""
import json
@@ -24,7 +26,7 @@ _CATALOG_PATHS = [
def _load_catalog() -> list[dict[str, Any]]:
"""Load modules from the bundled catalog file."""
"""Load modules from the bundled catalog file, preserving per-version schema."""
for path in _CATALOG_PATHS:
if path.exists():
try:
@@ -32,7 +34,6 @@ def _load_catalog() -> list[dict[str, Any]]:
modules_raw = data.get("modules", {})
registry = data.get("registry", "iactemplatereg.azurecr.io")
modules = []
# modules is a dict: { "bicep/modules/appservice": { versions: [...], ... }, ... }
for mod_path, info in modules_raw.items():
versions = info.get("versions", ["latest"])
name = mod_path.split("/")[-1] if "/" in mod_path else mod_path
@@ -42,6 +43,7 @@ def _load_catalog() -> list[dict[str, Any]]:
"versions": versions,
"latest": versions[-1] if versions else "latest",
"registry": registry,
"schema": info.get("schema", {}), # per-version → params
})
logger.info("Bicep catalog loaded from %s: %d modules", path, len(modules))
return modules
@@ -65,8 +67,16 @@ class BicepModuleCatalog:
def get_modules(cls) -> list[dict[str, Any]]:
return cls._modules
@classmethod
def get_module_by_name(cls, name: str) -> dict[str, Any] | None:
for mod in cls._modules:
if mod["name"] == name:
return mod
return None
@classmethod
def as_completion_items(cls) -> list[dict[str, Any]]:
"""Module name completions — shown when typing a module reference string."""
items = []
for mod in cls._modules:
ref = f"br/modules:{mod['path']}:{mod['latest']}"
@@ -88,3 +98,80 @@ class BicepModuleCatalog:
},
})
return items
@classmethod
def version_completion_items(cls, module_name: str) -> list[dict[str, Any]]:
"""Version completions for a specific module (newest first)."""
mod = cls.get_module_by_name(module_name)
if not mod:
return []
schema = mod.get("schema", {})
items = []
for ver in reversed(mod["versions"]):
ver_schema = schema.get(ver, {})
params = list(ver_schema.get("parameters", {}).keys())
param_summary = (
f"{len(params)} params: {', '.join(params[:3])}{'...' if len(params) > 3 else ''}"
if params else "no params"
)
items.append({
"label": ver,
"kind": 12, # Value
"detail": param_summary,
"insertText": ver,
"sortText": f"0_lru_ver_{ver}",
"documentation": {
"kind": "markdown",
"value": (
f"**{module_name} `{ver}`**\n\n"
+ (
"Params: " + ", ".join(f"`{p}`" for p in params)
if params else "_No params_"
)
),
},
})
return items
@classmethod
def param_completion_items(cls, module_name: str, version: str) -> list[dict[str, Any]]:
"""Param completions for a specific module+version combination."""
mod = cls.get_module_by_name(module_name)
if not mod:
return []
schema = mod.get("schema", {})
# Exact version match first, then fall back to closest available
ver_params = schema.get(version, {}).get("parameters", {})
if not ver_params:
for v in reversed(list(schema.keys())):
candidate = schema[v].get("parameters", {})
if candidate:
ver_params = candidate
logger.debug("Param fallback: %s %s%s", module_name, version, v)
break
items = []
for param_name, param_info in ver_params.items():
ptype = param_info.get("type", "any")
description = param_info.get("description", "").strip()
allowed = param_info.get("allowed", [])
doc_lines = [f"**{param_name}** (`{ptype}`)"]
if description:
doc_lines.append(f"\n{description}")
if allowed:
shown = allowed[:5]
more = f" + {len(allowed) - 5} more" if len(allowed) > 5 else ""
doc_lines.append(f"\nAllowed: {', '.join(f'`{a}`' for a in shown)}{more}")
items.append({
"label": param_name,
"kind": 5, # Field
"detail": ptype,
"insertText": f"{param_name}: ",
"sortText": f"0_lru_param_{param_name}",
"documentation": {
"kind": "markdown",
"value": "\n".join(doc_lines),
},
})
return items

View File

@@ -7,14 +7,21 @@ Architecture:
Uses subprocess.Popen + threads instead of asyncio subprocess — far more
reliable for stdin/stdout bridging of long-lived processes.
Intercepts textDocument/completion responses and injects LRU Bicep module
completions with higher sort priority (sortText "0_lru_...").
All other LSP messages are forwarded unchanged.
Intercepts:
- textDocument/didOpen + didChange → tracks document content per URI
- textDocument/completion requests → detects cursor context (module path / version / param)
- textDocument/completion responses → injects context-appropriate completions
Context-aware injection:
- Cursor in module path string → inject LRU module names
- Cursor after 'br/modules:NAME: → inject version suggestions for that module
- Cursor inside params {} block → inject param suggestions for that module+version
"""
import json
import logging
import os
import re
import socket
import subprocess
import threading
@@ -29,6 +36,8 @@ BICEP_LS_PATH = os.getenv(
"/opt/bicep-langserver/Bicep.LangServer.dll",
)
# ── LSP framing helpers ────────────────────────────────────────────────────────
def _read_message(fileobj) -> bytes:
"""Read one LSP Content-Length framed message from a file-like object."""
@@ -51,8 +60,100 @@ def _frame(body: bytes) -> bytes:
return f"Content-Length: {len(body)}\r\n\r\n".encode() + body
def _inject_completions(msg: dict[str, Any]) -> bytes:
"""Inject LRU modules into completion responses."""
# ── Per-connection session state ───────────────────────────────────────────────
class _ProxySession:
"""
Tracks document content and pending completion requests for one editor session.
Thread safety: CPython GIL makes individual dict reads/writes atomic.
One thread writes to docs/pending (client_to_ls); one thread reads (ls_to_client).
"""
def __init__(self) -> None:
self.docs: dict[str, list[str]] = {} # uri → lines
self.pending: dict = {} # request_id → context dict
def update_doc(self, uri: str, text: str) -> None:
self.docs[uri] = text.splitlines()
def record_completion_request(self, msg: dict) -> None:
req_id = msg.get("id")
if req_id is None:
return
params = msg.get("params", {})
uri = params.get("textDocument", {}).get("uri", "")
position = params.get("position", {})
self.pending[req_id] = self._detect_context(uri, position)
def _detect_context(self, uri: str, position: dict) -> dict:
"""Determine what kind of completion is being requested based on cursor position."""
lines = self.docs.get(uri, [])
line_idx = position.get("line", 0)
char_idx = position.get("character", 0)
if not lines or line_idx >= len(lines):
return {"type": "unknown"}
# Text on the current line up to the cursor
current = lines[line_idx][:char_idx]
# 1. Version context: cursor is after 'br/modules:NAME:
m = re.search(r"'br/modules:([^:'\s]+):([^'\s]*)$", current)
if m:
return {"type": "version", "module": m.group(1), "prefix": m.group(2)}
# 2. Module path context: cursor is inside 'br/modules: (no version colon yet)
m = re.search(r"'br/modules:([^:'\s]*)$", current)
if m:
return {"type": "module_path", "prefix": m.group(1)}
# 3. Params context: walk up to find enclosing module declaration
lookback_start = max(0, line_idx - 60)
context_lines = lines[lookback_start : line_idx + 1]
context_lines = list(context_lines)
context_lines[-1] = context_lines[-1][:char_idx]
context_text = "\n".join(context_lines)
# Find the last module declaration in the lookback window
mod_matches = list(
re.finditer(r"module\s+\w+\s+'br/modules:([^:]+):([^']+)'", context_text)
)
if mod_matches:
last_mod = mod_matches[-1]
text_after_mod = context_text[last_mod.start():]
params_m = re.search(r"\bparams\s*:\s*\{", text_after_mod)
if params_m:
text_in_params = text_after_mod[params_m.start():]
if text_in_params.count("{") > text_in_params.count("}"):
return {
"type": "param",
"module": last_mod.group(1),
"version": last_mod.group(2),
}
return {"type": "unknown"}
def pop_context(self, msg_id) -> dict:
return self.pending.pop(msg_id, {"type": "unknown"})
# ── Completion injection ───────────────────────────────────────────────────────
def _inject_completions(msg: dict[str, Any], context: dict | None = None) -> bytes:
"""
Inject LRU-aware completions into completion responses.
Behaviour depends on context type:
- 'version' → inject version suggestions for the named module
- 'param' → inject param suggestions for module+version
- 'module_path' / 'unknown' / None → inject module name suggestions (legacy)
"""
if context is None:
context = {}
result = msg.get("result")
if result is None:
return json.dumps(msg).encode()
@@ -66,7 +167,18 @@ def _inject_completions(msg: dict[str, Any]) -> bytes:
if items is None:
return json.dumps(msg).encode()
lru_items = BicepModuleCatalog.as_completion_items()
ctx_type = context.get("type", "unknown")
if ctx_type == "version":
lru_items = BicepModuleCatalog.version_completion_items(context["module"])
elif ctx_type == "param":
lru_items = BicepModuleCatalog.param_completion_items(
context["module"], context["version"]
)
else:
# Default: module name completions
lru_items = BicepModuleCatalog.as_completion_items()
if lru_items:
for item in items:
st = item.get("sortText", item.get("label", ""))
@@ -83,11 +195,32 @@ def _inject_completions(msg: dict[str, Any]) -> bytes:
def _client_to_ls(
conn_file,
proc_stdin,
session: _ProxySession,
) -> None:
try:
while True:
body = _read_message(conn_file)
logger.debug("Client→LS: %d bytes", len(body))
# Track document state and completion context (never block forwarding)
try:
msg = json.loads(body)
method = msg.get("method", "")
if method == "textDocument/didOpen":
text_doc = msg.get("params", {}).get("textDocument", {})
uri, text = text_doc.get("uri", ""), text_doc.get("text", "")
if uri:
session.update_doc(uri, text or "")
elif method == "textDocument/didChange":
uri = msg.get("params", {}).get("textDocument", {}).get("uri", "")
changes = msg.get("params", {}).get("contentChanges", [])
if uri and changes:
session.update_doc(uri, changes[-1].get("text", ""))
elif method == "textDocument/completion":
session.record_completion_request(msg)
except Exception:
pass # parsing errors must never block forwarding
framed = _frame(body)
proc_stdin.write(framed)
proc_stdin.flush()
@@ -97,8 +230,6 @@ def _client_to_ls(
except Exception as exc:
logger.debug("Client→LS error: %s", exc)
finally:
# Half-close: tell the LS that no more input is coming.
# The LS may still send responses, so we don't kill it here.
try:
proc_stdin.close()
except Exception:
@@ -108,13 +239,18 @@ def _client_to_ls(
def _ls_to_client(
proc_stdout,
conn: socket.socket,
session: _ProxySession,
) -> None:
try:
while True:
body = _read_message(proc_stdout)
logger.debug("LS→Client: %d bytes", len(body))
try:
out = _inject_completions(json.loads(body))
msg = json.loads(body)
context: dict = {}
if "id" in msg and "result" in msg:
context = session.pop_context(msg["id"])
out = _inject_completions(msg, context)
except json.JSONDecodeError:
out = body
conn.sendall(_frame(out))
@@ -126,6 +262,7 @@ def _ls_to_client(
def _handle_client(conn: socket.socket, addr: tuple) -> None:
logger.info("New Bicep client: %s", addr)
session = _ProxySession()
proc = subprocess.Popen(
["dotnet", BICEP_LS_PATH, "--stdio"],
stdin=subprocess.PIPE,
@@ -140,13 +277,13 @@ def _handle_client(conn: socket.socket, addr: tuple) -> None:
# t1: client → LS (finishes when client closes write side)
t1 = threading.Thread(
target=_client_to_ls,
args=(conn_file, proc.stdin),
args=(conn_file, proc.stdin, session),
daemon=True,
)
# t2: LS → client (finishes when LS closes stdout)
t2 = threading.Thread(
target=_ls_to_client,
args=(proc.stdout, conn),
args=(proc.stdout, conn, session),
daemon=True,
)
t1.start()