Files
iLSP/ilsp/bicep_lsp/proxy.py
Henrik Jess Nielsen 6385e159ff
Some checks failed
Build and Deploy iLSP / test (push) Failing after 12s
Build and Deploy iLSP / build-and-deploy (push) Has been skipped
fix: replace asyncio subprocess proxy with thread-based Popen proxy
asyncio subprocess PIPE unreliable for long-lived stdio bridging. Use Popen + threads instead. Also fix smoke_test.sh stdin handling.
2026-05-10 13:02:52 +02:00

187 lines
5.3 KiB
Python

"""
Thread-based TCP proxy that wraps Bicep.LangServer.dll.
Architecture:
Editor (TCP:2088) ──► BicepProxy ──► Bicep.LangServer subprocess (stdio)
Uses subprocess.Popen + threads instead of asyncio subprocess — far more
reliable for stdin/stdout bridging of long-lived processes.
Intercepts textDocument/completion responses and injects LRU Bicep module
completions with higher sort priority (sortText "0_lru_...").
All other LSP messages are forwarded unchanged.
"""
import json
import logging
import os
import socket
import subprocess
import threading
from typing import Any
from .modules import BicepModuleCatalog
logger = logging.getLogger(__name__)
BICEP_LS_PATH = os.getenv(
"BICEP_LS_PATH",
"/opt/bicep-langserver/Bicep.LangServer.dll",
)
def _read_message(fileobj) -> bytes:
"""Read one LSP Content-Length framed message from a file-like object."""
header = b""
while not header.endswith(b"\r\n\r\n"):
ch = fileobj.read(1)
if not ch:
raise EOFError("Stream closed")
header += ch
content_length = 0
for line in header.split(b"\r\n"):
if line.lower().startswith(b"content-length:"):
content_length = int(line.split(b":")[1].strip())
return fileobj.read(content_length)
def _frame(body: bytes) -> bytes:
return f"Content-Length: {len(body)}\r\n\r\n".encode() + body
def _inject_completions(msg: dict[str, Any]) -> bytes:
"""Inject LRU modules into completion responses."""
result = msg.get("result")
if result is None:
return json.dumps(msg).encode()
items: list | None = None
if isinstance(result, list):
items = result
elif isinstance(result, dict) and "items" in result:
items = result["items"]
if items is None:
return json.dumps(msg).encode()
lru_items = BicepModuleCatalog.as_completion_items()
if lru_items:
for item in items:
st = item.get("sortText", item.get("label", ""))
item["sortText"] = f"1_az_{st}"
if isinstance(result, list):
msg["result"] = lru_items + items
else:
result["items"] = lru_items + items
result["isIncomplete"] = True
return json.dumps(msg).encode()
def _client_to_ls(
conn_file,
proc_stdin,
) -> None:
try:
while True:
body = _read_message(conn_file)
logger.debug("Client→LS: %d bytes", len(body))
framed = _frame(body)
proc_stdin.write(framed)
proc_stdin.flush()
logger.debug("Client→LS: flushed")
except EOFError:
logger.debug("Client write side closed — signalling EOF to LS")
except Exception as exc:
logger.debug("Client→LS error: %s", exc)
finally:
# Half-close: tell the LS that no more input is coming.
# The LS may still send responses, so we don't kill it here.
try:
proc_stdin.close()
except Exception:
pass
def _ls_to_client(
proc_stdout,
conn: socket.socket,
) -> None:
try:
while True:
body = _read_message(proc_stdout)
logger.debug("LS→Client: %d bytes", len(body))
try:
out = _inject_completions(json.loads(body))
except json.JSONDecodeError:
out = body
conn.sendall(_frame(out))
except EOFError:
logger.debug("Bicep LS stdout closed")
except Exception as exc:
logger.debug("LS→Client error: %s", exc)
def _handle_client(conn: socket.socket, addr: tuple) -> None:
logger.info("New Bicep client: %s", addr)
proc = subprocess.Popen(
["dotnet", BICEP_LS_PATH, "--stdio"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
logger.info("Bicep LS subprocess started (pid=%d)", proc.pid)
# Unbuffered read from the socket — critical for correct LSP framing
conn_file = conn.makefile("rb", buffering=0)
# t1: client → LS (finishes when client closes write side)
t1 = threading.Thread(
target=_client_to_ls,
args=(conn_file, proc.stdin),
daemon=True,
)
# t2: LS → client (finishes when LS closes stdout)
t2 = threading.Thread(
target=_ls_to_client,
args=(proc.stdout, conn),
daemon=True,
)
t1.start()
t2.start()
# Session ends when LS is done (not when client closes write side)
t2.join()
try:
proc.wait(timeout=3)
except Exception:
proc.terminate()
try:
conn.close()
except Exception:
pass
logger.info("Bicep client %s disconnected", addr)
def serve_bicep(port: int) -> None:
"""Blocking TCP server — run in a daemon thread alongside asyncio."""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("0.0.0.0", port))
server.listen(10)
logger.info("Bicep LSP proxy listening on TCP :%d", port)
while True:
try:
conn, addr = server.accept()
threading.Thread(
target=_handle_client,
args=(conn, addr),
daemon=True,
).start()
except Exception as exc:
logger.error("Accept error: %s", exc)