Files
fil/scripts/classify_server.py

118 lines
3.7 KiB
Python
Raw Normal View History

"""classify_server.py — FastAPI service that adds taxonomy classification to kreuzberg /extract.
Exposes POST /classify same multipart interface as kreuzberg /extract,
returns the full kreuzberg response plus category/subcategory/confidence fields.
Usage:
uvicorn scripts.classify_server:app --host 0.0.0.0 --port 8001
"""
from __future__ import annotations
import json
import os
from contextlib import asynccontextmanager
from typing import Annotated
import httpx
from fastapi import FastAPI, File, Form, UploadFile
from fastapi.responses import JSONResponse
from taxonomy import classify_text
KREUZBERG_URL = os.getenv("KREUZBERG_URL", "https://check.i80.dk")
YAKE_CONFIG = {"keywords": {"algorithm": "yake", "max_keywords": 15}}
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.client = httpx.AsyncClient(timeout=60.0)
yield
await app.state.client.aclose()
app = FastAPI(
title="kreuzberg-classify",
description="Taxonomy classification on top of kreuzberg /extract",
version="1.0.0",
lifespan=lifespan,
)
@app.get("/health")
async def health():
return {"status": "healthy", "kreuzberg_url": KREUZBERG_URL}
@app.post("/classify")
async def classify(
files: Annotated[list[UploadFile], File()],
config: Annotated[str | None, Form()] = None,
folder: Annotated[str | None, Form()] = None,
):
"""Extract text + keywords via kreuzberg, then classify into taxonomy.
Args:
files: One or more document files (PDF, DOCX, etc.)
config: Optional JSON config for kreuzberg (merged with YAKE defaults).
folder: Optional current folder path for context hint.
Returns:
List of results one per file with all kreuzberg fields plus:
category, subcategory, confidence, runner_up, runner_up_score.
"""
# Merge caller config with our YAKE defaults
merged_config = dict(YAKE_CONFIG)
if config:
try:
caller_cfg = json.loads(config)
merged_config.update(caller_cfg)
except json.JSONDecodeError:
pass
# Forward files to kreuzberg /extract
form_data = [("config", json.dumps(merged_config))]
file_contents = []
for upload in files:
content = await upload.read()
file_contents.append((upload.filename, content, upload.content_type or "application/octet-stream"))
form_data.append(("files", (upload.filename, content, upload.content_type or "application/octet-stream")))
try:
response = await app.state.client.post(
f"{KREUZBERG_URL}/extract",
files=[("files", (fn, fc, ct)) for fn, fc, ct in file_contents],
data={"config": json.dumps(merged_config)},
)
response.raise_for_status()
kreuzberg_results = response.json()
except httpx.HTTPError as exc:
return JSONResponse(status_code=502, content={"error": f"kreuzberg error: {exc}"})
# Ensure list
if isinstance(kreuzberg_results, dict):
kreuzberg_results = [kreuzberg_results]
folder_hint = folder or ""
results = []
for i, result in enumerate(kreuzberg_results):
content_text = result.get("content", "") or ""
# Extract keyword strings from kreuzberg response
raw_keywords = result.get("keywords", []) or []
if raw_keywords and isinstance(raw_keywords[0], dict):
kw_strings = [k.get("keyword", "") or k.get("phrase", "") for k in raw_keywords]
else:
kw_strings = [str(k) for k in raw_keywords]
classification = classify_text(
content=content_text,
keywords=kw_strings,
folder_hint=folder_hint,
)
results.append({**result, **classification})
return results