Compare commits
1 Commits
main
...
backup/mac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0025043999 |
@@ -12,6 +12,10 @@ RUN apt-get update && apt-get install -y --no-install-recommends curl unzip ca-c
|
|||||||
COPY scripts/download_bicep_ls.sh /scripts/
|
COPY scripts/download_bicep_ls.sh /scripts/
|
||||||
RUN chmod +x /scripts/download_bicep_ls.sh && BICEP_VERSION=${BICEP_VERSION} /scripts/download_bicep_ls.sh
|
RUN chmod +x /scripts/download_bicep_ls.sh && BICEP_VERSION=${BICEP_VERSION} /scripts/download_bicep_ls.sh
|
||||||
|
|
||||||
|
# Download Azure DevOps pipeline schema for YAML task completions
|
||||||
|
RUN curl -f -o /azdo-pipeline-schema.json \
|
||||||
|
"https://raw.githubusercontent.com/microsoft/azure-pipelines-vscode/main/service-schema.json"
|
||||||
|
|
||||||
|
|
||||||
# ── Stage 2: Python wheel build ───────────────────────────────────────────────
|
# ── Stage 2: Python wheel build ───────────────────────────────────────────────
|
||||||
FROM python:3.12-slim AS builder
|
FROM python:3.12-slim AS builder
|
||||||
@@ -41,6 +45,7 @@ RUN apt-get update \
|
|||||||
|
|
||||||
# Copy Bicep Language Server (baked in at build time — no volume needed)
|
# Copy Bicep Language Server (baked in at build time — no volume needed)
|
||||||
COPY --from=bicep-downloader /opt/bicep-langserver /opt/bicep-langserver
|
COPY --from=bicep-downloader /opt/bicep-langserver /opt/bicep-langserver
|
||||||
|
COPY --from=bicep-downloader /azdo-pipeline-schema.json /azdo-pipeline-schema.json
|
||||||
|
|
||||||
# Install Python package and dependencies
|
# Install Python package and dependencies
|
||||||
COPY --from=builder /dist/*.whl /tmp/
|
COPY --from=builder /dist/*.whl /tmp/
|
||||||
|
|||||||
@@ -36,6 +36,34 @@ _IAC_SOURCE_PATHS = [
|
|||||||
pathlib.Path(__file__).parent.parent.parent / "iac_source_catalog.json", # dev
|
pathlib.Path(__file__).parent.parent.parent / "iac_source_catalog.json", # dev
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Principals catalog — known object IDs for array params (e.g. additionalAccess)
|
||||||
|
# Format: {"params": {"additionalAccess": [{"id": "<guid>", "label": "...", "description": "..."}]}}
|
||||||
|
_PRINCIPALS_CATALOG_PATHS = [
|
||||||
|
pathlib.Path("/data/principals_catalog.json"), # volume-mount (freshest)
|
||||||
|
pathlib.Path("/principals_catalog.json"), # baked into image (fallback)
|
||||||
|
pathlib.Path(__file__).parent.parent.parent / "principals_catalog.json", # dev
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _load_principals_catalog() -> dict[str, list[dict[str, Any]]]:
|
||||||
|
"""Load principals catalog for array param completions.
|
||||||
|
|
||||||
|
Returns dict keyed by param name (e.g. 'additionalAccess') → list of
|
||||||
|
{id, label, description} entries.
|
||||||
|
"""
|
||||||
|
for path in _PRINCIPALS_CATALOG_PATHS:
|
||||||
|
if path.exists():
|
||||||
|
try:
|
||||||
|
data = json.loads(path.read_text())
|
||||||
|
params = data.get("params", {})
|
||||||
|
count = sum(len(v) for v in params.values())
|
||||||
|
logger.info("Principals catalog loaded from %s: %d entries", path, count)
|
||||||
|
return params
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to parse principals catalog at %s", path)
|
||||||
|
logger.debug("No principals_catalog.json found — array param completions unavailable")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
def _load_iac_source_catalog() -> dict[str, dict[str, Any]]:
|
def _load_iac_source_catalog() -> dict[str, dict[str, Any]]:
|
||||||
"""Load IAC source catalog for enriched param descriptions.
|
"""Load IAC source catalog for enriched param descriptions.
|
||||||
@@ -94,12 +122,14 @@ class BicepModuleCatalog:
|
|||||||
|
|
||||||
_modules: list[dict[str, Any]] = []
|
_modules: list[dict[str, Any]] = []
|
||||||
_iac: dict[str, dict[str, Any]] = {} # module name → IAC source info
|
_iac: dict[str, dict[str, Any]] = {} # module name → IAC source info
|
||||||
|
_principals: dict[str, list[dict[str, Any]]] = {} # param name → [{id, label, description}]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(cls) -> None:
|
def load(cls) -> None:
|
||||||
"""Load both catalogs from disk. Call once at startup."""
|
"""Load all catalogs from disk. Call once at startup."""
|
||||||
cls._modules = _load_catalog()
|
cls._modules = _load_catalog()
|
||||||
cls._iac = _load_iac_source_catalog()
|
cls._iac = _load_iac_source_catalog()
|
||||||
|
cls._principals = _load_principals_catalog()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _iac_param_map(cls, module_name: str) -> dict[str, dict[str, Any]]:
|
def _iac_param_map(cls, module_name: str) -> dict[str, dict[str, Any]]:
|
||||||
@@ -246,6 +276,44 @@ class BicepModuleCatalog:
|
|||||||
})
|
})
|
||||||
return items
|
return items
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def param_array_item_completion_items(
|
||||||
|
cls,
|
||||||
|
module_name: str,
|
||||||
|
version: str,
|
||||||
|
param_name: str,
|
||||||
|
has_open_quote: bool = False,
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""Completions for items inside an array param (e.g. additionalAccess objectIds).
|
||||||
|
|
||||||
|
Looks up known entries from the principals catalog keyed by param name.
|
||||||
|
"""
|
||||||
|
entries = cls._principals.get(param_name, [])
|
||||||
|
if not entries:
|
||||||
|
return []
|
||||||
|
|
||||||
|
items = []
|
||||||
|
for i, entry in enumerate(entries):
|
||||||
|
val = entry["id"]
|
||||||
|
label = entry.get("label", val)
|
||||||
|
description = entry.get("description", "")
|
||||||
|
insert = f"{val}'" if has_open_quote else f"'{val}'"
|
||||||
|
doc = f"**{label}**\n\n`{val}`"
|
||||||
|
if description:
|
||||||
|
doc += f"\n\n{description}"
|
||||||
|
items.append({
|
||||||
|
"label": label,
|
||||||
|
"kind": 12, # Value
|
||||||
|
"detail": val, # GUID shown as detail
|
||||||
|
"insertText": insert,
|
||||||
|
"sortText": f"0_lru_arr_{i:03d}_{label}",
|
||||||
|
"documentation": {
|
||||||
|
"kind": "markdown",
|
||||||
|
"value": doc,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return items
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def param_completion_items(cls, module_name: str, version: str) -> list[dict[str, Any]]:
|
def param_completion_items(cls, module_name: str, version: str) -> list[dict[str, Any]]:
|
||||||
"""Param completions for a specific module+version combination."""
|
"""Param completions for a specific module+version combination."""
|
||||||
|
|||||||
@@ -130,6 +130,19 @@ class _ProxySession:
|
|||||||
mod_name = last_mod.group(1)
|
mod_name = last_mod.group(1)
|
||||||
mod_ver = last_mod.group(2)
|
mod_ver = last_mod.group(2)
|
||||||
|
|
||||||
|
# Array item context: cursor inside [...] for an array param.
|
||||||
|
# Must be checked BEFORE value_m, since array lines also match
|
||||||
|
# the value pattern (e.g. `additionalAccess: ['`).
|
||||||
|
array_m = re.search(r"^\s*(\w+):\s*\[", current)
|
||||||
|
if array_m and current.count("[") > current.count("]"):
|
||||||
|
return {
|
||||||
|
"type": "param_array_item",
|
||||||
|
"module": mod_name,
|
||||||
|
"version": mod_ver,
|
||||||
|
"param": array_m.group(1),
|
||||||
|
"has_open_quote": bool(re.search(r"'[^']*$", current)),
|
||||||
|
}
|
||||||
|
|
||||||
# Check if cursor is after 'paramname: ' on the current line
|
# Check if cursor is after 'paramname: ' on the current line
|
||||||
# (value context — inject enum/allowed values)
|
# (value context — inject enum/allowed values)
|
||||||
value_m = re.search(r"^\s*(\w+):\s*('?)([^'{}]*)$", current)
|
value_m = re.search(r"^\s*(\w+):\s*('?)([^'{}]*)$", current)
|
||||||
@@ -197,11 +210,18 @@ def _inject_completions(msg: dict[str, Any], context: dict | None = None) -> byt
|
|||||||
context["param"],
|
context["param"],
|
||||||
context.get("has_open_quote", False),
|
context.get("has_open_quote", False),
|
||||||
)
|
)
|
||||||
|
elif ctx_type == "param_array_item":
|
||||||
|
lru_items = BicepModuleCatalog.param_array_item_completion_items(
|
||||||
|
context["module"],
|
||||||
|
context["version"],
|
||||||
|
context["param"],
|
||||||
|
context.get("has_open_quote", False),
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# Default: module name completions
|
# Default: module name completions
|
||||||
lru_items = BicepModuleCatalog.as_completion_items()
|
lru_items = BicepModuleCatalog.as_completion_items()
|
||||||
|
|
||||||
if ctx_type in ("version", "param", "param_value"):
|
if ctx_type in ("version", "param", "param_value", "param_array_item"):
|
||||||
# Always replace LS completions for private-registry contexts — the
|
# Always replace LS completions for private-registry contexts — the
|
||||||
# Bicep LS doesn't know about our ACR, so anything it returns is noise.
|
# Bicep LS doesn't know about our ACR, so anything it returns is noise.
|
||||||
# Even if lru_items is empty (no enum values for a param), suppress LS.
|
# Even if lru_items is empty (no enum values for a param), suppress LS.
|
||||||
|
|||||||
11
principals_catalog.json
Normal file
11
principals_catalog.json
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"params": {
|
||||||
|
"additionalAccess": [
|
||||||
|
{
|
||||||
|
"id": "c88bf29d-b13a-4153-9738-8995085a451e",
|
||||||
|
"label": "LRIADMPRO-IaC-Bicep",
|
||||||
|
"description": "IaC Bicep pipeline service principal"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -31,6 +31,7 @@ BICEP_CATALOG="$DEVOPS_MCP_REPO/bicep_modules_catalog.json"
|
|||||||
IAC_CATALOG="$DEVOPS_MCP_REPO/iac_source_catalog.json"
|
IAC_CATALOG="$DEVOPS_MCP_REPO/iac_source_catalog.json"
|
||||||
ILSP_REPO="$(cd "$(dirname "$0")/.." && pwd)"
|
ILSP_REPO="$(cd "$(dirname "$0")/.." && pwd)"
|
||||||
TMPL_CATALOG="$ILSP_REPO/pipeline_templates_catalog.json"
|
TMPL_CATALOG="$ILSP_REPO/pipeline_templates_catalog.json"
|
||||||
|
PRINCIPALS_CATALOG="$ILSP_REPO/principals_catalog.json"
|
||||||
|
|
||||||
echo "── iLSP catalog push ──────────────────────────────"
|
echo "── iLSP catalog push ──────────────────────────────"
|
||||||
|
|
||||||
@@ -50,6 +51,13 @@ if [[ ! -f "$TMPL_CATALOG" ]]; then
|
|||||||
fi
|
fi
|
||||||
echo " ✓ $(basename "$TMPL_CATALOG") ($(du -sh "$TMPL_CATALOG" | cut -f1))"
|
echo " ✓ $(basename "$TMPL_CATALOG") ($(du -sh "$TMPL_CATALOG" | cut -f1))"
|
||||||
|
|
||||||
|
if [[ ! -f "$PRINCIPALS_CATALOG" ]]; then
|
||||||
|
echo " ✗ Not found: $PRINCIPALS_CATALOG"
|
||||||
|
echo " Run: python3 $ILSP_REPO/scripts/sync_principals_catalog.py"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo " ✓ $(basename "$PRINCIPALS_CATALOG") ($(du -sh "$PRINCIPALS_CATALOG" | cut -f1))"
|
||||||
|
|
||||||
# Copy iac_source_catalog.json to iLSP repo root so it gets baked into the Docker image
|
# Copy iac_source_catalog.json to iLSP repo root so it gets baked into the Docker image
|
||||||
echo ""
|
echo ""
|
||||||
echo " → Copying iac_source_catalog.json to iLSP repo root (for Docker bake) …"
|
echo " → Copying iac_source_catalog.json to iLSP repo root (for Docker bake) …"
|
||||||
@@ -59,7 +67,7 @@ echo " ✓ Copied to $ILSP_REPO/iac_source_catalog.json"
|
|||||||
echo ""
|
echo ""
|
||||||
echo " → Copying to $AUTOBOX:$REMOTE_DIR/ …"
|
echo " → Copying to $AUTOBOX:$REMOTE_DIR/ …"
|
||||||
ssh "$AUTOBOX" "mkdir -p $REMOTE_DIR"
|
ssh "$AUTOBOX" "mkdir -p $REMOTE_DIR"
|
||||||
scp "$BICEP_CATALOG" "$IAC_CATALOG" "$TMPL_CATALOG" "$AUTOBOX:$REMOTE_DIR/"
|
scp "$BICEP_CATALOG" "$IAC_CATALOG" "$TMPL_CATALOG" "$PRINCIPALS_CATALOG" "$AUTOBOX:$REMOTE_DIR/"
|
||||||
echo " ✓ Upload done"
|
echo " ✓ Upload done"
|
||||||
|
|
||||||
if [[ "$NO_RELOAD" == "true" ]]; then
|
if [[ "$NO_RELOAD" == "true" ]]; then
|
||||||
@@ -78,5 +86,5 @@ else
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
echo ""
|
echo ""
|
||||||
echo " Done. Bicep + YAML pipeline template completions updated."
|
echo " Done. Bicep + YAML pipeline template + principals completions updated."
|
||||||
echo " Note: iac_source_catalog.json was copied to repo root — commit + push to bake into next Docker image."
|
echo " Note: iac_source_catalog.json was copied to repo root — commit + push to bake into next Docker image."
|
||||||
|
|||||||
214
scripts/sync_principals_catalog.py
Normal file
214
scripts/sync_principals_catalog.py
Normal file
@@ -0,0 +1,214 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
sync_principals_catalog.py — Build principals_catalog.json by scanning .bicep files
|
||||||
|
for array param values (GUIDs) and their inline comments.
|
||||||
|
|
||||||
|
Scans configured IaC repo directories for patterns like:
|
||||||
|
|
||||||
|
additionalAccess: ['c88bf29d-...'] // LRIADMPRO-IaC-Bicep
|
||||||
|
additionalAccess: [
|
||||||
|
'c88bf29d-...' // LRIADMPRO-IaC-Bicep
|
||||||
|
'another-guid' // Another-SP
|
||||||
|
]
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 scripts/sync_principals_catalog.py
|
||||||
|
python3 scripts/sync_principals_catalog.py --paths ~/IdeaProjects/Bitbucket/IaC
|
||||||
|
python3 scripts/sync_principals_catalog.py --dry-run
|
||||||
|
python3 scripts/sync_principals_catalog.py --output /path/to/principals_catalog.json
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import pathlib
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_REPO_ROOT = pathlib.Path(__file__).parent.parent
|
||||||
|
_DEFAULT_OUTPUT = _REPO_ROOT / "principals_catalog.json"
|
||||||
|
|
||||||
|
# Default paths to scan — adjust to match your IaC repo locations
|
||||||
|
_DEFAULT_SCAN_PATHS = [
|
||||||
|
"~/IdeaProjects/Bitbucket/IaC",
|
||||||
|
"~/IdeaProjects/Bitbucket/LRU",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Matches a UUID/GUID
|
||||||
|
_GUID_RE = re.compile(
|
||||||
|
r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Matches a single quoted GUID optionally followed by an inline comment:
|
||||||
|
# 'c88bf29d-...' // Some label text
|
||||||
|
# 'c88bf29d-...' // or with hash comments
|
||||||
|
_ITEM_RE = re.compile(
|
||||||
|
r"'(" + _GUID_RE.pattern + r")'\s*(?://+\s*(.+?)\s*)?$",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Matches the opening of an array param assignment:
|
||||||
|
# additionalAccess: [ or additionalAccess: ['guid'
|
||||||
|
_ARRAY_OPEN_RE = re.compile(r"^\s*(\w+)\s*:\s*\[")
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_label(comment: str | None) -> str | None:
|
||||||
|
"""Clean up an inline comment to use as a display label."""
|
||||||
|
if not comment:
|
||||||
|
return None
|
||||||
|
# Strip trailing punctuation and whitespace
|
||||||
|
return comment.strip().rstrip(".,;")
|
||||||
|
|
||||||
|
|
||||||
|
def scan_file(path: pathlib.Path) -> dict[str, list[dict[str, Any]]]:
|
||||||
|
"""Scan a single .bicep file and return {param_name: [{id, label, source}]}."""
|
||||||
|
try:
|
||||||
|
text = path.read_text(encoding="utf-8")
|
||||||
|
except Exception as exc:
|
||||||
|
log.debug("Cannot read %s: %s", path, exc)
|
||||||
|
return {}
|
||||||
|
|
||||||
|
lines = text.splitlines()
|
||||||
|
results: dict[str, list[dict[str, Any]]] = {}
|
||||||
|
|
||||||
|
i = 0
|
||||||
|
while i < len(lines):
|
||||||
|
line = lines[i]
|
||||||
|
array_m = _ARRAY_OPEN_RE.match(line)
|
||||||
|
if not array_m:
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
param_name = array_m.group(1)
|
||||||
|
# Collect all characters on this and subsequent lines until array closes
|
||||||
|
collected = line[array_m.end() - 1:] # from '[' onwards
|
||||||
|
j = i + 1
|
||||||
|
|
||||||
|
# If the array doesn't close on the same line, keep accumulating
|
||||||
|
while collected.count("[") > collected.count("]") and j < len(lines):
|
||||||
|
collected += "\n" + lines[j]
|
||||||
|
j += 1
|
||||||
|
|
||||||
|
# Extract all GUID items from the collected block
|
||||||
|
for item_line in collected.splitlines():
|
||||||
|
m = _ITEM_RE.search(item_line)
|
||||||
|
if not m:
|
||||||
|
continue
|
||||||
|
guid = m.group(1).lower()
|
||||||
|
label = _extract_label(m.group(2))
|
||||||
|
entry: dict[str, Any] = {
|
||||||
|
"id": guid,
|
||||||
|
"label": label or guid,
|
||||||
|
"source": str(path),
|
||||||
|
}
|
||||||
|
if label:
|
||||||
|
entry["description"] = f"From {path.name}"
|
||||||
|
results.setdefault(param_name, [])
|
||||||
|
results[param_name].append(entry)
|
||||||
|
|
||||||
|
i = j
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def scan_paths(paths: list[pathlib.Path]) -> dict[str, list[dict[str, Any]]]:
|
||||||
|
"""Scan all .bicep files under the given paths, deduplicating GUIDs per param."""
|
||||||
|
# param_name → {guid → entry} (dict for dedup)
|
||||||
|
merged: dict[str, dict[str, dict[str, Any]]] = {}
|
||||||
|
files_scanned = 0
|
||||||
|
|
||||||
|
for base in paths:
|
||||||
|
if not base.exists():
|
||||||
|
log.warning("Path not found, skipping: %s", base)
|
||||||
|
continue
|
||||||
|
for bicep_file in sorted(base.rglob("*.bicep")):
|
||||||
|
file_results = scan_file(bicep_file)
|
||||||
|
files_scanned += 1
|
||||||
|
for param, entries in file_results.items():
|
||||||
|
bucket = merged.setdefault(param, {})
|
||||||
|
for entry in entries:
|
||||||
|
guid = entry["id"]
|
||||||
|
if guid not in bucket:
|
||||||
|
bucket[guid] = entry
|
||||||
|
else:
|
||||||
|
# Keep the entry with the most informative label
|
||||||
|
existing = bucket[guid]
|
||||||
|
if entry.get("label") and entry["label"] != guid:
|
||||||
|
if existing.get("label") == guid or not existing.get("label"):
|
||||||
|
bucket[guid] = entry
|
||||||
|
|
||||||
|
log.info("Scanned %d .bicep files across %d path(s)", files_scanned, len(paths))
|
||||||
|
|
||||||
|
# Flatten back to lists, sorted by label
|
||||||
|
return {
|
||||||
|
param: sorted(entries.values(), key=lambda e: e.get("label", e["id"]).lower())
|
||||||
|
for param, entries in sorted(merged.items())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def build_catalog(scan_paths_list: list[pathlib.Path]) -> dict[str, Any]:
|
||||||
|
params = scan_paths(scan_paths_list)
|
||||||
|
total = sum(len(v) for v in params.values())
|
||||||
|
log.info("Found %d unique entries across %d param(s)", total, len(params))
|
||||||
|
return {
|
||||||
|
"synced_at": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"entry_count": total,
|
||||||
|
"params": params,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description=__doc__,
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--paths",
|
||||||
|
nargs="+",
|
||||||
|
default=_DEFAULT_SCAN_PATHS,
|
||||||
|
metavar="PATH",
|
||||||
|
help="Directories to scan for .bicep files (default: %(default)s)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--dry-run",
|
||||||
|
action="store_true",
|
||||||
|
help="Print findings without writing the catalog",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--output",
|
||||||
|
default=str(_DEFAULT_OUTPUT),
|
||||||
|
help="Output JSON file (default: %(default)s)",
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
resolved = [pathlib.Path(p).expanduser().resolve() for p in args.paths]
|
||||||
|
catalog = build_catalog(resolved)
|
||||||
|
|
||||||
|
if args.dry_run:
|
||||||
|
print(f"\n── Principals catalog (dry-run) ──────────────────────")
|
||||||
|
if not catalog["params"]:
|
||||||
|
print(" No GUID values found in .bicep files.")
|
||||||
|
for param, entries in catalog["params"].items():
|
||||||
|
print(f"\n param: {param} ({len(entries)} entries)")
|
||||||
|
for e in entries:
|
||||||
|
print(f" {e['label']:<40} {e['id']}")
|
||||||
|
print(f"\n Total: {catalog['entry_count']} entries")
|
||||||
|
return
|
||||||
|
|
||||||
|
out = pathlib.Path(args.output)
|
||||||
|
# Strip internal 'source' field — not needed at runtime
|
||||||
|
for entries in catalog["params"].values():
|
||||||
|
for e in entries:
|
||||||
|
e.pop("source", None)
|
||||||
|
|
||||||
|
out.write_text(json.dumps(catalog, indent=2, ensure_ascii=False), encoding="utf-8")
|
||||||
|
log.info("Written: %s (%d entries)", out, catalog["entry_count"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -18,6 +18,7 @@ def test_frame_produces_correct_header():
|
|||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def reset_modules():
|
def reset_modules():
|
||||||
BicepModuleCatalog._modules = []
|
BicepModuleCatalog._modules = []
|
||||||
|
BicepModuleCatalog._principals = {}
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
||||||
@@ -386,6 +387,111 @@ def test_param_value_injected_in_completion_response():
|
|||||||
assert labels == ["DEV", "TEST", "PROD"]
|
assert labels == ["DEV", "TEST", "PROD"]
|
||||||
|
|
||||||
|
|
||||||
|
# ── Array item completion tests ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_detect_param_array_item_context():
|
||||||
|
"""Cursor inside [...] for an array param → param_array_item context."""
|
||||||
|
lines = [
|
||||||
|
"module keyVault 'br/modules:modules/keyvault:2.1.x' = {",
|
||||||
|
" params: {",
|
||||||
|
" additionalAccess: ['", # ← cursor after opening quote inside array
|
||||||
|
" }",
|
||||||
|
"}",
|
||||||
|
]
|
||||||
|
session = _make_session_with_doc(URI, lines)
|
||||||
|
# character 24 = after the `'` (4 spaces + 16 "additionalAccess" + 2 ": " + 1 "[" + 1 "'" = 24)
|
||||||
|
ctx = session._detect_context(URI, {"line": 2, "character": 24})
|
||||||
|
assert ctx["type"] == "param_array_item"
|
||||||
|
assert ctx["module"] == "modules/keyvault"
|
||||||
|
assert ctx["param"] == "additionalAccess"
|
||||||
|
assert ctx["has_open_quote"] is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_detect_param_array_item_context_no_quote():
|
||||||
|
"""Cursor at start of empty array → param_array_item, no open quote."""
|
||||||
|
lines = [
|
||||||
|
"module keyVault 'br/modules:modules/keyvault:2.1.x' = {",
|
||||||
|
" params: {",
|
||||||
|
" additionalAccess: [", # ← cursor after [
|
||||||
|
" }",
|
||||||
|
"}",
|
||||||
|
]
|
||||||
|
session = _make_session_with_doc(URI, lines)
|
||||||
|
ctx = session._detect_context(URI, {"line": 2, "character": 23})
|
||||||
|
assert ctx["type"] == "param_array_item"
|
||||||
|
assert ctx["has_open_quote"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_detect_param_array_item_context_after_existing_item():
|
||||||
|
"""Cursor after an existing item inside array → still param_array_item."""
|
||||||
|
lines = [
|
||||||
|
"module keyVault 'br/modules:modules/keyvault:2.1.x' = {",
|
||||||
|
" params: {",
|
||||||
|
" additionalAccess: ['c88bf29d-b13a-4153-9738-8995085a451e', '",
|
||||||
|
" }",
|
||||||
|
"}",
|
||||||
|
]
|
||||||
|
session = _make_session_with_doc(URI, lines)
|
||||||
|
char = len(" additionalAccess: ['c88bf29d-b13a-4153-9738-8995085a451e', '")
|
||||||
|
ctx = session._detect_context(URI, {"line": 2, "character": char})
|
||||||
|
assert ctx["type"] == "param_array_item"
|
||||||
|
assert ctx["has_open_quote"] is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_param_array_item_completion_items():
|
||||||
|
"""param_array_item_completion_items returns entries from principals catalog."""
|
||||||
|
BicepModuleCatalog._principals = {
|
||||||
|
"additionalAccess": [
|
||||||
|
{"id": "aaaa-bbbb", "label": "My-SP", "description": "Test SP"},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
items = BicepModuleCatalog.param_array_item_completion_items(
|
||||||
|
"modules/keyvault", "2.1.x", "additionalAccess"
|
||||||
|
)
|
||||||
|
assert len(items) == 1
|
||||||
|
assert items[0]["label"] == "My-SP"
|
||||||
|
assert items[0]["detail"] == "aaaa-bbbb"
|
||||||
|
assert items[0]["insertText"] == "'aaaa-bbbb'"
|
||||||
|
|
||||||
|
|
||||||
|
def test_param_array_item_completion_items_open_quote():
|
||||||
|
BicepModuleCatalog._principals = {
|
||||||
|
"additionalAccess": [{"id": "aaaa-bbbb", "label": "My-SP"}]
|
||||||
|
}
|
||||||
|
items = BicepModuleCatalog.param_array_item_completion_items(
|
||||||
|
"modules/keyvault", "2.1.x", "additionalAccess", has_open_quote=True
|
||||||
|
)
|
||||||
|
assert items[0]["insertText"] == "aaaa-bbbb'"
|
||||||
|
|
||||||
|
|
||||||
|
def test_param_array_item_empty_when_no_catalog_entry():
|
||||||
|
BicepModuleCatalog._principals = {}
|
||||||
|
items = BicepModuleCatalog.param_array_item_completion_items(
|
||||||
|
"modules/keyvault", "2.1.x", "additionalAccess"
|
||||||
|
)
|
||||||
|
assert items == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_param_array_item_injected_in_completion_response():
|
||||||
|
"""Full pipeline: param_array_item context injects principal completions."""
|
||||||
|
BicepModuleCatalog._principals = {
|
||||||
|
"additionalAccess": [{"id": "aaaa-1111", "label": "IaC-SP", "description": "Pipeline SP"}]
|
||||||
|
}
|
||||||
|
msg = _completion_response([{"label": "noise", "sortText": "z"}])
|
||||||
|
ctx = {
|
||||||
|
"type": "param_array_item",
|
||||||
|
"module": "modules/keyvault",
|
||||||
|
"version": "2.1.x",
|
||||||
|
"param": "additionalAccess",
|
||||||
|
"has_open_quote": False,
|
||||||
|
}
|
||||||
|
out = json.loads(_inject_completions(msg, ctx))
|
||||||
|
items = out["result"]["items"]
|
||||||
|
assert len(items) == 1
|
||||||
|
assert items[0]["label"] == "IaC-SP"
|
||||||
|
assert items[0]["detail"] == "aaaa-1111"
|
||||||
|
|
||||||
|
|
||||||
def test_detect_unknown_context_outside_module():
|
def test_detect_unknown_context_outside_module():
|
||||||
lines = ["var x = 'hello'"]
|
lines = ["var x = 'hello'"]
|
||||||
session = _make_session_with_doc(URI, lines)
|
session = _make_session_with_doc(URI, lines)
|
||||||
|
|||||||
Reference in New Issue
Block a user