Files
iLSP/scripts/smoke_test_completions.py
Henrik Jess Nielsen 2a1717ff81
All checks were successful
Build and Deploy iLSP / test (push) Successful in 23s
Build and Deploy iLSP / build-and-deploy (push) Successful in 1m35s
feat: bake iac_source_catalog into Docker image; fix YAML smoke test; add .gitignore
- Dockerfile: COPY iac_source_catalog.json in builder + final stage
- push_catalogs.sh: cp iac_source_catalog.json to iLSP repo root before SCP
- smoke_test_completions.py: detect WS CLOSE in _recv_until_id; 20s YAML init timeout; clearer error message
- .gitignore: standard Python exclusions

Fixes: iac_source_modules=0 (was never baked into image)
2026-05-10 16:23:02 +02:00

427 lines
16 KiB
Python

#!/usr/bin/env python3
"""Live smoke test: verifies Bicep and YAML completions from ilsp.i80.dk.
Tests:
1. Health endpoint returns ok with bicep_modules > 0
2. /bicep WebSocket accepts LSP initialize
3. Version completion: 'br/modules:roleassignments:' → versions injected at top
4. Param completion: inside params {} of roleassignments → params injected
5. Module-path completion: 'br/modules:' → module list injected
6. YAML: /yaml WebSocket accepts LSP initialize
7. YAML: pipeline template completion → template list injected
Usage:
python3 scripts/smoke_test_completions.py [wss://ilsp.i80.dk]
"""
import asyncio
import json
import sys
import time
import aiohttp
BASE_URL = sys.argv[1] if len(sys.argv) > 1 else "https://ilsp.i80.dk"
WS_URL = BASE_URL.replace("https://", "wss://").replace("http://", "ws://") + "/bicep"
YAML_WS_URL = BASE_URL.replace("https://", "wss://").replace("http://", "ws://") + "/yaml"
HTTP_URL = BASE_URL.replace("wss://", "https://").replace("ws://", "http://")
PASS = 0
FAIL = 0
RESULTS: list[tuple[str, bool, str]] = []
def ok(name: str, detail: str = "") -> None:
global PASS
PASS += 1
RESULTS.append((name, True, detail))
print(f"{name}" + (f" ({detail})" if detail else ""))
def fail(name: str, detail: str = "") -> None:
global FAIL
FAIL += 1
RESULTS.append((name, False, detail))
print(f"{name}" + (f" ({detail})" if detail else ""))
# ── LSP helpers ───────────────────────────────────────────────────────────────
def _frame(body: dict) -> bytes:
raw = json.dumps(body).encode()
return b"Content-Length: " + str(len(raw)).encode() + b"\r\n\r\n" + raw
def _parse_frames(data: bytes) -> list[dict]:
msgs = []
while b"\r\n\r\n" in data:
header, rest = data.split(b"\r\n\r\n", 1)
length = None
for line in header.split(b"\r\n"):
if line.lower().startswith(b"content-length:"):
length = int(line.split(b":", 1)[1].strip())
if length is None or len(rest) < length:
break
msgs.append(json.loads(rest[:length]))
data = rest[length:]
return msgs
async def _recv_until_id(ws: aiohttp.ClientWebSocketResponse, req_id: int, timeout: float = 8.0) -> dict | None:
"""Collect WebSocket frames until we find a response with the given id.
Returns None if WS closes before the response arrives.
"""
buf = b""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
remaining = deadline - time.monotonic()
try:
msg = await asyncio.wait_for(ws.receive(), timeout=max(0.5, remaining))
except (asyncio.TimeoutError, Exception):
break
if msg.type == aiohttp.WSMsgType.BINARY:
buf += msg.data
elif msg.type == aiohttp.WSMsgType.TEXT:
buf += msg.data.encode()
elif msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break # connection closed — no response coming
else:
continue
for parsed in _parse_frames(buf):
if parsed.get("id") == req_id:
return parsed
return None
# ── Bicep test document ───────────────────────────────────────────────────────
#
# One line per entry so character positions are easy to count.
BICEP_DOC = """\
module ra 'br/modules:roleassignments:2.0.x' = {
name: 'testRA'
params: {
assignments: []
environmentType: 'DEV'
}
}
"""
DOC_URI = "file:///tmp/smoke_test.bicep"
async def _open_doc(ws: aiohttp.ClientWebSocketResponse) -> None:
await ws.send_bytes(_frame({
"jsonrpc": "2.0",
"method": "textDocument/didOpen",
"params": {
"textDocument": {
"uri": DOC_URI,
"languageId": "bicep",
"version": 1,
"text": BICEP_DOC,
}
},
}))
async def _completion_request(ws: aiohttp.ClientWebSocketResponse, req_id: int, line: int, char: int) -> None:
await ws.send_bytes(_frame({
"jsonrpc": "2.0",
"id": req_id,
"method": "textDocument/completion",
"params": {
"textDocument": {"uri": DOC_URI},
"position": {"line": line, "character": char},
},
}))
# ── Individual smoke tests ─────────────────────────────────────────────────────
async def check_health(session: aiohttp.ClientSession) -> bool:
print("\n[1] Health endpoint")
try:
async with session.get(f"{HTTP_URL}/health", timeout=aiohttp.ClientTimeout(total=5)) as r:
d = await r.json()
if d.get("status") == "ok":
bicep = d.get("bicep_modules", 0)
pypi = d.get("pypi_packages", 0)
ok("Health ok", f"bicep_modules={bicep}, pypi_packages={pypi}")
if bicep > 0:
ok("Bicep catalog loaded", f"{bicep} modules")
else:
fail("Bicep catalog empty", "bicep_modules=0")
return bicep > 0
else:
fail("Health status not ok", str(d))
return False
except Exception as e:
fail("Health unreachable", str(e))
return False
async def check_initialize(ws: aiohttp.ClientWebSocketResponse) -> bool:
print("\n[2] LSP initialize handshake")
await ws.send_bytes(_frame({
"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": {"processId": None, "rootUri": None, "capabilities": {}},
}))
resp = await _recv_until_id(ws, 1, timeout=15.0)
if resp and "result" in resp and "capabilities" in resp["result"]:
ok("Initialize response received", f"serverInfo={resp['result'].get('serverInfo', {}).get('name','?')}")
# send initialized notification
await ws.send_bytes(_frame({"jsonrpc": "2.0", "method": "initialized", "params": {}}))
return True
else:
fail("No initialize response", str(resp))
return False
async def check_version_completion(ws: aiohttp.ClientWebSocketResponse) -> None:
"""Cursor just after 'br/modules:roleassignments:' — should get version completions."""
print("\n[3] Version completion (br/modules:roleassignments:<cursor>)")
await _open_doc(ws)
# Line 0: "module ra 'br/modules:roleassignments:2.0.x' = {"
# We want cursor right after the second colon → "...roleassignments:"
# Count: module ra 'br/modules:roleassignments: → char 38
line, char = 0, 38
await _completion_request(ws, 10, line, char)
resp = await _recv_until_id(ws, 10, timeout=8.0)
if resp is None:
fail("Version completion: no response")
return
items = []
result = resp.get("result")
if isinstance(result, dict):
items = result.get("items", [])
elif isinstance(result, list):
items = result
labels = [i.get("label", "") for i in items]
version_items = [l for l in labels if "." in l or l.startswith("latest") or l.startswith("2.")]
if version_items:
ok("Version completions injected", f"{len(version_items)} versions: {', '.join(version_items[:5])}")
elif items:
ok("Completion response received (non-version)", f"{len(items)} items: {', '.join(labels[:5])}")
else:
fail("Version completion: empty result", str(resp.get("result"))[:200])
async def check_param_completion(ws: aiohttp.ClientWebSocketResponse) -> None:
"""Cursor on the blank line inside 'params {' — should get param completions."""
print("\n[4] Param completion (inside params { } block)")
# Line 2: " params: {" → cursor at end of line, after opening brace
# Line 3: " assignments: []" → cursor before 'assignments' (char 4)
line, char = 3, 4
await _completion_request(ws, 11, line, char)
resp = await _recv_until_id(ws, 11, timeout=8.0)
if resp is None:
fail("Param completion: no response")
return
items = []
result = resp.get("result")
if isinstance(result, dict):
items = result.get("items", [])
elif isinstance(result, list):
items = result
labels = [i.get("label", "") for i in items]
known_params = {"assignments", "environmentType", "principalId", "principalType", "roleDefinitionIds"}
injected = [l for l in labels if l in known_params]
if injected:
ok("Param completions injected", f"found: {', '.join(injected)}")
elif items:
# Completions came back but not the expected ones — still a partial win
ok("Completion response received", f"{len(items)} items: {', '.join(labels[:5])}")
else:
fail("Param completion: empty result", str(resp.get("result"))[:200])
async def check_module_path_completion(ws: aiohttp.ClientWebSocketResponse) -> None:
"""Test module-path context: 'br/modules:' should return module list."""
print("\n[5] Module-path completion ('br/modules:<cursor>')")
# Edit the document to have a partial module ref
partial_doc = "module test 'br/modules:'\n"
await ws.send_bytes(_frame({
"jsonrpc": "2.0",
"method": "textDocument/didChange",
"params": {
"textDocument": {"uri": DOC_URI, "version": 2},
"contentChanges": [{"text": partial_doc}],
},
}))
# Cursor right after 'br/modules:' → char 24
await _completion_request(ws, 12, 0, 24)
resp = await _recv_until_id(ws, 12, timeout=8.0)
if resp is None:
fail("Module-path completion: no response")
return
items = []
result = resp.get("result")
if isinstance(result, dict):
items = result.get("items", [])
elif isinstance(result, list):
items = result
labels = [i.get("label", "") for i in items]
if items:
ok("Module-path completions returned", f"{len(items)} modules: {', '.join(labels[:5])}")
else:
fail("Module-path completion: empty result")
# ── YAML smoke tests ──────────────────────────────────────────────────────────
YAML_DOC_URI = "file:///tmp/smoke_test/azure-pipelines.yml"
# A minimal AzDO pipeline that uses @pipeline-templates — triggers YAML template context
YAML_DOC = """\
trigger:
- main
stages:
- stage: Deploy
jobs:
- job: DeployJob
steps:
- template: tasks/k8s/
"""
async def _yaml_open_doc(ws: aiohttp.ClientWebSocketResponse) -> None:
await ws.send_bytes(_frame({
"jsonrpc": "2.0",
"method": "textDocument/didOpen",
"params": {
"textDocument": {
"uri": YAML_DOC_URI,
"languageId": "yaml",
"version": 1,
"text": YAML_DOC,
}
},
}))
async def _yaml_completion_request(ws: aiohttp.ClientWebSocketResponse, req_id: int, line: int, char: int) -> None:
await ws.send_bytes(_frame({
"jsonrpc": "2.0",
"id": req_id,
"method": "textDocument/completion",
"params": {
"textDocument": {"uri": YAML_DOC_URI},
"position": {"line": line, "character": char},
},
}))
async def check_yaml_initialize(ws: aiohttp.ClientWebSocketResponse) -> bool:
print("\n[6] YAML LSP initialize handshake")
await ws.send_bytes(_frame({
"jsonrpc": "2.0", "id": 100, "method": "initialize",
"params": {"processId": None, "rootUri": None, "capabilities": {}},
}))
resp = await _recv_until_id(ws, 100, timeout=20.0)
if resp and "result" in resp and "capabilities" in resp["result"]:
name = resp["result"].get("serverInfo", {}).get("name", "?")
ok("YAML initialize response received", f"server={name}")
await ws.send_bytes(_frame({"jsonrpc": "2.0", "method": "initialized", "params": {}}))
return True
elif resp is None:
fail("YAML: no initialize response (backend may be down or WS closed)", "check yaml-language-server in container")
return False
else:
fail("YAML: unexpected initialize response", str(resp)[:200])
return False
async def check_yaml_template_completion(ws: aiohttp.ClientWebSocketResponse) -> None:
"""Cursor after 'tasks/k8s/' in a template line — expect AzDO template completions."""
print("\n[7] YAML pipeline template completion (tasks/k8s/<cursor>@pipeline-templates)")
await _yaml_open_doc(ws)
# Line 8: " - template: tasks/k8s/" → char 32 (end of line)
line, char = 8, 32
await _yaml_completion_request(ws, 101, line, char)
resp = await _recv_until_id(ws, 101, timeout=10.0)
if resp is None:
fail("YAML template completion: no response")
return
items = []
result = resp.get("result")
if isinstance(result, dict):
items = result.get("items", [])
elif isinstance(result, list):
items = result
labels = [i.get("label", "") for i in items]
# Look for any item that contains "@pipeline-templates" or is a known AzDO template key
template_items = [l for l in labels if "@pipeline-templates" in l or "tasks/" in l]
if template_items:
ok("YAML template completions injected", f"{len(template_items)} templates: {', '.join(template_items[:3])}")
elif items:
ok("YAML completion response received (generic)", f"{len(items)} items: {', '.join(labels[:5])}")
else:
fail("YAML template completion: empty result", str(resp.get("result"))[:200])
# ── Main ──────────────────────────────────────────────────────────────────────
async def main() -> int:
print(f"\niLSP completion smoke test")
print(f"Target: {BASE_URL}")
print("" * 50)
async with aiohttp.ClientSession() as session:
catalog_ok = await check_health(session)
if not catalog_ok:
print("\n⚠ Catalog empty — completion tests may fail")
print(f"\nConnecting to {WS_URL} ...")
try:
async with session.ws_connect(WS_URL, timeout=aiohttp.ClientTimeout(total=20)) as ws:
initialized = await check_initialize(ws)
if not initialized:
fail("Skipping completion tests (no initialize)", "")
else:
await check_version_completion(ws)
await check_param_completion(ws)
await check_module_path_completion(ws)
except Exception as e:
fail("WebSocket connect failed", str(e))
print(f"\nConnecting to {YAML_WS_URL} ...")
try:
async with session.ws_connect(YAML_WS_URL, timeout=aiohttp.ClientTimeout(total=20)) as ws:
yaml_ok = await check_yaml_initialize(ws)
if not yaml_ok:
fail("Skipping YAML completion tests (no initialize)", "")
else:
await check_yaml_template_completion(ws)
except Exception as e:
fail("YAML WebSocket connect failed", str(e))
print("\n" + "" * 50)
print(f"Results: {PASS} passed, {FAIL} failed")
return 0 if FAIL == 0 else 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))