asyncio subprocess PIPE unreliable for long-lived stdio bridging. Use Popen + threads instead. Also fix smoke_test.sh stdin handling.
187 lines
5.3 KiB
Python
187 lines
5.3 KiB
Python
"""
|
|
Thread-based TCP proxy that wraps Bicep.LangServer.dll.
|
|
|
|
Architecture:
|
|
Editor (TCP:2088) ──► BicepProxy ──► Bicep.LangServer subprocess (stdio)
|
|
|
|
Uses subprocess.Popen + threads instead of asyncio subprocess — far more
|
|
reliable for stdin/stdout bridging of long-lived processes.
|
|
|
|
Intercepts textDocument/completion responses and injects LRU Bicep module
|
|
completions with higher sort priority (sortText "0_lru_...").
|
|
All other LSP messages are forwarded unchanged.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import threading
|
|
from typing import Any
|
|
|
|
from .modules import BicepModuleCatalog
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BICEP_LS_PATH = os.getenv(
|
|
"BICEP_LS_PATH",
|
|
"/opt/bicep-langserver/Bicep.LangServer.dll",
|
|
)
|
|
|
|
|
|
def _read_message(fileobj) -> bytes:
|
|
"""Read one LSP Content-Length framed message from a file-like object."""
|
|
header = b""
|
|
while not header.endswith(b"\r\n\r\n"):
|
|
ch = fileobj.read(1)
|
|
if not ch:
|
|
raise EOFError("Stream closed")
|
|
header += ch
|
|
|
|
content_length = 0
|
|
for line in header.split(b"\r\n"):
|
|
if line.lower().startswith(b"content-length:"):
|
|
content_length = int(line.split(b":")[1].strip())
|
|
|
|
return fileobj.read(content_length)
|
|
|
|
|
|
def _frame(body: bytes) -> bytes:
|
|
return f"Content-Length: {len(body)}\r\n\r\n".encode() + body
|
|
|
|
|
|
def _inject_completions(msg: dict[str, Any]) -> bytes:
|
|
"""Inject LRU modules into completion responses."""
|
|
result = msg.get("result")
|
|
if result is None:
|
|
return json.dumps(msg).encode()
|
|
|
|
items: list | None = None
|
|
if isinstance(result, list):
|
|
items = result
|
|
elif isinstance(result, dict) and "items" in result:
|
|
items = result["items"]
|
|
|
|
if items is None:
|
|
return json.dumps(msg).encode()
|
|
|
|
lru_items = BicepModuleCatalog.as_completion_items()
|
|
if lru_items:
|
|
for item in items:
|
|
st = item.get("sortText", item.get("label", ""))
|
|
item["sortText"] = f"1_az_{st}"
|
|
if isinstance(result, list):
|
|
msg["result"] = lru_items + items
|
|
else:
|
|
result["items"] = lru_items + items
|
|
result["isIncomplete"] = True
|
|
|
|
return json.dumps(msg).encode()
|
|
|
|
|
|
def _client_to_ls(
|
|
conn_file,
|
|
proc_stdin,
|
|
) -> None:
|
|
try:
|
|
while True:
|
|
body = _read_message(conn_file)
|
|
logger.debug("Client→LS: %d bytes", len(body))
|
|
framed = _frame(body)
|
|
proc_stdin.write(framed)
|
|
proc_stdin.flush()
|
|
logger.debug("Client→LS: flushed")
|
|
except EOFError:
|
|
logger.debug("Client write side closed — signalling EOF to LS")
|
|
except Exception as exc:
|
|
logger.debug("Client→LS error: %s", exc)
|
|
finally:
|
|
# Half-close: tell the LS that no more input is coming.
|
|
# The LS may still send responses, so we don't kill it here.
|
|
try:
|
|
proc_stdin.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _ls_to_client(
|
|
proc_stdout,
|
|
conn: socket.socket,
|
|
) -> None:
|
|
try:
|
|
while True:
|
|
body = _read_message(proc_stdout)
|
|
logger.debug("LS→Client: %d bytes", len(body))
|
|
try:
|
|
out = _inject_completions(json.loads(body))
|
|
except json.JSONDecodeError:
|
|
out = body
|
|
conn.sendall(_frame(out))
|
|
except EOFError:
|
|
logger.debug("Bicep LS stdout closed")
|
|
except Exception as exc:
|
|
logger.debug("LS→Client error: %s", exc)
|
|
|
|
|
|
def _handle_client(conn: socket.socket, addr: tuple) -> None:
|
|
logger.info("New Bicep client: %s", addr)
|
|
proc = subprocess.Popen(
|
|
["dotnet", BICEP_LS_PATH, "--stdio"],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
logger.info("Bicep LS subprocess started (pid=%d)", proc.pid)
|
|
|
|
# Unbuffered read from the socket — critical for correct LSP framing
|
|
conn_file = conn.makefile("rb", buffering=0)
|
|
|
|
# t1: client → LS (finishes when client closes write side)
|
|
t1 = threading.Thread(
|
|
target=_client_to_ls,
|
|
args=(conn_file, proc.stdin),
|
|
daemon=True,
|
|
)
|
|
# t2: LS → client (finishes when LS closes stdout)
|
|
t2 = threading.Thread(
|
|
target=_ls_to_client,
|
|
args=(proc.stdout, conn),
|
|
daemon=True,
|
|
)
|
|
t1.start()
|
|
t2.start()
|
|
# Session ends when LS is done (not when client closes write side)
|
|
t2.join()
|
|
|
|
try:
|
|
proc.wait(timeout=3)
|
|
except Exception:
|
|
proc.terminate()
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
logger.info("Bicep client %s disconnected", addr)
|
|
|
|
|
|
def serve_bicep(port: int) -> None:
|
|
"""Blocking TCP server — run in a daemon thread alongside asyncio."""
|
|
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
server.bind(("0.0.0.0", port))
|
|
server.listen(10)
|
|
logger.info("Bicep LSP proxy listening on TCP :%d", port)
|
|
|
|
while True:
|
|
try:
|
|
conn, addr = server.accept()
|
|
threading.Thread(
|
|
target=_handle_client,
|
|
args=(conn, addr),
|
|
daemon=True,
|
|
).start()
|
|
except Exception as exc:
|
|
logger.error("Accept error: %s", exc)
|