From 27947e4f7f0000d1511ac64c4ce6fbc8bd8a4f25 Mon Sep 17 00:00:00 2001 From: Henrik Jess Nielsen Date: Sun, 10 May 2026 15:04:11 +0200 Subject: [PATCH] feat: context-aware Bicep completions (version + param injection) - ProxySession tracks open documents per TCP connection - _detect_context() identifies version, param, and module_path contexts - version context: autocomplete versions for 'br/modules:NAME:' cursor positions - param context: autocomplete params for specific module+version (with version fallback) - modules.py: added get_module_by_name(), version_completion_items(), param_completion_items() - 28/28 tests passing --- ilsp/bicep_lsp/modules.py | 95 +++++++++++++- ilsp/bicep_lsp/proxy.py | 159 +++++++++++++++++++++-- tests/test_proxy.py | 258 +++++++++++++++++++++++++++++++++++--- 3 files changed, 481 insertions(+), 31 deletions(-) diff --git a/ilsp/bicep_lsp/modules.py b/ilsp/bicep_lsp/modules.py index 605ba2c..e08781d 100644 --- a/ilsp/bicep_lsp/modules.py +++ b/ilsp/bicep_lsp/modules.py @@ -4,8 +4,10 @@ LRU Bicep module catalog — loaded from the bundled catalog file at startup. The catalog (bicep_modules_catalog.json) is baked into the Docker image at build time. No runtime dependency on DevOpsMCP or any external service. -Provides completion items for LRU-internal Bicep modules with -higher sort priority than standard Azure modules. +Provides completion items for: +- LRU module names (when typing a module reference string) +- Module versions per module (when cursor is after 'br/modules:NAME:') +- Module params per version (when cursor is inside a params {} block) """ import json @@ -24,7 +26,7 @@ _CATALOG_PATHS = [ def _load_catalog() -> list[dict[str, Any]]: - """Load modules from the bundled catalog file.""" + """Load modules from the bundled catalog file, preserving per-version schema.""" for path in _CATALOG_PATHS: if path.exists(): try: @@ -32,7 +34,6 @@ def _load_catalog() -> list[dict[str, Any]]: modules_raw = data.get("modules", {}) registry = data.get("registry", "iactemplatereg.azurecr.io") modules = [] - # modules is a dict: { "bicep/modules/appservice": { versions: [...], ... }, ... } for mod_path, info in modules_raw.items(): versions = info.get("versions", ["latest"]) name = mod_path.split("/")[-1] if "/" in mod_path else mod_path @@ -42,6 +43,7 @@ def _load_catalog() -> list[dict[str, Any]]: "versions": versions, "latest": versions[-1] if versions else "latest", "registry": registry, + "schema": info.get("schema", {}), # per-version → params }) logger.info("Bicep catalog loaded from %s: %d modules", path, len(modules)) return modules @@ -65,8 +67,16 @@ class BicepModuleCatalog: def get_modules(cls) -> list[dict[str, Any]]: return cls._modules + @classmethod + def get_module_by_name(cls, name: str) -> dict[str, Any] | None: + for mod in cls._modules: + if mod["name"] == name: + return mod + return None + @classmethod def as_completion_items(cls) -> list[dict[str, Any]]: + """Module name completions — shown when typing a module reference string.""" items = [] for mod in cls._modules: ref = f"br/modules:{mod['path']}:{mod['latest']}" @@ -88,3 +98,80 @@ class BicepModuleCatalog: }, }) return items + + @classmethod + def version_completion_items(cls, module_name: str) -> list[dict[str, Any]]: + """Version completions for a specific module (newest first).""" + mod = cls.get_module_by_name(module_name) + if not mod: + return [] + schema = mod.get("schema", {}) + items = [] + for ver in reversed(mod["versions"]): + ver_schema = schema.get(ver, {}) + params = list(ver_schema.get("parameters", {}).keys()) + param_summary = ( + f"{len(params)} params: {', '.join(params[:3])}{'...' if len(params) > 3 else ''}" + if params else "no params" + ) + items.append({ + "label": ver, + "kind": 12, # Value + "detail": param_summary, + "insertText": ver, + "sortText": f"0_lru_ver_{ver}", + "documentation": { + "kind": "markdown", + "value": ( + f"**{module_name} `{ver}`**\n\n" + + ( + "Params: " + ", ".join(f"`{p}`" for p in params) + if params else "_No params_" + ) + ), + }, + }) + return items + + @classmethod + def param_completion_items(cls, module_name: str, version: str) -> list[dict[str, Any]]: + """Param completions for a specific module+version combination.""" + mod = cls.get_module_by_name(module_name) + if not mod: + return [] + schema = mod.get("schema", {}) + + # Exact version match first, then fall back to closest available + ver_params = schema.get(version, {}).get("parameters", {}) + if not ver_params: + for v in reversed(list(schema.keys())): + candidate = schema[v].get("parameters", {}) + if candidate: + ver_params = candidate + logger.debug("Param fallback: %s %s→%s", module_name, version, v) + break + + items = [] + for param_name, param_info in ver_params.items(): + ptype = param_info.get("type", "any") + description = param_info.get("description", "").strip() + allowed = param_info.get("allowed", []) + doc_lines = [f"**{param_name}** (`{ptype}`)"] + if description: + doc_lines.append(f"\n{description}") + if allowed: + shown = allowed[:5] + more = f" + {len(allowed) - 5} more" if len(allowed) > 5 else "" + doc_lines.append(f"\nAllowed: {', '.join(f'`{a}`' for a in shown)}{more}") + items.append({ + "label": param_name, + "kind": 5, # Field + "detail": ptype, + "insertText": f"{param_name}: ", + "sortText": f"0_lru_param_{param_name}", + "documentation": { + "kind": "markdown", + "value": "\n".join(doc_lines), + }, + }) + return items diff --git a/ilsp/bicep_lsp/proxy.py b/ilsp/bicep_lsp/proxy.py index ec290d3..8b295f5 100644 --- a/ilsp/bicep_lsp/proxy.py +++ b/ilsp/bicep_lsp/proxy.py @@ -7,14 +7,21 @@ Architecture: Uses subprocess.Popen + threads instead of asyncio subprocess — far more reliable for stdin/stdout bridging of long-lived processes. -Intercepts textDocument/completion responses and injects LRU Bicep module -completions with higher sort priority (sortText "0_lru_..."). -All other LSP messages are forwarded unchanged. +Intercepts: +- textDocument/didOpen + didChange → tracks document content per URI +- textDocument/completion requests → detects cursor context (module path / version / param) +- textDocument/completion responses → injects context-appropriate completions + +Context-aware injection: +- Cursor in module path string → inject LRU module names +- Cursor after 'br/modules:NAME: → inject version suggestions for that module +- Cursor inside params {} block → inject param suggestions for that module+version """ import json import logging import os +import re import socket import subprocess import threading @@ -29,6 +36,8 @@ BICEP_LS_PATH = os.getenv( "/opt/bicep-langserver/Bicep.LangServer.dll", ) +# ── LSP framing helpers ──────────────────────────────────────────────────────── + def _read_message(fileobj) -> bytes: """Read one LSP Content-Length framed message from a file-like object.""" @@ -51,8 +60,100 @@ def _frame(body: bytes) -> bytes: return f"Content-Length: {len(body)}\r\n\r\n".encode() + body -def _inject_completions(msg: dict[str, Any]) -> bytes: - """Inject LRU modules into completion responses.""" +# ── Per-connection session state ─────────────────────────────────────────────── + + +class _ProxySession: + """ + Tracks document content and pending completion requests for one editor session. + + Thread safety: CPython GIL makes individual dict reads/writes atomic. + One thread writes to docs/pending (client_to_ls); one thread reads (ls_to_client). + """ + + def __init__(self) -> None: + self.docs: dict[str, list[str]] = {} # uri → lines + self.pending: dict = {} # request_id → context dict + + def update_doc(self, uri: str, text: str) -> None: + self.docs[uri] = text.splitlines() + + def record_completion_request(self, msg: dict) -> None: + req_id = msg.get("id") + if req_id is None: + return + params = msg.get("params", {}) + uri = params.get("textDocument", {}).get("uri", "") + position = params.get("position", {}) + self.pending[req_id] = self._detect_context(uri, position) + + def _detect_context(self, uri: str, position: dict) -> dict: + """Determine what kind of completion is being requested based on cursor position.""" + lines = self.docs.get(uri, []) + line_idx = position.get("line", 0) + char_idx = position.get("character", 0) + + if not lines or line_idx >= len(lines): + return {"type": "unknown"} + + # Text on the current line up to the cursor + current = lines[line_idx][:char_idx] + + # 1. Version context: cursor is after 'br/modules:NAME: + m = re.search(r"'br/modules:([^:'\s]+):([^'\s]*)$", current) + if m: + return {"type": "version", "module": m.group(1), "prefix": m.group(2)} + + # 2. Module path context: cursor is inside 'br/modules: (no version colon yet) + m = re.search(r"'br/modules:([^:'\s]*)$", current) + if m: + return {"type": "module_path", "prefix": m.group(1)} + + # 3. Params context: walk up to find enclosing module declaration + lookback_start = max(0, line_idx - 60) + context_lines = lines[lookback_start : line_idx + 1] + context_lines = list(context_lines) + context_lines[-1] = context_lines[-1][:char_idx] + context_text = "\n".join(context_lines) + + # Find the last module declaration in the lookback window + mod_matches = list( + re.finditer(r"module\s+\w+\s+'br/modules:([^:]+):([^']+)'", context_text) + ) + if mod_matches: + last_mod = mod_matches[-1] + text_after_mod = context_text[last_mod.start():] + params_m = re.search(r"\bparams\s*:\s*\{", text_after_mod) + if params_m: + text_in_params = text_after_mod[params_m.start():] + if text_in_params.count("{") > text_in_params.count("}"): + return { + "type": "param", + "module": last_mod.group(1), + "version": last_mod.group(2), + } + + return {"type": "unknown"} + + def pop_context(self, msg_id) -> dict: + return self.pending.pop(msg_id, {"type": "unknown"}) + + +# ── Completion injection ─────────────────────────────────────────────────────── + + +def _inject_completions(msg: dict[str, Any], context: dict | None = None) -> bytes: + """ + Inject LRU-aware completions into completion responses. + + Behaviour depends on context type: + - 'version' → inject version suggestions for the named module + - 'param' → inject param suggestions for module+version + - 'module_path' / 'unknown' / None → inject module name suggestions (legacy) + """ + if context is None: + context = {} + result = msg.get("result") if result is None: return json.dumps(msg).encode() @@ -66,7 +167,18 @@ def _inject_completions(msg: dict[str, Any]) -> bytes: if items is None: return json.dumps(msg).encode() - lru_items = BicepModuleCatalog.as_completion_items() + ctx_type = context.get("type", "unknown") + + if ctx_type == "version": + lru_items = BicepModuleCatalog.version_completion_items(context["module"]) + elif ctx_type == "param": + lru_items = BicepModuleCatalog.param_completion_items( + context["module"], context["version"] + ) + else: + # Default: module name completions + lru_items = BicepModuleCatalog.as_completion_items() + if lru_items: for item in items: st = item.get("sortText", item.get("label", "")) @@ -83,11 +195,32 @@ def _inject_completions(msg: dict[str, Any]) -> bytes: def _client_to_ls( conn_file, proc_stdin, + session: _ProxySession, ) -> None: try: while True: body = _read_message(conn_file) logger.debug("Client→LS: %d bytes", len(body)) + + # Track document state and completion context (never block forwarding) + try: + msg = json.loads(body) + method = msg.get("method", "") + if method == "textDocument/didOpen": + text_doc = msg.get("params", {}).get("textDocument", {}) + uri, text = text_doc.get("uri", ""), text_doc.get("text", "") + if uri: + session.update_doc(uri, text or "") + elif method == "textDocument/didChange": + uri = msg.get("params", {}).get("textDocument", {}).get("uri", "") + changes = msg.get("params", {}).get("contentChanges", []) + if uri and changes: + session.update_doc(uri, changes[-1].get("text", "")) + elif method == "textDocument/completion": + session.record_completion_request(msg) + except Exception: + pass # parsing errors must never block forwarding + framed = _frame(body) proc_stdin.write(framed) proc_stdin.flush() @@ -97,8 +230,6 @@ def _client_to_ls( except Exception as exc: logger.debug("Client→LS error: %s", exc) finally: - # Half-close: tell the LS that no more input is coming. - # The LS may still send responses, so we don't kill it here. try: proc_stdin.close() except Exception: @@ -108,13 +239,18 @@ def _client_to_ls( def _ls_to_client( proc_stdout, conn: socket.socket, + session: _ProxySession, ) -> None: try: while True: body = _read_message(proc_stdout) logger.debug("LS→Client: %d bytes", len(body)) try: - out = _inject_completions(json.loads(body)) + msg = json.loads(body) + context: dict = {} + if "id" in msg and "result" in msg: + context = session.pop_context(msg["id"]) + out = _inject_completions(msg, context) except json.JSONDecodeError: out = body conn.sendall(_frame(out)) @@ -126,6 +262,7 @@ def _ls_to_client( def _handle_client(conn: socket.socket, addr: tuple) -> None: logger.info("New Bicep client: %s", addr) + session = _ProxySession() proc = subprocess.Popen( ["dotnet", BICEP_LS_PATH, "--stdio"], stdin=subprocess.PIPE, @@ -140,13 +277,13 @@ def _handle_client(conn: socket.socket, addr: tuple) -> None: # t1: client → LS (finishes when client closes write side) t1 = threading.Thread( target=_client_to_ls, - args=(conn_file, proc.stdin), + args=(conn_file, proc.stdin, session), daemon=True, ) # t2: LS → client (finishes when LS closes stdout) t2 = threading.Thread( target=_ls_to_client, - args=(proc.stdout, conn), + args=(proc.stdout, conn, session), daemon=True, ) t1.start() diff --git a/tests/test_proxy.py b/tests/test_proxy.py index 88253e5..fb189b6 100644 --- a/tests/test_proxy.py +++ b/tests/test_proxy.py @@ -5,7 +5,7 @@ import json import pytest from ilsp.bicep_lsp.modules import BicepModuleCatalog -from ilsp.bicep_lsp.proxy import _frame, _inject_completions +from ilsp.bicep_lsp.proxy import _ProxySession, _frame, _inject_completions def test_frame_produces_correct_header(): @@ -29,22 +29,29 @@ def _completion_response(items: list) -> dict: } +def _make_module(name, versions=None, schema=None): + versions = versions or ["1.0.0", "latest"] + return { + "name": name, + "path": f"bicep/modules/{name}", + "versions": versions, + "latest": versions[-1], + "registry": "iactemplatereg.azurecr.io", + "schema": schema or {}, + } + + +# ── Existing injection tests (unchanged behaviour) ───────────────────────────── + def test_standard_items_not_downgraded_without_lru(): """Without LRU modules, standard items keep their original sortText.""" msg = _completion_response([{"label": "Microsoft.Storage", "sortText": "az"}]) out = json.loads(_inject_completions(msg)) - # No LRU modules → no downgrade, original sortText preserved assert out["result"]["items"][0]["sortText"] == "az" def test_lru_modules_injected_at_top(): - BicepModuleCatalog._modules = [{ - "name": "appservice", - "path": "bicep/modules/appservice", - "versions": ["2.3.0", "latest"], - "latest": "latest", - "registry": "iactemplatereg.azurecr.io", - }] + BicepModuleCatalog._modules = [_make_module("appservice", ["2.3.0", "latest"])] msg = _completion_response([{"label": "Microsoft.Web/sites", "sortText": "az"}]) out = json.loads(_inject_completions(msg)) @@ -57,13 +64,7 @@ def test_lru_modules_injected_at_top(): def test_list_result_also_handled(): - BicepModuleCatalog._modules = [{ - "name": "roleassignments", - "path": "bicep/modules/roleassignments", - "versions": ["2.0.0"], - "latest": "2.0.0", - "registry": "iactemplatereg.azurecr.io", - }] + BicepModuleCatalog._modules = [_make_module("roleassignments", ["2.0.0"])] msg = {"jsonrpc": "2.0", "id": 2, "result": [{"label": "az-item", "sortText": "az"}]} out = json.loads(_inject_completions(msg)) @@ -75,3 +76,228 @@ def test_non_completion_message_passthrough(): msg = {"jsonrpc": "2.0", "method": "initialized", "params": {}} out = json.loads(_inject_completions(msg)) assert out["method"] == "initialized" + + +# ── Version completion tests ─────────────────────────────────────────────────── + +def test_version_completions_injected_on_version_context(): + BicepModuleCatalog._modules = [_make_module( + "roleassignments", + versions=["1.1.x", "2.0.x", "latest"], + schema={ + "1.1.x": {"parameters": {"principalId": {"type": "string", "description": ""}}}, + "2.0.x": {"parameters": {"assignments": {"type": "array", "description": ""}}}, + "latest": {"parameters": {"assignments": {"type": "array", "description": ""}}}, + }, + )] + + msg = _completion_response([{"label": "az-builtin", "sortText": "az"}]) + context = {"type": "version", "module": "roleassignments", "prefix": ""} + out = json.loads(_inject_completions(msg, context)) + items = out["result"]["items"] + + labels = [i["label"] for i in items] + assert "1.1.x" in labels + assert "2.0.x" in labels + assert "latest" in labels + # LRU versions come first + assert items[0]["sortText"].startswith("0_lru_ver_") + assert items[-1]["sortText"].startswith("1_az_") + + +def test_version_items_include_param_detail(): + BicepModuleCatalog._modules = [_make_module( + "appservice", + versions=["2.3.2"], + schema={"2.3.2": {"parameters": { + "projectName": {"type": "string", "description": ""}, + "environmentType": {"type": "string", "description": ""}, + }}}, + )] + + items = BicepModuleCatalog.version_completion_items("appservice") + assert len(items) == 1 + assert items[0]["label"] == "2.3.2" + assert "projectName" in items[0]["detail"] + assert items[0]["kind"] == 12 # Value + + +def test_version_completions_unknown_module_returns_empty(): + items = BicepModuleCatalog.version_completion_items("nonexistent") + assert items == [] + + +# ── Param completion tests ───────────────────────────────────────────────────── + +_ROLEASSIGNMENTS_SCHEMA = { + "1.1.x": { + "parameters": { + "environmentType": { + "type": "string", + "description": "", + "allowed": ["DEV", "TEST", "PROD"], + }, + "roleDefinitionIds": { + "type": "array", + "description": "The role definition ID.", + }, + "principalId": { + "type": "string", + "description": "The principal ID.", + }, + "principalType": { + "type": "string", + "description": "The principal type.", + }, + } + } +} + + +def test_param_completions_injected_on_param_context(): + BicepModuleCatalog._modules = [_make_module( + "roleassignments", + versions=["1.1.x"], + schema=_ROLEASSIGNMENTS_SCHEMA, + )] + + msg = _completion_response([]) + context = {"type": "param", "module": "roleassignments", "version": "1.1.x"} + out = json.loads(_inject_completions(msg, context)) + items = out["result"]["items"] + + labels = [i["label"] for i in items] + assert "environmentType" in labels + assert "roleDefinitionIds" in labels + assert "principalId" in labels + assert "principalType" in labels + assert items[0]["sortText"].startswith("0_lru_param_") + assert items[0]["kind"] == 5 # Field + + +def test_param_completion_items_have_insert_text(): + BicepModuleCatalog._modules = [_make_module( + "roleassignments", versions=["1.1.x"], schema=_ROLEASSIGNMENTS_SCHEMA + )] + items = BicepModuleCatalog.param_completion_items("roleassignments", "1.1.x") + for item in items: + assert item["insertText"].endswith(": ") + + +def test_param_completions_fallback_to_closest_version(): + """When the exact version isn't in schema, fall back to any available version.""" + BicepModuleCatalog._modules = [_make_module( + "roleassignments", + versions=["1.1.x", "1.1.4"], + schema=_ROLEASSIGNMENTS_SCHEMA, # only has "1.1.x" + )] + items = BicepModuleCatalog.param_completion_items("roleassignments", "1.1.4") + assert any(i["label"] == "principalId" for i in items) + + +def test_param_completions_unknown_module_returns_empty(): + items = BicepModuleCatalog.param_completion_items("nonexistent", "1.0.0") + assert items == [] + + +# ── _ProxySession context detection tests ───────────────────────────────────── + +def _make_session_with_doc(uri: str, lines: list[str]) -> _ProxySession: + session = _ProxySession() + session.update_doc(uri, "\n".join(lines)) + return session + + +URI = "file:///test/main.bicep" + + +def test_detect_version_context(): + lines = ["module x 'br/modules:roleassignments:' = {}"] + session = _make_session_with_doc(URI, lines) + ctx = session._detect_context(URI, {"line": 0, "character": 37}) + assert ctx["type"] == "version" + assert ctx["module"] == "roleassignments" + + +def test_detect_version_context_partial_prefix(): + lines = ["module x 'br/modules:roleassignments:1.1' = {}"] + session = _make_session_with_doc(URI, lines) + ctx = session._detect_context(URI, {"line": 0, "character": 40}) + assert ctx["type"] == "version" + assert ctx["module"] == "roleassignments" + assert ctx["prefix"] == "1.1" + + +def test_detect_module_path_context(): + lines = ["module x 'br/modules:role' = {}"] + session = _make_session_with_doc(URI, lines) + ctx = session._detect_context(URI, {"line": 0, "character": 25}) + assert ctx["type"] == "module_path" + + +def test_detect_param_context(): + lines = [ + "module myMod 'br/modules:roleassignments:1.1.x' = {", + " name: 'test'", + " params: {", + " ", # ← cursor here + " }", + "}", + ] + session = _make_session_with_doc(URI, lines) + ctx = session._detect_context(URI, {"line": 3, "character": 4}) + assert ctx["type"] == "param" + assert ctx["module"] == "roleassignments" + assert ctx["version"] == "1.1.x" + + +def test_detect_unknown_context_outside_module(): + lines = ["var x = 'hello'"] + session = _make_session_with_doc(URI, lines) + ctx = session._detect_context(URI, {"line": 0, "character": 10}) + assert ctx["type"] == "unknown" + + +def test_detect_no_param_context_after_params_closed(): + """Cursor after the closing brace of params should NOT be param context.""" + lines = [ + "module myMod 'br/modules:roleassignments:1.1.x' = {", + " params: {", + " }", + " ", # ← cursor here (outside params block) + "}", + ] + session = _make_session_with_doc(URI, lines) + ctx = session._detect_context(URI, {"line": 3, "character": 2}) + # params block is closed, so should NOT be param context + assert ctx["type"] != "param" + + +# ── Session request tracking ─────────────────────────────────────────────────── + +def test_session_records_and_pops_context(): + session = _ProxySession() + session.update_doc(URI, "\n".join([ + "module m 'br/modules:appservice:2.3.0' = {", + " params: {", + " ", + " }", + "}", + ])) + msg = { + "jsonrpc": "2.0", + "id": 42, + "method": "textDocument/completion", + "params": { + "textDocument": {"uri": URI}, + "position": {"line": 2, "character": 4}, + }, + } + session.record_completion_request(msg) + ctx = session.pop_context(42) + assert ctx["type"] == "param" + assert ctx["module"] == "appservice" + + # Second pop returns unknown + assert session.pop_context(42)["type"] == "unknown" +