From 6385e159ff09591ab2381c472016abaa27b9d3c1 Mon Sep 17 00:00:00 2001 From: Henrik Jess Nielsen Date: Sun, 10 May 2026 13:02:52 +0200 Subject: [PATCH] fix: replace asyncio subprocess proxy with thread-based Popen proxy asyncio subprocess PIPE unreliable for long-lived stdio bridging. Use Popen + threads instead. Also fix smoke_test.sh stdin handling. --- Dockerfile | 2 +- ilsp/bicep_lsp/proxy.py | 276 ++++++++++++++++++++++------------------ ilsp/server.py | 18 ++- pyproject.toml | 2 +- scripts/smoke_test.sh | 16 ++- 5 files changed, 175 insertions(+), 139 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8a1d823..23e7fff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,7 +27,7 @@ RUN pip install --upgrade pip build \ # ── Stage 3: final runtime ──────────────────────────────────────────────────── # Use Microsoft's official .NET runtime image — avoids the SHA1-signed APT key # issue on newer Debian hosts (trixie+ rejects packages.microsoft.com GPG since 2026-02-01). -FROM mcr.microsoft.com/dotnet/runtime:8.0 +FROM mcr.microsoft.com/dotnet/runtime:10.0 # Install Python 3 + pip (the dotnet base image is Debian bookworm) RUN apt-get update \ diff --git a/ilsp/bicep_lsp/proxy.py b/ilsp/bicep_lsp/proxy.py index 467c711..ec290d3 100644 --- a/ilsp/bicep_lsp/proxy.py +++ b/ilsp/bicep_lsp/proxy.py @@ -1,19 +1,23 @@ """ -Asyncio TCP proxy that wraps Bicep.LangServer.dll. +Thread-based TCP proxy that wraps Bicep.LangServer.dll. Architecture: Editor (TCP:2088) ──► BicepProxy ──► Bicep.LangServer subprocess (stdio) -The proxy intercepts textDocument/completion responses and injects -LRU Bicep module completions with higher sort priority (sortText "0_lru_..."). +Uses subprocess.Popen + threads instead of asyncio subprocess — far more +reliable for stdin/stdout bridging of long-lived processes. + +Intercepts textDocument/completion responses and injects LRU Bicep module +completions with higher sort priority (sortText "0_lru_..."). All other LSP messages are forwarded unchanged. """ -import asyncio import json import logging import os +import socket import subprocess +import threading from typing import Any from .modules import BicepModuleCatalog @@ -24,141 +28,159 @@ BICEP_LS_PATH = os.getenv( "BICEP_LS_PATH", "/opt/bicep-langserver/Bicep.LangServer.dll", ) -LISTEN_PORT = int(os.getenv("BICEP_LSP_PORT", "2088")) -class _ContentLengthFramer: - """Reads/writes LSP Content-Length framed messages.""" +def _read_message(fileobj) -> bytes: + """Read one LSP Content-Length framed message from a file-like object.""" + header = b"" + while not header.endswith(b"\r\n\r\n"): + ch = fileobj.read(1) + if not ch: + raise EOFError("Stream closed") + header += ch - def __init__(self, reader: asyncio.StreamReader): - self._reader = reader + content_length = 0 + for line in header.split(b"\r\n"): + if line.lower().startswith(b"content-length:"): + content_length = int(line.split(b":")[1].strip()) - async def read_message(self) -> bytes: - headers = b"" - while not headers.endswith(b"\r\n\r\n"): - chunk = await self._reader.read(1) - if not chunk: - raise EOFError("Connection closed") - headers += chunk - - content_length = 0 - for line in headers.split(b"\r\n"): - if line.lower().startswith(b"content-length:"): - content_length = int(line.split(b":")[1].strip()) - - body = await self._reader.readexactly(content_length) - return body - - @staticmethod - def frame(body: bytes) -> bytes: - return f"Content-Length: {len(body)}\r\n\r\n".encode() + body + return fileobj.read(content_length) -class BicepProxy: - """Per-connection proxy between one editor client and one Bicep LS process.""" +def _frame(body: bytes) -> bytes: + return f"Content-Length: {len(body)}\r\n\r\n".encode() + body - def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - self._client_reader = reader - self._client_writer = writer - self._proc: subprocess.Popen | None = None - self._ls_reader: asyncio.StreamReader | None = None - self._ls_writer: asyncio.StreamWriter | None = None - - async def run(self) -> None: - peer = self._client_writer.get_extra_info("peername") - logger.info("New Bicep client: %s", peer) - - self._proc = await asyncio.create_subprocess_exec( - "dotnet", BICEP_LS_PATH, "--stdio", - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - self._ls_reader = self._proc.stdout - self._ls_writer = self._proc.stdin - - try: - await asyncio.gather( - self._client_to_ls(), - self._ls_to_client(), - ) - except (EOFError, ConnectionResetError, asyncio.CancelledError): - pass - finally: - self._cleanup() - - async def _client_to_ls(self) -> None: - framer = _ContentLengthFramer(self._client_reader) - while True: - body = await framer.read_message() - framed = _ContentLengthFramer.frame(body) - self._ls_writer.write(framed) - await self._ls_writer.drain() - - async def _ls_to_client(self) -> None: - framer = _ContentLengthFramer(self._ls_reader) - while True: - body = await framer.read_message() - - try: - msg = json.loads(body) - body = self._maybe_inject_completions(msg) - except json.JSONDecodeError: - pass - - framed = _ContentLengthFramer.frame( - body if isinstance(body, bytes) else json.dumps(body).encode() - ) - self._client_writer.write(framed) - await self._client_writer.drain() - - def _maybe_inject_completions(self, msg: dict[str, Any]) -> dict[str, Any] | bytes: - """Inject LRU modules into completion responses.""" - result = msg.get("result") - if result is None: - return json.dumps(msg).encode() - - # Completion result is either a list or {isIncomplete, items} - items: list | None = None - if isinstance(result, list): - items = result - elif isinstance(result, dict) and "items" in result: - items = result["items"] - - if items is None: - return json.dumps(msg).encode() - - lru_items = BicepModuleCatalog.as_completion_items() - if lru_items: - # Downgrade standard items so LRU sorts first - for item in items: - st = item.get("sortText", item.get("label", "")) - item["sortText"] = f"1_az_{st}" - - if isinstance(result, list): - msg["result"] = lru_items + items - else: - result["items"] = lru_items + items - result["isIncomplete"] = True +def _inject_completions(msg: dict[str, Any]) -> bytes: + """Inject LRU modules into completion responses.""" + result = msg.get("result") + if result is None: return json.dumps(msg).encode() - def _cleanup(self) -> None: - if self._proc and self._proc.returncode is None: - self._proc.terminate() - self._client_writer.close() + items: list | None = None + if isinstance(result, list): + items = result + elif isinstance(result, dict) and "items" in result: + items = result["items"] + + if items is None: + return json.dumps(msg).encode() + + lru_items = BicepModuleCatalog.as_completion_items() + if lru_items: + for item in items: + st = item.get("sortText", item.get("label", "")) + item["sortText"] = f"1_az_{st}" + if isinstance(result, list): + msg["result"] = lru_items + items + else: + result["items"] = lru_items + items + result["isIncomplete"] = True + + return json.dumps(msg).encode() -async def serve_bicep(port: int = LISTEN_PORT) -> None: - """Start the Bicep LSP TCP proxy server.""" - await BicepModuleCatalog.start_background_refresh() +def _client_to_ls( + conn_file, + proc_stdin, +) -> None: + try: + while True: + body = _read_message(conn_file) + logger.debug("Client→LS: %d bytes", len(body)) + framed = _frame(body) + proc_stdin.write(framed) + proc_stdin.flush() + logger.debug("Client→LS: flushed") + except EOFError: + logger.debug("Client write side closed — signalling EOF to LS") + except Exception as exc: + logger.debug("Client→LS error: %s", exc) + finally: + # Half-close: tell the LS that no more input is coming. + # The LS may still send responses, so we don't kill it here. + try: + proc_stdin.close() + except Exception: + pass - async def _handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: - proxy = BicepProxy(reader, writer) - await proxy.run() - server = await asyncio.start_server(_handle, "0.0.0.0", port) +def _ls_to_client( + proc_stdout, + conn: socket.socket, +) -> None: + try: + while True: + body = _read_message(proc_stdout) + logger.debug("LS→Client: %d bytes", len(body)) + try: + out = _inject_completions(json.loads(body)) + except json.JSONDecodeError: + out = body + conn.sendall(_frame(out)) + except EOFError: + logger.debug("Bicep LS stdout closed") + except Exception as exc: + logger.debug("LS→Client error: %s", exc) + + +def _handle_client(conn: socket.socket, addr: tuple) -> None: + logger.info("New Bicep client: %s", addr) + proc = subprocess.Popen( + ["dotnet", BICEP_LS_PATH, "--stdio"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ) + logger.info("Bicep LS subprocess started (pid=%d)", proc.pid) + + # Unbuffered read from the socket — critical for correct LSP framing + conn_file = conn.makefile("rb", buffering=0) + + # t1: client → LS (finishes when client closes write side) + t1 = threading.Thread( + target=_client_to_ls, + args=(conn_file, proc.stdin), + daemon=True, + ) + # t2: LS → client (finishes when LS closes stdout) + t2 = threading.Thread( + target=_ls_to_client, + args=(proc.stdout, conn), + daemon=True, + ) + t1.start() + t2.start() + # Session ends when LS is done (not when client closes write side) + t2.join() + + try: + proc.wait(timeout=3) + except Exception: + proc.terminate() + try: + conn.close() + except Exception: + pass + + logger.info("Bicep client %s disconnected", addr) + + +def serve_bicep(port: int) -> None: + """Blocking TCP server — run in a daemon thread alongside asyncio.""" + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.bind(("0.0.0.0", port)) + server.listen(10) logger.info("Bicep LSP proxy listening on TCP :%d", port) - async with server: - await server.serve_forever() + + while True: + try: + conn, addr = server.accept() + threading.Thread( + target=_handle_client, + args=(conn, addr), + daemon=True, + ).start() + except Exception as exc: + logger.error("Accept error: %s", exc) diff --git a/ilsp/server.py b/ilsp/server.py index ac47016..d5dd3d1 100644 --- a/ilsp/server.py +++ b/ilsp/server.py @@ -11,6 +11,7 @@ import asyncio import logging import os import signal +import threading from aiohttp import web @@ -58,13 +59,23 @@ async def main_async() -> None: level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) + logging.getLogger("ilsp.bicep_lsp.proxy").setLevel(logging.DEBUG) # Pre-warm caches logger.info("Pre-warming catalogs…") + from .bicep_lsp.modules import BicepModuleCatalog await asyncio.gather( PypiCatalog.start_background_refresh(), + BicepModuleCatalog.start_background_refresh(), ) + # Start Bicep LSP proxy in a daemon thread (blocking socket server) + threading.Thread( + target=serve_bicep, + args=(BICEP_LSP_PORT,), + daemon=True, + ).start() + # Build health app health_app = await _health_app() runner = web.AppRunner(health_app) @@ -73,11 +84,8 @@ async def main_async() -> None: await site.start() logger.info("Health endpoint on http://0.0.0.0:%d/health", HEALTH_PORT) - # Run all services - await asyncio.gather( - _serve_python_lsp(PYTHON_LSP_PORT), - serve_bicep(BICEP_LSP_PORT), - ) + # Run remaining asyncio services + await _serve_python_lsp(PYTHON_LSP_PORT) def main() -> None: diff --git a/pyproject.toml b/pyproject.toml index 8e90874..8078772 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [build-system] requires = ["setuptools>=68", "wheel"] -build-backend = "setuptools.backends.legacy:build" +build-backend = "setuptools.build_meta" [project] name = "ilsp" diff --git a/scripts/smoke_test.sh b/scripts/smoke_test.sh index 391557d..7d9afe0 100755 --- a/scripts/smoke_test.sh +++ b/scripts/smoke_test.sh @@ -13,8 +13,8 @@ HEALTH_PORT="${HEALTH_PORT:-2089}" PASS=0 FAIL=0 -ok() { echo " ✓ $*"; ((PASS++)); } -fail() { echo " ✗ $*"; ((FAIL++)); } +ok() { echo " ✓ $*"; PASS=$((PASS+1)); } +fail() { echo " ✗ $*"; FAIL=$((FAIL+1)); } # ── Helper: send LSP initialize and read response ───────────────────────────── @@ -28,8 +28,14 @@ send_lsp_init() { lsp_check() { local name="$1" local port="$2" + local timeout="${3:-3}" local response - response=$(send_lsp_init "$port" | nc -w 3 "$HOST" "$port" 2>/dev/null || true) + # Keep stdin open during LSP server startup: the server must NOT see EOF + # on stdin before it has finished responding (especially slow .NET JIT). + # We send the init message and then sleep for the full timeout so that nc + # keeps the TCP write-side open while reading the server's response. + response=$({ send_lsp_init "$port"; sleep "$timeout"; } \ + | nc -w "$timeout" "$HOST" "$port" 2>/dev/null || true) if echo "$response" | grep -q '"result"'; then ok "$name LSP responded to initialize (port $port)" else @@ -65,11 +71,11 @@ else echo " ⚠ nc not found — skipping TCP tests" fi -# 3. Bicep LSP +# 3. Bicep LSP (longer timeout — .NET startup takes a few seconds) echo "" echo "Bicep LSP (TCP :$BICEP_PORT)" if command -v nc &>/dev/null; then - lsp_check "Bicep" "$BICEP_PORT" + lsp_check "Bicep" "$BICEP_PORT" 12 fi # ── Summary ───────────────────────────────────────────────────────────────────