""" Asyncio TCP proxy that wraps Bicep.LangServer.dll. Architecture: Editor (TCP:2088) ──► BicepProxy ──► Bicep.LangServer subprocess (stdio) The proxy intercepts textDocument/completion responses and injects LRU Bicep module completions with higher sort priority (sortText "0_lru_..."). All other LSP messages are forwarded unchanged. """ import asyncio import json import logging import os import subprocess from typing import Any from .modules import BicepModuleCatalog logger = logging.getLogger(__name__) BICEP_LS_PATH = os.getenv( "BICEP_LS_PATH", "/opt/bicep-langserver/Bicep.LangServer.dll", ) LISTEN_PORT = int(os.getenv("BICEP_LSP_PORT", "2088")) class _ContentLengthFramer: """Reads/writes LSP Content-Length framed messages.""" def __init__(self, reader: asyncio.StreamReader): self._reader = reader async def read_message(self) -> bytes: headers = b"" while not headers.endswith(b"\r\n\r\n"): chunk = await self._reader.read(1) if not chunk: raise EOFError("Connection closed") headers += chunk content_length = 0 for line in headers.split(b"\r\n"): if line.lower().startswith(b"content-length:"): content_length = int(line.split(b":")[1].strip()) body = await self._reader.readexactly(content_length) return body @staticmethod def frame(body: bytes) -> bytes: return f"Content-Length: {len(body)}\r\n\r\n".encode() + body class BicepProxy: """Per-connection proxy between one editor client and one Bicep LS process.""" def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): self._client_reader = reader self._client_writer = writer self._proc: subprocess.Popen | None = None self._ls_reader: asyncio.StreamReader | None = None self._ls_writer: asyncio.StreamWriter | None = None async def run(self) -> None: peer = self._client_writer.get_extra_info("peername") logger.info("New Bicep client: %s", peer) self._proc = await asyncio.create_subprocess_exec( "dotnet", BICEP_LS_PATH, "--stdio", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) self._ls_reader = self._proc.stdout self._ls_writer = self._proc.stdin try: await asyncio.gather( self._client_to_ls(), self._ls_to_client(), ) except (EOFError, ConnectionResetError, asyncio.CancelledError): pass finally: self._cleanup() async def _client_to_ls(self) -> None: framer = _ContentLengthFramer(self._client_reader) while True: body = await framer.read_message() framed = _ContentLengthFramer.frame(body) self._ls_writer.write(framed) await self._ls_writer.drain() async def _ls_to_client(self) -> None: framer = _ContentLengthFramer(self._ls_reader) while True: body = await framer.read_message() try: msg = json.loads(body) body = self._maybe_inject_completions(msg) except json.JSONDecodeError: pass framed = _ContentLengthFramer.frame( body if isinstance(body, bytes) else json.dumps(body).encode() ) self._client_writer.write(framed) await self._client_writer.drain() def _maybe_inject_completions(self, msg: dict[str, Any]) -> dict[str, Any] | bytes: """Inject LRU modules into completion responses.""" result = msg.get("result") if result is None: return json.dumps(msg).encode() # Completion result is either a list or {isIncomplete, items} items: list | None = None if isinstance(result, list): items = result elif isinstance(result, dict) and "items" in result: items = result["items"] if items is None: return json.dumps(msg).encode() lru_items = BicepModuleCatalog.as_completion_items() if lru_items: # Downgrade standard items so LRU sorts first for item in items: st = item.get("sortText", item.get("label", "")) item["sortText"] = f"1_az_{st}" if isinstance(result, list): msg["result"] = lru_items + items else: result["items"] = lru_items + items result["isIncomplete"] = True return json.dumps(msg).encode() def _cleanup(self) -> None: if self._proc and self._proc.returncode is None: self._proc.terminate() self._client_writer.close() async def serve_bicep(port: int = LISTEN_PORT) -> None: """Start the Bicep LSP TCP proxy server.""" await BicepModuleCatalog.start_background_refresh() async def _handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: proxy = BicepProxy(reader, writer) await proxy.run() server = await asyncio.start_server(_handle, "0.0.0.0", port) logger.info("Bicep LSP proxy listening on TCP :%d", port) async with server: await server.serve_forever()