#!/usr/bin/env python3 """ dba.dk universal listing monitor — works for any DBA search URL. Usage: python3 fetch_dba.py [URL] [--all] URL Any dba.dk search URL (mobility/cars or recommerce/general goods). Falls back to DEFAULT_URL if omitted. --all Fetch all pages (default: page 1 only). Examples: python3 fetch_dba.py python3 fetch_dba.py --all python3 fetch_dba.py "https://www.dba.dk/recommerce/forsale/search?q=rtx+3090" python3 fetch_dba.py "https://www.dba.dk/recommerce/forsale/search?q=golf+driver&price_to=3000" --all """ import hashlib, re, json, sys, time, math, uuid as _uuid from datetime import datetime, timezone, timedelta from pathlib import Path from urllib.parse import urlparse, parse_qs from concurrent.futures import ThreadPoolExecutor, as_completed import requests DEFAULT_URL = ( "https://www.dba.dk/mobility/search/car" "?mileage_to=175000&price_from=15000&price_to=110000" "®istration_class=1&year_from=2014" ) HEADERS = {"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"} BASE_DIR = Path(__file__).parent DATA_DIR = BASE_DIR / "data" ITEM_CACHE = BASE_DIR / "data" / "item_cache" CACHE_TTL_H = 24 # hours before a cached item detail is re-fetched UUID_RE = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$") # ── URL helpers ─────────────────────────────────────────────────────────────── def detect_domain(url: str) -> str: """Return 'mobility' or 'recommerce' based on URL path.""" return "mobility" if "/mobility/" in url else "recommerce" def url_slug(url: str) -> str: """Create a short filename-safe slug from a search URL.""" parsed = urlparse(url) q = parse_qs(parsed.query).get("q", [""])[0] path_tail = parsed.path.rstrip("/").split("/")[-1] label = re.sub(r"[^\w]", "_", q or path_tail).strip("_").lower()[:30] short = hashlib.md5(url.encode()).hexdigest()[:6] return f"{label}_{short}" if label else short def page_url(search_url: str, page: int) -> str: sep = "&" if "?" in search_url else "?" return search_url + (f"{sep}page={page}" if page > 1 else "") # ── Search page parsing ─────────────────────────────────────────────────────── def fetch_page(search_url: str, page: int = 1) -> tuple[list[dict], int]: """Fetch one search result page. Returns (listings, total_count).""" resp = requests.get(page_url(search_url, page), headers=HEADERS, timeout=15) resp.raise_for_status() return parse_search_page(resp.text) def parse_search_page(html: str) -> tuple[list[dict], int]: listings: list[dict] = [] total = 0 m = re.search(r"([\d\.]+)\s+annonce", html) if m: total = int(m.group(1).replace(".", "")) for block in re.findall( r']*type="application/ld\+json"[^>]*>(.*?)', html, re.DOTALL ): try: data = json.loads(block) if data.get("@type") != "CollectionPage": continue for item in data.get("mainEntity", {}).get("itemListElement", []): p = item.get("item", {}) item_url = p.get("url", "") # ID is always the last numeric path segment item_id = re.search(r"/(\d+)/?$", item_url) listings.append({ "id": item_id.group(1) if item_id else item_url.split("/")[-1], "name": p.get("name") or f"{p.get('brand',{}).get('name','')} {p.get('model','')}".strip(), "brand": p.get("brand", {}).get("name"), "model": p.get("model"), "description": p.get("description"), "price_dkk": p.get("offers", {}).get("price"), "url": item_url, "image": p.get("image"), "condition": p.get("itemCondition", "").replace("https://schema.org/", ""), }) except (json.JSONDecodeError, KeyError): pass return listings, total def fetch_all_pages(search_url: str) -> list[dict]: first_page, total = fetch_page(search_url, 1) if total == 0: # Try counting items directly if total not found in HTML total = len(first_page) items_per_page = len(first_page) or 49 pages = math.ceil(total / items_per_page) if total else 1 print(f"Total: {total} listings across {pages} pages", file=sys.stderr) all_listings = first_page for p in range(2, pages + 1): print(f" Fetching page {p}/{pages}…", file=sys.stderr) listings, _ = fetch_page(search_url, p) all_listings.extend(listings) if not listings: break time.sleep(0.5) return all_listings # ── Item detail fetching ────────────────────────────────────────────────────── def page_to_text(html: str) -> str: """Strip HTML tags and return clean visible text, trimmed of navigation/footer noise.""" # Remove script and style blocks entirely text = re.sub(r"<(script|style)[^>]*>.*?", "", html, flags=re.DOTALL | re.IGNORECASE) # Strip all remaining tags text = re.sub(r"<[^>]+>", " ", text) text = re.sub(r"\s+", " ", text).strip() # Cut off at footer noise (everything after "For virksomheder" is boilerplate) for cutoff in ["For virksomheder", "Annoncens metadata", "DBA Boost"]: idx = text.find(cutoff) if idx > 200: text = text[:idx].strip() break return text def fetch_item_details(item: dict) -> dict: """Fetch raw visible text from an item page, using file cache.""" item_id = item.get("id", "") cache_key = ITEM_CACHE / f"{item_id}.json" ITEM_CACHE.mkdir(parents=True, exist_ok=True) # Serve from cache if fresh enough if cache_key.exists(): try: cached = json.loads(cache_key.read_text()) cached_at = datetime.fromisoformat(cached["cached_at"]).replace(tzinfo=timezone.utc) age_h = (datetime.now(timezone.utc) - cached_at).total_seconds() / 3600 if age_h < CACHE_TTL_H: return {"raw_text": cached["raw_text"], "from_cache": True} except Exception: pass # corrupt cache entry → re-fetch try: resp = requests.get(item["url"], headers=HEADERS, timeout=10) resp.raise_for_status() raw_text = page_to_text(resp.text) cache_key.write_text(json.dumps({ "id": item_id, "raw_text": raw_text, "cached_at": datetime.now(timezone.utc).isoformat(timespec="seconds"), }, ensure_ascii=False)) return {"raw_text": raw_text} except Exception: return {"raw_text": ""} def enrich_listings(listings: list[dict], workers: int = 8) -> list[dict]: print(f"Fetching details for {len(listings)} items…", file=sys.stderr) with ThreadPoolExecutor(max_workers=workers) as ex: futures = {ex.submit(fetch_item_details, l): i for i, l in enumerate(listings)} for future in as_completed(futures): result = future.result() listings[futures[future]]["details"] = result cached = sum(1 for l in listings if l.get("details", {}).get("from_cache")) fetched = len(listings) - cached print(f" ✓ {fetched} hentet fra DBA, {cached} fra cache", file=sys.stderr) return listings # ── Data directory helpers ──────────────────────────────────────────────────── def search_dir(search_id: str) -> Path: return DATA_DIR / search_id def create_search(url: str) -> str: """Create a new search directory and return its UUID.""" search_id = str(_uuid.uuid4()) d = search_dir(search_id) d.mkdir(parents=True, exist_ok=True) meta = { "id": search_id, "url": url, "domain": detect_domain(url), "created_at": datetime.now().isoformat(timespec="seconds"), } (d / "meta.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2)) return search_id def load_meta(search_id: str) -> dict: p = search_dir(search_id) / "meta.json" if not p.exists(): raise FileNotFoundError(f"Ingen søgning med UUID {search_id}") return json.loads(p.read_text()) def listings_file(search_id: str) -> Path: return search_dir(search_id) / "listings.json" def seen_file(search_id: str) -> Path: return search_dir(search_id) / "seen.json" def list_searches() -> list[dict]: if not DATA_DIR.exists(): return [] results = [] for d in sorted(DATA_DIR.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True): meta_path = d / "meta.json" if meta_path.exists(): meta = json.loads(meta_path.read_text()) lf = d / "listings.json" meta["listing_count"] = len(json.loads(lf.read_text())) if lf.exists() else 0 results.append(meta) return results def load_seen(state_file: Path) -> set[str]: return set(json.loads(state_file.read_text())) if state_file.exists() else set() def save_seen(state_file: Path, ids: set[str]) -> None: state_file.write_text(json.dumps(sorted(ids))) def find_new(listings: list[dict], seen: set[str]) -> list[dict]: return [l for l in listings if l["id"] not in seen] # ── Output ──────────────────────────────────────────────────────────────────── def print_listing(item: dict) -> None: text = item.get("details", {}).get("raw_text", "") preview = (text[:160] + "…") if len(text) > 160 else text print( f"[{item['id']}] {item['name']}\n" f" Pris: {item['price_dkk']} DKK | {item.get('condition','')}\n" f" {item['url']}\n" f" {preview}\n" ) # ── Main ────────────────────────────────────────────────────────────────────── def main() -> None: args = [a for a in sys.argv[1:] if not a.startswith("-")] flags = [a for a in sys.argv[1:] if a.startswith("-")] fetch_all = "--all" in flags first = args[0] if args else None # ── list existing searches ───────────────────────────────────────────────── if "--list" in flags or first == "list": searches = list_searches() if not searches: print("Ingen søgninger endnu. Kør: python fetch_dba.py ") return print(f"{'UUID':36} {'Oprettet':19} {'#':4} URL") print("─" * 100) for s in searches: print(f"{s['id']:36} {s['created_at']:19} {s['listing_count']:4} {s['url'][:60]}") return # ── resolve search_id or create new ─────────────────────────────────────── if first and UUID_RE.match(first): search_id = first meta = load_meta(search_id) search_url = meta["url"] print(f"🔄 Bruger eksisterende søgning: {search_id}", file=sys.stderr) else: search_url = first if first and first.startswith("http") else DEFAULT_URL search_id = create_search(search_url) print(f"✨ Ny søgning oprettet: {search_id}", file=sys.stderr) domain = detect_domain(search_url) sf = seen_file(search_id) lf = listings_file(search_id) emoji = "🚗" if domain == "mobility" else "🛒" print(f"{emoji} Domain: {domain} | {'All pages' if fetch_all else 'Page 1'}", file=sys.stderr) print(f" URL: {search_url}", file=sys.stderr) print(f" Dir: data/{search_id}/", file=sys.stderr) seen = load_seen(sf) listings = fetch_all_pages(search_url) if fetch_all else fetch_page(search_url, 1)[0] new_listings = find_new(listings, seen) if not new_listings: print("Ingen nye annoncer siden sidst.") return new_listings = enrich_listings(new_listings) existing = json.loads(lf.read_text()) if lf.exists() else [] existing.extend(new_listings) lf.write_text(json.dumps(existing, ensure_ascii=False, indent=2)) print(f"💾 Gemt {len(new_listings)} nye → data/{search_id}/listings.json ({len(existing)} total)\n", file=sys.stderr) print(f"\n📋 UUID: {search_id}") print(f"{emoji} {len(new_listings)} ny(e) annonce(r):\n") for item in new_listings: print_listing(item) save_seen(sf, seen | {l["id"] for l in listings}) if __name__ == "__main__": main()