Files
fil/scripts/analyse_familie.py
Henrik Jess Nielsen 58210207ea
Some checks failed
Deploy classify service / build-and-deploy (push) Failing after 24s
Deploy fil (kreuzberg) / deploy (push) Successful in 53s
feat: add taxonomy classify service + /classify endpoint
- scripts/taxonomy.py: shared taxonomy with 14 categories, keyword scorer
  and classify_text() function
- scripts/classify_server.py: FastAPI service — forwards to kreuzberg /extract,
  applies taxonomy, returns category/subcategory/confidence alongside full kreuzberg response
- Dockerfile.classify: lightweight Python image for classify service
- classify.nomad: Nomad job → classify.i80.dk
- .gitea/workflows/classify.yml: CI/CD pipeline (build + deploy)
- analyse_familie.py: refactored to import from taxonomy.py (no duplication)
- .gitignore: exclude dokumenter_keywords.* and extract_all.log
2026-06-05 19:57:39 +02:00

513 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Document keyword analyser — misplacement detection across ~/Dokumenter.
Phase 1 (extract): Hits kreuzberg API for all files, saves progress to JSON.
Phase 2 (analyse): Builds folder profiles, flags files likely in the wrong folder.
Phase 3 (classify): Zero-shot taxonomy classification via sentence-transformers.
Usage:
python3 analyse_familie.py extract
python3 analyse_familie.py analyse
python3 analyse_familie.py classify
python3 analyse_familie.py extract --workers 6 --output my_results.json
python3 analyse_familie.py analyse --threshold 0.25
python3 analyse_familie.py classify --results dokumenter_keywords.json
"""
import csv
import json
import argparse
import re
import time
from pathlib import Path
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import urllib.error
API_URL = "https://check.i80.dk/extract"
DEFAULT_DIR = "/home/hjess/Dokumenter"
DEFAULT_OUTPUT = "dokumenter_keywords.json"
SUPPORTED_EXTS = {".pdf", ".doc", ".docx", ".odt", ".ods", ".ppt", ".pptx", ".txt", ".rtf"}
# Generic words that don't help categorise documents
STOPWORDS = {
# Danish function words
"den", "det", "der", "som", "til", "fra", "med", "for", "og", "i", "er",
"en", "et", "af", "", "at", "de", "har", "ikke", "vi", "hun", "han",
"skal", "kan", "var", "men", "når", "også", "bare", "blev", "bare",
# Form field noise
"navn", "dato", "side", "fulde", "nummer", "adresse", "telefon", "email",
"cpr", "cpr-nr", "postnr", "underskrift", "forælder", "barnet", "oplysninger",
"telefonnummer", "telefonnummer i dagtimerne",
# Email/phone footer boilerplate
"sendt fra", "sendt fra min", "fra min", "fra min iphone",
"min iphone", "min iphone den", "iphone den",
"skrev henrik", "skrev henrik jess", "henrik jess", "henrik",
"ganstar nielsen wrote", "nielsen wrote",
# Names — too generic across this specific corpus
"ganstar", "nielsen", "ganstar nielsen", "tanya ganstar", "tanya ganstar nielsen",
"henrik jess nielsen", "jess nielsen",
# Kaiten mail system noise
"kaiten", "kaiten mail", "med kaiten mail", "via kaiten",
}
SIDER_PATTERN = "Aktindsigt/Sider"
SIDER_SAMPLE = 20 # how many pages to sample from the bulk Sider/ group
BULK_DIR_THRESHOLD = 50 # dirs with more files than this get sampled instead of fully processed
# ---------------------------------------------------------------------------
# HTTP helper (stdlib only — no requests dependency)
# ---------------------------------------------------------------------------
def _post_multipart(url, filepath: Path, config: dict, timeout=90) -> dict | None:
"""POST a file as multipart/form-data using stdlib only. Retries on 502."""
boundary = "----KreuzbergBoundary7MA4YWxkTrZu"
config_json = json.dumps(config).encode()
with open(filepath, "rb") as fh:
file_data = fh.read()
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="files"; filename="{filepath.name}"\r\n'
f"Content-Type: application/octet-stream\r\n\r\n"
).encode() + file_data + (
f"\r\n--{boundary}\r\n"
f'Content-Disposition: form-data; name="config"\r\n\r\n'
).encode() + config_json + f"\r\n--{boundary}--\r\n".encode()
req = urllib.request.Request(url, data=body)
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
req.add_header("Content-Length", str(len(body)))
retries = 3
for attempt in range(retries):
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 502 and attempt < retries - 1:
wait = 5 * (2 ** attempt) # 5s, 10s, 20s
time.sleep(wait)
continue
return {"error": f"HTTP {e.code}"}
except Exception as e:
return {"error": str(e)}
return {"error": "HTTP 502 (max retries exceeded)"}
# ---------------------------------------------------------------------------
# Extraction
# ---------------------------------------------------------------------------
def extract_file(filepath: Path, max_keywords: int = 15) -> dict:
config = {"keywords": {"algorithm": "yake", "max_keywords": max_keywords}}
data = _post_multipart(API_URL, filepath, config)
if data is None or "error" in (data if isinstance(data, dict) else {}):
error = (data or {}).get("error", "unknown")
return {"file": str(filepath), "keywords": [], "languages": [], "content_length": 0, "error": error}
results = data if isinstance(data, list) else [data]
if not results:
return {"file": str(filepath), "keywords": [], "languages": [], "content_length": 0, "error": "empty response"}
r = results[0]
return {
"file": str(filepath),
"keywords": [(k["text"], round(k["score"], 4)) for k in (r.get("extracted_keywords") or [])],
"languages": r.get("detected_languages") or [],
"content_length": len(r.get("content") or ""),
"error": None,
}
def collect_files(directory: Path, bulk_threshold: int = BULK_DIR_THRESHOLD) -> tuple[list[Path], list[Path]]:
"""Return (regular_files, bulk_dirs) where bulk dirs have >bulk_threshold files."""
# First pass: count files per directory
dir_files: dict[Path, list[Path]] = defaultdict(list)
for f in sorted(directory.rglob("*")):
if f.is_file() and f.suffix.lower() in SUPPORTED_EXTS:
dir_files[f.parent].append(f)
regular: list[Path] = []
bulk_dirs: list[Path] = []
seen_bulk: set[Path] = set()
for parent, files in dir_files.items():
# Check if any ancestor is already a bulk dir
if any(b in parent.parents or b == parent for b in seen_bulk):
continue
if len(files) >= bulk_threshold:
bulk_dirs.append(parent)
seen_bulk.add(parent)
else:
regular.extend(files)
return regular, bulk_dirs
def analyse_bulk_group(bulk_dir: Path) -> dict:
"""Sample SIDER_SAMPLE files from a large directory and return a merged group result."""
all_files = [f for f in sorted(bulk_dir.rglob("*")) if f.is_file() and f.suffix.lower() in SUPPORTED_EXTS]
sample = all_files[:SIDER_SAMPLE]
print(f" Sampling {len(sample)}/{len(all_files)} files from {bulk_dir.name}/…")
keyword_counter: Counter = Counter()
for f in sample:
r = extract_file(f, max_keywords=10)
for kw, _ in r["keywords"]:
keyword_counter[kw.lower()] += 1
return {
"file": str(bulk_dir) + f"/ [GROUP — {len(all_files)} files, sampled {len(sample)}]",
"keywords": [(kw, round(count / len(sample), 3)) for kw, count in keyword_counter.most_common(15)],
"languages": ["dan"],
"content_length": -1,
"is_group": True,
"error": None,
}
def run_extract(directory: str, output_file: str, workers: int, bulk_threshold: int = BULK_DIR_THRESHOLD) -> None:
dir_path = Path(directory)
out_path = Path(output_file)
files, bulk_dirs = collect_files(dir_path, bulk_threshold=bulk_threshold)
# Resume from existing output — only skip files with successful results (no error)
results: dict[str, dict] = {}
retriable_errors = {"HTTP 502", "HTTP 502 (max retries exceeded)"}
if out_path.exists():
with open(out_path) as fh:
for r in json.load(fh):
# Retry transient server errors; keep permanent errors (422 etc.)
if r.get("error") in retriable_errors:
continue
results[r["file"]] = r
print(f"Resuming — {len(results)} files already done")
todo = [f for f in files if str(f) not in results]
total = len(todo)
print(f"Files to process: {total} (skipping {len(files) - total} already done)")
if bulk_dirs:
print(f"Bulk directories (sampled): {[str(d) for d in bulk_dirs]}\n")
done = 0
errors = 0
def save():
with open(out_path, "w") as fh:
json.dump(list(results.values()), fh, ensure_ascii=False, indent=2)
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = {ex.submit(extract_file, f): f for f in todo}
for future in as_completed(futures):
result = future.result()
results[result["file"]] = result
done += 1
if result["error"]:
errors += 1
print(f" ❌ [{done}/{total}] {Path(result['file']).name}: {result['error']}")
else:
preview = ", ".join(kw for kw, _ in result["keywords"][:3])
print(f" ✅ [{done}/{total}] {Path(result['file']).name}{preview}")
if done % 20 == 0:
save()
save()
# Handle bulk dirs as sampled groups
for bulk_dir in bulk_dirs:
group_key = str(bulk_dir) + "/ [GROUP]"
if group_key not in results:
print(f"\nAnalysing bulk group: {bulk_dir.name}/")
group = analyse_bulk_group(bulk_dir)
results[group["file"]] = group
save()
print(f" Top keywords: {', '.join(kw for kw, _ in group['keywords'][:5])}")
print(f"\n✅ Finished: {done} files, {errors} errors → {out_path}")
print(f" Run 'python3 {__file__} analyse' to find misplaced files")
# ---------------------------------------------------------------------------
# Phase 2: Misplacement detection
# ---------------------------------------------------------------------------
def _clean_keywords(raw: list[tuple[str, float]]) -> list[str]:
return [
kw.lower().strip() for kw, _ in (raw or [])
if len(kw) >= 4 and kw.lower().strip() not in STOPWORDS
]
def _folder_key(filepath: Path, base: Path) -> str:
"""Return the relative folder path (e.g. 'Privat/Økonomi/Gæld')."""
try:
return str(filepath.relative_to(base).parent)
except ValueError:
return ""
def run_analyse(results_file: str, base_dir: str, threshold: float, min_folder_docs: int) -> None:
"""
Build a keyword profile per folder, then flag files whose keywords
don't overlap well with their current folder.
"""
with open(results_file) as fh:
results = json.load(fh)
base = Path(base_dir)
print(f"Analysing {len(results)} document records…")
# Build folder profiles: folder → Counter(keyword → freq)
folder_profiles: dict[str, Counter] = defaultdict(Counter)
file_kws: dict[str, list[str]] = {}
for r in results:
if r.get("is_group"):
continue
kws = _clean_keywords(r.get("keywords") or [])
file_kws[r["file"]] = kws
folder = _folder_key(Path(r["file"]), base)
if folder:
for kw in kws:
folder_profiles[folder][kw] += 1
# Filter out thin folders (too few docs to be meaningful)
folder_doc_counts: Counter = Counter()
for r in results:
if not r.get("is_group"):
folder_doc_counts[_folder_key(Path(r["file"]), base)] += 1
valid_folders = {f for f, c in folder_doc_counts.items() if c >= min_folder_docs}
print(f"Folder profiles built: {len(valid_folders)} folders with {min_folder_docs}+ documents\n")
# Score each file against its own folder, flag low-overlap files
misplaced: list[dict] = []
unclassified: list[dict] = []
for r in results:
if r.get("is_group"):
continue
fp = Path(r["file"])
folder = _folder_key(fp, base)
kws = file_kws.get(r["file"], [])
if not kws:
continue
if folder == "." or folder == "":
# File sits directly in the root — needs a home
unclassified.append(r)
continue
profile = folder_profiles.get(folder, Counter())
# Overlap: share of file's keywords that appear ≥2 times in folder profile
shared = sum(1 for kw in kws if profile[kw] >= 2)
overlap = shared / len(kws)
if overlap < threshold and folder in valid_folders:
# Find best matching alternative folder
best_folder = max(
valid_folders - {folder},
key=lambda fd: sum(folder_profiles[fd][kw] for kw in kws),
default=None,
)
best_score = sum(folder_profiles[best_folder][kw] for kw in kws) if best_folder else 0
# Skip if both current and suggested are generic "Ukendt" dump folders —
# moving Ukendt/PDF/2011 → Ukendt/PDF/2009 is not an improvement
def _is_ukendt(f: str) -> bool:
return f is not None and "Ukendt" in f
if _is_ukendt(folder) and _is_ukendt(best_folder):
continue
misplaced.append({
"file": r["file"],
"filename": fp.name,
"current_folder": folder,
"overlap": round(overlap, 2),
"suggested_folder": best_folder or "",
"suggestion_score": best_score,
"top_keywords": "; ".join(kws[:6]),
})
misplaced.sort(key=lambda x: x["overlap"])
# Print summary
print(f"{''*65}")
print(f"Potentially misplaced: {len(misplaced)} files (overlap < {threshold:.0%})\n")
for m in misplaced[:40]:
print(f" 📄 {m['filename']}")
print(f" Current: {m['current_folder']}")
print(f" Suggested: {m['suggested_folder']} (overlap={m['overlap']:.0%})")
print(f" Keywords: {m['top_keywords']}")
print()
if len(misplaced) > 40:
print(f" … and {len(misplaced) - 40} more — see CSV for full list\n")
# Save CSV for easy review / filtering in a spreadsheet
csv_path = Path(results_file).with_suffix(".misplaced.csv")
with open(csv_path, "w", newline="", encoding="utf-8") as csvf:
writer = csv.DictWriter(
csvf,
fieldnames=["filename", "current_folder", "overlap", "suggested_folder",
"suggestion_score", "top_keywords", "file"],
)
writer.writeheader()
writer.writerows(misplaced)
print(f"{''*65}")
print(f"✅ Saved {len(misplaced)} misplaced records → {csv_path}")
print(f" Open in LibreOffice Calc, sort by 'overlap' asc to prioritise.")
# ---------------------------------------------------------------------------
# Phase 3: taxonomy classification via keyword scoring
# ---------------------------------------------------------------------------
# Taxonomy data and scorer live in taxonomy.py (shared with classify_server.py).
import sys as _sys
import os as _os
_sys.path.insert(0, _os.path.dirname(__file__))
from taxonomy import TAXONOMY, TAXONOMY_TO_FOLDER, keyword_score as _keyword_score # noqa: E402
def run_classify(results_file: str, base_dir: str = DEFAULT_DIR, min_score: float = 1.5) -> None:
"""Phase 3: classify each document into a taxonomy category using keyword scoring.
Scoring is deterministic: each category has a weighted keyword list; the
document text (filename + folder path + YAKE keywords) is scored against
every category and the highest score wins. No ML model required.
"""
results_path = Path(results_file)
if not results_path.exists():
print(f"❌ Results file not found: {results_file}")
print(" Run 'extract' phase first.")
return
with open(results_path, encoding="utf-8") as fh:
results: list[dict] = json.load(fh)
# Include files even without YAKE keywords — filename+folder alone can classify
classifiable = [r for r in results if not r.get("is_group") and not r.get("error")]
print(f"Classifying {len(classifiable)} documents…")
base = Path(base_dir)
output_rows: list[dict] = []
for r in classifiable:
fp = Path(r["file"])
# Build document text from filename tokens + full folder path + YAKE keywords
stem_tokens = re.sub(r"[-_.]", " ", fp.stem).lower()
folder_tokens = " ".join(fp.parent.parts).lower()
kw_text = " ".join(kw for kw, _ in r.get("keywords", []))
doc_text = f"{stem_tokens} {folder_tokens} {kw_text}"
scores: dict[str, float] = {
cat: _keyword_score(doc_text, kws)
for cat, kws in TAXONOMY.items()
}
best_label = max(scores, key=lambda c: scores[c])
best_score = scores[best_label]
sorted_cats = sorted(scores, key=lambda c: scores[c], reverse=True)
runner_up = sorted_cats[1] if len(sorted_cats) > 1 else ""
runner_score = scores[runner_up] if runner_up else 0.0
current_folder = _folder_key(fp, base)
label = best_label if best_score >= min_score else "Ukendt"
suggested_folder = TAXONOMY_TO_FOLDER.get(label, "") if label != "Ukendt" else ""
output_rows.append({
"filename": fp.name,
"current_folder": current_folder,
"taxonomy_label": label,
"confidence": best_score,
"runner_up": runner_up,
"runner_up_score": runner_score,
"suggested_folder": suggested_folder,
"top_keywords": "; ".join(kw for kw, _ in r.get("keywords", [])[:6]),
"file": r["file"],
})
# Sort by confidence ascending — lowest confidence = needs most attention
output_rows.sort(key=lambda x: x["confidence"])
csv_path = Path(results_file).with_suffix(".classified.csv")
with open(csv_path, "w", newline="", encoding="utf-8") as csvf:
writer = csv.DictWriter(
csvf,
fieldnames=["filename", "current_folder", "taxonomy_label", "confidence",
"runner_up", "runner_up_score", "suggested_folder", "top_keywords", "file"],
)
writer.writeheader()
writer.writerows(output_rows)
# Print distribution summary
label_counts: Counter = Counter(r["taxonomy_label"] for r in output_rows)
print(f"\n{''*65}")
print(f"Taxonomy distribution ({len(output_rows)} documents):\n")
for label, count in label_counts.most_common():
bar = "" * (count * 30 // max(label_counts.values()))
print(f" {label:<30} {count:>4} {bar}")
low_conf = sum(1 for r in output_rows if r["confidence"] < min_score)
print(f"\n Low score (<{min_score}): {low_conf} files → labelled 'Ukendt'")
print(f"\n{''*65}")
print(f"✅ Saved {len(output_rows)} classifications → {csv_path}")
print(f" Columns: filename, current_folder, taxonomy_label, confidence, suggested_folder")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Document keyword extraction + misplacement detector",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
sub = parser.add_subparsers(dest="cmd", required=True)
ep = sub.add_parser("extract", help="Phase 1: extract keywords from all files")
ep.add_argument("--dir", default=DEFAULT_DIR, help="Root directory to scan")
ep.add_argument("--output", default=DEFAULT_OUTPUT, help="Output JSON file")
ep.add_argument("--workers", type=int, default=3, help="Parallel API workers (default 3 — keep low to avoid 502s)")
ep.add_argument("--bulk-threshold", type=int, default=BULK_DIR_THRESHOLD,
help=f"Dirs with this many files are sampled instead of fully processed (default {BULK_DIR_THRESHOLD}). Use 999999 to process all files.")
ap = sub.add_parser("analyse", help="Phase 2: find misplaced files using folder profiles")
ap.add_argument("--results", default=DEFAULT_OUTPUT, help="JSON file from extract phase")
ap.add_argument("--dir", default=DEFAULT_DIR, help="Root directory (same as extract)")
ap.add_argument("--threshold", type=float, default=0.25,
help="Overlap threshold below which a file is flagged (default 0.25 = 25%%)")
ap.add_argument("--min-folder-docs", type=int, default=5,
help="Minimum docs in a folder to be used as a reference profile (default 5)")
cp = sub.add_parser("classify", help="Phase 3: keyword-scoring taxonomy classification")
cp.add_argument("--results", default=DEFAULT_OUTPUT, help="JSON file from extract phase")
cp.add_argument("--dir", default=DEFAULT_DIR, help="Root directory (same as extract)")
cp.add_argument("--min-score", type=float, default=1.5,
help="Minimum keyword score to assign a label (default 1.5). Below this → 'Ukendt'.")
args = parser.parse_args()
if args.cmd == "extract":
run_extract(args.dir, args.output, args.workers, args.bulk_threshold)
elif args.cmd == "analyse":
run_analyse(args.results, args.dir, args.threshold, args.min_folder_docs)
else:
run_classify(args.results, args.dir, args.min_score)