""" ground_news.py — Ground News article fetcher + local SQLite store Key design: - RSC payload trick: send RSC: 1 header to get Next.js App Router data - page_cache table: raw RSC payloads with TTL (don't re-fetch fresh pages) - articles table: all extracted fields, categories merged across pages - fetch_article(slug) — single article, rich data - fetch_category(slug) — all stories on an interest page (~15 stories) - fetch_all() — all known interest categories in parallel - top_articles(n, days)— query DB for top-N by source_count """ import re import json import time import sqlite3 import httpx import concurrent.futures from pathlib import Path from db import get_conn, DBConn # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- DB_PATH = Path(__file__).parent / "ground_news.db" BASE_URL = "https://ground.news" CACHE_TTL = { "interest": 30 * 60, # category pages: 30 min "article": 6 * 60 * 60, # single articles: 6 h } HEADERS = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", "Accept": "text/html,application/xhtml+xml", "RSC": "1", "Next-Router-State-Tree": ( "%5B%22%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%5D%7D%2Cnull%2Cnull%2Ctrue%5D" ), } # All known interest slugs (auto-discovered from ground.news homepage 2026-05-24) KNOWN_INTERESTS: dict[str, str] = { "europe": "Europe", "europe-economy": "Europe Economy", "european-politics": "European Politics", "european-union": "European Union", "european-security-and-nato": "European Security & NATO", "uk-politics": "UK Politics", "united-kingdom": "United Kingdom", "international": "International", "north-america": "North America", "south-america": "South America", "africa": "Africa", "asia": "Asia", "australia": "Australia", "us-politics": "US Politics", "united-states": "United States", "donald-trump": "Donald Trump", "trump-administration": "Trump Administration", "israeli-palestinian-conflict": "Israeli-Palestinian Conflict", "business-and-markets": "Business & Markets", "premier-league": "Premier League", "soccer": "Soccer", "memorial-day": "Memorial Day", # Financial / C25 relevant categories "pharma": "Pharmaceuticals", "energy": "Energy", "renewable-energy": "Renewable Energy", "denmark": "Denmark", "finance": "Finance", "corporate": "Corporate", "technology": "Technology", "climate-change": "Climate Change", "shipping": "Shipping", # Danish/Nordic specific "biotech": "Biotech", "healthcare": "Healthcare", "pharmaceutical": "Pharmaceutical", "nordic": "Nordic", "scandinavia": "Scandinavia", "denmark-economy": "Denmark Economy", "danish-economy": "Danish Economy", "global-economy": "Global Economy", "global-markets": "Global Markets", "stock-market": "Stock Market", "investing": "Investing", "clean-energy": "Clean Energy", "logistics": "Logistics", "diabetes": "Diabetes", } # --------------------------------------------------------------------------- # Database # --------------------------------------------------------------------------- def get_db() -> DBConn: """Return a DBConn wrapper (Postgres or SQLite). Schema is managed by db.py.""" return get_conn() # --------------------------------------------------------------------------- # HTTP fetch with cache # --------------------------------------------------------------------------- def fetch_cached(db: DBConn, url: str, page_type: str = "interest") -> tuple[str, bool]: """Return (content, from_cache). Re-fetches if stale per CACHE_TTL.""" row = db.execute( "SELECT content, fetched_at FROM page_cache WHERE url=?", (url,) ).fetchone() ttl = CACHE_TTL.get(page_type, 1800) now = int(time.time()) if row and (now - row["fetched_at"]) < ttl: return row["content"], True r = httpx.get(url, headers=HEADERS, follow_redirects=True, timeout=20) r.raise_for_status() db.upsert( "page_cache", "url", ["url", "page_type", "fetched_at", "content"], (url, page_type, now, r.text), ) db.commit() return r.text, False # --------------------------------------------------------------------------- # RSC payload parsers # --------------------------------------------------------------------------- # UUID v4 pattern _UUID = re.compile(r"[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}") # blindspotData — has coverageProfileStatement + coverageProfileType before the numbers _BLIND = re.compile( r'"blindspotData":\{[^}]{0,400}' # skip coverageProfileStatement, coverageProfileType r'"leftPercent":([\d.]+),"rightPercent":([\d.]+),"centerPercent":([\d.]+),' r'"leftSrcCount":(\d+),"rightSrcCount":(\d+),"cntrSrcCount":(\d+)' ) # Story anchor: start + title + slug + factuality (field order confirmed from RSC) _STORY = re.compile( r'"start":"(20\d{2}-[^"]+)",' r'"title":"([^"]{10,200})",' r'"slug":"([a-z0-9][a-z0-9_-]{15,})",' r'"factuality":\{([^}]+)\}' ) # Escaped JSON string value _JSON_STR = re.compile(r'"((?:[^"\\]|\\.)*)"') def _decode(s: str) -> str: """Decode a JSON-escaped string value.""" try: return json.loads(f'"{s}"') except Exception: return s def parse_stories(data: str, category: str) -> list[dict]: """Extract all story objects from an RSC payload.""" stories = [] for m in _STORY.finditer(data): start, title, slug, fact_raw = m.group(1), m.group(2), m.group(3), m.group(4) before = data[max(0, m.start() - 8000): m.start()] after = data[m.end(): m.end() + 6000] # UUID — last v4 UUID found before the story anchor (the story's own id) uuids = _UUID.findall(before[-4000:]) story_id = uuids[-1] if uuids else None # blindspotData (comes before the anchor) blind = _BLIND.search(before[-8000:]) left_pct = right_pct = ctr_pct = None left_cnt = right_cnt = ctr_cnt = None if blind: left_pct = float(blind.group(1)) # already 0-100 right_pct = float(blind.group(2)) ctr_pct = float(blind.group(3)) left_cnt = int(blind.group(4)) right_cnt = int(blind.group(5)) ctr_cnt = int(blind.group(6)) # biasSourceCount bsc = re.search(r'"biasSourceCount":(\d+)', before[-8000:]) bias_src_count = int(bsc.group(1)) if bsc else 0 # overallBias score (-1 .. +1) ob = re.search(r'"overallBias":([-\d.]+)', before[-8000:]) overall_bias = float(ob.group(1)) if ob else None # blindspot label ("left"/"right"/"center") bs = re.search(r'"blindspot":"(left|right|center|none)"', before[-8000:]) blindspot = bs.group(1) if bs else None # description — allow JSON-escaped content desc_m = re.search(r'"description":"((?:[^"\\]|\\.){0,600})"', before[-3000:]) description = _decode(desc_m.group(1)) if desc_m else None # sourceCount (comes after the anchor in sources:[...]) sc = re.search(r'"sourceCount":(\d+)', after) source_count = int(sc.group(1)) if sc else 0 # factuality factuality = {k: int(v) for k, v in re.findall(r'"(\w+)":(\d+)', fact_raw)} # Ground News interest UUIDs this story belongs to int_m = re.search(r'"interests":\[([^\]]*)\]', before[-2000:]) interests = _UUID.findall(int_m.group(1)) if int_m else [] stories.append({ "slug": slug, "story_id": story_id, "title": _decode(title), "description": description, "start_date": start[:10], "source_count": source_count, "bias_src_count": bias_src_count, "left_pct": left_pct, "ctr_pct": ctr_pct, "right_pct": right_pct, "left_src_count": left_cnt, "ctr_src_count": ctr_cnt, "right_src_count":right_cnt, "overall_bias": overall_bias, "blindspot": blindspot, "factuality": factuality, "interests": interests, "category": category, }) return stories def parse_single_article(data: str, slug: str) -> dict: """Richer extraction for a single article page (has wireStoryRefs etc).""" def get(pattern, cast=str): m = re.search(pattern, data) try: return cast(m.group(1)) if m else None except Exception: return None # story_id: UUID before the slug id_m = re.search(r'"id":"([0-9a-f-]{36})"[^}]{0,200}"slug":"' + re.escape(slug), data, re.DOTALL) story_id = id_m.group(1) if id_m else get(r'"id":"([0-9a-f-]{36})"') # Title — must come before wireStoryRefs title_m = re.search(r'"title":"([^"]{10,200})"[^}]{0,100}"wireStoryRefs"', data, re.DOTALL) title = _decode(title_m.group(1)) if title_m else get(r'"title":"([^"]{10,200})"') # blindspotData blind = _BLIND.search(data) # Bias side breakdown bias_breakdown = {} for side in ("left", "center", "right"): bm = re.search( rf'"id":"{side}".*?"sourceCount":(\d+).*?"percent":(\d+)', data, re.DOTALL ) if bm: bias_breakdown[side] = {"sources": int(bm.group(1)), "percent": int(bm.group(2))} # factuality fm = re.search(r'"factuality":\{([^}]+)\}', data) factuality = {k: int(v) for k, v in re.findall(r'"(\w+)":(\d+)', fm.group(1))} if fm else {} desc_m = re.search(r'"description":"((?:[^"\\]|\\.){20,600})"', data) return { "slug": slug, "story_id": story_id, "title": title, "description": _decode(desc_m.group(1)) if desc_m else None, "start_date": get(r'"start":"(20\d{2}-[^"]+)"'), "source_count": get(r'"sourceCount":(\d+)', int), "bias_src_count": get(r'"biasSourceCount":(\d+)', int), "overall_bias": get(r'"overallBias":([-\d.]+)', float), "blindspot": get(r'"blindspot":"(left|right|center|none)"'), "left_pct": float(blind.group(1)) if blind else None, "right_pct": float(blind.group(2)) if blind else None, "ctr_pct": float(blind.group(3)) if blind else None, "left_src_count": int(blind.group(4)) if blind else None, "right_src_count": int(blind.group(5)) if blind else None, "ctr_src_count": int(blind.group(6)) if blind else None, "factuality": factuality, "bias_breakdown": bias_breakdown, } # --------------------------------------------------------------------------- # DB upsert # --------------------------------------------------------------------------- def upsert_articles(db: DBConn, stories: list[dict]) -> int: """Insert new / update existing articles. Returns count of new rows.""" now = int(time.time()) new = 0 for s in stories: row = db.execute( "SELECT categories, first_seen FROM articles WHERE slug=?", (s["slug"],) ).fetchone() cats = set((row["categories"] or "").split(",")) if row else set() cats.discard("") cats.add(s["category"]) if row: db.execute( """UPDATE articles SET story_id=COALESCE(story_id, ?), source_count=?, bias_src_count=?, left_pct=?, ctr_pct=?, right_pct=?, left_src_count=?, ctr_src_count=?, right_src_count=?, overall_bias=?, blindspot=?, description=COALESCE(description, ?), categories=?, last_seen=? WHERE slug=?""", (s["story_id"], s["source_count"], s["bias_src_count"], s["left_pct"], s["ctr_pct"], s["right_pct"], s["left_src_count"], s["ctr_src_count"], s["right_src_count"], s["overall_bias"], s["blindspot"], s["description"], ",".join(sorted(cats)), now, s["slug"]), ) else: db.execute( """INSERT INTO articles (slug, story_id, title, description, start_date, source_count, bias_src_count, left_pct, ctr_pct, right_pct, left_src_count, ctr_src_count, right_src_count, overall_bias, blindspot, factuality_json, interests_json, categories, first_seen, last_seen) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (s["slug"], s["story_id"], s["title"], s["description"], s["start_date"], s["source_count"], s["bias_src_count"], s["left_pct"], s["ctr_pct"], s["right_pct"], s["left_src_count"], s["ctr_src_count"], s["right_src_count"], s["overall_bias"], s["blindspot"], json.dumps(s["factuality"]), json.dumps(s["interests"]), s["category"], now, now), ) new += 1 db.commit() return new # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def fetch_article_text(slug: str, db: DBConn | None = None) -> str: """ Fetch full article RSC payload and return a clean text blob for NLP. Extracts: main title + description + all source article headlines. """ own_db = db is None if own_db: db = get_db() url = f"{BASE_URL}/article/{slug}" data, _ = fetch_cached(db, url, "article") if own_db: db.close() parts: list[str] = [] seen: set[str] = set() def add(text: str) -> None: if text and len(text) > 20 and text not in seen: seen.add(text) parts.append(text) # Main title for m in re.finditer(r'"title":"((?:[^"\\]|\\.){10,300})"', data): t = _decode(m.group(1)) if not re.search(r'Getty|AFP|\/AFP|PHOTO-TAG', t, re.I): add(t) # Descriptions / excerpts for pattern in [ r'"description":"((?:[^"\\]|\\.){20,600})"', r'"excerpt":"((?:[^"\\]|\\.){20,400})"', r'"summary":"((?:[^"\\]|\\.){20,400})"', ]: for m in re.finditer(pattern, data): t = _decode(m.group(1)) if not re.search(r'Getty|AFP|PHOTO-TAG|Author:', t, re.I): add(t) # Wire story / source article headlines for m in re.finditer(r'"headline":"((?:[^"\\]|\\.){10,300})"', data): add(_decode(m.group(1))) return " ".join(parts) def fetch_article(slug: str, db: DBConn | None = None) -> dict: """Fetch a single article page; optionally cache + store in DB.""" own_db = db is None if own_db: db = get_db() url = f"{BASE_URL}/article/{slug}" data, _ = fetch_cached(db, url, "article") result = parse_single_article(data, slug) if own_db: db.close() return result def _http_fetch_category( category_slug: str, *, force: bool = False, ) -> tuple[str, list[dict], bool]: """ Fetch one category page via HTTP only. Uses a per-thread DB connection (psycopg2 connections are not thread-safe). Returns (slug, stories, from_cache). """ db = get_conn() url = f"{BASE_URL}/interest/{category_slug}" if force: db.execute("DELETE FROM page_cache WHERE url=?", (url,)) db.commit() data, from_cache = fetch_cached(db, url, "interest") db.close() stories = parse_stories(data, category_slug) return category_slug, stories, from_cache def fetch_category( category_slug: str, db: DBConn, *, force: bool = False, ) -> tuple[list[dict], bool]: """ Fetch an interest category page. Returns (stories, from_cache). """ _, stories, from_cache = _http_fetch_category(category_slug, force=force) upsert_articles(db, stories) return stories, from_cache def fetch_all( db: DBConn, slugs: list[str] | None = None, *, force: bool = False, workers: int = 12, ) -> dict[str, list[dict]]: """ Fetch all (or given) interest categories in parallel (HTTP only), then upsert results serially into DB from the calling thread. Returns {slug: [story, ...]} mapping. """ targets = slugs or list(KNOWN_INTERESTS.keys()) results: dict[str, list[dict]] = {} with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as ex: futs = {ex.submit(_http_fetch_category, s, force=force): s for s in targets} for f in concurrent.futures.as_completed(futs): slug = futs[f] try: _, stories, cached = f.result() upsert_articles(db, stories) # DB write in main thread results[slug] = stories icon = "💾" if cached else "🌐" print(f" {icon} {slug:<38} {len(stories):2} stories") except Exception as e: print(f" ✗ {slug:<38} ERROR: {e}") results[slug] = [] return results def top_articles( db: DBConn, limit: int = 30, days: int | None = 2, min_sources: int = 0, ) -> list[sqlite3.Row]: """Query DB for top articles by source_count.""" where = "WHERE source_count >= ?" params: list = [min_sources] if days is not None: where += " AND start_date >= date('now', ?)" params.append(f"-{days} days") return db.execute( f"SELECT * FROM articles {where} ORDER BY source_count DESC LIMIT ?", (*params, limit), ).fetchall() # --------------------------------------------------------------------------- # Display # --------------------------------------------------------------------------- def print_top(rows: list[sqlite3.Row], header: str = "Top artikler") -> None: print(f"\n{'='*76}") print(f" {header} ({len(rows)} artikler)") print(f"{'='*76}\n") for i, a in enumerate(rows, 1): bias = "" if a["left_pct"] is not None: bias = f" L{a['left_pct']:.0f}% C{a['ctr_pct']:.0f}% R{a['right_pct']:.0f}%" cats = (a["categories"] or "").replace(",", " · ") ob = f" bias={a['overall_bias']:+.2f}" if a["overall_bias"] is not None else "" bs = f" blindspot={a['blindspot']}" if a["blindspot"] else "" print(f"{i:2}. [{a['source_count']:4} src{bias}{ob}{bs}] [{a['start_date']}]") print(f" {a['title'][:80]}") if a["description"]: print(f" {a['description'][:90]}") print(f" [{cats}]") print(f" /article/{a['slug']}") print() # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": import sys db = get_db() if len(sys.argv) >= 2 and sys.argv[1] == "article": slug = sys.argv[2] url = f"{BASE_URL}/article/{slug}" data, cached = fetch_cached(db, url, "article") result = parse_single_article(data, slug) print(f"({'cached' if cached else 'fetched'})") print(json.dumps(result, indent=2, ensure_ascii=False)) elif len(sys.argv) >= 2 and sys.argv[1] == "category": slug = sys.argv[2] stories, cached = fetch_category(slug, db) print(f"({'cached' if cached else 'fetched'}) {len(stories)} stories\n") for s in sorted(stories, key=lambda x: x["source_count"], reverse=True): print(f" [{s['source_count']:4} src] {s['title'][:70]}") else: force = "--force" in sys.argv days = 3 print(f"Fetching all {len(KNOWN_INTERESTS)} categories (force={force})…\n") fetch_all(db, force=force) rows = top_articles(db, limit=30, days=days) print_top(rows, f"Top 30 – seneste {days} dage") db.close()