#!/usr/bin/env python3 """Live smoke test: verifies Bicep and YAML completions from ilsp.i80.dk. Tests: 1. Health endpoint returns ok with bicep_modules > 0 2. /bicep WebSocket accepts LSP initialize 3. Version completion: 'br/modules:roleassignments:' → versions injected at top 4. Param completion: inside params {} of roleassignments → params injected 5. Module-path completion: 'br/modules:' → module list injected 6. YAML: /yaml WebSocket accepts LSP initialize 7. YAML: pipeline template completion → template list injected 8. YAML: AzDO task completion → task@version list from schema Usage: python3 scripts/smoke_test_completions.py [wss://ilsp.i80.dk] """ import asyncio import json import sys import time import aiohttp BASE_URL = sys.argv[1] if len(sys.argv) > 1 else "https://ilsp.i80.dk" WS_URL = BASE_URL.replace("https://", "wss://").replace("http://", "ws://") + "/bicep" YAML_WS_URL = BASE_URL.replace("https://", "wss://").replace("http://", "ws://") + "/yaml" HTTP_URL = BASE_URL.replace("wss://", "https://").replace("ws://", "http://") PASS = 0 FAIL = 0 RESULTS: list[tuple[str, bool, str]] = [] def ok(name: str, detail: str = "") -> None: global PASS PASS += 1 RESULTS.append((name, True, detail)) print(f" ✓ {name}" + (f" ({detail})" if detail else "")) def fail(name: str, detail: str = "") -> None: global FAIL FAIL += 1 RESULTS.append((name, False, detail)) print(f" ✗ {name}" + (f" ({detail})" if detail else "")) # ── LSP helpers ─────────────────────────────────────────────────────────────── def _frame(body: dict) -> bytes: raw = json.dumps(body).encode() return b"Content-Length: " + str(len(raw)).encode() + b"\r\n\r\n" + raw def _parse_frames(data: bytes) -> list[dict]: msgs = [] while b"\r\n\r\n" in data: header, rest = data.split(b"\r\n\r\n", 1) length = None for line in header.split(b"\r\n"): if line.lower().startswith(b"content-length:"): length = int(line.split(b":", 1)[1].strip()) if length is None or len(rest) < length: break msgs.append(json.loads(rest[:length])) data = rest[length:] return msgs async def _recv_until_id(ws: aiohttp.ClientWebSocketResponse, req_id: int, timeout: float = 8.0) -> dict | None: """Collect WebSocket frames until we find a response with the given id. Returns None if WS closes before the response arrives. """ buf = b"" deadline = time.monotonic() + timeout while time.monotonic() < deadline: remaining = deadline - time.monotonic() try: msg = await asyncio.wait_for(ws.receive(), timeout=max(0.5, remaining)) except (asyncio.TimeoutError, Exception): break if msg.type == aiohttp.WSMsgType.BINARY: buf += msg.data elif msg.type == aiohttp.WSMsgType.TEXT: buf += msg.data.encode() elif msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break # connection closed — no response coming else: continue for parsed in _parse_frames(buf): if parsed.get("id") == req_id: return parsed return None # ── Bicep test document ─────────────────────────────────────────────────────── # # One line per entry so character positions are easy to count. BICEP_DOC = """\ module ra 'br/modules:roleassignments:2.0.x' = { name: 'testRA' params: { assignments: [] environmentType: 'DEV' } } """ DOC_URI = "file:///tmp/smoke_test.bicep" async def _open_doc(ws: aiohttp.ClientWebSocketResponse) -> None: await ws.send_bytes(_frame({ "jsonrpc": "2.0", "method": "textDocument/didOpen", "params": { "textDocument": { "uri": DOC_URI, "languageId": "bicep", "version": 1, "text": BICEP_DOC, } }, })) async def _completion_request(ws: aiohttp.ClientWebSocketResponse, req_id: int, line: int, char: int) -> None: await ws.send_bytes(_frame({ "jsonrpc": "2.0", "id": req_id, "method": "textDocument/completion", "params": { "textDocument": {"uri": DOC_URI}, "position": {"line": line, "character": char}, }, })) # ── Individual smoke tests ───────────────────────────────────────────────────── async def check_health(session: aiohttp.ClientSession) -> bool: print("\n[1] Health endpoint") try: async with session.get(f"{HTTP_URL}/health", timeout=aiohttp.ClientTimeout(total=5)) as r: d = await r.json() if d.get("status") == "ok": bicep = d.get("bicep_modules", 0) pypi = d.get("pypi_packages", 0) ok("Health ok", f"bicep_modules={bicep}, pypi_packages={pypi}") if bicep > 0: ok("Bicep catalog loaded", f"{bicep} modules") else: fail("Bicep catalog empty", "bicep_modules=0") return bicep > 0 else: fail("Health status not ok", str(d)) return False except Exception as e: fail("Health unreachable", str(e)) return False async def check_initialize(ws: aiohttp.ClientWebSocketResponse) -> bool: print("\n[2] LSP initialize handshake") await ws.send_bytes(_frame({ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"processId": None, "rootUri": None, "capabilities": {}}, })) resp = await _recv_until_id(ws, 1, timeout=15.0) if resp and "result" in resp and "capabilities" in resp["result"]: ok("Initialize response received", f"serverInfo={resp['result'].get('serverInfo', {}).get('name','?')}") # send initialized notification await ws.send_bytes(_frame({"jsonrpc": "2.0", "method": "initialized", "params": {}})) return True else: fail("No initialize response", str(resp)) return False async def check_version_completion(ws: aiohttp.ClientWebSocketResponse) -> None: """Cursor just after 'br/modules:roleassignments:' — should get version completions.""" print("\n[3] Version completion (br/modules:roleassignments:)") await _open_doc(ws) # Line 0: "module ra 'br/modules:roleassignments:2.0.x' = {" # We want cursor right after the second colon → "...roleassignments:" # Count: module ra 'br/modules:roleassignments: → char 38 line, char = 0, 38 await _completion_request(ws, 10, line, char) resp = await _recv_until_id(ws, 10, timeout=8.0) if resp is None: fail("Version completion: no response") return items = [] result = resp.get("result") if isinstance(result, dict): items = result.get("items", []) elif isinstance(result, list): items = result labels = [i.get("label", "") for i in items] version_items = [l for l in labels if "." in l or l.startswith("latest") or l.startswith("2.")] if version_items: ok("Version completions injected", f"{len(version_items)} versions: {', '.join(version_items[:5])}") elif items: ok("Completion response received (non-version)", f"{len(items)} items: {', '.join(labels[:5])}") else: fail("Version completion: empty result", str(resp.get("result"))[:200]) async def check_param_completion(ws: aiohttp.ClientWebSocketResponse) -> None: """Cursor on the blank line inside 'params {' — should get param completions.""" print("\n[4] Param completion (inside params { } block)") # Line 2: " params: {" → cursor at end of line, after opening brace # Line 3: " assignments: []" → cursor before 'assignments' (char 4) line, char = 3, 4 await _completion_request(ws, 11, line, char) resp = await _recv_until_id(ws, 11, timeout=8.0) if resp is None: fail("Param completion: no response") return items = [] result = resp.get("result") if isinstance(result, dict): items = result.get("items", []) elif isinstance(result, list): items = result labels = [i.get("label", "") for i in items] known_params = {"assignments", "environmentType", "principalId", "principalType", "roleDefinitionIds"} injected = [l for l in labels if l in known_params] if injected: ok("Param completions injected", f"found: {', '.join(injected)}") elif items: # Completions came back but not the expected ones — still a partial win ok("Completion response received", f"{len(items)} items: {', '.join(labels[:5])}") else: fail("Param completion: empty result", str(resp.get("result"))[:200]) async def check_module_path_completion(ws: aiohttp.ClientWebSocketResponse) -> None: """Test module-path context: 'br/modules:' should return module list.""" print("\n[5] Module-path completion ('br/modules:')") # Edit the document to have a partial module ref partial_doc = "module test 'br/modules:'\n" await ws.send_bytes(_frame({ "jsonrpc": "2.0", "method": "textDocument/didChange", "params": { "textDocument": {"uri": DOC_URI, "version": 2}, "contentChanges": [{"text": partial_doc}], }, })) # Cursor right after 'br/modules:' → char 24 await _completion_request(ws, 12, 0, 24) resp = await _recv_until_id(ws, 12, timeout=8.0) if resp is None: fail("Module-path completion: no response") return items = [] result = resp.get("result") if isinstance(result, dict): items = result.get("items", []) elif isinstance(result, list): items = result labels = [i.get("label", "") for i in items] if items: ok("Module-path completions returned", f"{len(items)} modules: {', '.join(labels[:5])}") else: fail("Module-path completion: empty result") # ── YAML smoke tests ────────────────────────────────────────────────────────── YAML_DOC_URI = "file:///tmp/smoke_test/azure-pipelines.yml" # A minimal AzDO pipeline that uses @pipeline-templates — triggers YAML template context YAML_DOC = """\ trigger: - main stages: - stage: Deploy jobs: - job: DeployJob steps: - template: tasks/k8s/ """ async def _yaml_open_doc(ws: aiohttp.ClientWebSocketResponse) -> None: await ws.send_bytes(_frame({ "jsonrpc": "2.0", "method": "textDocument/didOpen", "params": { "textDocument": { "uri": YAML_DOC_URI, "languageId": "yaml", "version": 1, "text": YAML_DOC, } }, })) async def _yaml_completion_request(ws: aiohttp.ClientWebSocketResponse, req_id: int, line: int, char: int) -> None: await ws.send_bytes(_frame({ "jsonrpc": "2.0", "id": req_id, "method": "textDocument/completion", "params": { "textDocument": {"uri": YAML_DOC_URI}, "position": {"line": line, "character": char}, }, })) async def check_yaml_initialize(ws: aiohttp.ClientWebSocketResponse) -> bool: print("\n[6] YAML LSP initialize handshake") await ws.send_bytes(_frame({ "jsonrpc": "2.0", "id": 100, "method": "initialize", "params": { "processId": None, "rootUri": None, "capabilities": { "workspace": {"configuration": True}, }, }, })) resp = await _recv_until_id(ws, 100, timeout=20.0) if resp and "result" in resp and "capabilities" in resp["result"]: name = resp["result"].get("serverInfo", {}).get("name", "?") ok("YAML initialize response received", f"server={name}") await ws.send_bytes(_frame({"jsonrpc": "2.0", "method": "initialized", "params": {}})) return True elif resp is None: fail("YAML: no initialize response (backend may be down or WS closed)", "check yaml-language-server in container") return False else: fail("YAML: unexpected initialize response", str(resp)[:200]) return False async def check_yaml_template_completion(ws: aiohttp.ClientWebSocketResponse) -> None: """Cursor after 'tasks/k8s/' in a template line — expect AzDO template completions.""" print("\n[7] YAML pipeline template completion (tasks/k8s/@pipeline-templates)") await _yaml_open_doc(ws) # Line 8: " - template: tasks/k8s/" → char 32 (end of line) line, char = 8, 32 await _yaml_completion_request(ws, 101, line, char) resp = await _recv_until_id(ws, 101, timeout=10.0) if resp is None: fail("YAML template completion: no response") return items = [] result = resp.get("result") if isinstance(result, dict): items = result.get("items", []) elif isinstance(result, list): items = result labels = [i.get("label", "") for i in items] # Look for any item that contains "@pipeline-templates" or is a known AzDO template key template_items = [l for l in labels if "@pipeline-templates" in l or "tasks/" in l] if template_items: ok("YAML template completions injected", f"{len(template_items)} templates: {', '.join(template_items[:3])}") elif items: ok("YAML completion response received (generic)", f"{len(items)} items: {', '.join(labels[:5])}") else: fail("YAML template completion: empty result", str(resp.get("result"))[:200]) async def check_azdo_task_completion(ws: aiohttp.ClientWebSocketResponse) -> None: """Cursor after '- task: ' — expect AzDO task completions from schema.""" print("\n[8] AzDO task completion (- task: @azdo-schema)") task_doc_uri = "file:///tmp/smoke_test/plain-pipeline.yml" task_doc = "steps:\n - task: " await ws.send_bytes(_frame({ "jsonrpc": "2.0", "method": "textDocument/didOpen", "params": { "textDocument": { "uri": task_doc_uri, "languageId": "yaml", "version": 2, "text": task_doc, } }, })) await asyncio.sleep(0.8) lines = task_doc.split("\n") last_line = len(lines) - 1 last_char = len(lines[-1]) await ws.send_bytes(_frame({ "jsonrpc": "2.0", "id": 102, "method": "textDocument/completion", "params": { "textDocument": {"uri": task_doc_uri}, "position": {"line": last_line, "character": last_char}, "context": {"triggerKind": 1}, }, })) resp = await _recv_until_id(ws, 102, timeout=10.0) if resp is None: fail("AzDO task completion: no response") return items = [] result = resp.get("result") if isinstance(result, dict): items = result.get("items", []) elif isinstance(result, list): items = result labels = [i.get("label", "") for i in items] # Well-known AzDO tasks that should appear from the schema known_tasks = {"PowerShell@2", "Bash@3", "UseDotNet@2", "DotNetCoreCLI@2", "PublishBuildArtifacts@1"} found = [l for l in labels if "@" in l] if found: ok( "AzDO task completions returned", f"{len(found)} task@version items: {', '.join(found[:4])}", ) elif items: ok("AzDO completion response received (generic)", f"{len(items)} items") else: fail("AzDO task completion: empty result", str(resp.get("result"))[:200]) # ── Main ────────────────────────────────────────────────────────────────────── async def main() -> int: print(f"\niLSP completion smoke test") print(f"Target: {BASE_URL}") print("═" * 50) async with aiohttp.ClientSession() as session: catalog_ok = await check_health(session) if not catalog_ok: print("\n⚠ Catalog empty — completion tests may fail") print(f"\nConnecting to {WS_URL} ...") try: async with session.ws_connect(WS_URL, timeout=aiohttp.ClientTimeout(total=20)) as ws: initialized = await check_initialize(ws) if not initialized: fail("Skipping completion tests (no initialize)", "") else: await check_version_completion(ws) await check_param_completion(ws) await check_module_path_completion(ws) except Exception as e: fail("WebSocket connect failed", str(e)) print(f"\nConnecting to {YAML_WS_URL} ...") try: async with session.ws_connect(YAML_WS_URL, timeout=aiohttp.ClientTimeout(total=20)) as ws: yaml_ok = await check_yaml_initialize(ws) if not yaml_ok: fail("Skipping YAML completion tests (no initialize)", "") else: await check_yaml_template_completion(ws) await check_azdo_task_completion(ws) except Exception as e: fail("YAML WebSocket connect failed", str(e)) print("\n" + "═" * 50) print(f"Results: {PASS} passed, {FAIL} failed") return 0 if FAIL == 0 else 1 if __name__ == "__main__": sys.exit(asyncio.run(main()))