Files
iLSP/ilsp/bicep_lsp/modules.py

370 lines
15 KiB
Python
Raw Normal View History

"""
LRU Bicep module catalog loaded from the bundled catalog file at startup.
The catalog (bicep_modules_catalog.json) is baked into the Docker image at build time.
No runtime dependency on DevOpsMCP or any external service.
Provides completion items for:
- LRU module names (when typing a module reference string)
- Module versions per module (when cursor is after 'br/modules:NAME:')
- Module params per version (when cursor is inside a params {} block)
"""
import json
import logging
import pathlib
from typing import Any
logger = logging.getLogger(__name__)
# Known Azure enum values not always captured in the catalog schema
_KNOWN_ENUMS: dict[str, list[str]] = {
"principalType": ["User", "Group", "ServicePrincipal", "Device", "ForeignGroup"],
}
# Catalog is baked into the image root at /bicep_modules_catalog.json
_CATALOG_PATHS = [
pathlib.Path("/data/bicep_modules_catalog.json"), # volume-mount (freshest)
pathlib.Path("/bicep_modules_catalog.json"), # baked into image (fallback)
pathlib.Path(__file__).parent.parent.parent / "bicep_modules_catalog.json", # dev
]
# IAC source catalog — richer param descriptions from Bicep source code
_IAC_SOURCE_PATHS = [
pathlib.Path("/iac_source_catalog.json"), # baked into Docker image
pathlib.Path("/data/iac_source_catalog.json"), # volume-mount (future)
pathlib.Path(__file__).parent.parent.parent / "iac_source_catalog.json", # dev
]
# Principals catalog — known object IDs for array params (e.g. additionalAccess)
# Format: {"params": {"additionalAccess": [{"id": "<guid>", "label": "...", "description": "..."}]}}
_PRINCIPALS_CATALOG_PATHS = [
pathlib.Path("/data/principals_catalog.json"), # volume-mount (freshest)
pathlib.Path("/principals_catalog.json"), # baked into image (fallback)
pathlib.Path(__file__).parent.parent.parent / "principals_catalog.json", # dev
]
def _load_principals_catalog() -> dict[str, list[dict[str, Any]]]:
"""Load principals catalog for array param completions.
Returns dict keyed by param name (e.g. 'additionalAccess') list of
{id, label, description} entries.
"""
for path in _PRINCIPALS_CATALOG_PATHS:
if path.exists():
try:
data = json.loads(path.read_text())
params = data.get("params", {})
count = sum(len(v) for v in params.values())
logger.info("Principals catalog loaded from %s: %d entries", path, count)
return params
except Exception:
logger.exception("Failed to parse principals catalog at %s", path)
logger.debug("No principals_catalog.json found — array param completions unavailable")
return {}
def _load_iac_source_catalog() -> dict[str, dict[str, Any]]:
"""Load IAC source catalog for enriched param descriptions.
Returns dict keyed by module name (e.g. 'roleassignments') module info
with 'params' list containing name/description/required/type.
"""
for path in _IAC_SOURCE_PATHS:
if path.exists():
try:
data = json.loads(path.read_text())
modules = data.get("modules", {})
logger.info("IAC source catalog loaded from %s: %d modules", path, len(modules))
return modules
except Exception:
logger.exception("Failed to parse IAC source catalog at %s", path)
logger.info("No iac_source_catalog.json found — param descriptions unavailable")
return {}
def _load_catalog() -> list[dict[str, Any]]:
"""Load modules from the bundled catalog file, preserving per-version schema."""
for path in _CATALOG_PATHS:
if path.exists():
try:
data = json.loads(path.read_text())
modules_raw = data.get("modules", {})
registry = data.get("registry", "iactemplatereg.azurecr.io")
modules = []
for mod_path, info in modules_raw.items():
versions = info.get("versions", ["latest"])
name = mod_path.split("/")[-1] if "/" in mod_path else mod_path
# ref_path: what the user writes after 'br/modules:' in a .bicep file.
# bicepconfig modulePath "bicep" means Bicep prepends "bicep/" to the
# registry path automatically, so strip that prefix here.
ref_path = mod_path.removeprefix("bicep/")
modules.append({
"name": name,
"path": mod_path,
"ref_path": ref_path, # e.g. "modules/appservice", "util/types"
"versions": versions,
"latest": versions[-1] if versions else "latest",
"registry": registry,
"schema": info.get("schema", {}), # per-version → params
})
logger.info("Bicep catalog loaded from %s: %d modules", path, len(modules))
return modules
except Exception:
logger.exception("Failed to parse catalog at %s", path)
logger.warning("No bicep_modules_catalog.json found — completions disabled")
return []
class BicepModuleCatalog:
"""In-memory catalog of LRU Bicep modules, loaded once at startup."""
_modules: list[dict[str, Any]] = []
_iac: dict[str, dict[str, Any]] = {} # module name → IAC source info
_principals: dict[str, list[dict[str, Any]]] = {} # param name → [{id, label, description}]
@classmethod
def load(cls) -> None:
"""Load all catalogs from disk. Call once at startup."""
cls._modules = _load_catalog()
cls._iac = _load_iac_source_catalog()
cls._principals = _load_principals_catalog()
@classmethod
def _iac_param_map(cls, module_name: str) -> dict[str, dict[str, Any]]:
"""Return {param_name: {description, required, type}} from IAC source catalog.
IAC catalog is keyed by bare module name (last path segment), so strip any
path prefix (e.g. 'modules/appservice' 'appservice').
"""
bare_name = module_name.split("/")[-1]
iac_mod = cls._iac.get(bare_name, {})
return {p["name"]: p for p in iac_mod.get("params", [])}
@classmethod
def get_modules(cls) -> list[dict[str, Any]]:
return cls._modules
@classmethod
def get_module_by_name(cls, name: str) -> dict[str, Any] | None:
for mod in cls._modules:
if mod["name"] == name:
return mod
return None
@classmethod
def get_module_by_ref(cls, ref_path: str) -> dict[str, Any] | None:
"""Look up a module by the path that appears after 'br/modules:' in a .bicep file.
Matches both new-style ('modules/appservice', 'util/types') and old-style
bare names ('appservice') so completions work regardless of module version.
Case-insensitive to handle catalog lowercase vs. camelCase in .bicep files.
"""
ref_lower = ref_path.lower()
for mod in cls._modules:
if mod["ref_path"].lower() == ref_lower or mod["name"].lower() == ref_lower:
return mod
return None
@classmethod
def as_completion_items(cls) -> list[dict[str, Any]]:
"""Module name completions — shown when typing a module reference string."""
items = []
for mod in cls._modules:
ref = f"br/modules:{mod['ref_path']}:{mod['latest']}"
items.append({
"label": mod["name"],
"kind": 9, # Module
"detail": f"LRU Bicep module — {mod['registry']}",
"insertText": ref,
"sortText": f"0_lru_{mod['name']}",
"documentation": {
"kind": "markdown",
"value": (
f"**{mod['name']}** (LRU internal)\n\n"
f"Registry: `{mod['registry']}`\n"
f"Versions: {', '.join(mod['versions'])}\n\n"
f"```bicep\nmodule {mod['name'].lower()} '{ref}' = {{\n"
f" name: '{mod['name'].lower()}'\n params: {{}}\n}}\n```"
),
},
})
return items
@classmethod
def version_completion_items(cls, module_name: str) -> list[dict[str, Any]]:
"""Version completions for a specific module (newest first)."""
mod = cls.get_module_by_ref(module_name)
if not mod:
return []
schema = mod.get("schema", {})
items = []
for ver in reversed(mod["versions"]):
ver_schema = schema.get(ver, {})
params = list(ver_schema.get("parameters", {}).keys())
param_summary = (
f"{len(params)} params: {', '.join(params[:3])}{'...' if len(params) > 3 else ''}"
if params else "no params"
)
items.append({
"label": ver,
"kind": 12, # Value
"detail": param_summary,
"insertText": ver,
"sortText": f"0_lru_ver_{ver}",
"documentation": {
"kind": "markdown",
"value": (
f"**{module_name} `{ver}`**\n\n"
+ (
"Params: " + ", ".join(f"`{p}`" for p in params)
if params else "_No params_"
)
),
},
})
return items
@classmethod
def param_value_completion_items(
cls,
module_name: str,
version: str,
param_name: str,
has_open_quote: bool = False,
) -> list[dict[str, Any]]:
"""Enum/allowed-value completions for a specific param (e.g. principalType, environmentType)."""
mod = cls.get_module_by_ref(module_name)
allowed: list[str] = []
if mod:
schema = mod.get("schema", {})
ver_params = schema.get(version, {}).get("parameters", {})
if not ver_params:
for v in reversed(list(schema.keys())):
candidate = schema[v].get("parameters", {})
if candidate:
ver_params = candidate
break
param_info = ver_params.get(param_name, {})
ptype = param_info.get("type", "")
allowed = [str(a) for a in param_info.get("allowed", [])]
if not allowed and ptype == "bool":
allowed = ["true", "false"]
# Fallback: known Azure enums not captured in catalog
if not allowed:
allowed = _KNOWN_ENUMS.get(param_name, [])
if not allowed:
return []
items = []
for i, val in enumerate(allowed):
insert = f"{val}'" if has_open_quote else f"'{val}'"
items.append({
"label": val,
"kind": 12, # Value
"detail": f"{param_name} value",
"insertText": insert,
"sortText": f"0_lru_val_{i:03d}_{val}",
"documentation": {
"kind": "markdown",
"value": f"**{val}**\n\nAllowed value for `{param_name}`",
},
})
return items
@classmethod
def param_array_item_completion_items(
cls,
module_name: str,
version: str,
param_name: str,
has_open_quote: bool = False,
) -> list[dict[str, Any]]:
"""Completions for items inside an array param (e.g. additionalAccess objectIds).
Looks up known entries from the principals catalog keyed by param name.
"""
entries = cls._principals.get(param_name, [])
if not entries:
return []
items = []
for i, entry in enumerate(entries):
val = entry["id"]
label = entry.get("label", val)
description = entry.get("description", "")
insert = f"{val}'" if has_open_quote else f"'{val}'"
doc = f"**{label}**\n\n`{val}`"
if description:
doc += f"\n\n{description}"
items.append({
"label": label,
"kind": 12, # Value
"detail": val, # GUID shown as detail
"insertText": insert,
"sortText": f"0_lru_arr_{i:03d}_{label}",
"documentation": {
"kind": "markdown",
"value": doc,
},
})
return items
@classmethod
def param_completion_items(cls, module_name: str, version: str) -> list[dict[str, Any]]:
"""Param completions for a specific module+version combination."""
mod = cls.get_module_by_ref(module_name)
if not mod:
return []
schema = mod.get("schema", {})
# Exact version match first, then fall back to closest available
ver_params = schema.get(version, {}).get("parameters", {})
if not ver_params:
for v in reversed(list(schema.keys())):
candidate = schema[v].get("parameters", {})
if candidate:
ver_params = candidate
logger.debug("Param fallback: %s %s%s", module_name, version, v)
break
# IAC source catalog: richer descriptions + required flag
iac_params = cls._iac_param_map(module_name)
items = []
for param_name, param_info in ver_params.items():
ptype = param_info.get("type", "any")
allowed = param_info.get("allowed", [])
# Prefer IAC source description (has human-readable text from Bicep source)
iac = iac_params.get(param_name, {})
description = iac.get("description", "") or param_info.get("description", "")
description = description.strip()
required = iac.get("required", False)
doc_lines = [f"**{param_name}** (`{ptype}`)"]
if required:
doc_lines[0] += " ⚠️ required"
if description:
doc_lines.append(f"\n{description}")
if allowed:
shown = allowed[:5]
more = f" + {len(allowed) - 5} more" if len(allowed) > 5 else ""
doc_lines.append(f"\nAllowed: {', '.join(f'`{a}`' for a in shown)}{more}")
items.append({
"label": param_name,
"kind": 5, # Field
"detail": f"{'*' if required else ''}{ptype}",
"insertText": f"{param_name}: ",
"sortText": f"0_lru_param_{'0' if required else '1'}_{param_name}",
"documentation": {
"kind": "markdown",
"value": "\n".join(doc_lines),
},
})
return items