26 lines
699 B
Python
26 lines
699 B
Python
import json
|
|
|
|
from fastapi import WebSocket
|
|
|
|
|
|
class ConnectionManager:
|
|
"""Manages active WebSocket connections keyed by BLE token."""
|
|
|
|
def __init__(self) -> None:
|
|
self._connections: dict[str, WebSocket] = {}
|
|
|
|
async def connect(self, token: str, websocket: WebSocket) -> None:
|
|
await websocket.accept()
|
|
self._connections[token] = websocket
|
|
|
|
def disconnect(self, token: str) -> None:
|
|
self._connections.pop(token, None)
|
|
|
|
async def send(self, token: str, message: dict) -> None:
|
|
websocket = self._connections.get(token)
|
|
if websocket is not None:
|
|
await websocket.send_text(json.dumps(message))
|
|
|
|
|
|
manager = ConnectionManager()
|