Nomad changes
All checks were successful
Deploy fil (kreuzberg) / deploy (push) Successful in 49s

This commit is contained in:
Henrik Jess Nielsen
2026-06-01 23:40:55 +02:00
parent 72b1a0a6ed
commit b4c07d3693
5723 changed files with 1130655 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
.venv/
__pycache__/
*.egg-info/
.pytest_cache/
dist/
build/

View File

@@ -0,0 +1,82 @@
# generate-test-fixtures
Deterministic fixture-generation toolkit for kreuzberg integration tests.
Produces real on-disk DOCX / ODT / XLSX / PPTX / PDF documents that exercise
track-changes / revisions / comments / incremental-update / diff / security
code paths in `kreuzberg::extract` and `kreuzberg::diff::compare`. Every
binary fixture is paired with a `<stem>.gt.json` ground-truth sidecar that
integration tests load to assert structured expectations.
The generated fixtures fill the gap left by `test_documents/`, whose existing
~200 real-world corpus does not contain track-changes, comments, incremental
xref chains, or paired diff inputs.
## Layout
```text
tools/generate_test_fixtures/
pyproject.toml
src/generate_test_fixtures/
__init__.py
__main__.py argparse entry point
gt_schema.py GroundTruth dataclass + JSON writer
docx_revisions.py DOCX w:ins / w:del / w:rPrChange fixtures
odt_revisions.py ODT text:tracked-changes fixtures
xlsx_revisions.py XLSX xl/revisions/revisionHeaders.xml fixtures
pptx_comments.py PPTX ppt/comments/comment{N}.xml fixtures
pdf_incremental.py PDF base + incremental xref chain fixtures
diff_pairs.py paired v1/v2 inputs for kreuzberg::diff::compare
security_fixtures.py DDE / oversized embed / zip-bomb fixtures
tests/
test_generation.py smoke test: each generator runs + GT JSON parses
```
## Usage
From the kreuzberg repo root:
```bash
uv run --directory tools/generate_test_fixtures \
python -m generate_test_fixtures all
```
Or per format:
```bash
uv run --directory tools/generate_test_fixtures \
python -m generate_test_fixtures docx odt xlsx pptx pdf diff-pairs security
```
Default output: `test_documents/generated/<format>/<stem>.{ext,gt.json}`.
Override with `--output-dir <PATH>` (resolved relative to the cwd).
## Ground-truth schema
See `src/generate_test_fixtures/gt_schema.py`. Every sidecar is a JSON object
of the shape:
```json
{
"fixture_path": "test_documents/generated/docx/docx_track_changes_basic.docx",
"format": "docx",
"feature": "revisions",
"expectations": { ... feature-specific shape ... },
"generated_by": "generate-test-fixtures 0.1.0"
}
```
## Determinism
Every generator pins timestamps to fixed ISO-8601 strings (no `now()`), uses
hardcoded author names, and seeds any randomness with `random.Random(42)`.
Re-running the generator on the same source code produces byte-identical
outputs except for the ZIP archive container's mtime — which the generators
override to `2024-01-01T00:00:00Z` via `zipfile.ZipInfo`.
## Why not check binaries in?
The user owns the call on whether these belong in the `test_documents/` git
submodule. The generator scripts are committed; the binary outputs are not.
The integration test scaffold (`crates/kreuzberg/tests/`) is marked
`#[ignore]` until the binaries land.

View File

@@ -0,0 +1,52 @@
[build-system]
build-backend = "hatchling.build"
requires = [ "hatchling" ]
[project]
name = "generate-test-fixtures"
version = "0.1.0"
description = """\
Deterministic fixture generator producing track-changes / revisions / diff / security documents with ground-truth \
JSON sidecars for kreuzberg integration tests.\
"""
readme = "README.md"
license = { text = "MIT OR Apache-2.0" }
authors = [ { name = "Kreuzberg Contributors", email = "hello@kreuzberg.dev" } ]
requires-python = ">=3.11"
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Topic :: Software Development :: Testing",
]
dependencies = [
"odfpy>=1.4.1",
"openpyxl>=3.1.2",
"pikepdf>=8",
"python-docx>=1.1",
"python-pptx>=0.6.23",
"reportlab>=4",
]
optional-dependencies.dev = [ "pytest>=7.4", "ruff>=0.4" ]
scripts.generate-test-fixtures = "generate_test_fixtures.__main__:main"
[tool.hatch]
build.targets.wheel.packages = [ "src/generate_test_fixtures" ]
[tool.ruff]
target-version = "py311"
line-length = 120
lint.select = [ "B", "E", "F", "I", "RUF", "UP", "W" ]
lint.ignore = [ "E501" ]
[tool.pytest]
ini_options.minversion = "7.0"
ini_options.testpaths = [ "tests" ]
ini_options.python_files = "test_*.py"

View File

@@ -0,0 +1,10 @@
"""Deterministic fixture generator for kreuzberg integration tests.
Each submodule produces a single category of on-disk fixture (DOCX
track-changes, ODT tracked changes, XLSX revision headers, PPTX comments,
PDF incremental updates, paired diff inputs, security edge cases) together
with a ``<stem>.gt.json`` ground-truth sidecar that integration tests load
to assert structured expectations.
"""
__version__ = "0.1.0"

View File

@@ -0,0 +1,115 @@
"""CLI entry point for the fixture generator.
Run as ``python -m generate_test_fixtures <command> [...]``. Commands map
one-to-one onto the per-format submodules. ``all`` runs every generator
in a deterministic order.
"""
from __future__ import annotations
import argparse
import sys
from collections.abc import Callable
from pathlib import Path
# Each generator exposes ``generate(output_root: Path, repo_root: Path) -> list[Path]``
# returning the list of files (binary + sidecars) it wrote. This keeps the
# dispatch table trivial and the smoke test predictable.
GeneratorFn = Callable[[Path, Path], list[Path]]
def _generators() -> dict[str, GeneratorFn]:
"""Lazy-import generators so a partial dep install doesn't break ``--help``."""
from . import (
diff_pairs,
docx_revisions,
odt_revisions,
pdf_incremental,
pptx_comments,
security_fixtures,
xlsx_revisions,
)
return {
"docx": docx_revisions.generate,
"odt": odt_revisions.generate,
"xlsx": xlsx_revisions.generate,
"pptx": pptx_comments.generate,
"pdf": pdf_incremental.generate,
"diff-pairs": diff_pairs.generate,
"security": security_fixtures.generate,
}
def _default_repo_root() -> Path:
"""Walk upward from this file to find the kreuzberg repo root.
Anchored on the presence of ``Cargo.toml`` + ``test_documents``. Falls
back to the current working directory when those markers are absent
(e.g. when the package is installed elsewhere).
"""
here = Path(__file__).resolve()
for ancestor in [here, *here.parents]:
if (ancestor / "Cargo.toml").is_file() and (ancestor / "test_documents").is_dir():
return ancestor
return Path.cwd()
def main(argv: list[str] | None = None) -> int:
"""CLI entry. Returns a process exit code (0 on success)."""
parser = argparse.ArgumentParser(
prog="generate-test-fixtures",
description="Generate deterministic test fixtures for kreuzberg integration tests.",
)
parser.add_argument(
"commands",
nargs="+",
choices=["all", "docx", "odt", "xlsx", "pptx", "pdf", "diff-pairs", "security"],
help="One or more fixture categories to generate. 'all' runs every generator.",
)
parser.add_argument(
"--output-dir",
type=Path,
default=None,
help=(
"Output root directory. Defaults to "
"<repo-root>/test_documents/generated. Per-format subdirectories "
"are created automatically."
),
)
parser.add_argument(
"--repo-root",
type=Path,
default=None,
help="Repository root override. Auto-detected when omitted.",
)
args = parser.parse_args(argv)
repo_root = (args.repo_root or _default_repo_root()).resolve()
output_root = (args.output_dir or (repo_root / "test_documents" / "generated")).resolve()
output_root.mkdir(parents=True, exist_ok=True)
selected: list[str]
if "all" in args.commands:
selected = ["docx", "odt", "xlsx", "pptx", "pdf", "diff-pairs", "security"]
else:
# Preserve user ordering, drop duplicates.
seen: set[str] = set()
selected = [c for c in args.commands if not (c in seen or seen.add(c))]
generators = _generators()
total_written = 0
for command in selected:
fn = generators[command]
written = fn(output_root, repo_root)
total_written += len(written)
print(f"[{command}] wrote {len(written)} files")
for path in written:
print(f" - {path.relative_to(output_root) if path.is_relative_to(output_root) else path}")
print(f"Done. Total files written: {total_written}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,188 @@
"""Paired ``v1`` / ``v2`` fixtures for ``kreuzberg::diff::compare``.
Two scenarios:
- ``docx_memo_v1.docx`` vs ``docx_memo_v2.docx`` — same memo with one
paragraph removed, one paragraph added, and one paragraph rewritten.
Exercises ``ExtractionDiff.content_diff`` (DiffLine::Added / Removed).
- ``xlsx_budget_v1.xlsx`` vs ``xlsx_budget_v2.xlsx`` — a 3x3 budget table
with one cell value changed in v2. Exercises ``ExtractionDiff.tables_changed``
(the per-cell ``CellChange`` payload).
GT sidecars carry the same shape from ``gt_schema.diff_expectation``: the
relative paths to both halves of the pair, the substring assertions for
added/removed lines, and the expected ``CellChange`` entries (row/col/
from/to). Integration tests load BOTH halves, run extraction
independently, then call ``kreuzberg::diff::compare`` and assert against
the GT.
"""
from __future__ import annotations
import io
from pathlib import Path
from docx import Document # type: ignore[import-untyped]
from openpyxl import Workbook # type: ignore[import-untyped]
from .gt_schema import diff_expectation, write_ground_truth
ZIP_MTIME = (2024, 1, 1, 0, 0, 0)
# DOCX content. Each entry is a single paragraph.
DOCX_V1 = [
"Subject: Q2 planning meeting.",
"Date: 2024-04-15.",
"Attendees: Alice, Bob, Carol.",
"Agenda item one: review last quarter's revenue.",
"Agenda item two: discuss Q2 product launches.",
"Action items will be circulated by Friday.",
]
DOCX_V2 = [
"Subject: Q2 planning meeting.",
"Date: 2024-04-15.",
# "Attendees" line dropped in v2.
"Agenda item one: review last quarter's revenue and margin.", # rewritten
"Agenda item two: discuss Q2 product launches.",
"Agenda item three: hiring plan for engineering.", # new
"Action items will be circulated by Friday.",
]
# XLSX content. v2 changes B2 from 100 to 150.
XLSX_HEADER = ["Department", "Q1 Budget", "Q2 Budget"]
XLSX_V1_ROWS = [
["Engineering", 100, 120],
["Marketing", 50, 60],
["Operations", 80, 90],
]
XLSX_V2_ROWS = [
["Engineering", 150, 120], # B2: 100 -> 150
["Marketing", 50, 60],
["Operations", 80, 90],
]
def _save_docx(paragraphs: list[str]) -> bytes:
"""Serialise a DOCX with one paragraph per entry."""
doc = Document()
for text in paragraphs:
doc.add_paragraph(text)
buf = io.BytesIO()
doc.save(buf)
return buf.getvalue()
def _save_xlsx(header: list[str], rows: list[list[str | int]]) -> bytes:
"""Serialise a single-sheet workbook with ``header`` + ``rows``."""
wb = Workbook()
ws = wb.active
ws.title = "Budget"
ws.append(header)
for row in rows:
ws.append(row)
buf = io.BytesIO()
wb.save(buf)
return buf.getvalue()
def _emit_docx_pair(output_dir: Path, repo_root: Path) -> list[Path]:
v1_path = output_dir / "docx_memo_v1.docx"
v2_path = output_dir / "docx_memo_v2.docx"
sidecar_path = output_dir / "docx_memo_diff.gt.json"
v1_path.write_bytes(_save_docx(DOCX_V1))
v2_path.write_bytes(_save_docx(DOCX_V2))
# Relative paths for the sidecar — both halves of the pair are needed
# by the integration test.
repo_root_resolved = repo_root.resolve()
def _rel(path: Path) -> str:
try:
return str(path.resolve().relative_to(repo_root_resolved)).replace("\\", "/")
except ValueError:
return str(path.resolve()).replace("\\", "/")
write_ground_truth(
sidecar_path,
v1_path,
repo_root,
document_format="docx",
feature="diff",
expectations=diff_expectation(
before_path=_rel(v1_path),
after_path=_rel(v2_path),
content_changed=True,
# Substrings that MUST appear in some DiffLine::Added entry.
expected_added_lines=[
"review last quarter's revenue and margin.",
"Agenda item three: hiring plan for engineering.",
],
# Substrings that MUST appear in some DiffLine::Removed entry.
expected_removed_lines=[
"Attendees: Alice, Bob, Carol.",
"review last quarter's revenue.",
],
notes=(
"v2 drops the 'Attendees' line, rewrites agenda item one, and inserts "
"agenda item three. Content paragraphs unchanged on either side stay "
"in DiffLine::Context entries (not asserted)."
),
),
generator="diff_pairs",
)
return [v1_path, v2_path, sidecar_path]
def _emit_xlsx_pair(output_dir: Path, repo_root: Path) -> list[Path]:
v1_path = output_dir / "xlsx_budget_v1.xlsx"
v2_path = output_dir / "xlsx_budget_v2.xlsx"
sidecar_path = output_dir / "xlsx_budget_diff.gt.json"
v1_path.write_bytes(_save_xlsx(XLSX_HEADER, XLSX_V1_ROWS))
v2_path.write_bytes(_save_xlsx(XLSX_HEADER, XLSX_V2_ROWS))
repo_root_resolved = repo_root.resolve()
def _rel(path: Path) -> str:
try:
return str(path.resolve().relative_to(repo_root_resolved)).replace("\\", "/")
except ValueError:
return str(path.resolve()).replace("\\", "/")
write_ground_truth(
sidecar_path,
v1_path,
repo_root,
document_format="xlsx",
feature="diff",
expectations=diff_expectation(
before_path=_rel(v1_path),
after_path=_rel(v2_path),
content_changed=True,
expected_added_lines=["150"],
expected_removed_lines=["100"],
table_cell_changes=[
# Row 1 = Engineering row (header is row 0), col 1 = Q1 Budget.
{"row": 1, "col": 1, "from": "100", "to": "150"},
],
notes=(
"Single cell change in B2 (Engineering / Q1 Budget): 100 -> 150. "
"All other cells identical. ExtractionDiff.tables_changed should "
"carry exactly one TableDiff with one CellChange entry."
),
),
generator="diff_pairs",
)
return [v1_path, v2_path, sidecar_path]
def generate(output_root: Path, repo_root: Path) -> list[Path]:
"""Emit both diff pairs under ``output_root/diff/``."""
output_dir = output_root / "diff"
output_dir.mkdir(parents=True, exist_ok=True)
written: list[Path] = []
written.extend(_emit_docx_pair(output_dir, repo_root))
written.extend(_emit_xlsx_pair(output_dir, repo_root))
return written

View File

@@ -0,0 +1,210 @@
"""DOCX track-changes fixture generator.
``python-docx`` doesn't author ``w:ins`` / ``w:del`` / ``w:rPrChange``
elements natively — they're considered "revision marks" that Word inserts
when track-changes mode is on. We sidestep that by authoring a vanilla
document with ``python-docx``, then post-processing ``word/document.xml``
inside the zip: parse the XML, splice change elements around target runs,
write the archive back out with deterministic ZIP metadata.
The on-disk XML matches what Word produces, which is what
``crates/kreuzberg/src/extractors/docx`` (the path that populates
``ExtractionResult.revisions``) consumes.
"""
from __future__ import annotations
import io
import zipfile
from collections.abc import Iterable
from pathlib import Path
from docx import Document # type: ignore[import-untyped]
from .gt_schema import revisions_expectation, write_ground_truth
W_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
# Pinned timestamps. Determinism > realism — these are fixtures, not real
# documents.
TS_ALICE_INS_1 = "2024-03-15T10:30:00Z"
TS_ALICE_INS_2 = "2024-03-15T10:35:00Z"
TS_BOB_DEL = "2024-03-15T11:00:00Z"
TS_CAROL_FMT = "2024-03-15T12:00:00Z"
TS_DAVE_INS = "2024-03-15T12:15:00Z"
# Deterministic mtime for every zip entry so fixtures hash stably across
# runs and CI. (1980-01-01 is the ZIP epoch — using the start of 2024 is
# arbitrary but visible in `unzip -v` output.)
ZIP_MTIME = (2024, 1, 1, 0, 0, 0)
def _read_document_xml(docx_bytes: bytes) -> str:
with zipfile.ZipFile(io.BytesIO(docx_bytes), "r") as zf:
return zf.read("word/document.xml").decode("utf-8")
def _replace_in_zip(docx_bytes: bytes, replacements: dict[str, bytes]) -> bytes:
"""Return a new docx with ``replacements`` patched in.
Re-writes every entry so we control mtime + compression for hash
stability. Entries not in ``replacements`` are copied byte-for-byte.
"""
buf = io.BytesIO()
with zipfile.ZipFile(io.BytesIO(docx_bytes), "r") as src:
names = src.namelist()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as dst:
for name in names:
data = replacements.get(name, src.read(name))
info = zipfile.ZipInfo(name, ZIP_MTIME)
info.compress_type = zipfile.ZIP_DEFLATED
dst.writestr(info, data)
return buf.getvalue()
def _ins_block(author: str, date: str, rev_id: str, text: str) -> str:
"""An entire ``<w:p>`` block carrying a single ``<w:ins>`` run.
DOCX extractor anchors revisions on paragraph index, so each
insertion lives in its own paragraph for unambiguous expectations.
"""
return (
f'<w:p xmlns:w="{W_NS}">'
f'<w:ins w:id="{rev_id}" w:author="{author}" w:date="{date}">'
f"<w:r><w:t xml:space=\"preserve\">{text}</w:t></w:r>"
f"</w:ins>"
f"</w:p>"
)
def _del_block(author: str, date: str, rev_id: str, text: str) -> str:
return (
f'<w:p xmlns:w="{W_NS}">'
f'<w:del w:id="{rev_id}" w:author="{author}" w:date="{date}">'
f"<w:r><w:delText xml:space=\"preserve\">{text}</w:delText></w:r>"
f"</w:del>"
f"</w:p>"
)
def _format_change_block(author: str, date: str, rev_id: str, text: str) -> str:
"""Paragraph carrying a ``w:rPrChange`` — run-level formatting revision."""
return (
f'<w:p xmlns:w="{W_NS}">'
f"<w:r>"
f'<w:rPr><w:b/><w:rPrChange w:id="{rev_id}" w:author="{author}" w:date="{date}"><w:rPr/></w:rPrChange></w:rPr>'
f'<w:t xml:space="preserve">{text}</w:t>'
f"</w:r>"
f"</w:p>"
)
def _splice_blocks_into_body(document_xml: str, blocks: Iterable[str]) -> str:
"""Insert ``blocks`` immediately before ``</w:body>``.
We deliberately do NOT parse with ``lxml`` — string splicing keeps the
output stable across Python / lxml versions and avoids namespace-
declaration reshuffling that can confuse downstream diff tools.
"""
marker = "</w:body>"
insert_at = document_xml.rfind(marker)
if insert_at == -1:
raise RuntimeError("word/document.xml is missing </w:body>; cannot splice revisions")
head = document_xml[:insert_at]
tail = document_xml[insert_at:]
return head + "".join(blocks) + tail
def _build_base_docx(paragraphs: list[str]) -> bytes:
"""Author a baseline DOCX with ``python-docx`` and return its bytes."""
doc = Document()
for text in paragraphs:
doc.add_paragraph(text)
buf = io.BytesIO()
doc.save(buf)
return buf.getvalue()
def _emit_basic(output_dir: Path, repo_root: Path) -> list[Path]:
"""Three paragraphs, two insertions (Alice), one deletion (Bob)."""
base = _build_base_docx(
[
"Original paragraph one — kept as-is.",
"Original paragraph two — kept as-is.",
"Original paragraph three — kept as-is.",
]
)
blocks = [
_ins_block("Alice", TS_ALICE_INS_1, "100", "Inserted by Alice (first)."),
_ins_block("Alice", TS_ALICE_INS_2, "101", "Inserted by Alice (second)."),
_del_block("Bob", TS_BOB_DEL, "102", "Deleted by Bob."),
]
patched_xml = _splice_blocks_into_body(_read_document_xml(base), blocks)
out = _replace_in_zip(base, {"word/document.xml": patched_xml.encode("utf-8")})
fixture_path = output_dir / "docx_track_changes_basic.docx"
sidecar_path = output_dir / "docx_track_changes_basic.gt.json"
fixture_path.write_bytes(out)
write_ground_truth(
sidecar_path,
fixture_path,
repo_root,
document_format="docx",
feature="revisions",
expectations=revisions_expectation(
expected_count=3,
revisions=[
{"kind": "Insertion", "author": "Alice", "timestamp": TS_ALICE_INS_1, "revision_id": "100"},
{"kind": "Insertion", "author": "Alice", "timestamp": TS_ALICE_INS_2, "revision_id": "101"},
{"kind": "Deletion", "author": "Bob", "timestamp": TS_BOB_DEL, "revision_id": "102"},
],
),
generator="docx_revisions",
)
return [fixture_path, sidecar_path]
def _emit_multi_author(output_dir: Path, repo_root: Path) -> list[Path]:
"""Five paragraphs, four authors, mixed Insertion / Deletion / FormatChange."""
base = _build_base_docx([f"Baseline paragraph {i}." for i in range(5)])
blocks = [
_ins_block("Alice", TS_ALICE_INS_1, "200", "Alice inserts here."),
_del_block("Bob", TS_BOB_DEL, "201", "Bob deletes this."),
_format_change_block("Carol", TS_CAROL_FMT, "202", "Carol changes formatting."),
_ins_block("Dave", TS_DAVE_INS, "203", "Dave inserts a closing line."),
]
patched_xml = _splice_blocks_into_body(_read_document_xml(base), blocks)
out = _replace_in_zip(base, {"word/document.xml": patched_xml.encode("utf-8")})
fixture_path = output_dir / "docx_track_changes_multi_author.docx"
sidecar_path = output_dir / "docx_track_changes_multi_author.gt.json"
fixture_path.write_bytes(out)
write_ground_truth(
sidecar_path,
fixture_path,
repo_root,
document_format="docx",
feature="revisions",
expectations=revisions_expectation(
expected_count=4,
revisions=[
{"kind": "Insertion", "author": "Alice", "timestamp": TS_ALICE_INS_1, "revision_id": "200"},
{"kind": "Deletion", "author": "Bob", "timestamp": TS_BOB_DEL, "revision_id": "201"},
{"kind": "FormatChange", "author": "Carol", "timestamp": TS_CAROL_FMT, "revision_id": "202"},
{"kind": "Insertion", "author": "Dave", "timestamp": TS_DAVE_INS, "revision_id": "203"},
],
notes="Four distinct authors; mixed kinds exercise the per-kind branches in extractors/docx.",
),
generator="docx_revisions",
)
return [fixture_path, sidecar_path]
def generate(output_root: Path, repo_root: Path) -> list[Path]:
"""Produce both DOCX track-changes fixtures under ``output_root/docx/``."""
output_dir = output_root / "docx"
output_dir.mkdir(parents=True, exist_ok=True)
written: list[Path] = []
written.extend(_emit_basic(output_dir, repo_root))
written.extend(_emit_multi_author(output_dir, repo_root))
return written

View File

@@ -0,0 +1,180 @@
"""Ground-truth sidecar schema.
Every binary fixture produced by this toolkit ships with a JSON sidecar of
the same stem (``foo.docx`` -> ``foo.gt.json``). Integration tests load the
pair and assert ``ExtractionResult`` / ``ExtractionDiff`` fields against the
``expectations`` dict.
The schema is intentionally feature-shaped rather than format-shaped: a
``revisions`` fixture's expectations look the same whether the underlying
file is DOCX, ODT, XLSX, or PPTX. This keeps the integration-test asserter
generic.
"""
from __future__ import annotations
import hashlib
import json
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
from . import __version__
# Single source of truth for the ``generated_by`` field. Including the
# package version + the calling script's import path makes regressions easy
# to triage by-eye in test failures.
TOOL_NAME = "generate-test-fixtures"
@dataclass
class GroundTruth:
"""Structured expectations bound to a binary fixture.
Attributes:
fixture_path: Path of the binary fixture relative to the kreuzberg
repository root (e.g. ``test_documents/generated/docx/foo.docx``).
Integration tests join this with the repo root to load the file.
format: One of ``"docx" | "odt" | "xlsx" | "pptx" | "pdf"``.
feature: One of ``"revisions" | "diff" | "security" | "embedded"``.
Drives which assertion helper the integration test invokes.
expectations: Feature-specific shape. See ``revisions_expectation``,
``diff_expectation``, and ``security_expectation`` helpers below
for the canonical shapes.
generated_by: ``"<tool-name> <version> (<generator-module>)"``.
"""
fixture_path: str
format: str
feature: str
expectations: dict[str, Any]
generated_by: str = field(default_factory=lambda: f"{TOOL_NAME} {__version__}")
def write_ground_truth(
sidecar_path: Path,
fixture_path: Path,
repo_root: Path,
document_format: str,
feature: str,
expectations: dict[str, Any],
generator: str,
) -> None:
"""Serialise a ``GroundTruth`` next to its binary fixture.
Args:
sidecar_path: Destination ``*.gt.json`` path.
fixture_path: Absolute path of the companion binary fixture.
repo_root: Repository root, used to make ``fixture_path`` relative
in the sidecar so integration tests can resolve it portably.
document_format: Canonical format string (see ``GroundTruth.format``).
feature: Canonical feature string (see ``GroundTruth.feature``).
expectations: Feature-specific shape.
generator: Module name that produced the fixture, e.g.
``"docx_revisions"``.
"""
try:
relative = fixture_path.resolve().relative_to(repo_root.resolve())
except ValueError:
# Fixture is outside the repo (e.g. tmp_path in tests). Store the
# absolute path so the loader at least surfaces a useful error.
relative = fixture_path.resolve()
gt = GroundTruth(
fixture_path=str(relative).replace("\\", "/"),
format=document_format,
feature=feature,
expectations=expectations,
generated_by=f"{TOOL_NAME} {__version__} ({generator})",
)
sidecar_path.write_text(json.dumps(asdict(gt), indent=2, sort_keys=True) + "\n", encoding="utf-8")
# ── Expectation builders ─────────────────────────────────────────────────────
def revisions_expectation(
*,
expected_count: int,
revisions: list[dict[str, Any]],
notes: str | None = None,
) -> dict[str, Any]:
"""Shape for ``feature="revisions"`` fixtures.
Each entry in ``revisions`` mirrors the ``DocumentRevision`` struct
fields the test should assert: ``kind`` ("Insertion" | "Deletion" |
"FormatChange" | "Comment"), ``author``, ``timestamp``, ``revision_id``.
Integration tests assert ``len(result.revisions) == expected_count``
plus per-entry kind/author matching.
"""
payload: dict[str, Any] = {"expected_count": expected_count, "revisions": revisions}
if notes is not None:
payload["notes"] = notes
return payload
def diff_expectation(
*,
before_path: str,
after_path: str,
content_changed: bool,
expected_added_lines: list[str],
expected_removed_lines: list[str],
table_cell_changes: list[dict[str, Any]] | None = None,
notes: str | None = None,
) -> dict[str, Any]:
"""Shape for ``feature="diff"`` fixtures.
The pair ``(before_path, after_path)`` are both relative-to-repo-root
paths that the integration test extracts independently before calling
``kreuzberg::diff::compare``. ``expected_added_lines`` / ``…_removed_lines``
are substrings that MUST appear in some ``DiffLine::Added`` /
``DiffLine::Removed`` entry — substring match, not equality, since the
extractor may add framing whitespace.
"""
payload: dict[str, Any] = {
"before_path": before_path,
"after_path": after_path,
"content_changed": content_changed,
"expected_added_lines": expected_added_lines,
"expected_removed_lines": expected_removed_lines,
"table_cell_changes": table_cell_changes or [],
}
if notes is not None:
payload["notes"] = notes
return payload
def security_expectation(
*,
should_extract: bool,
expected_warnings: list[str],
notes: str | None = None,
) -> dict[str, Any]:
"""Shape for ``feature="security"`` fixtures.
``should_extract = False`` means extraction MUST return an error (e.g.
the zip-bomb guard rejects the file). ``expected_warnings`` is a list
of case-insensitive substrings; each must match at least one warning
surfaced by the extractor.
"""
payload: dict[str, Any] = {
"should_extract": should_extract,
"expected_warnings": expected_warnings,
}
if notes is not None:
payload["notes"] = notes
return payload
def file_sha256(path: Path) -> str:
"""Return the lowercase hex SHA-256 of ``path``.
Useful when an integration test wants to assert the generator produced
a byte-identical fixture across runs.
"""
digest = hashlib.sha256()
with path.open("rb") as fh:
for chunk in iter(lambda: fh.read(64 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()

View File

@@ -0,0 +1,170 @@
"""ODT tracked-changes fixture generator.
``odfpy`` authors valid ODT containers but does not expose helpers for
``<text:tracked-changes>`` — those are the OpenDocument equivalent of
Word's ``w:ins`` / ``w:del``. We author a baseline body with ``odfpy``,
then post-process ``content.xml`` to splice a ``<text:tracked-changes>``
block (with ``<text:changed-region>`` children for each revision) into
``<office:text>`` plus matching ``<text:change-start>`` / ``<text:change-end>``
markers around the live insertion text.
The shape mirrors what
``crates/kreuzberg/src/extractors/odt.rs::parse_tracked_changes`` consumes:
``office:change-info`` -> ``dc:creator`` + ``dc:date``, child element
``insertion`` / ``deletion`` / ``format-change`` drives ``RevisionKind``.
"""
from __future__ import annotations
import io
import zipfile
from pathlib import Path
from odf.opendocument import OpenDocumentText # type: ignore[import-untyped]
from odf.text import H, P # type: ignore[import-untyped]
from .gt_schema import revisions_expectation, write_ground_truth
ZIP_MTIME = (2024, 1, 1, 0, 0, 0)
TS_ALICE = "2024-04-01T09:00:00Z"
TS_BOB = "2024-04-01T09:15:00Z"
# Pre-built tracked-changes block. Two changed-regions: ct1 = insertion by
# Alice, ct2 = deletion by Bob. The matching <text:change-start text:change-id="ct1"/>
# / <text:change-end text:change-id="ct1"/> markers are spliced into body
# paragraphs below.
TRACKED_CHANGES_XML = (
'<text:tracked-changes xmlns:text="urn:oasis:names:tc:opendocument:xmlns:text:1.0" '
'xmlns:office="urn:oasis:names:tc:opendocument:xmlns:office:1.0" '
'xmlns:dc="http://purl.org/dc/elements/1.1/">'
'<text:changed-region text:id="ct1">'
"<text:insertion>"
"<office:change-info>"
f"<dc:creator>Alice</dc:creator><dc:date>{TS_ALICE}</dc:date>"
"</office:change-info>"
"<text:p>Alice inserted this paragraph.</text:p>"
"</text:insertion>"
"</text:changed-region>"
'<text:changed-region text:id="ct2">'
"<text:deletion>"
"<office:change-info>"
f"<dc:creator>Bob</dc:creator><dc:date>{TS_BOB}</dc:date>"
"</office:change-info>"
"<text:p>Bob deleted this paragraph.</text:p>"
"</text:deletion>"
"</text:changed-region>"
"</text:tracked-changes>"
)
# Body fragment that references the change-regions. The extractor walks
# body paragraphs and translates change-start/change-end markers into the
# matching revisions, so we include both insertion live text and a point-
# deletion marker.
BODY_REVISION_MARKERS = (
'<text:p xmlns:text="urn:oasis:names:tc:opendocument:xmlns:text:1.0">'
'<text:change-start text:change-id="ct1"/>'
"Alice inserted this paragraph."
'<text:change-end text:change-id="ct1"/>'
"</text:p>"
'<text:p xmlns:text="urn:oasis:names:tc:opendocument:xmlns:text:1.0">'
'<text:change text:change-id="ct2"/>'
"</text:p>"
)
def _build_baseline_odt() -> bytes:
"""Author a vanilla ODT with a heading + three paragraphs, return bytes."""
doc = OpenDocumentText()
doc.text.addElement(H(outlinelevel=1, text="ODT tracked-changes fixture"))
doc.text.addElement(P(text="Baseline paragraph one — kept as-is."))
doc.text.addElement(P(text="Baseline paragraph two — kept as-is."))
doc.text.addElement(P(text="Baseline paragraph three — kept as-is."))
buf = io.BytesIO()
doc.write(buf)
return buf.getvalue()
def _splice_tracked_changes(content_xml: str) -> str:
"""Insert the tracked-changes block + body markers into content.xml.
Inserts ``<text:tracked-changes>`` immediately after the opening
``<office:text>`` tag, then inserts the body markers just before the
closing ``</office:text>`` tag.
"""
open_marker = "<office:text>"
open_idx = content_xml.find(open_marker)
# Some odfpy versions emit ``<office:text ...>`` with attributes; fall
# back to locating the first ``>`` after ``<office:text``.
if open_idx == -1:
tag_idx = content_xml.find("<office:text")
if tag_idx == -1:
raise RuntimeError("content.xml missing <office:text> element")
open_idx = content_xml.find(">", tag_idx) + 1
else:
open_idx = open_idx + len(open_marker)
close_marker = "</office:text>"
close_idx = content_xml.rfind(close_marker)
if close_idx == -1:
raise RuntimeError("content.xml missing </office:text> close tag")
head = content_xml[:open_idx]
middle = content_xml[open_idx:close_idx]
tail = content_xml[close_idx:]
return head + TRACKED_CHANGES_XML + middle + BODY_REVISION_MARKERS + tail
def _replace_in_zip(src_bytes: bytes, replacements: dict[str, bytes]) -> bytes:
"""Rewrite ``src_bytes`` (an ODT zip) with deterministic mtimes."""
buf = io.BytesIO()
with zipfile.ZipFile(io.BytesIO(src_bytes), "r") as src:
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as dst:
for name in src.namelist():
data = replacements.get(name, src.read(name))
# ODT requires ``mimetype`` to be the first entry and stored
# without compression. Preserve that invariant.
info = zipfile.ZipInfo(name, ZIP_MTIME)
if name == "mimetype":
info.compress_type = zipfile.ZIP_STORED
else:
info.compress_type = zipfile.ZIP_DEFLATED
dst.writestr(info, data)
return buf.getvalue()
def generate(output_root: Path, repo_root: Path) -> list[Path]:
"""Emit odt_tracked_changes_basic.odt + sidecar under ``output_root/odt/``."""
output_dir = output_root / "odt"
output_dir.mkdir(parents=True, exist_ok=True)
base = _build_baseline_odt()
with zipfile.ZipFile(io.BytesIO(base), "r") as zf:
content_xml = zf.read("content.xml").decode("utf-8")
patched = _splice_tracked_changes(content_xml)
out = _replace_in_zip(base, {"content.xml": patched.encode("utf-8")})
fixture_path = output_dir / "odt_tracked_changes_basic.odt"
sidecar_path = output_dir / "odt_tracked_changes_basic.gt.json"
fixture_path.write_bytes(out)
write_ground_truth(
sidecar_path,
fixture_path,
repo_root,
document_format="odt",
feature="revisions",
expectations=revisions_expectation(
expected_count=2,
revisions=[
{"kind": "Insertion", "author": "Alice", "timestamp": TS_ALICE, "revision_id": "ct1"},
{"kind": "Deletion", "author": "Bob", "timestamp": TS_BOB, "revision_id": "ct2"},
],
notes=(
"<text:changed-region> ids are 'ct1' (insertion) and 'ct2' (deletion). "
"Body markers exercise both <text:change-start>/<text:change-end> pair "
"and the point-marker <text:change> form."
),
),
generator="odt_revisions",
)
return [fixture_path, sidecar_path]

View File

@@ -0,0 +1,197 @@
"""PDF incremental-update fixture generator.
Produces a PDF with multiple historical ``xref`` sections, each carrying a
``trailer << /Prev <previous-xref-offset> >>``. The kreuzberg PDF revisions
walker (``crates/kreuzberg/src/pdf/xref_revisions.rs``) discovers them by
scanning backwards for ``%%EOF`` markers and following ``/Prev`` from the
latest xref.
We use ``reportlab`` for the base document (a single page that ``lopdf``
will happily load) and then append two incremental-update sections by hand.
The append technique matches the ``build_incremental_pdf`` helper used in
the Rust extractor's own unit tests:
<new object>
xref
<subsection-header>
<new-object-offset> 00000 n
trailer << /Size N /Root <root> /Prev <previous-xref> /Info <info> >>
startxref
<new-xref-offset>
%%EOF
The trailer keeps ``/Root`` and ``/Info`` references from the base so the
PDF is still a valid single-revision document for any tool that ignores
the ``/Prev`` chain.
"""
from __future__ import annotations
import io
import re
from pathlib import Path
from reportlab.lib.pagesizes import LETTER # type: ignore[import-untyped]
from reportlab.pdfgen import canvas # type: ignore[import-untyped]
from .gt_schema import revisions_expectation, write_ground_truth
def _build_baseline_pdf() -> bytes:
"""Author a single-page PDF with reportlab and return its bytes.
reportlab is intentionally configured with a fixed creation date and
deterministic ``invariant=True`` settings so the byte output is stable.
"""
buf = io.BytesIO()
pdf = canvas.Canvas(buf, pagesize=LETTER, invariant=True)
pdf.setAuthor("Alice")
pdf.setTitle("PDF incremental-updates fixture")
pdf.setSubject("Three-revision xref chain")
pdf.setCreator("generate-test-fixtures")
# reportlab's invariant=True replaces the document creation date with a
# fixed value internally, so the produced bytes hash stably.
pdf.drawString(72, 720, "Original revision (base save).")
pdf.showPage()
pdf.save()
return buf.getvalue()
def _parse_last_startxref(pdf_bytes: bytes) -> int:
"""Return the byte offset stored in the trailing ``startxref\\n<N>``.
Used to populate the ``/Prev`` value of the first incremental update.
"""
# Search the last 1024 bytes — every PDF should have its startxref well
# within the trailer window.
window = pdf_bytes[-1024:]
match = re.search(rb"startxref\s+(\d+)", window)
if not match:
raise RuntimeError("baseline PDF missing trailing startxref")
return int(match.group(1))
def _find_root_ref(pdf_bytes: bytes) -> str:
"""Locate the ``/Root <obj-num> <gen-num> R`` reference in the trailer.
We need it to keep ``/Root`` populated in the new trailer of each
incremental update.
"""
match = re.search(rb"/Root\s+(\d+\s+\d+\s+R)", pdf_bytes)
if not match:
raise RuntimeError("baseline PDF missing /Root in trailer")
return match.group(1).decode("ascii")
def _find_size(pdf_bytes: bytes) -> int:
"""Read ``/Size N`` from the baseline trailer (highest object number + 1)."""
match = re.search(rb"/Size\s+(\d+)", pdf_bytes)
if not match:
raise RuntimeError("baseline PDF missing /Size in trailer")
return int(match.group(1))
def _append_incremental_update(
pdf_bytes: bytes,
*,
new_object_number: int,
new_object_body: bytes,
previous_xref_offset: int,
new_size: int,
root_ref: str,
) -> tuple[bytes, int]:
"""Append a single incremental-update section.
Returns the new PDF bytes plus the byte offset of the new xref (useful
as the ``/Prev`` value when chaining a second update).
"""
# Ensure baseline ends with a newline so our appended section starts on
# a fresh line — some validators reject `%%EOF<obj>`.
if not pdf_bytes.endswith(b"\n"):
pdf_bytes += b"\n"
# New object definition.
obj_offset = len(pdf_bytes)
obj_block = f"{new_object_number} 0 obj\n".encode("ascii") + new_object_body + b"\nendobj\n"
pdf_bytes += obj_block
# xref subsection for the new object.
xref_offset = len(pdf_bytes)
xref_block = (
b"xref\n"
+ f"{new_object_number} 1\n".encode("ascii")
+ f"{obj_offset:010d} 00000 n \n".encode("ascii")
)
pdf_bytes += xref_block
# New trailer with /Prev pointing to the previous xref offset.
trailer = (
f"trailer\n<</Size {new_size} /Root {root_ref} /Prev {previous_xref_offset}>>\n"
f"startxref\n{xref_offset}\n%%EOF\n"
)
pdf_bytes += trailer.encode("ascii")
return pdf_bytes, xref_offset
def generate(output_root: Path, repo_root: Path) -> list[Path]:
"""Emit pdf_incremental_basic.pdf + sidecar under ``output_root/pdf/``."""
output_dir = output_root / "pdf"
output_dir.mkdir(parents=True, exist_ok=True)
base = _build_baseline_pdf()
base_xref_offset = _parse_last_startxref(base)
root_ref = _find_root_ref(base)
next_obj_number = _find_size(base) # /Size = highest+1, so next obj reuses Size
# First incremental update.
after_first, first_xref_offset = _append_incremental_update(
base,
new_object_number=next_obj_number,
new_object_body=b"<</Update 1 /Note (first incremental save)>>",
previous_xref_offset=base_xref_offset,
new_size=next_obj_number + 1,
root_ref=root_ref,
)
# Second incremental update.
final_bytes, _final_xref_offset = _append_incremental_update(
after_first,
new_object_number=next_obj_number + 1,
new_object_body=b"<</Update 2 /Note (second incremental save)>>",
previous_xref_offset=first_xref_offset,
new_size=next_obj_number + 2,
root_ref=root_ref,
)
fixture_path = output_dir / "pdf_incremental_basic.pdf"
sidecar_path = output_dir / "pdf_incremental_basic.gt.json"
fixture_path.write_bytes(final_bytes)
write_ground_truth(
sidecar_path,
fixture_path,
repo_root,
document_format="pdf",
feature="revisions",
expectations=revisions_expectation(
expected_count=2,
revisions=[
# The PDF extractor emits historical xref offsets oldest-first,
# with revision_id = "xref-offset-{N}". The exact offsets vary
# with reportlab's output size, so integration tests assert
# count + revision_id PREFIX rather than exact offsets.
{"kind": "Insertion", "revision_id_prefix": "xref-offset-", "author": "Alice"},
{"kind": "Insertion", "revision_id_prefix": "xref-offset-", "author": "Alice"},
],
notes=(
"Three xref sections (base + two incremental updates). The PDF revisions "
"walker emits 2 historical revisions (the latest xref represents the live "
"state and is excluded). RevisionKind is always Insertion for PDFs — there "
"is no DOCX-style typed change classification at the xref level. The two "
"/Prev offsets vary with reportlab's output size; assert by prefix."
),
),
generator="pdf_incremental",
)
return [fixture_path, sidecar_path]

View File

@@ -0,0 +1,181 @@
"""PPTX comments fixture generator.
``python-pptx`` doesn't author comment parts. The fixture is produced by
building a vanilla 3-slide deck with ``python-pptx`` and then injecting
``ppt/commentAuthors.xml`` plus ``ppt/comments/comment{N}.xml`` parts
into the resulting zip (one comments file per slide that has comments).
Shape mirrors what ``crates/kreuzberg/src/extraction/pptx/comments.rs``
consumes: ``<p:cmAuthor id="" name=""/>`` for authors,
``<p:cm authorId="" dt="" idx=""><p:text>…</p:text></p:cm>`` for
comments. The extractor anchors the resulting ``DocumentRevision`` on
``RevisionAnchor::Slide { index }`` where index is the zero-based slide
ordinal (so ``comment1.xml`` -> slide index 0, ``comment3.xml`` -> 2).
"""
from __future__ import annotations
import io
import zipfile
from pathlib import Path
from pptx import Presentation # type: ignore[import-untyped]
from .gt_schema import revisions_expectation, write_ground_truth
ZIP_MTIME = (2024, 1, 1, 0, 0, 0)
AUTHORS = [
(0, "Alice"),
(1, "Bob"),
]
# Each row: (slide_index_zero_based, idx, author_id, dt, text)
COMMENTS = [
(0, 1, 0, "2024-06-01T10:00:00Z", "Alice: opening question on slide 1"),
(0, 2, 1, "2024-06-01T10:15:00Z", "Bob: follow-up on slide 1"),
(2, 1, 0, "2024-06-01T11:30:00Z", "Alice: closing comment on slide 3"),
]
def _build_baseline_pptx() -> bytes:
"""Author a vanilla 3-slide deck with one text shape each."""
prs = Presentation()
blank_layout = prs.slide_layouts[6] # blank layout
for i in range(3):
slide = prs.slides.add_slide(blank_layout)
# python-pptx writes deterministic slideN.xml; add a minimal text
# frame so each slide carries body text.
textbox = slide.shapes.add_textbox(left=914400, top=914400, width=914400 * 4, height=914400)
textbox.text_frame.text = f"Slide {i + 1} body"
buf = io.BytesIO()
prs.save(buf)
return buf.getvalue()
def _comment_authors_xml() -> bytes:
"""Build ``ppt/commentAuthors.xml``."""
authors_xml = "".join(
f'<p:cmAuthor id="{aid}" name="{name}" initials="{name[0]}" lastIdx="0" clrIdx="0"/>' for aid, name in AUTHORS
)
xml = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<p:cmAuthorLst xmlns:p="http://schemas.openxmlformats.org/presentationml/2006/main">'
f"{authors_xml}"
"</p:cmAuthorLst>"
)
return xml.encode("utf-8")
def _comments_for_slide(slide_index: int) -> bytes | None:
"""Return ``ppt/comments/comment{slide_index+1}.xml`` bytes, or ``None``."""
entries = [c for c in COMMENTS if c[0] == slide_index]
if not entries:
return None
inner = "".join(
f'<p:cm authorId="{aid}" dt="{dt}" idx="{idx}"><p:text>{text}</p:text></p:cm>'
for (_, idx, aid, dt, text) in entries
)
xml = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<p:cmLst xmlns:p="http://schemas.openxmlformats.org/presentationml/2006/main">'
f"{inner}"
"</p:cmLst>"
)
return xml.encode("utf-8")
def _patch_content_types(original: bytes, comment_slide_indices: list[int]) -> bytes:
"""Register commentAuthors + per-slide comments content-types."""
text = original.decode("utf-8")
overrides: list[str] = [
'<Override PartName="/ppt/commentAuthors.xml" '
'ContentType="application/vnd.openxmlformats-officedocument.presentationml.commentAuthors+xml"/>'
]
for slide_idx in comment_slide_indices:
overrides.append(
f'<Override PartName="/ppt/comments/comment{slide_idx + 1}.xml" '
'ContentType="application/vnd.openxmlformats-officedocument.presentationml.comments+xml"/>'
)
addition = "".join(o for o in overrides if o not in text)
if not addition:
return original
return text.replace("</Types>", f"{addition}</Types>").encode("utf-8")
def _rewrite_zip(src_bytes: bytes, additions: dict[str, bytes], replacements: dict[str, bytes]) -> bytes:
"""Re-zip with deterministic mtimes; additions are appended after the original entries."""
buf = io.BytesIO()
seen: set[str] = set()
with zipfile.ZipFile(io.BytesIO(src_bytes), "r") as src:
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as dst:
for name in src.namelist():
data = replacements.get(name, src.read(name))
info = zipfile.ZipInfo(name, ZIP_MTIME)
info.compress_type = zipfile.ZIP_DEFLATED
dst.writestr(info, data)
seen.add(name)
for name, data in additions.items():
if name in seen:
continue
info = zipfile.ZipInfo(name, ZIP_MTIME)
info.compress_type = zipfile.ZIP_DEFLATED
dst.writestr(info, data)
return buf.getvalue()
def generate(output_root: Path, repo_root: Path) -> list[Path]:
"""Emit pptx_comments_basic.pptx + sidecar under ``output_root/pptx/``."""
output_dir = output_root / "pptx"
output_dir.mkdir(parents=True, exist_ok=True)
base = _build_baseline_pptx()
comment_slide_indices = sorted({c[0] for c in COMMENTS})
additions: dict[str, bytes] = {"ppt/commentAuthors.xml": _comment_authors_xml()}
for slide_idx in comment_slide_indices:
payload = _comments_for_slide(slide_idx)
assert payload is not None # by construction
additions[f"ppt/comments/comment{slide_idx + 1}.xml"] = payload
with zipfile.ZipFile(io.BytesIO(base), "r") as zf:
content_types = zf.read("[Content_Types].xml")
replacements = {"[Content_Types].xml": _patch_content_types(content_types, comment_slide_indices)}
out = _rewrite_zip(base, additions=additions, replacements=replacements)
fixture_path = output_dir / "pptx_comments_basic.pptx"
sidecar_path = output_dir / "pptx_comments_basic.gt.json"
fixture_path.write_bytes(out)
expected_revisions = [
{
"kind": "Comment",
"author": dict(AUTHORS)[author_id],
"timestamp": dt,
"slide_index": slide_idx,
}
for (slide_idx, _idx, author_id, dt, _text) in COMMENTS
]
write_ground_truth(
sidecar_path,
fixture_path,
repo_root,
document_format="pptx",
feature="revisions",
expectations=revisions_expectation(
expected_count=len(COMMENTS),
revisions=expected_revisions,
notes=(
"Three slides; comments on slide 1 (two by Alice and Bob) and slide 3 "
"(one by Alice). RevisionKind = Comment for every entry; anchor is "
"RevisionAnchor::Slide with zero-based index. Note that this fixture "
"intentionally does not include the slide -> comments .rels link — the "
"extractor walks the comments directory by filename pattern, not via the "
"relationship graph."
),
),
generator="pptx_comments",
)
return [fixture_path, sidecar_path]

View File

@@ -0,0 +1,303 @@
"""Security edge-case fixtures.
Five fixtures exercising the OOXML-extractor guards:
- ``xlsx_dde_formula.xlsx`` — workbook carrying ``=HYPERLINK(…)`` and
``=DDE(…)`` formula calls. GT asserts extraction succeeds and surfaces
a warning naming the dangerous formula type.
- ``xlsx_safe_formulas.xlsx`` — control workbook with only ``=SUM(A1:A2)``.
GT asserts no DDE / HYPERLINK warning.
- ``docx_oversized_embedded.docx`` — a DOCX whose embedded part is
declared at 100 MiB (a synthetic stream of zeros). GT asserts that with
``max_embedded_file_bytes = 50 MiB`` the extractor skips the child and
emits a size-limit warning.
- ``zip_bomb_xlsx.xlsx`` — 50:1 compression ratio. GT asserts extraction
SUCCEEDS — the guard tolerates legitimate compression.
- ``zip_bomb_xlsx_pathological.xlsx`` — 200:1 ratio. GT asserts extraction
is REJECTED by the zip-bomb guard.
All zip archives use a fixed mtime for hash-stable output.
"""
from __future__ import annotations
import io
import zipfile
from pathlib import Path
from openpyxl import Workbook # type: ignore[import-untyped]
from .gt_schema import security_expectation, write_ground_truth
ZIP_MTIME = (2024, 1, 1, 0, 0, 0)
# A 100 MiB synthetic payload used by docx_oversized_embedded. We avoid
# materialising 100 MiB in memory by writing zeros in chunks during the
# zip write.
ONE_MIB = 1024 * 1024
OVERSIZED_BYTES = 100 * ONE_MIB
# Compression ratios for the two zip-bomb fixtures. The guard threshold
# we're targeting is documented at >= 100:1 in the cloud security model;
# 50:1 must pass, 200:1 must fail.
SAFE_COMPRESSION_RATIO = 50
PATHOLOGICAL_COMPRESSION_RATIO = 200
# Compressed entry size used as the "input" side of the ratio. 64 KiB is
# small enough that the corresponding uncompressed-zero payload at the
# pathological ratio fits in well under 16 MiB.
COMPRESSED_ENTRY_BYTES = 64 * 1024
def _rewrite_zip(src_bytes: bytes, additions: dict[str, bytes], replacements: dict[str, bytes]) -> bytes:
"""Re-zip ``src_bytes`` with deterministic mtimes; additions appended."""
buf = io.BytesIO()
seen: set[str] = set()
with zipfile.ZipFile(io.BytesIO(src_bytes), "r") as src:
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as dst:
for name in src.namelist():
data = replacements.get(name, src.read(name))
info = zipfile.ZipInfo(name, ZIP_MTIME)
info.compress_type = zipfile.ZIP_DEFLATED
dst.writestr(info, data)
seen.add(name)
for name, data in additions.items():
if name in seen:
continue
info = zipfile.ZipInfo(name, ZIP_MTIME)
info.compress_type = zipfile.ZIP_DEFLATED
dst.writestr(info, data)
return buf.getvalue()
# ── DDE / HYPERLINK formulas ──────────────────────────────────────────────────
def _emit_xlsx_dde(output_dir: Path, repo_root: Path) -> list[Path]:
"""Workbook with one HYPERLINK and one DDE formula cell."""
wb = Workbook()
ws = wb.active
ws.title = "danger"
ws["A1"] = "label"
ws["A2"] = "ok"
# openpyxl writes formulas verbatim — the resulting <f>=HYPERLINK(...)</f>
# is exactly what the OOXML extractor flags.
ws["B1"] = '=HYPERLINK("https://example.com/evil", "click me")'
ws["B2"] = '=DDE("cmd","/c calc.exe","_")'
buf = io.BytesIO()
wb.save(buf)
fixture_path = output_dir / "xlsx_dde_formula.xlsx"
sidecar_path = output_dir / "xlsx_dde_formula.gt.json"
fixture_path.write_bytes(buf.getvalue())
write_ground_truth(
sidecar_path,
fixture_path,
repo_root,
document_format="xlsx",
feature="security",
expectations=security_expectation(
should_extract=True,
expected_warnings=["dde", "hyperlink"],
notes=(
"Two dangerous formula calls in B1/B2. Extraction must succeed but the "
"warnings stream must mention DDE and HYPERLINK (case-insensitive substring "
"match — exact wording is up to the extractor's audit emitter)."
),
),
generator="security_fixtures",
)
return [fixture_path, sidecar_path]
def _emit_xlsx_safe(output_dir: Path, repo_root: Path) -> list[Path]:
"""Control workbook with only a SUM formula — no warnings expected."""
wb = Workbook()
ws = wb.active
ws.title = "safe"
ws["A1"] = 10
ws["A2"] = 32
ws["A3"] = "=SUM(A1:A2)"
buf = io.BytesIO()
wb.save(buf)
fixture_path = output_dir / "xlsx_safe_formulas.xlsx"
sidecar_path = output_dir / "xlsx_safe_formulas.gt.json"
fixture_path.write_bytes(buf.getvalue())
write_ground_truth(
sidecar_path,
fixture_path,
repo_root,
document_format="xlsx",
feature="security",
expectations=security_expectation(
should_extract=True,
expected_warnings=[],
notes=(
"Control workbook. Asserts the DDE/HYPERLINK warning path does NOT trigger on "
"ordinary arithmetic formulas — guards against false positives."
),
),
generator="security_fixtures",
)
return [fixture_path, sidecar_path]
# ── Oversized embedded binary inside a DOCX ──────────────────────────────────
def _emit_docx_oversized_embedded(output_dir: Path, repo_root: Path) -> list[Path]:
"""DOCX whose ``word/embeddings/oversized.bin`` is a 100 MiB zero stream."""
from docx import Document # type: ignore[import-untyped]
doc = Document()
doc.add_paragraph("Document carrying an oversized embedded part.")
base_buf = io.BytesIO()
doc.save(base_buf)
base_bytes = base_buf.getvalue()
# The 100 MiB zero payload is highly compressible — the resulting docx
# is ~100 KiB on disk even though the embedded part is huge once
# decompressed.
oversized_payload = b"\x00" * OVERSIZED_BYTES
# The extractor enforces ``max_embedded_file_bytes`` against the
# decompressed size. We patch [Content_Types] + word/_rels with an
# entry that the extractor would walk into.
with zipfile.ZipFile(io.BytesIO(base_bytes), "r") as zf:
content_types = zf.read("[Content_Types].xml")
document_rels = zf.read("word/_rels/document.xml.rels")
new_content_types = content_types.replace(
b"</Types>",
b'<Override PartName="/word/embeddings/oversized.bin" '
b'ContentType="application/octet-stream"/></Types>',
)
new_document_rels = document_rels.replace(
b"</Relationships>",
b'<Relationship Id="rIdOversized" '
b'Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/oleObject" '
b'Target="embeddings/oversized.bin"/></Relationships>',
)
out_bytes = _rewrite_zip(
base_bytes,
additions={"word/embeddings/oversized.bin": oversized_payload},
replacements={
"[Content_Types].xml": new_content_types,
"word/_rels/document.xml.rels": new_document_rels,
},
)
fixture_path = output_dir / "docx_oversized_embedded.docx"
sidecar_path = output_dir / "docx_oversized_embedded.gt.json"
fixture_path.write_bytes(out_bytes)
write_ground_truth(
sidecar_path,
fixture_path,
repo_root,
document_format="docx",
feature="security",
expectations=security_expectation(
should_extract=True,
expected_warnings=["embed", "size", "skip"],
notes=(
"word/embeddings/oversized.bin carries 100 MiB of zeros. With "
"max_embedded_file_bytes = 50 MiB the extractor must skip the child "
"and emit a warning mentioning the embed + size + skip. The base "
"document is extracted normally."
),
),
generator="security_fixtures",
)
return [fixture_path, sidecar_path]
# ── Zip-bomb fixtures ────────────────────────────────────────────────────────
def _build_zip_bomb_xlsx(compression_ratio: int) -> bytes:
"""Author an XLSX whose embedded /xl/payload.bin has the requested ratio.
Implemented by writing ``compression_ratio * COMPRESSED_ENTRY_BYTES``
bytes of zeros into a part that compresses down to roughly
``COMPRESSED_ENTRY_BYTES``. The XLSX shell is otherwise a valid one-
sheet workbook so the zip-bomb guard is what triggers (or doesn't),
not a malformed-archive code path.
"""
wb = Workbook()
ws = wb.active
ws["A1"] = f"Compression ratio target: {compression_ratio}:1"
base_buf = io.BytesIO()
wb.save(base_buf)
base_bytes = base_buf.getvalue()
uncompressed_size = compression_ratio * COMPRESSED_ENTRY_BYTES
payload = b"\x00" * uncompressed_size
return _rewrite_zip(
base_bytes,
additions={"xl/payload.bin": payload},
replacements={},
)
def _emit_zip_bomb_pair(output_dir: Path, repo_root: Path) -> list[Path]:
written: list[Path] = []
safe_path = output_dir / "zip_bomb_xlsx.xlsx"
safe_sidecar = output_dir / "zip_bomb_xlsx.gt.json"
safe_path.write_bytes(_build_zip_bomb_xlsx(SAFE_COMPRESSION_RATIO))
write_ground_truth(
safe_sidecar,
safe_path,
repo_root,
document_format="xlsx",
feature="security",
expectations=security_expectation(
should_extract=True,
expected_warnings=[],
notes=(
f"{SAFE_COMPRESSION_RATIO}:1 compression ratio — legitimately compressible "
"content (zero-filled stream). The zip-bomb guard must NOT trigger; this "
"fixture verifies the guard tolerates real-world compression."
),
),
generator="security_fixtures",
)
written.extend([safe_path, safe_sidecar])
pathological_path = output_dir / "zip_bomb_xlsx_pathological.xlsx"
pathological_sidecar = output_dir / "zip_bomb_xlsx_pathological.gt.json"
pathological_path.write_bytes(_build_zip_bomb_xlsx(PATHOLOGICAL_COMPRESSION_RATIO))
write_ground_truth(
pathological_sidecar,
pathological_path,
repo_root,
document_format="xlsx",
feature="security",
expectations=security_expectation(
should_extract=False,
expected_warnings=["zip", "bomb"],
notes=(
f"{PATHOLOGICAL_COMPRESSION_RATIO}:1 compression ratio — the zip-bomb guard "
"MUST reject the file. Extraction returns an error whose message mentions "
"zip/bomb (case-insensitive substring match)."
),
),
generator="security_fixtures",
)
written.extend([pathological_path, pathological_sidecar])
return written
def generate(output_root: Path, repo_root: Path) -> list[Path]:
"""Emit all security fixtures under ``output_root/security/``."""
output_dir = output_root / "security"
output_dir.mkdir(parents=True, exist_ok=True)
written: list[Path] = []
written.extend(_emit_xlsx_dde(output_dir, repo_root))
written.extend(_emit_xlsx_safe(output_dir, repo_root))
written.extend(_emit_docx_oversized_embedded(output_dir, repo_root))
written.extend(_emit_zip_bomb_pair(output_dir, repo_root))
return written

View File

@@ -0,0 +1,164 @@
"""XLSX revision-headers fixture generator.
Authors a baseline workbook with ``openpyxl``, then injects the
``xl/revisions/revisionHeaders.xml`` part (legacy shared-workbook
collaborative-edit metadata) into the zip alongside the required
``[Content_Types].xml`` registration and a relationship from
``xl/_rels/workbook.xml.rels``.
The on-disk shape matches what
``crates/kreuzberg/src/extraction/excel.rs::parse_revision_headers_xml``
consumes: ``<header guid="{...}" userName="..." dateTime="..."/>`` under
``<headers xmlns="…spreadsheetml/2006/main">``.
"""
from __future__ import annotations
import io
import re
import zipfile
from pathlib import Path
from openpyxl import Workbook # type: ignore[import-untyped]
from .gt_schema import revisions_expectation, write_ground_truth
ZIP_MTIME = (2024, 1, 1, 0, 0, 0)
REV_HEADERS = [
# (guid, userName, dateTime)
("11111111-1111-1111-1111-111111111111", "Alice", "2024-05-01T08:00:00Z"),
("22222222-2222-2222-2222-222222222222", "Bob", "2024-05-01T09:30:00Z"),
("33333333-3333-3333-3333-333333333333", "Carol", "2024-05-01T11:00:00Z"),
]
REVISION_HEADERS_RELID = "rIdRevHeaders"
REVISION_HEADERS_PATH = "xl/revisions/revisionHeaders.xml"
REVISION_HEADERS_CT = "application/vnd.openxmlformats-officedocument.spreadsheetml.revisionHeaders+xml"
def _build_baseline_xlsx() -> bytes:
"""Author a one-sheet workbook with three rows of data."""
wb = Workbook()
ws = wb.active
ws.title = "Sheet1"
ws["A1"] = "Item"
ws["B1"] = "Qty"
ws["A2"] = "Widgets"
ws["B2"] = 42
ws["A3"] = "Gadgets"
ws["B3"] = 7
buf = io.BytesIO()
wb.save(buf)
return buf.getvalue()
def _revision_headers_xml() -> bytes:
body = "".join(
f'<header guid="{{{guid}}}" dateTime="{dt}" userName="{user}" maxSheetId="1"/>'
for guid, user, dt in REV_HEADERS
)
xml = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<headers xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main">'
f"{body}"
"</headers>"
)
return xml.encode("utf-8")
def _patch_content_types(original: bytes) -> bytes:
"""Register the revisionHeaders content-type as an Override."""
text = original.decode("utf-8")
override = (
f'<Override PartName="/{REVISION_HEADERS_PATH}" ContentType="{REVISION_HEADERS_CT}"/>'
)
if override in text:
return original
return text.replace("</Types>", f"{override}</Types>").encode("utf-8")
def _patch_workbook_rels(original: bytes) -> bytes:
"""Add a relationship from workbook -> revisionHeaders."""
text = original.decode("utf-8")
rel = (
f'<Relationship Id="{REVISION_HEADERS_RELID}" '
'Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/revisionHeaders" '
'Target="revisions/revisionHeaders.xml"/>'
)
if REVISION_HEADERS_RELID in text:
return original
# ``</Relationships>`` should always be present; replace the last occurrence.
return re.sub(r"</Relationships>\s*$", f"{rel}</Relationships>", text, count=1).encode("utf-8")
def _rewrite_zip(src_bytes: bytes, additions: dict[str, bytes], replacements: dict[str, bytes]) -> bytes:
"""Re-zip ``src_bytes`` with replacements applied and additions appended."""
buf = io.BytesIO()
seen: set[str] = set()
with zipfile.ZipFile(io.BytesIO(src_bytes), "r") as src:
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as dst:
for name in src.namelist():
data = replacements.get(name, src.read(name))
info = zipfile.ZipInfo(name, ZIP_MTIME)
info.compress_type = zipfile.ZIP_DEFLATED
dst.writestr(info, data)
seen.add(name)
for name, data in additions.items():
if name in seen:
continue
info = zipfile.ZipInfo(name, ZIP_MTIME)
info.compress_type = zipfile.ZIP_DEFLATED
dst.writestr(info, data)
return buf.getvalue()
def generate(output_root: Path, repo_root: Path) -> list[Path]:
"""Emit xlsx_revisions_basic.xlsx + sidecar under ``output_root/xlsx/``."""
output_dir = output_root / "xlsx"
output_dir.mkdir(parents=True, exist_ok=True)
base = _build_baseline_xlsx()
with zipfile.ZipFile(io.BytesIO(base), "r") as zf:
content_types = zf.read("[Content_Types].xml")
workbook_rels = zf.read("xl/_rels/workbook.xml.rels")
out = _rewrite_zip(
base,
additions={REVISION_HEADERS_PATH: _revision_headers_xml()},
replacements={
"[Content_Types].xml": _patch_content_types(content_types),
"xl/_rels/workbook.xml.rels": _patch_workbook_rels(workbook_rels),
},
)
fixture_path = output_dir / "xlsx_revisions_basic.xlsx"
sidecar_path = output_dir / "xlsx_revisions_basic.gt.json"
fixture_path.write_bytes(out)
write_ground_truth(
sidecar_path,
fixture_path,
repo_root,
document_format="xlsx",
feature="revisions",
expectations=revisions_expectation(
expected_count=len(REV_HEADERS),
revisions=[
{
"kind": "FormatChange",
"author": user,
"timestamp": dt,
"revision_id": guid,
}
for guid, user, dt in REV_HEADERS
],
notes=(
"xl/revisions/revisionHeaders.xml carries shared-workbook collaborative-edit "
"headers. The extractor maps each <header> to a DocumentRevision with kind = "
"FormatChange (the closest neutral variant — header file does not record the "
"kind of change). guid braces are stripped from revision_id."
),
),
generator="xlsx_revisions",
)
return [fixture_path, sidecar_path]

View File

@@ -0,0 +1,67 @@
"""Smoke test: every generator runs end-to-end into a tmp dir.
Asserts that the generator produces non-empty binary fixtures and that
every ``*.gt.json`` sidecar parses to a dict with the expected keys.
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from generate_test_fixtures import (
diff_pairs,
docx_revisions,
odt_revisions,
pdf_incremental,
pptx_comments,
security_fixtures,
xlsx_revisions,
)
# (module, expected_minimum_files). Each generator must emit at least one
# binary + one sidecar. Stubs return 0, so we relax the floor to 0 for any
# generator that hasn't been implemented yet — the smoke test still asserts
# shape of whatever it does produce.
GENERATORS = [
docx_revisions,
odt_revisions,
xlsx_revisions,
pptx_comments,
pdf_incremental,
diff_pairs,
security_fixtures,
]
@pytest.fixture()
def repo_root(tmp_path: Path) -> Path:
"""A fake repo root with a ``test_documents/`` marker so relative-path
resolution in the ground-truth writer succeeds.
"""
(tmp_path / "Cargo.toml").write_text("# stub for fixture tests\n", encoding="utf-8")
(tmp_path / "test_documents").mkdir()
return tmp_path
@pytest.mark.parametrize("module", GENERATORS, ids=lambda m: m.__name__.rsplit(".", 1)[-1])
def test_generator_runs_and_emits_well_formed_outputs(module, tmp_path: Path, repo_root: Path) -> None:
"""Each generator runs without raising and every sidecar parses cleanly."""
output_root = tmp_path / "out"
output_root.mkdir()
written = module.generate(output_root, repo_root)
# Stubs may legitimately write nothing while still being valid. We only
# assert structure on files that ARE written.
assert isinstance(written, list)
for path in written:
assert path.exists(), f"{module.__name__} reported {path} but it does not exist"
assert path.stat().st_size > 0, f"{path} is zero-length"
if path.suffix == ".json":
payload = json.loads(path.read_text(encoding="utf-8"))
assert isinstance(payload, dict), f"{path} is not a JSON object"
for key in ("fixture_path", "format", "feature", "expectations", "generated_by"):
assert key in payload, f"{path} missing {key!r}"