#!/usr/bin/env python3 """ sync_pipeline_templates.py — Build pipeline_templates_catalog.json from: - AzDO template repos (parameters: list format) - GitHub Actions reusable workflows (on.workflow_call.inputs format) Usage: python3 scripts/sync_pipeline_templates.py # scan both formats python3 scripts/sync_pipeline_templates.py --mode azdo # AzDO only python3 scripts/sync_pipeline_templates.py --mode gha # GHA only python3 scripts/sync_pipeline_templates.py --dry-run # preview, no write python3 scripts/sync_pipeline_templates.py --output /path/to/catalog.json """ import argparse import json import logging import os import pathlib import sys from datetime import datetime, timezone from typing import Any import yaml logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") log = logging.getLogger(__name__) _REPO_ROOT = pathlib.Path(__file__).parent.parent _SOURCES_FILE = pathlib.Path(__file__).parent / "template_sources.yml" _DEFAULT_OUTPUT = _REPO_ROOT / "pipeline_templates_catalog.json" # ── AzDO template scanner ──────────────────────────────────────────────────── def _parse_azdo_params(raw: dict[str, Any]) -> list[dict[str, Any]]: """Extract parameter definitions from an AzDO template dict.""" params = raw.get("parameters", []) if not isinstance(params, list): return [] result = [] for p in params: if not isinstance(p, dict) or "name" not in p: continue entry: dict[str, Any] = { "name": p["name"], "type": p.get("type", "string"), "required": "default" not in p, } if "default" in p: entry["default"] = p["default"] allowed = p.get("values", []) if isinstance(allowed, list) and allowed: entry["allowed"] = [str(v) for v in allowed] if "displayName" in p: entry["description"] = p["displayName"] result.append(entry) return result def scan_azdo_source(config: dict[str, Any]) -> dict[str, dict[str, Any]]: """Scan an AzDO template directory. Returns {key: template_entry}.""" alias = config["alias"] base = pathlib.Path(config["local_path"]).expanduser() if not base.exists(): log.warning("AzDO source '%s' not found at %s — skipping", alias, base) return {} scan_dirs = config.get("scan_dirs", []) extensions = set(config.get("extensions", [".yaml", ".yml"])) if scan_dirs: candidates = [] for d in scan_dirs: candidates.extend((base / d).rglob("*")) else: candidates = list(base.rglob("*")) results: dict[str, dict[str, Any]] = {} for fpath in candidates: if fpath.suffix not in extensions or not fpath.is_file(): continue try: raw = yaml.safe_load(fpath.read_text(encoding="utf-8")) except Exception as exc: log.debug("Cannot parse %s: %s", fpath, exc) continue if not isinstance(raw, dict): continue params = _parse_azdo_params(raw) if not params: continue # Not a template file (no parameters block) rel = fpath.relative_to(base).as_posix() key = f"{rel}@{alias}" results[key] = { "format": "azdo", "title": fpath.stem, "path": rel, "alias": alias, "parameters": params, } log.info("AzDO '%s': %d templates found", alias, len(results)) return results # ── GHA reusable workflow scanner ──────────────────────────────────────────── def _parse_gha_inputs(raw: dict[str, Any]) -> list[dict[str, Any]]: """Extract workflow_call.inputs from a GHA workflow dict.""" on = raw.get("on") or raw.get(True) # 'on' is a YAML bool alias if not isinstance(on, dict): return [] wc = on.get("workflow_call", {}) if not isinstance(wc, dict): return [] inputs = wc.get("inputs", {}) if not isinstance(inputs, dict): return [] result = [] for name, meta in inputs.items(): if not isinstance(meta, dict): meta = {} entry: dict[str, Any] = { "name": name, "type": meta.get("type", "string"), "required": meta.get("required", False), } if "default" in meta: entry["default"] = meta["default"] if "description" in meta: entry["description"] = meta["description"] # GHA doesn't have allowed values natively — skip result.append(entry) return result def scan_gha_source(config: dict[str, Any]) -> dict[str, dict[str, Any]]: """Scan GitHub Actions repos for reusable workflows. Returns {key: template_entry}.""" org = config["org"] base = pathlib.Path(config["local_base"]).expanduser() default_ref = config.get("default_ref", "main") repos_filter = config.get("repos", []) if not base.exists(): log.warning("GHA base '%s' not found at %s — skipping", org, base) return {} results: dict[str, dict[str, Any]] = {} repos = [base / r for r in repos_filter] if repos_filter else [p for p in base.iterdir() if p.is_dir()] for repo_path in repos: wf_dir = repo_path / ".github" / "workflows" if not wf_dir.is_dir(): continue repo_name = repo_path.name for fpath in wf_dir.glob("*.yml"): try: raw = yaml.safe_load(fpath.read_text(encoding="utf-8")) except Exception as exc: log.debug("Cannot parse %s: %s", fpath, exc) continue if not isinstance(raw, dict): continue params = _parse_gha_inputs(raw) if not params: continue # Not a reusable workflow rel_wf = f".github/workflows/{fpath.name}" key = f"{org}/{repo_name}/{rel_wf}@{default_ref}" results[key] = { "format": "gha", "title": fpath.stem, "org": org, "repo": repo_name, "path": rel_wf, "ref": default_ref, "parameters": params, } log.info("GHA '%s': %d reusable workflows found", org, len(results)) return results # ── Main ───────────────────────────────────────────────────────────────────── def build_catalog( sources_file: pathlib.Path, mode: str | None, ) -> dict[str, Any]: config = yaml.safe_load(sources_file.read_text(encoding="utf-8")) templates: dict[str, dict[str, Any]] = {} if mode in (None, "azdo"): for src in config.get("sources", {}).get("azdo", []): templates.update(scan_azdo_source(src)) if mode in (None, "gha"): for src in config.get("sources", {}).get("gha", []): templates.update(scan_gha_source(src)) return { "synced_at": datetime.now(timezone.utc).isoformat(), "template_count": len(templates), "templates": templates, } def main() -> None: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--mode", choices=["azdo", "gha"], default=None, help="Only scan one format") parser.add_argument("--dry-run", action="store_true", help="Print summary without writing") parser.add_argument("--output", default=str(_DEFAULT_OUTPUT), help="Output JSON file") parser.add_argument("--sources", default=str(_SOURCES_FILE), help="Sources YAML config") args = parser.parse_args() sources_file = pathlib.Path(args.sources) if not sources_file.exists(): log.error("Sources file not found: %s", sources_file) sys.exit(1) catalog = build_catalog(sources_file, args.mode) if args.dry_run: print(f"\n── Pipeline template catalog (dry-run) ──") print(f" Templates found: {catalog['template_count']}") for key, tmpl in catalog["templates"].items(): nparams = len(tmpl["parameters"]) required = sum(1 for p in tmpl["parameters"] if p.get("required")) print(f" [{tmpl['format'].upper()}] {key} ({nparams} params, {required} required)") return out = pathlib.Path(args.output) out.write_text(json.dumps(catalog, indent=2, ensure_ascii=False), encoding="utf-8") log.info("Written: %s (%d templates)", out, catalog["template_count"]) if __name__ == "__main__": main()