""" Thread-based TCP proxy that wraps Bicep.LangServer.dll. Architecture: Editor (TCP:2088) ──► BicepProxy ──► Bicep.LangServer subprocess (stdio) Uses subprocess.Popen + threads instead of asyncio subprocess — far more reliable for stdin/stdout bridging of long-lived processes. Intercepts textDocument/completion responses and injects LRU Bicep module completions with higher sort priority (sortText "0_lru_..."). All other LSP messages are forwarded unchanged. """ import json import logging import os import socket import subprocess import threading from typing import Any from .modules import BicepModuleCatalog logger = logging.getLogger(__name__) BICEP_LS_PATH = os.getenv( "BICEP_LS_PATH", "/opt/bicep-langserver/Bicep.LangServer.dll", ) def _read_message(fileobj) -> bytes: """Read one LSP Content-Length framed message from a file-like object.""" header = b"" while not header.endswith(b"\r\n\r\n"): ch = fileobj.read(1) if not ch: raise EOFError("Stream closed") header += ch content_length = 0 for line in header.split(b"\r\n"): if line.lower().startswith(b"content-length:"): content_length = int(line.split(b":")[1].strip()) return fileobj.read(content_length) def _frame(body: bytes) -> bytes: return f"Content-Length: {len(body)}\r\n\r\n".encode() + body def _inject_completions(msg: dict[str, Any]) -> bytes: """Inject LRU modules into completion responses.""" result = msg.get("result") if result is None: return json.dumps(msg).encode() items: list | None = None if isinstance(result, list): items = result elif isinstance(result, dict) and "items" in result: items = result["items"] if items is None: return json.dumps(msg).encode() lru_items = BicepModuleCatalog.as_completion_items() if lru_items: for item in items: st = item.get("sortText", item.get("label", "")) item["sortText"] = f"1_az_{st}" if isinstance(result, list): msg["result"] = lru_items + items else: result["items"] = lru_items + items result["isIncomplete"] = True return json.dumps(msg).encode() def _client_to_ls( conn_file, proc_stdin, ) -> None: try: while True: body = _read_message(conn_file) logger.debug("Client→LS: %d bytes", len(body)) framed = _frame(body) proc_stdin.write(framed) proc_stdin.flush() logger.debug("Client→LS: flushed") except EOFError: logger.debug("Client write side closed — signalling EOF to LS") except Exception as exc: logger.debug("Client→LS error: %s", exc) finally: # Half-close: tell the LS that no more input is coming. # The LS may still send responses, so we don't kill it here. try: proc_stdin.close() except Exception: pass def _ls_to_client( proc_stdout, conn: socket.socket, ) -> None: try: while True: body = _read_message(proc_stdout) logger.debug("LS→Client: %d bytes", len(body)) try: out = _inject_completions(json.loads(body)) except json.JSONDecodeError: out = body conn.sendall(_frame(out)) except EOFError: logger.debug("Bicep LS stdout closed") except Exception as exc: logger.debug("LS→Client error: %s", exc) def _handle_client(conn: socket.socket, addr: tuple) -> None: logger.info("New Bicep client: %s", addr) proc = subprocess.Popen( ["dotnet", BICEP_LS_PATH, "--stdio"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) logger.info("Bicep LS subprocess started (pid=%d)", proc.pid) # Unbuffered read from the socket — critical for correct LSP framing conn_file = conn.makefile("rb", buffering=0) # t1: client → LS (finishes when client closes write side) t1 = threading.Thread( target=_client_to_ls, args=(conn_file, proc.stdin), daemon=True, ) # t2: LS → client (finishes when LS closes stdout) t2 = threading.Thread( target=_ls_to_client, args=(proc.stdout, conn), daemon=True, ) t1.start() t2.start() # Session ends when LS is done (not when client closes write side) t2.join() try: proc.wait(timeout=3) except Exception: proc.terminate() try: conn.close() except Exception: pass logger.info("Bicep client %s disconnected", addr) def serve_bicep(port: int) -> None: """Blocking TCP server — run in a daemon thread alongside asyncio.""" server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(("0.0.0.0", port)) server.listen(10) logger.info("Bicep LSP proxy listening on TCP :%d", port) while True: try: conn, addr = server.accept() threading.Thread( target=_handle_client, args=(conn, addr), daemon=True, ).start() except Exception as exc: logger.error("Accept error: %s", exc)