""" LRU Bicep module catalog — loaded from the bundled catalog file at startup. The catalog (bicep_modules_catalog.json) is baked into the Docker image at build time. No runtime dependency on DevOpsMCP or any external service. Provides completion items for: - LRU module names (when typing a module reference string) - Module versions per module (when cursor is after 'br/modules:NAME:') - Module params per version (when cursor is inside a params {} block) """ import json import logging import pathlib from typing import Any logger = logging.getLogger(__name__) # Known Azure enum values not always captured in the catalog schema def _load_azure_roles() -> list[str]: """Load Azure roles from azure_roles.json (generated from bicep/lookup/rbaclookup).""" roles_file = pathlib.Path(__file__).parent / "azure_roles.json" if roles_file.exists(): try: return json.loads(roles_file.read_text()) except Exception as e: logger.warning("Failed to load azure_roles.json: %s", e) return [] _KNOWN_ENUMS: dict[str, list[str]] = { "principalType": ["User", "Group", "ServicePrincipal", "Device", "ForeignGroup"], "roles": _load_azure_roles(), "roleDefinitionIds": _load_azure_roles(), # alias for roles } # Catalog is baked into the image root at /bicep_modules_catalog.json _CATALOG_PATHS = [ pathlib.Path("/data/bicep_modules_catalog.json"), # volume-mount (freshest) pathlib.Path("/bicep_modules_catalog.json"), # baked into image (fallback) pathlib.Path(__file__).parent.parent.parent / "bicep_modules_catalog.json", # dev ] # IAC source catalog — richer param descriptions from Bicep source code _IAC_SOURCE_PATHS = [ pathlib.Path("/iac_source_catalog.json"), # baked into Docker image pathlib.Path("/data/iac_source_catalog.json"), # volume-mount (future) pathlib.Path(__file__).parent.parent.parent / "iac_source_catalog.json", # dev ] def _load_iac_source_catalog() -> dict[str, dict[str, Any]]: """Load IAC source catalog for enriched param descriptions. Returns dict keyed by module name (e.g. 'roleassignments') → module info with 'params' list containing name/description/required/type. """ for path in _IAC_SOURCE_PATHS: if path.exists(): try: data = json.loads(path.read_text()) modules = data.get("modules", {}) logger.info("IAC source catalog loaded from %s: %d modules", path, len(modules)) return modules except Exception: logger.exception("Failed to parse IAC source catalog at %s", path) logger.info("No iac_source_catalog.json found — param descriptions unavailable") return {} def _load_catalog() -> list[dict[str, Any]]: """Load modules from the bundled catalog file, preserving per-version schema.""" for path in _CATALOG_PATHS: if path.exists(): try: data = json.loads(path.read_text()) modules_raw = data.get("modules", {}) registry = data.get("registry", "iactemplatereg.azurecr.io") modules = [] for mod_path, info in modules_raw.items(): versions = info.get("versions", ["latest"]) name = mod_path.split("/")[-1] if "/" in mod_path else mod_path # ref_path: what the user writes after 'br/modules:' in a .bicep file. # bicepconfig modulePath "bicep" means Bicep prepends "bicep/" to the # registry path automatically, so strip that prefix here. ref_path = mod_path.removeprefix("bicep/") modules.append({ "name": name, "path": mod_path, "ref_path": ref_path, # e.g. "modules/appservice", "util/types" "versions": versions, "latest": versions[-1] if versions else "latest", "registry": registry, "schema": info.get("schema", {}), # per-version → params }) logger.info("Bicep catalog loaded from %s: %d modules", path, len(modules)) return modules except Exception: logger.exception("Failed to parse catalog at %s", path) logger.warning("No bicep_modules_catalog.json found — completions disabled") return [] class BicepModuleCatalog: """In-memory catalog of LRU Bicep modules, loaded once at startup.""" _modules: list[dict[str, Any]] = [] _iac: dict[str, dict[str, Any]] = {} # module name → IAC source info @classmethod def load(cls) -> None: """Load both catalogs from disk. Call once at startup.""" cls._modules = _load_catalog() cls._iac = _load_iac_source_catalog() @classmethod def _iac_param_map(cls, module_name: str) -> dict[str, dict[str, Any]]: """Return {param_name: {description, required, type}} from IAC source catalog. IAC catalog is keyed by bare module name (last path segment), so strip any path prefix (e.g. 'modules/appservice' → 'appservice'). """ bare_name = module_name.split("/")[-1] iac_mod = cls._iac.get(bare_name, {}) return {p["name"]: p for p in iac_mod.get("params", [])} @classmethod def get_modules(cls) -> list[dict[str, Any]]: return cls._modules @classmethod def get_module_by_name(cls, name: str) -> dict[str, Any] | None: for mod in cls._modules: if mod["name"] == name: return mod return None @classmethod def get_module_by_ref(cls, ref_path: str) -> dict[str, Any] | None: """Look up a module by the path that appears after 'br/modules:' in a .bicep file. Matches both new-style ('modules/appservice', 'util/types') and old-style bare names ('appservice') so completions work regardless of module version. Case-insensitive to handle catalog lowercase vs. camelCase in .bicep files. """ ref_lower = ref_path.lower() for mod in cls._modules: if mod["ref_path"].lower() == ref_lower or mod["name"].lower() == ref_lower: return mod return None @classmethod def as_completion_items(cls) -> list[dict[str, Any]]: """Module name completions — shown when typing a module reference string.""" items = [] for mod in cls._modules: ref = f"br/modules:{mod['ref_path']}:{mod['latest']}" items.append({ "label": mod["name"], "kind": 9, # Module "detail": f"LRU Bicep module — {mod['registry']}", "insertText": ref, "sortText": f"0_lru_{mod['name']}", "documentation": { "kind": "markdown", "value": ( f"**{mod['name']}** (LRU internal)\n\n" f"Registry: `{mod['registry']}`\n" f"Versions: {', '.join(mod['versions'])}\n\n" f"```bicep\nmodule {mod['name'].lower()} '{ref}' = {{\n" f" name: '{mod['name'].lower()}'\n params: {{}}\n}}\n```" ), }, }) return items @classmethod def version_completion_items(cls, module_name: str) -> list[dict[str, Any]]: """Version completions for a specific module (newest first).""" mod = cls.get_module_by_ref(module_name) if not mod: return [] schema = mod.get("schema", {}) items = [] for ver in reversed(mod["versions"]): ver_schema = schema.get(ver, {}) params = list(ver_schema.get("parameters", {}).keys()) param_summary = ( f"{len(params)} params: {', '.join(params[:3])}{'...' if len(params) > 3 else ''}" if params else "no params" ) items.append({ "label": ver, "kind": 12, # Value "detail": param_summary, "insertText": ver, "sortText": f"0_lru_ver_{ver}", "documentation": { "kind": "markdown", "value": ( f"**{module_name} `{ver}`**\n\n" + ( "Params: " + ", ".join(f"`{p}`" for p in params) if params else "_No params_" ) ), }, }) return items @classmethod def param_value_completion_items( cls, module_name: str, version: str, param_name: str, has_open_quote: bool = False, ) -> list[dict[str, Any]]: """Enum/allowed-value completions for a specific param (e.g. principalType, environmentType).""" mod = cls.get_module_by_ref(module_name) allowed: list[str] = [] if mod: schema = mod.get("schema", {}) ver_params = schema.get(version, {}).get("parameters", {}) if not ver_params: for v in reversed(list(schema.keys())): candidate = schema[v].get("parameters", {}) if candidate: ver_params = candidate break param_info = ver_params.get(param_name, {}) ptype = param_info.get("type", "") allowed = [str(a) for a in param_info.get("allowed", [])] if not allowed and ptype == "bool": allowed = ["true", "false"] # Fallback: known Azure enums not captured in catalog if not allowed: allowed = _KNOWN_ENUMS.get(param_name, []) if not allowed: return [] items = [] for i, val in enumerate(allowed): insert = f"{val}'" if has_open_quote else f"'{val}'" items.append({ "label": val, "kind": 12, # Value "detail": f"{param_name} value", "insertText": insert, "sortText": f"0_lru_val_{i:03d}_{val}", "documentation": { "kind": "markdown", "value": f"**{val}**\n\nAllowed value for `{param_name}`", }, }) return items @classmethod def param_completion_items(cls, module_name: str, version: str) -> list[dict[str, Any]]: """Param completions for a specific module+version combination.""" mod = cls.get_module_by_ref(module_name) if not mod: return [] schema = mod.get("schema", {}) # Exact version match first, then fall back to closest available ver_params = schema.get(version, {}).get("parameters", {}) if not ver_params: for v in reversed(list(schema.keys())): candidate = schema[v].get("parameters", {}) if candidate: ver_params = candidate logger.debug("Param fallback: %s %s→%s", module_name, version, v) break # IAC source catalog: richer descriptions + required flag iac_params = cls._iac_param_map(module_name) items = [] # Build snippet for full params block (shown first) snippet_params = [] tabstop = 1 for param_name, param_info in ver_params.items(): iac = iac_params.get(param_name, {}) required = iac.get("required", False) ptype = param_info.get("type", "any") allowed = param_info.get("allowed", []) # Include required params + first few optional params in snippet if required or len(snippet_params) < 5: if allowed: # Enum: use placeholder with first allowed value placeholder = f"'{allowed[0]}'" elif ptype == "bool": placeholder = "true" elif ptype == "int": placeholder = "0" elif ptype == "array": placeholder = "[]" elif ptype == "object": placeholder = "{{}}" else: placeholder = "''" snippet_params.append(f" {param_name}: ${{{tabstop}:{placeholder}}}") tabstop += 1 if snippet_params: snippet_text = "\n" + "\n".join(snippet_params) + "\n" required_count = sum(1 for p, i in ver_params.items() if iac_params.get(p, {}).get("required", False)) items.append({ "label": "⚡ Fill params block", "kind": 15, # Snippet "detail": f"{len(snippet_params)} params ({required_count} required)", "insertText": snippet_text, "insertTextFormat": 2, # Snippet "sortText": "0_lru_snippet_000", "documentation": { "kind": "markdown", "value": ( f"**Fill params block**\n\n" f"Inserts {len(snippet_params)} params for `{module_name}`.\n" f"Use Tab to navigate between fields." ), }, }) # Individual param completions for param_name, param_info in ver_params.items(): ptype = param_info.get("type", "any") allowed = param_info.get("allowed", []) # Prefer IAC source description (has human-readable text from Bicep source) iac = iac_params.get(param_name, {}) description = iac.get("description", "") or param_info.get("description", "") description = description.strip() required = iac.get("required", False) doc_lines = [f"**{param_name}** (`{ptype}`)"] if required: doc_lines[0] += " ⚠️ required" if description: doc_lines.append(f"\n{description}") if allowed: shown = allowed[:5] more = f" + {len(allowed) - 5} more" if len(allowed) > 5 else "" doc_lines.append(f"\nAllowed: {', '.join(f'`{a}`' for a in shown)}{more}") items.append({ "label": param_name, "kind": 5, # Field "detail": f"{'*' if required else ''}{ptype}", "insertText": f"{param_name}: ", "sortText": f"0_lru_param_{'0' if required else '1'}_{param_name}", "documentation": { "kind": "markdown", "value": "\n".join(doc_lines), }, }) return items