Compare commits

1 Commits

Author SHA1 Message Date
Henrik Jess Nielsen
0025043999 backup: uncommitted changes from MAC-M9FQ0900T3 2026-05-17 15:52:31 2026-05-17 15:52:31 +02:00
7 changed files with 436 additions and 4 deletions

View File

@@ -12,6 +12,10 @@ RUN apt-get update && apt-get install -y --no-install-recommends curl unzip ca-c
COPY scripts/download_bicep_ls.sh /scripts/
RUN chmod +x /scripts/download_bicep_ls.sh && BICEP_VERSION=${BICEP_VERSION} /scripts/download_bicep_ls.sh
# Download Azure DevOps pipeline schema for YAML task completions
RUN curl -f -o /azdo-pipeline-schema.json \
"https://raw.githubusercontent.com/microsoft/azure-pipelines-vscode/main/service-schema.json"
# ── Stage 2: Python wheel build ───────────────────────────────────────────────
FROM python:3.12-slim AS builder
@@ -41,6 +45,7 @@ RUN apt-get update \
# Copy Bicep Language Server (baked in at build time — no volume needed)
COPY --from=bicep-downloader /opt/bicep-langserver /opt/bicep-langserver
COPY --from=bicep-downloader /azdo-pipeline-schema.json /azdo-pipeline-schema.json
# Install Python package and dependencies
COPY --from=builder /dist/*.whl /tmp/

View File

@@ -36,6 +36,34 @@ _IAC_SOURCE_PATHS = [
pathlib.Path(__file__).parent.parent.parent / "iac_source_catalog.json", # dev
]
# Principals catalog — known object IDs for array params (e.g. additionalAccess)
# Format: {"params": {"additionalAccess": [{"id": "<guid>", "label": "...", "description": "..."}]}}
_PRINCIPALS_CATALOG_PATHS = [
pathlib.Path("/data/principals_catalog.json"), # volume-mount (freshest)
pathlib.Path("/principals_catalog.json"), # baked into image (fallback)
pathlib.Path(__file__).parent.parent.parent / "principals_catalog.json", # dev
]
def _load_principals_catalog() -> dict[str, list[dict[str, Any]]]:
"""Load principals catalog for array param completions.
Returns dict keyed by param name (e.g. 'additionalAccess') → list of
{id, label, description} entries.
"""
for path in _PRINCIPALS_CATALOG_PATHS:
if path.exists():
try:
data = json.loads(path.read_text())
params = data.get("params", {})
count = sum(len(v) for v in params.values())
logger.info("Principals catalog loaded from %s: %d entries", path, count)
return params
except Exception:
logger.exception("Failed to parse principals catalog at %s", path)
logger.debug("No principals_catalog.json found — array param completions unavailable")
return {}
def _load_iac_source_catalog() -> dict[str, dict[str, Any]]:
"""Load IAC source catalog for enriched param descriptions.
@@ -94,12 +122,14 @@ class BicepModuleCatalog:
_modules: list[dict[str, Any]] = []
_iac: dict[str, dict[str, Any]] = {} # module name → IAC source info
_principals: dict[str, list[dict[str, Any]]] = {} # param name → [{id, label, description}]
@classmethod
def load(cls) -> None:
"""Load both catalogs from disk. Call once at startup."""
"""Load all catalogs from disk. Call once at startup."""
cls._modules = _load_catalog()
cls._iac = _load_iac_source_catalog()
cls._principals = _load_principals_catalog()
@classmethod
def _iac_param_map(cls, module_name: str) -> dict[str, dict[str, Any]]:
@@ -246,6 +276,44 @@ class BicepModuleCatalog:
})
return items
@classmethod
def param_array_item_completion_items(
cls,
module_name: str,
version: str,
param_name: str,
has_open_quote: bool = False,
) -> list[dict[str, Any]]:
"""Completions for items inside an array param (e.g. additionalAccess objectIds).
Looks up known entries from the principals catalog keyed by param name.
"""
entries = cls._principals.get(param_name, [])
if not entries:
return []
items = []
for i, entry in enumerate(entries):
val = entry["id"]
label = entry.get("label", val)
description = entry.get("description", "")
insert = f"{val}'" if has_open_quote else f"'{val}'"
doc = f"**{label}**\n\n`{val}`"
if description:
doc += f"\n\n{description}"
items.append({
"label": label,
"kind": 12, # Value
"detail": val, # GUID shown as detail
"insertText": insert,
"sortText": f"0_lru_arr_{i:03d}_{label}",
"documentation": {
"kind": "markdown",
"value": doc,
},
})
return items
@classmethod
def param_completion_items(cls, module_name: str, version: str) -> list[dict[str, Any]]:
"""Param completions for a specific module+version combination."""

View File

@@ -130,6 +130,19 @@ class _ProxySession:
mod_name = last_mod.group(1)
mod_ver = last_mod.group(2)
# Array item context: cursor inside [...] for an array param.
# Must be checked BEFORE value_m, since array lines also match
# the value pattern (e.g. `additionalAccess: ['`).
array_m = re.search(r"^\s*(\w+):\s*\[", current)
if array_m and current.count("[") > current.count("]"):
return {
"type": "param_array_item",
"module": mod_name,
"version": mod_ver,
"param": array_m.group(1),
"has_open_quote": bool(re.search(r"'[^']*$", current)),
}
# Check if cursor is after 'paramname: ' on the current line
# (value context — inject enum/allowed values)
value_m = re.search(r"^\s*(\w+):\s*('?)([^'{}]*)$", current)
@@ -197,11 +210,18 @@ def _inject_completions(msg: dict[str, Any], context: dict | None = None) -> byt
context["param"],
context.get("has_open_quote", False),
)
elif ctx_type == "param_array_item":
lru_items = BicepModuleCatalog.param_array_item_completion_items(
context["module"],
context["version"],
context["param"],
context.get("has_open_quote", False),
)
else:
# Default: module name completions
lru_items = BicepModuleCatalog.as_completion_items()
if ctx_type in ("version", "param", "param_value"):
if ctx_type in ("version", "param", "param_value", "param_array_item"):
# Always replace LS completions for private-registry contexts — the
# Bicep LS doesn't know about our ACR, so anything it returns is noise.
# Even if lru_items is empty (no enum values for a param), suppress LS.

11
principals_catalog.json Normal file
View File

@@ -0,0 +1,11 @@
{
"params": {
"additionalAccess": [
{
"id": "c88bf29d-b13a-4153-9738-8995085a451e",
"label": "LRIADMPRO-IaC-Bicep",
"description": "IaC Bicep pipeline service principal"
}
]
}
}

View File

@@ -31,6 +31,7 @@ BICEP_CATALOG="$DEVOPS_MCP_REPO/bicep_modules_catalog.json"
IAC_CATALOG="$DEVOPS_MCP_REPO/iac_source_catalog.json"
ILSP_REPO="$(cd "$(dirname "$0")/.." && pwd)"
TMPL_CATALOG="$ILSP_REPO/pipeline_templates_catalog.json"
PRINCIPALS_CATALOG="$ILSP_REPO/principals_catalog.json"
echo "── iLSP catalog push ──────────────────────────────"
@@ -50,6 +51,13 @@ if [[ ! -f "$TMPL_CATALOG" ]]; then
fi
echo "$(basename "$TMPL_CATALOG") ($(du -sh "$TMPL_CATALOG" | cut -f1))"
if [[ ! -f "$PRINCIPALS_CATALOG" ]]; then
echo " ✗ Not found: $PRINCIPALS_CATALOG"
echo " Run: python3 $ILSP_REPO/scripts/sync_principals_catalog.py"
exit 1
fi
echo "$(basename "$PRINCIPALS_CATALOG") ($(du -sh "$PRINCIPALS_CATALOG" | cut -f1))"
# Copy iac_source_catalog.json to iLSP repo root so it gets baked into the Docker image
echo ""
echo " → Copying iac_source_catalog.json to iLSP repo root (for Docker bake) …"
@@ -59,7 +67,7 @@ echo " ✓ Copied to $ILSP_REPO/iac_source_catalog.json"
echo ""
echo " → Copying to $AUTOBOX:$REMOTE_DIR/ …"
ssh "$AUTOBOX" "mkdir -p $REMOTE_DIR"
scp "$BICEP_CATALOG" "$IAC_CATALOG" "$TMPL_CATALOG" "$AUTOBOX:$REMOTE_DIR/"
scp "$BICEP_CATALOG" "$IAC_CATALOG" "$TMPL_CATALOG" "$PRINCIPALS_CATALOG" "$AUTOBOX:$REMOTE_DIR/"
echo " ✓ Upload done"
if [[ "$NO_RELOAD" == "true" ]]; then
@@ -78,5 +86,5 @@ else
fi
echo ""
echo " Done. Bicep + YAML pipeline template completions updated."
echo " Done. Bicep + YAML pipeline template + principals completions updated."
echo " Note: iac_source_catalog.json was copied to repo root — commit + push to bake into next Docker image."

View File

@@ -0,0 +1,214 @@
#!/usr/bin/env python3
"""
sync_principals_catalog.py — Build principals_catalog.json by scanning .bicep files
for array param values (GUIDs) and their inline comments.
Scans configured IaC repo directories for patterns like:
additionalAccess: ['c88bf29d-...'] // LRIADMPRO-IaC-Bicep
additionalAccess: [
'c88bf29d-...' // LRIADMPRO-IaC-Bicep
'another-guid' // Another-SP
]
Usage:
python3 scripts/sync_principals_catalog.py
python3 scripts/sync_principals_catalog.py --paths ~/IdeaProjects/Bitbucket/IaC
python3 scripts/sync_principals_catalog.py --dry-run
python3 scripts/sync_principals_catalog.py --output /path/to/principals_catalog.json
"""
import argparse
import json
import logging
import pathlib
import re
import sys
from datetime import datetime, timezone
from typing import Any
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
log = logging.getLogger(__name__)
_REPO_ROOT = pathlib.Path(__file__).parent.parent
_DEFAULT_OUTPUT = _REPO_ROOT / "principals_catalog.json"
# Default paths to scan — adjust to match your IaC repo locations
_DEFAULT_SCAN_PATHS = [
"~/IdeaProjects/Bitbucket/IaC",
"~/IdeaProjects/Bitbucket/LRU",
]
# Matches a UUID/GUID
_GUID_RE = re.compile(
r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"
)
# Matches a single quoted GUID optionally followed by an inline comment:
# 'c88bf29d-...' // Some label text
# 'c88bf29d-...' // or with hash comments
_ITEM_RE = re.compile(
r"'(" + _GUID_RE.pattern + r")'\s*(?://+\s*(.+?)\s*)?$",
re.IGNORECASE,
)
# Matches the opening of an array param assignment:
# additionalAccess: [ or additionalAccess: ['guid'
_ARRAY_OPEN_RE = re.compile(r"^\s*(\w+)\s*:\s*\[")
def _extract_label(comment: str | None) -> str | None:
"""Clean up an inline comment to use as a display label."""
if not comment:
return None
# Strip trailing punctuation and whitespace
return comment.strip().rstrip(".,;")
def scan_file(path: pathlib.Path) -> dict[str, list[dict[str, Any]]]:
"""Scan a single .bicep file and return {param_name: [{id, label, source}]}."""
try:
text = path.read_text(encoding="utf-8")
except Exception as exc:
log.debug("Cannot read %s: %s", path, exc)
return {}
lines = text.splitlines()
results: dict[str, list[dict[str, Any]]] = {}
i = 0
while i < len(lines):
line = lines[i]
array_m = _ARRAY_OPEN_RE.match(line)
if not array_m:
i += 1
continue
param_name = array_m.group(1)
# Collect all characters on this and subsequent lines until array closes
collected = line[array_m.end() - 1:] # from '[' onwards
j = i + 1
# If the array doesn't close on the same line, keep accumulating
while collected.count("[") > collected.count("]") and j < len(lines):
collected += "\n" + lines[j]
j += 1
# Extract all GUID items from the collected block
for item_line in collected.splitlines():
m = _ITEM_RE.search(item_line)
if not m:
continue
guid = m.group(1).lower()
label = _extract_label(m.group(2))
entry: dict[str, Any] = {
"id": guid,
"label": label or guid,
"source": str(path),
}
if label:
entry["description"] = f"From {path.name}"
results.setdefault(param_name, [])
results[param_name].append(entry)
i = j
return results
def scan_paths(paths: list[pathlib.Path]) -> dict[str, list[dict[str, Any]]]:
"""Scan all .bicep files under the given paths, deduplicating GUIDs per param."""
# param_name → {guid → entry} (dict for dedup)
merged: dict[str, dict[str, dict[str, Any]]] = {}
files_scanned = 0
for base in paths:
if not base.exists():
log.warning("Path not found, skipping: %s", base)
continue
for bicep_file in sorted(base.rglob("*.bicep")):
file_results = scan_file(bicep_file)
files_scanned += 1
for param, entries in file_results.items():
bucket = merged.setdefault(param, {})
for entry in entries:
guid = entry["id"]
if guid not in bucket:
bucket[guid] = entry
else:
# Keep the entry with the most informative label
existing = bucket[guid]
if entry.get("label") and entry["label"] != guid:
if existing.get("label") == guid or not existing.get("label"):
bucket[guid] = entry
log.info("Scanned %d .bicep files across %d path(s)", files_scanned, len(paths))
# Flatten back to lists, sorted by label
return {
param: sorted(entries.values(), key=lambda e: e.get("label", e["id"]).lower())
for param, entries in sorted(merged.items())
}
def build_catalog(scan_paths_list: list[pathlib.Path]) -> dict[str, Any]:
params = scan_paths(scan_paths_list)
total = sum(len(v) for v in params.values())
log.info("Found %d unique entries across %d param(s)", total, len(params))
return {
"synced_at": datetime.now(timezone.utc).isoformat(),
"entry_count": total,
"params": params,
}
def main() -> None:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--paths",
nargs="+",
default=_DEFAULT_SCAN_PATHS,
metavar="PATH",
help="Directories to scan for .bicep files (default: %(default)s)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print findings without writing the catalog",
)
parser.add_argument(
"--output",
default=str(_DEFAULT_OUTPUT),
help="Output JSON file (default: %(default)s)",
)
args = parser.parse_args()
resolved = [pathlib.Path(p).expanduser().resolve() for p in args.paths]
catalog = build_catalog(resolved)
if args.dry_run:
print(f"\n── Principals catalog (dry-run) ──────────────────────")
if not catalog["params"]:
print(" No GUID values found in .bicep files.")
for param, entries in catalog["params"].items():
print(f"\n param: {param} ({len(entries)} entries)")
for e in entries:
print(f" {e['label']:<40} {e['id']}")
print(f"\n Total: {catalog['entry_count']} entries")
return
out = pathlib.Path(args.output)
# Strip internal 'source' field — not needed at runtime
for entries in catalog["params"].values():
for e in entries:
e.pop("source", None)
out.write_text(json.dumps(catalog, indent=2, ensure_ascii=False), encoding="utf-8")
log.info("Written: %s (%d entries)", out, catalog["entry_count"])
if __name__ == "__main__":
main()

View File

@@ -18,6 +18,7 @@ def test_frame_produces_correct_header():
@pytest.fixture(autouse=True)
def reset_modules():
BicepModuleCatalog._modules = []
BicepModuleCatalog._principals = {}
yield
@@ -386,6 +387,111 @@ def test_param_value_injected_in_completion_response():
assert labels == ["DEV", "TEST", "PROD"]
# ── Array item completion tests ────────────────────────────────────────────────
def test_detect_param_array_item_context():
"""Cursor inside [...] for an array param → param_array_item context."""
lines = [
"module keyVault 'br/modules:modules/keyvault:2.1.x' = {",
" params: {",
" additionalAccess: ['", # ← cursor after opening quote inside array
" }",
"}",
]
session = _make_session_with_doc(URI, lines)
# character 24 = after the `'` (4 spaces + 16 "additionalAccess" + 2 ": " + 1 "[" + 1 "'" = 24)
ctx = session._detect_context(URI, {"line": 2, "character": 24})
assert ctx["type"] == "param_array_item"
assert ctx["module"] == "modules/keyvault"
assert ctx["param"] == "additionalAccess"
assert ctx["has_open_quote"] is True
def test_detect_param_array_item_context_no_quote():
"""Cursor at start of empty array → param_array_item, no open quote."""
lines = [
"module keyVault 'br/modules:modules/keyvault:2.1.x' = {",
" params: {",
" additionalAccess: [", # ← cursor after [
" }",
"}",
]
session = _make_session_with_doc(URI, lines)
ctx = session._detect_context(URI, {"line": 2, "character": 23})
assert ctx["type"] == "param_array_item"
assert ctx["has_open_quote"] is False
def test_detect_param_array_item_context_after_existing_item():
"""Cursor after an existing item inside array → still param_array_item."""
lines = [
"module keyVault 'br/modules:modules/keyvault:2.1.x' = {",
" params: {",
" additionalAccess: ['c88bf29d-b13a-4153-9738-8995085a451e', '",
" }",
"}",
]
session = _make_session_with_doc(URI, lines)
char = len(" additionalAccess: ['c88bf29d-b13a-4153-9738-8995085a451e', '")
ctx = session._detect_context(URI, {"line": 2, "character": char})
assert ctx["type"] == "param_array_item"
assert ctx["has_open_quote"] is True
def test_param_array_item_completion_items():
"""param_array_item_completion_items returns entries from principals catalog."""
BicepModuleCatalog._principals = {
"additionalAccess": [
{"id": "aaaa-bbbb", "label": "My-SP", "description": "Test SP"},
]
}
items = BicepModuleCatalog.param_array_item_completion_items(
"modules/keyvault", "2.1.x", "additionalAccess"
)
assert len(items) == 1
assert items[0]["label"] == "My-SP"
assert items[0]["detail"] == "aaaa-bbbb"
assert items[0]["insertText"] == "'aaaa-bbbb'"
def test_param_array_item_completion_items_open_quote():
BicepModuleCatalog._principals = {
"additionalAccess": [{"id": "aaaa-bbbb", "label": "My-SP"}]
}
items = BicepModuleCatalog.param_array_item_completion_items(
"modules/keyvault", "2.1.x", "additionalAccess", has_open_quote=True
)
assert items[0]["insertText"] == "aaaa-bbbb'"
def test_param_array_item_empty_when_no_catalog_entry():
BicepModuleCatalog._principals = {}
items = BicepModuleCatalog.param_array_item_completion_items(
"modules/keyvault", "2.1.x", "additionalAccess"
)
assert items == []
def test_param_array_item_injected_in_completion_response():
"""Full pipeline: param_array_item context injects principal completions."""
BicepModuleCatalog._principals = {
"additionalAccess": [{"id": "aaaa-1111", "label": "IaC-SP", "description": "Pipeline SP"}]
}
msg = _completion_response([{"label": "noise", "sortText": "z"}])
ctx = {
"type": "param_array_item",
"module": "modules/keyvault",
"version": "2.1.x",
"param": "additionalAccess",
"has_open_quote": False,
}
out = json.loads(_inject_completions(msg, ctx))
items = out["result"]["items"]
assert len(items) == 1
assert items[0]["label"] == "IaC-SP"
assert items[0]["detail"] == "aaaa-1111"
def test_detect_unknown_context_outside_module():
lines = ["var x = 'hello'"]
session = _make_session_with_doc(URI, lines)