Files
SigHej/backend/app/services/session_service.py

40 lines
1.2 KiB
Python
Raw Normal View History

2026-05-12 18:21:25 +02:00
import json
import uuid
from datetime import UTC, datetime, timedelta
from app.core.config import settings
from app.core.redis import get_client
async def create_session(ble_token: str, interests: list[str]) -> dict:
"""Store an ephemeral BLE session in Redis with TTL."""
client = get_client()
session_id = str(uuid.uuid4())
expires_at = datetime.now(UTC) + timedelta(seconds=settings.session_ttl_seconds)
payload = json.dumps({
"session_id": session_id,
"interests": interests,
"created_at": datetime.now(UTC).isoformat(),
})
await client.setex(f"session:{ble_token}", settings.session_ttl_seconds, payload)
return {"session_id": session_id, "expires_at": expires_at}
async def get_session(ble_token: str) -> dict | None:
"""Retrieve a session by BLE token. Returns None if expired or not found."""
client = get_client()
raw = await client.get(f"session:{ble_token}")
if raw is None:
return None
return json.loads(raw)
async def count_active_sessions() -> int:
"""Return number of active (non-expired) sessions."""
client = get_client()
keys = await client.keys("session:*")
return len(keys)