215 lines
7.1 KiB
Python
215 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
sync_principals_catalog.py — Build principals_catalog.json by scanning .bicep files
|
|
for array param values (GUIDs) and their inline comments.
|
|
|
|
Scans configured IaC repo directories for patterns like:
|
|
|
|
additionalAccess: ['c88bf29d-...'] // LRIADMPRO-IaC-Bicep
|
|
additionalAccess: [
|
|
'c88bf29d-...' // LRIADMPRO-IaC-Bicep
|
|
'another-guid' // Another-SP
|
|
]
|
|
|
|
Usage:
|
|
python3 scripts/sync_principals_catalog.py
|
|
python3 scripts/sync_principals_catalog.py --paths ~/IdeaProjects/Bitbucket/IaC
|
|
python3 scripts/sync_principals_catalog.py --dry-run
|
|
python3 scripts/sync_principals_catalog.py --output /path/to/principals_catalog.json
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import pathlib
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
|
log = logging.getLogger(__name__)
|
|
|
|
_REPO_ROOT = pathlib.Path(__file__).parent.parent
|
|
_DEFAULT_OUTPUT = _REPO_ROOT / "principals_catalog.json"
|
|
|
|
# Default paths to scan — adjust to match your IaC repo locations
|
|
_DEFAULT_SCAN_PATHS = [
|
|
"~/IdeaProjects/Bitbucket/IaC",
|
|
"~/IdeaProjects/Bitbucket/LRU",
|
|
]
|
|
|
|
# Matches a UUID/GUID
|
|
_GUID_RE = re.compile(
|
|
r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"
|
|
)
|
|
|
|
# Matches a single quoted GUID optionally followed by an inline comment:
|
|
# 'c88bf29d-...' // Some label text
|
|
# 'c88bf29d-...' // or with hash comments
|
|
_ITEM_RE = re.compile(
|
|
r"'(" + _GUID_RE.pattern + r")'\s*(?://+\s*(.+?)\s*)?$",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Matches the opening of an array param assignment:
|
|
# additionalAccess: [ or additionalAccess: ['guid'
|
|
_ARRAY_OPEN_RE = re.compile(r"^\s*(\w+)\s*:\s*\[")
|
|
|
|
|
|
def _extract_label(comment: str | None) -> str | None:
|
|
"""Clean up an inline comment to use as a display label."""
|
|
if not comment:
|
|
return None
|
|
# Strip trailing punctuation and whitespace
|
|
return comment.strip().rstrip(".,;")
|
|
|
|
|
|
def scan_file(path: pathlib.Path) -> dict[str, list[dict[str, Any]]]:
|
|
"""Scan a single .bicep file and return {param_name: [{id, label, source}]}."""
|
|
try:
|
|
text = path.read_text(encoding="utf-8")
|
|
except Exception as exc:
|
|
log.debug("Cannot read %s: %s", path, exc)
|
|
return {}
|
|
|
|
lines = text.splitlines()
|
|
results: dict[str, list[dict[str, Any]]] = {}
|
|
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
array_m = _ARRAY_OPEN_RE.match(line)
|
|
if not array_m:
|
|
i += 1
|
|
continue
|
|
|
|
param_name = array_m.group(1)
|
|
# Collect all characters on this and subsequent lines until array closes
|
|
collected = line[array_m.end() - 1:] # from '[' onwards
|
|
j = i + 1
|
|
|
|
# If the array doesn't close on the same line, keep accumulating
|
|
while collected.count("[") > collected.count("]") and j < len(lines):
|
|
collected += "\n" + lines[j]
|
|
j += 1
|
|
|
|
# Extract all GUID items from the collected block
|
|
for item_line in collected.splitlines():
|
|
m = _ITEM_RE.search(item_line)
|
|
if not m:
|
|
continue
|
|
guid = m.group(1).lower()
|
|
label = _extract_label(m.group(2))
|
|
entry: dict[str, Any] = {
|
|
"id": guid,
|
|
"label": label or guid,
|
|
"source": str(path),
|
|
}
|
|
if label:
|
|
entry["description"] = f"From {path.name}"
|
|
results.setdefault(param_name, [])
|
|
results[param_name].append(entry)
|
|
|
|
i = j
|
|
|
|
return results
|
|
|
|
|
|
def scan_paths(paths: list[pathlib.Path]) -> dict[str, list[dict[str, Any]]]:
|
|
"""Scan all .bicep files under the given paths, deduplicating GUIDs per param."""
|
|
# param_name → {guid → entry} (dict for dedup)
|
|
merged: dict[str, dict[str, dict[str, Any]]] = {}
|
|
files_scanned = 0
|
|
|
|
for base in paths:
|
|
if not base.exists():
|
|
log.warning("Path not found, skipping: %s", base)
|
|
continue
|
|
for bicep_file in sorted(base.rglob("*.bicep")):
|
|
file_results = scan_file(bicep_file)
|
|
files_scanned += 1
|
|
for param, entries in file_results.items():
|
|
bucket = merged.setdefault(param, {})
|
|
for entry in entries:
|
|
guid = entry["id"]
|
|
if guid not in bucket:
|
|
bucket[guid] = entry
|
|
else:
|
|
# Keep the entry with the most informative label
|
|
existing = bucket[guid]
|
|
if entry.get("label") and entry["label"] != guid:
|
|
if existing.get("label") == guid or not existing.get("label"):
|
|
bucket[guid] = entry
|
|
|
|
log.info("Scanned %d .bicep files across %d path(s)", files_scanned, len(paths))
|
|
|
|
# Flatten back to lists, sorted by label
|
|
return {
|
|
param: sorted(entries.values(), key=lambda e: e.get("label", e["id"]).lower())
|
|
for param, entries in sorted(merged.items())
|
|
}
|
|
|
|
|
|
def build_catalog(scan_paths_list: list[pathlib.Path]) -> dict[str, Any]:
|
|
params = scan_paths(scan_paths_list)
|
|
total = sum(len(v) for v in params.values())
|
|
log.info("Found %d unique entries across %d param(s)", total, len(params))
|
|
return {
|
|
"synced_at": datetime.now(timezone.utc).isoformat(),
|
|
"entry_count": total,
|
|
"params": params,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
parser.add_argument(
|
|
"--paths",
|
|
nargs="+",
|
|
default=_DEFAULT_SCAN_PATHS,
|
|
metavar="PATH",
|
|
help="Directories to scan for .bicep files (default: %(default)s)",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Print findings without writing the catalog",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
default=str(_DEFAULT_OUTPUT),
|
|
help="Output JSON file (default: %(default)s)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
resolved = [pathlib.Path(p).expanduser().resolve() for p in args.paths]
|
|
catalog = build_catalog(resolved)
|
|
|
|
if args.dry_run:
|
|
print(f"\n── Principals catalog (dry-run) ──────────────────────")
|
|
if not catalog["params"]:
|
|
print(" No GUID values found in .bicep files.")
|
|
for param, entries in catalog["params"].items():
|
|
print(f"\n param: {param} ({len(entries)} entries)")
|
|
for e in entries:
|
|
print(f" {e['label']:<40} {e['id']}")
|
|
print(f"\n Total: {catalog['entry_count']} entries")
|
|
return
|
|
|
|
out = pathlib.Path(args.output)
|
|
# Strip internal 'source' field — not needed at runtime
|
|
for entries in catalog["params"].values():
|
|
for e in entries:
|
|
e.pop("source", None)
|
|
|
|
out.write_text(json.dumps(catalog, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
log.info("Written: %s (%d entries)", out, catalog["entry_count"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|