Files
BlaaAI/fetch_dba.py

332 lines
13 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
dba.dk universal listing monitor works for any DBA search URL.
Usage:
python3 fetch_dba.py [URL] [--all]
URL Any dba.dk search URL (mobility/cars or recommerce/general goods).
Falls back to DEFAULT_URL if omitted.
--all Fetch all pages (default: page 1 only).
Examples:
python3 fetch_dba.py
python3 fetch_dba.py --all
python3 fetch_dba.py "https://www.dba.dk/recommerce/forsale/search?q=rtx+3090"
python3 fetch_dba.py "https://www.dba.dk/recommerce/forsale/search?q=golf+driver&price_to=3000" --all
"""
import hashlib, re, json, sys, time, math, uuid as _uuid
from datetime import datetime, timezone, timedelta
from pathlib import Path
from urllib.parse import urlparse, parse_qs
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
DEFAULT_URL = (
"https://www.dba.dk/mobility/search/car"
"?mileage_to=175000&price_from=15000&price_to=110000"
"&registration_class=1&year_from=2014"
)
HEADERS = {"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"}
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / "data"
ITEM_CACHE = BASE_DIR / "data" / "item_cache"
CACHE_TTL_H = 24 # hours before a cached item detail is re-fetched
UUID_RE = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$")
# ── URL helpers ───────────────────────────────────────────────────────────────
def detect_domain(url: str) -> str:
"""Return 'mobility' or 'recommerce' based on URL path."""
return "mobility" if "/mobility/" in url else "recommerce"
def url_slug(url: str) -> str:
"""Create a short filename-safe slug from a search URL."""
parsed = urlparse(url)
q = parse_qs(parsed.query).get("q", [""])[0]
path_tail = parsed.path.rstrip("/").split("/")[-1]
label = re.sub(r"[^\w]", "_", q or path_tail).strip("_").lower()[:30]
short = hashlib.md5(url.encode()).hexdigest()[:6]
return f"{label}_{short}" if label else short
def page_url(search_url: str, page: int) -> str:
sep = "&" if "?" in search_url else "?"
return search_url + (f"{sep}page={page}" if page > 1 else "")
# ── Search page parsing ───────────────────────────────────────────────────────
def fetch_page(search_url: str, page: int = 1) -> tuple[list[dict], int]:
"""Fetch one search result page. Returns (listings, total_count)."""
resp = requests.get(page_url(search_url, page), headers=HEADERS, timeout=15)
resp.raise_for_status()
return parse_search_page(resp.text)
def parse_search_page(html: str) -> tuple[list[dict], int]:
listings: list[dict] = []
total = 0
m = re.search(r"([\d\.]+)\s+annonce", html)
if m:
total = int(m.group(1).replace(".", ""))
for block in re.findall(
r'<script[^>]*type="application/ld\+json"[^>]*>(.*?)</script>',
html, re.DOTALL
):
try:
data = json.loads(block)
if data.get("@type") != "CollectionPage":
continue
for item in data.get("mainEntity", {}).get("itemListElement", []):
p = item.get("item", {})
item_url = p.get("url", "")
# ID is always the last numeric path segment
item_id = re.search(r"/(\d+)/?$", item_url)
listings.append({
"id": item_id.group(1) if item_id else item_url.split("/")[-1],
"name": p.get("name") or f"{p.get('brand',{}).get('name','')} {p.get('model','')}".strip(),
"brand": p.get("brand", {}).get("name"),
"model": p.get("model"),
"description": p.get("description"),
"price_dkk": p.get("offers", {}).get("price"),
"url": item_url,
"image": p.get("image"),
"condition": p.get("itemCondition", "").replace("https://schema.org/", ""),
})
except (json.JSONDecodeError, KeyError):
pass
return listings, total
def fetch_all_pages(search_url: str) -> list[dict]:
first_page, total = fetch_page(search_url, 1)
if total == 0:
# Try counting items directly if total not found in HTML
total = len(first_page)
items_per_page = len(first_page) or 49
pages = math.ceil(total / items_per_page) if total else 1
print(f"Total: {total} listings across {pages} pages", file=sys.stderr)
all_listings = first_page
for p in range(2, pages + 1):
print(f" Fetching page {p}/{pages}", file=sys.stderr)
listings, _ = fetch_page(search_url, p)
all_listings.extend(listings)
if not listings:
break
time.sleep(0.5)
return all_listings
# ── Item detail fetching ──────────────────────────────────────────────────────
def page_to_text(html: str) -> str:
"""Strip HTML tags and return clean visible text, trimmed of navigation/footer noise."""
# Remove script and style blocks entirely
text = re.sub(r"<(script|style)[^>]*>.*?</\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
# Strip all remaining tags
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text).strip()
# Cut off at footer noise (everything after "For virksomheder" is boilerplate)
for cutoff in ["For virksomheder", "Annoncens metadata", "DBA Boost"]:
idx = text.find(cutoff)
if idx > 200:
text = text[:idx].strip()
break
return text
def fetch_item_details(item: dict) -> dict:
"""Fetch raw visible text from an item page, using file cache."""
item_id = item.get("id", "")
cache_key = ITEM_CACHE / f"{item_id}.json"
ITEM_CACHE.mkdir(parents=True, exist_ok=True)
# Serve from cache if fresh enough
if cache_key.exists():
try:
cached = json.loads(cache_key.read_text())
cached_at = datetime.fromisoformat(cached["cached_at"]).replace(tzinfo=timezone.utc)
age_h = (datetime.now(timezone.utc) - cached_at).total_seconds() / 3600
if age_h < CACHE_TTL_H:
return {"raw_text": cached["raw_text"], "from_cache": True}
except Exception:
pass # corrupt cache entry → re-fetch
try:
resp = requests.get(item["url"], headers=HEADERS, timeout=10)
resp.raise_for_status()
raw_text = page_to_text(resp.text)
cache_key.write_text(json.dumps({
"id": item_id,
"raw_text": raw_text,
"cached_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
}, ensure_ascii=False))
return {"raw_text": raw_text}
except Exception:
return {"raw_text": ""}
def enrich_listings(listings: list[dict], workers: int = 8) -> list[dict]:
print(f"Fetching details for {len(listings)} items…", file=sys.stderr)
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = {ex.submit(fetch_item_details, l): i for i, l in enumerate(listings)}
for future in as_completed(futures):
result = future.result()
listings[futures[future]]["details"] = result
cached = sum(1 for l in listings if l.get("details", {}).get("from_cache"))
fetched = len(listings) - cached
print(f"{fetched} hentet fra DBA, {cached} fra cache", file=sys.stderr)
return listings
# ── Data directory helpers ────────────────────────────────────────────────────
def search_dir(search_id: str) -> Path:
return DATA_DIR / search_id
def create_search(url: str) -> str:
"""Create a new search directory and return its UUID."""
search_id = str(_uuid.uuid4())
d = search_dir(search_id)
d.mkdir(parents=True, exist_ok=True)
meta = {
"id": search_id,
"url": url,
"domain": detect_domain(url),
"created_at": datetime.now().isoformat(timespec="seconds"),
}
(d / "meta.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2))
return search_id
def load_meta(search_id: str) -> dict:
p = search_dir(search_id) / "meta.json"
if not p.exists():
raise FileNotFoundError(f"Ingen søgning med UUID {search_id}")
return json.loads(p.read_text())
def listings_file(search_id: str) -> Path:
return search_dir(search_id) / "listings.json"
def seen_file(search_id: str) -> Path:
return search_dir(search_id) / "seen.json"
def list_searches() -> list[dict]:
if not DATA_DIR.exists():
return []
results = []
for d in sorted(DATA_DIR.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True):
meta_path = d / "meta.json"
if meta_path.exists():
meta = json.loads(meta_path.read_text())
lf = d / "listings.json"
meta["listing_count"] = len(json.loads(lf.read_text())) if lf.exists() else 0
results.append(meta)
return results
def load_seen(state_file: Path) -> set[str]:
return set(json.loads(state_file.read_text())) if state_file.exists() else set()
def save_seen(state_file: Path, ids: set[str]) -> None:
state_file.write_text(json.dumps(sorted(ids)))
def find_new(listings: list[dict], seen: set[str]) -> list[dict]:
return [l for l in listings if l["id"] not in seen]
# ── Output ────────────────────────────────────────────────────────────────────
def print_listing(item: dict) -> None:
text = item.get("details", {}).get("raw_text", "")
preview = (text[:160] + "") if len(text) > 160 else text
print(
f"[{item['id']}] {item['name']}\n"
f" Pris: {item['price_dkk']} DKK | {item.get('condition','')}\n"
f" {item['url']}\n"
f" {preview}\n"
)
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
args = [a for a in sys.argv[1:] if not a.startswith("-")]
flags = [a for a in sys.argv[1:] if a.startswith("-")]
fetch_all = "--all" in flags
first = args[0] if args else None
# ── list existing searches ─────────────────────────────────────────────────
if "--list" in flags or first == "list":
searches = list_searches()
if not searches:
print("Ingen søgninger endnu. Kør: python fetch_dba.py <url>")
return
print(f"{'UUID':36} {'Oprettet':19} {'#':4} URL")
print("" * 100)
for s in searches:
print(f"{s['id']:36} {s['created_at']:19} {s['listing_count']:4} {s['url'][:60]}")
return
# ── resolve search_id or create new ───────────────────────────────────────
if first and UUID_RE.match(first):
search_id = first
meta = load_meta(search_id)
search_url = meta["url"]
print(f"🔄 Bruger eksisterende søgning: {search_id}", file=sys.stderr)
else:
search_url = first if first and first.startswith("http") else DEFAULT_URL
search_id = create_search(search_url)
print(f"✨ Ny søgning oprettet: {search_id}", file=sys.stderr)
domain = detect_domain(search_url)
sf = seen_file(search_id)
lf = listings_file(search_id)
emoji = "🚗" if domain == "mobility" else "🛒"
print(f"{emoji} Domain: {domain} | {'All pages' if fetch_all else 'Page 1'}", file=sys.stderr)
print(f" URL: {search_url}", file=sys.stderr)
print(f" Dir: data/{search_id}/", file=sys.stderr)
seen = load_seen(sf)
listings = fetch_all_pages(search_url) if fetch_all else fetch_page(search_url, 1)[0]
new_listings = find_new(listings, seen)
if not new_listings:
print("Ingen nye annoncer siden sidst.")
return
new_listings = enrich_listings(new_listings)
existing = json.loads(lf.read_text()) if lf.exists() else []
existing.extend(new_listings)
lf.write_text(json.dumps(existing, ensure_ascii=False, indent=2))
print(f"💾 Gemt {len(new_listings)} nye → data/{search_id}/listings.json ({len(existing)} total)\n", file=sys.stderr)
print(f"\n📋 UUID: {search_id}")
print(f"{emoji} {len(new_listings)} ny(e) annonce(r):\n")
for item in new_listings:
print_listing(item)
save_seen(sf, seen | {l["id"] for l in listings})
if __name__ == "__main__":
main()