fix: replace asyncio subprocess proxy with thread-based Popen proxy
Some checks failed
Build and Deploy iLSP / test (push) Failing after 12s
Build and Deploy iLSP / build-and-deploy (push) Has been skipped

asyncio subprocess PIPE unreliable for long-lived stdio bridging. Use Popen + threads instead. Also fix smoke_test.sh stdin handling.
This commit is contained in:
Henrik Jess Nielsen
2026-05-10 13:02:52 +02:00
parent c550a4963e
commit 6385e159ff
5 changed files with 175 additions and 139 deletions

View File

@@ -1,19 +1,23 @@
"""
Asyncio TCP proxy that wraps Bicep.LangServer.dll.
Thread-based TCP proxy that wraps Bicep.LangServer.dll.
Architecture:
Editor (TCP:2088) ──► BicepProxy ──► Bicep.LangServer subprocess (stdio)
The proxy intercepts textDocument/completion responses and injects
LRU Bicep module completions with higher sort priority (sortText "0_lru_...").
Uses subprocess.Popen + threads instead of asyncio subprocess — far more
reliable for stdin/stdout bridging of long-lived processes.
Intercepts textDocument/completion responses and injects LRU Bicep module
completions with higher sort priority (sortText "0_lru_...").
All other LSP messages are forwarded unchanged.
"""
import asyncio
import json
import logging
import os
import socket
import subprocess
import threading
from typing import Any
from .modules import BicepModuleCatalog
@@ -24,141 +28,159 @@ BICEP_LS_PATH = os.getenv(
"BICEP_LS_PATH",
"/opt/bicep-langserver/Bicep.LangServer.dll",
)
LISTEN_PORT = int(os.getenv("BICEP_LSP_PORT", "2088"))
class _ContentLengthFramer:
"""Reads/writes LSP Content-Length framed messages."""
def _read_message(fileobj) -> bytes:
"""Read one LSP Content-Length framed message from a file-like object."""
header = b""
while not header.endswith(b"\r\n\r\n"):
ch = fileobj.read(1)
if not ch:
raise EOFError("Stream closed")
header += ch
def __init__(self, reader: asyncio.StreamReader):
self._reader = reader
content_length = 0
for line in header.split(b"\r\n"):
if line.lower().startswith(b"content-length:"):
content_length = int(line.split(b":")[1].strip())
async def read_message(self) -> bytes:
headers = b""
while not headers.endswith(b"\r\n\r\n"):
chunk = await self._reader.read(1)
if not chunk:
raise EOFError("Connection closed")
headers += chunk
content_length = 0
for line in headers.split(b"\r\n"):
if line.lower().startswith(b"content-length:"):
content_length = int(line.split(b":")[1].strip())
body = await self._reader.readexactly(content_length)
return body
@staticmethod
def frame(body: bytes) -> bytes:
return f"Content-Length: {len(body)}\r\n\r\n".encode() + body
return fileobj.read(content_length)
class BicepProxy:
"""Per-connection proxy between one editor client and one Bicep LS process."""
def _frame(body: bytes) -> bytes:
return f"Content-Length: {len(body)}\r\n\r\n".encode() + body
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self._client_reader = reader
self._client_writer = writer
self._proc: subprocess.Popen | None = None
self._ls_reader: asyncio.StreamReader | None = None
self._ls_writer: asyncio.StreamWriter | None = None
async def run(self) -> None:
peer = self._client_writer.get_extra_info("peername")
logger.info("New Bicep client: %s", peer)
self._proc = await asyncio.create_subprocess_exec(
"dotnet", BICEP_LS_PATH, "--stdio",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._ls_reader = self._proc.stdout
self._ls_writer = self._proc.stdin
try:
await asyncio.gather(
self._client_to_ls(),
self._ls_to_client(),
)
except (EOFError, ConnectionResetError, asyncio.CancelledError):
pass
finally:
self._cleanup()
async def _client_to_ls(self) -> None:
framer = _ContentLengthFramer(self._client_reader)
while True:
body = await framer.read_message()
framed = _ContentLengthFramer.frame(body)
self._ls_writer.write(framed)
await self._ls_writer.drain()
async def _ls_to_client(self) -> None:
framer = _ContentLengthFramer(self._ls_reader)
while True:
body = await framer.read_message()
try:
msg = json.loads(body)
body = self._maybe_inject_completions(msg)
except json.JSONDecodeError:
pass
framed = _ContentLengthFramer.frame(
body if isinstance(body, bytes) else json.dumps(body).encode()
)
self._client_writer.write(framed)
await self._client_writer.drain()
def _maybe_inject_completions(self, msg: dict[str, Any]) -> dict[str, Any] | bytes:
"""Inject LRU modules into completion responses."""
result = msg.get("result")
if result is None:
return json.dumps(msg).encode()
# Completion result is either a list or {isIncomplete, items}
items: list | None = None
if isinstance(result, list):
items = result
elif isinstance(result, dict) and "items" in result:
items = result["items"]
if items is None:
return json.dumps(msg).encode()
lru_items = BicepModuleCatalog.as_completion_items()
if lru_items:
# Downgrade standard items so LRU sorts first
for item in items:
st = item.get("sortText", item.get("label", ""))
item["sortText"] = f"1_az_{st}"
if isinstance(result, list):
msg["result"] = lru_items + items
else:
result["items"] = lru_items + items
result["isIncomplete"] = True
def _inject_completions(msg: dict[str, Any]) -> bytes:
"""Inject LRU modules into completion responses."""
result = msg.get("result")
if result is None:
return json.dumps(msg).encode()
def _cleanup(self) -> None:
if self._proc and self._proc.returncode is None:
self._proc.terminate()
self._client_writer.close()
items: list | None = None
if isinstance(result, list):
items = result
elif isinstance(result, dict) and "items" in result:
items = result["items"]
if items is None:
return json.dumps(msg).encode()
lru_items = BicepModuleCatalog.as_completion_items()
if lru_items:
for item in items:
st = item.get("sortText", item.get("label", ""))
item["sortText"] = f"1_az_{st}"
if isinstance(result, list):
msg["result"] = lru_items + items
else:
result["items"] = lru_items + items
result["isIncomplete"] = True
return json.dumps(msg).encode()
async def serve_bicep(port: int = LISTEN_PORT) -> None:
"""Start the Bicep LSP TCP proxy server."""
await BicepModuleCatalog.start_background_refresh()
def _client_to_ls(
conn_file,
proc_stdin,
) -> None:
try:
while True:
body = _read_message(conn_file)
logger.debug("Client→LS: %d bytes", len(body))
framed = _frame(body)
proc_stdin.write(framed)
proc_stdin.flush()
logger.debug("Client→LS: flushed")
except EOFError:
logger.debug("Client write side closed — signalling EOF to LS")
except Exception as exc:
logger.debug("Client→LS error: %s", exc)
finally:
# Half-close: tell the LS that no more input is coming.
# The LS may still send responses, so we don't kill it here.
try:
proc_stdin.close()
except Exception:
pass
async def _handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
proxy = BicepProxy(reader, writer)
await proxy.run()
server = await asyncio.start_server(_handle, "0.0.0.0", port)
def _ls_to_client(
proc_stdout,
conn: socket.socket,
) -> None:
try:
while True:
body = _read_message(proc_stdout)
logger.debug("LS→Client: %d bytes", len(body))
try:
out = _inject_completions(json.loads(body))
except json.JSONDecodeError:
out = body
conn.sendall(_frame(out))
except EOFError:
logger.debug("Bicep LS stdout closed")
except Exception as exc:
logger.debug("LS→Client error: %s", exc)
def _handle_client(conn: socket.socket, addr: tuple) -> None:
logger.info("New Bicep client: %s", addr)
proc = subprocess.Popen(
["dotnet", BICEP_LS_PATH, "--stdio"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
logger.info("Bicep LS subprocess started (pid=%d)", proc.pid)
# Unbuffered read from the socket — critical for correct LSP framing
conn_file = conn.makefile("rb", buffering=0)
# t1: client → LS (finishes when client closes write side)
t1 = threading.Thread(
target=_client_to_ls,
args=(conn_file, proc.stdin),
daemon=True,
)
# t2: LS → client (finishes when LS closes stdout)
t2 = threading.Thread(
target=_ls_to_client,
args=(proc.stdout, conn),
daemon=True,
)
t1.start()
t2.start()
# Session ends when LS is done (not when client closes write side)
t2.join()
try:
proc.wait(timeout=3)
except Exception:
proc.terminate()
try:
conn.close()
except Exception:
pass
logger.info("Bicep client %s disconnected", addr)
def serve_bicep(port: int) -> None:
"""Blocking TCP server — run in a daemon thread alongside asyncio."""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("0.0.0.0", port))
server.listen(10)
logger.info("Bicep LSP proxy listening on TCP :%d", port)
async with server:
await server.serve_forever()
while True:
try:
conn, addr = server.accept()
threading.Thread(
target=_handle_client,
args=(conn, addr),
daemon=True,
).start()
except Exception as exc:
logger.error("Accept error: %s", exc)

View File

@@ -11,6 +11,7 @@ import asyncio
import logging
import os
import signal
import threading
from aiohttp import web
@@ -58,13 +59,23 @@ async def main_async() -> None:
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logging.getLogger("ilsp.bicep_lsp.proxy").setLevel(logging.DEBUG)
# Pre-warm caches
logger.info("Pre-warming catalogs…")
from .bicep_lsp.modules import BicepModuleCatalog
await asyncio.gather(
PypiCatalog.start_background_refresh(),
BicepModuleCatalog.start_background_refresh(),
)
# Start Bicep LSP proxy in a daemon thread (blocking socket server)
threading.Thread(
target=serve_bicep,
args=(BICEP_LSP_PORT,),
daemon=True,
).start()
# Build health app
health_app = await _health_app()
runner = web.AppRunner(health_app)
@@ -73,11 +84,8 @@ async def main_async() -> None:
await site.start()
logger.info("Health endpoint on http://0.0.0.0:%d/health", HEALTH_PORT)
# Run all services
await asyncio.gather(
_serve_python_lsp(PYTHON_LSP_PORT),
serve_bicep(BICEP_LSP_PORT),
)
# Run remaining asyncio services
await _serve_python_lsp(PYTHON_LSP_PORT)
def main() -> None: