Files
mmd/ground_news.py
Henrik Jess Nielsen 05eed51e7d First commit
2026-05-26 22:21:27 +02:00

560 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ground_news.py — Ground News article fetcher + local SQLite store
Key design:
- RSC payload trick: send RSC: 1 header to get Next.js App Router data
- page_cache table: raw RSC payloads with TTL (don't re-fetch fresh pages)
- articles table: all extracted fields, categories merged across pages
- fetch_article(slug) — single article, rich data
- fetch_category(slug) — all stories on an interest page (~15 stories)
- fetch_all() — all known interest categories in parallel
- top_articles(n, days)— query DB for top-N by source_count
"""
import re
import json
import time
import sqlite3
import httpx
import concurrent.futures
from pathlib import Path
from db import get_conn, DBConn
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DB_PATH = Path(__file__).parent / "ground_news.db"
BASE_URL = "https://ground.news"
CACHE_TTL = {
"interest": 30 * 60, # category pages: 30 min
"article": 6 * 60 * 60, # single articles: 6 h
}
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml",
"RSC": "1",
"Next-Router-State-Tree": (
"%5B%22%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%5D%7D%2Cnull%2Cnull%2Ctrue%5D"
),
}
# All known interest slugs (auto-discovered from ground.news homepage 2026-05-24)
KNOWN_INTERESTS: dict[str, str] = {
"europe": "Europe",
"europe-economy": "Europe Economy",
"european-politics": "European Politics",
"european-union": "European Union",
"european-security-and-nato": "European Security & NATO",
"uk-politics": "UK Politics",
"united-kingdom": "United Kingdom",
"international": "International",
"north-america": "North America",
"south-america": "South America",
"africa": "Africa",
"asia": "Asia",
"australia": "Australia",
"us-politics": "US Politics",
"united-states": "United States",
"donald-trump": "Donald Trump",
"trump-administration": "Trump Administration",
"israeli-palestinian-conflict": "Israeli-Palestinian Conflict",
"business-and-markets": "Business & Markets",
"premier-league": "Premier League",
"soccer": "Soccer",
"memorial-day": "Memorial Day",
# Financial / C25 relevant categories
"pharma": "Pharmaceuticals",
"energy": "Energy",
"renewable-energy": "Renewable Energy",
"denmark": "Denmark",
"finance": "Finance",
"corporate": "Corporate",
"technology": "Technology",
"climate-change": "Climate Change",
"shipping": "Shipping",
# Danish/Nordic specific
"biotech": "Biotech",
"healthcare": "Healthcare",
"pharmaceutical": "Pharmaceutical",
"nordic": "Nordic",
"scandinavia": "Scandinavia",
"denmark-economy": "Denmark Economy",
"danish-economy": "Danish Economy",
"global-economy": "Global Economy",
"global-markets": "Global Markets",
"stock-market": "Stock Market",
"investing": "Investing",
"clean-energy": "Clean Energy",
"logistics": "Logistics",
"diabetes": "Diabetes",
}
# ---------------------------------------------------------------------------
# Database
# ---------------------------------------------------------------------------
def get_db() -> DBConn:
"""Return a DBConn wrapper (Postgres or SQLite). Schema is managed by db.py."""
return get_conn()
# ---------------------------------------------------------------------------
# HTTP fetch with cache
# ---------------------------------------------------------------------------
def fetch_cached(db: DBConn, url: str, page_type: str = "interest") -> tuple[str, bool]:
"""Return (content, from_cache). Re-fetches if stale per CACHE_TTL."""
row = db.execute(
"SELECT content, fetched_at FROM page_cache WHERE url=?", (url,)
).fetchone()
ttl = CACHE_TTL.get(page_type, 1800)
now = int(time.time())
if row and (now - row["fetched_at"]) < ttl:
return row["content"], True
r = httpx.get(url, headers=HEADERS, follow_redirects=True, timeout=20)
r.raise_for_status()
db.upsert(
"page_cache", "url",
["url", "page_type", "fetched_at", "content"],
(url, page_type, now, r.text),
)
db.commit()
return r.text, False
# ---------------------------------------------------------------------------
# RSC payload parsers
# ---------------------------------------------------------------------------
# UUID v4 pattern
_UUID = re.compile(r"[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}")
# blindspotData — has coverageProfileStatement + coverageProfileType before the numbers
_BLIND = re.compile(
r'"blindspotData":\{[^}]{0,400}' # skip coverageProfileStatement, coverageProfileType
r'"leftPercent":([\d.]+),"rightPercent":([\d.]+),"centerPercent":([\d.]+),'
r'"leftSrcCount":(\d+),"rightSrcCount":(\d+),"cntrSrcCount":(\d+)'
)
# Story anchor: start + title + slug + factuality (field order confirmed from RSC)
_STORY = re.compile(
r'"start":"(20\d{2}-[^"]+)",'
r'"title":"([^"]{10,200})",'
r'"slug":"([a-z0-9][a-z0-9_-]{15,})",'
r'"factuality":\{([^}]+)\}'
)
# Escaped JSON string value
_JSON_STR = re.compile(r'"((?:[^"\\]|\\.)*)"')
def _decode(s: str) -> str:
"""Decode a JSON-escaped string value."""
try:
return json.loads(f'"{s}"')
except Exception:
return s
def parse_stories(data: str, category: str) -> list[dict]:
"""Extract all story objects from an RSC payload."""
stories = []
for m in _STORY.finditer(data):
start, title, slug, fact_raw = m.group(1), m.group(2), m.group(3), m.group(4)
before = data[max(0, m.start() - 8000): m.start()]
after = data[m.end(): m.end() + 6000]
# UUID — last v4 UUID found before the story anchor (the story's own id)
uuids = _UUID.findall(before[-4000:])
story_id = uuids[-1] if uuids else None
# blindspotData (comes before the anchor)
blind = _BLIND.search(before[-8000:])
left_pct = right_pct = ctr_pct = None
left_cnt = right_cnt = ctr_cnt = None
if blind:
left_pct = float(blind.group(1)) # already 0-100
right_pct = float(blind.group(2))
ctr_pct = float(blind.group(3))
left_cnt = int(blind.group(4))
right_cnt = int(blind.group(5))
ctr_cnt = int(blind.group(6))
# biasSourceCount
bsc = re.search(r'"biasSourceCount":(\d+)', before[-8000:])
bias_src_count = int(bsc.group(1)) if bsc else 0
# overallBias score (-1 .. +1)
ob = re.search(r'"overallBias":([-\d.]+)', before[-8000:])
overall_bias = float(ob.group(1)) if ob else None
# blindspot label ("left"/"right"/"center")
bs = re.search(r'"blindspot":"(left|right|center|none)"', before[-8000:])
blindspot = bs.group(1) if bs else None
# description — allow JSON-escaped content
desc_m = re.search(r'"description":"((?:[^"\\]|\\.){0,600})"', before[-3000:])
description = _decode(desc_m.group(1)) if desc_m else None
# sourceCount (comes after the anchor in sources:[...])
sc = re.search(r'"sourceCount":(\d+)', after)
source_count = int(sc.group(1)) if sc else 0
# factuality
factuality = {k: int(v) for k, v in re.findall(r'"(\w+)":(\d+)', fact_raw)}
# Ground News interest UUIDs this story belongs to
int_m = re.search(r'"interests":\[([^\]]*)\]', before[-2000:])
interests = _UUID.findall(int_m.group(1)) if int_m else []
stories.append({
"slug": slug,
"story_id": story_id,
"title": _decode(title),
"description": description,
"start_date": start[:10],
"source_count": source_count,
"bias_src_count": bias_src_count,
"left_pct": left_pct,
"ctr_pct": ctr_pct,
"right_pct": right_pct,
"left_src_count": left_cnt,
"ctr_src_count": ctr_cnt,
"right_src_count":right_cnt,
"overall_bias": overall_bias,
"blindspot": blindspot,
"factuality": factuality,
"interests": interests,
"category": category,
})
return stories
def parse_single_article(data: str, slug: str) -> dict:
"""Richer extraction for a single article page (has wireStoryRefs etc)."""
def get(pattern, cast=str):
m = re.search(pattern, data)
try:
return cast(m.group(1)) if m else None
except Exception:
return None
# story_id: UUID before the slug
id_m = re.search(r'"id":"([0-9a-f-]{36})"[^}]{0,200}"slug":"' + re.escape(slug), data, re.DOTALL)
story_id = id_m.group(1) if id_m else get(r'"id":"([0-9a-f-]{36})"')
# Title — must come before wireStoryRefs
title_m = re.search(r'"title":"([^"]{10,200})"[^}]{0,100}"wireStoryRefs"', data, re.DOTALL)
title = _decode(title_m.group(1)) if title_m else get(r'"title":"([^"]{10,200})"')
# blindspotData
blind = _BLIND.search(data)
# Bias side breakdown
bias_breakdown = {}
for side in ("left", "center", "right"):
bm = re.search(
rf'"id":"{side}".*?"sourceCount":(\d+).*?"percent":(\d+)',
data, re.DOTALL
)
if bm:
bias_breakdown[side] = {"sources": int(bm.group(1)), "percent": int(bm.group(2))}
# factuality
fm = re.search(r'"factuality":\{([^}]+)\}', data)
factuality = {k: int(v) for k, v in re.findall(r'"(\w+)":(\d+)', fm.group(1))} if fm else {}
desc_m = re.search(r'"description":"((?:[^"\\]|\\.){20,600})"', data)
return {
"slug": slug,
"story_id": story_id,
"title": title,
"description": _decode(desc_m.group(1)) if desc_m else None,
"start_date": get(r'"start":"(20\d{2}-[^"]+)"'),
"source_count": get(r'"sourceCount":(\d+)', int),
"bias_src_count": get(r'"biasSourceCount":(\d+)', int),
"overall_bias": get(r'"overallBias":([-\d.]+)', float),
"blindspot": get(r'"blindspot":"(left|right|center|none)"'),
"left_pct": float(blind.group(1)) if blind else None,
"right_pct": float(blind.group(2)) if blind else None,
"ctr_pct": float(blind.group(3)) if blind else None,
"left_src_count": int(blind.group(4)) if blind else None,
"right_src_count": int(blind.group(5)) if blind else None,
"ctr_src_count": int(blind.group(6)) if blind else None,
"factuality": factuality,
"bias_breakdown": bias_breakdown,
}
# ---------------------------------------------------------------------------
# DB upsert
# ---------------------------------------------------------------------------
def upsert_articles(db: DBConn, stories: list[dict]) -> int:
"""Insert new / update existing articles. Returns count of new rows."""
now = int(time.time())
new = 0
for s in stories:
row = db.execute(
"SELECT categories, first_seen FROM articles WHERE slug=?", (s["slug"],)
).fetchone()
cats = set((row["categories"] or "").split(",")) if row else set()
cats.discard("")
cats.add(s["category"])
if row:
db.execute(
"""UPDATE articles SET
story_id=COALESCE(story_id, ?),
source_count=?, bias_src_count=?,
left_pct=?, ctr_pct=?, right_pct=?,
left_src_count=?, ctr_src_count=?, right_src_count=?,
overall_bias=?, blindspot=?,
description=COALESCE(description, ?),
categories=?, last_seen=?
WHERE slug=?""",
(s["story_id"],
s["source_count"], s["bias_src_count"],
s["left_pct"], s["ctr_pct"], s["right_pct"],
s["left_src_count"], s["ctr_src_count"], s["right_src_count"],
s["overall_bias"], s["blindspot"],
s["description"],
",".join(sorted(cats)), now,
s["slug"]),
)
else:
db.execute(
"""INSERT INTO articles
(slug, story_id, title, description, start_date,
source_count, bias_src_count,
left_pct, ctr_pct, right_pct,
left_src_count, ctr_src_count, right_src_count,
overall_bias, blindspot,
factuality_json, interests_json,
categories, first_seen, last_seen)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(s["slug"], s["story_id"], s["title"], s["description"], s["start_date"],
s["source_count"], s["bias_src_count"],
s["left_pct"], s["ctr_pct"], s["right_pct"],
s["left_src_count"], s["ctr_src_count"], s["right_src_count"],
s["overall_bias"], s["blindspot"],
json.dumps(s["factuality"]), json.dumps(s["interests"]),
s["category"], now, now),
)
new += 1
db.commit()
return new
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def fetch_article_text(slug: str, db: DBConn | None = None) -> str:
"""
Fetch full article RSC payload and return a clean text blob for NLP.
Extracts: main title + description + all source article headlines.
"""
own_db = db is None
if own_db:
db = get_db()
url = f"{BASE_URL}/article/{slug}"
data, _ = fetch_cached(db, url, "article")
if own_db:
db.close()
parts: list[str] = []
seen: set[str] = set()
def add(text: str) -> None:
if text and len(text) > 20 and text not in seen:
seen.add(text)
parts.append(text)
# Main title
for m in re.finditer(r'"title":"((?:[^"\\]|\\.){10,300})"', data):
t = _decode(m.group(1))
if not re.search(r'Getty|AFP|\/AFP|PHOTO-TAG', t, re.I):
add(t)
# Descriptions / excerpts
for pattern in [
r'"description":"((?:[^"\\]|\\.){20,600})"',
r'"excerpt":"((?:[^"\\]|\\.){20,400})"',
r'"summary":"((?:[^"\\]|\\.){20,400})"',
]:
for m in re.finditer(pattern, data):
t = _decode(m.group(1))
if not re.search(r'Getty|AFP|PHOTO-TAG|Author:', t, re.I):
add(t)
# Wire story / source article headlines
for m in re.finditer(r'"headline":"((?:[^"\\]|\\.){10,300})"', data):
add(_decode(m.group(1)))
return " ".join(parts)
def fetch_article(slug: str, db: DBConn | None = None) -> dict:
"""Fetch a single article page; optionally cache + store in DB."""
own_db = db is None
if own_db:
db = get_db()
url = f"{BASE_URL}/article/{slug}"
data, _ = fetch_cached(db, url, "article")
result = parse_single_article(data, slug)
if own_db:
db.close()
return result
def _http_fetch_category(
category_slug: str,
*,
force: bool = False,
) -> tuple[str, list[dict], bool]:
"""
Fetch one category page via HTTP only.
Uses a per-thread DB connection (psycopg2 connections are not thread-safe).
Returns (slug, stories, from_cache).
"""
db = get_conn()
url = f"{BASE_URL}/interest/{category_slug}"
if force:
db.execute("DELETE FROM page_cache WHERE url=?", (url,))
db.commit()
data, from_cache = fetch_cached(db, url, "interest")
db.close()
stories = parse_stories(data, category_slug)
return category_slug, stories, from_cache
def fetch_category(
category_slug: str,
db: DBConn,
*,
force: bool = False,
) -> tuple[list[dict], bool]:
"""
Fetch an interest category page.
Returns (stories, from_cache).
"""
_, stories, from_cache = _http_fetch_category(category_slug, force=force)
upsert_articles(db, stories)
return stories, from_cache
def fetch_all(
db: DBConn,
slugs: list[str] | None = None,
*,
force: bool = False,
workers: int = 12,
) -> dict[str, list[dict]]:
"""
Fetch all (or given) interest categories in parallel (HTTP only),
then upsert results serially into DB from the calling thread.
Returns {slug: [story, ...]} mapping.
"""
targets = slugs or list(KNOWN_INTERESTS.keys())
results: dict[str, list[dict]] = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as ex:
futs = {ex.submit(_http_fetch_category, s, force=force): s for s in targets}
for f in concurrent.futures.as_completed(futs):
slug = futs[f]
try:
_, stories, cached = f.result()
upsert_articles(db, stories) # DB write in main thread
results[slug] = stories
icon = "💾" if cached else "🌐"
print(f" {icon} {slug:<38} {len(stories):2} stories")
except Exception as e:
print(f"{slug:<38} ERROR: {e}")
results[slug] = []
return results
def top_articles(
db: DBConn,
limit: int = 30,
days: int | None = 2,
min_sources: int = 0,
) -> list[sqlite3.Row]:
"""Query DB for top articles by source_count."""
where = "WHERE source_count >= ?"
params: list = [min_sources]
if days is not None:
where += " AND start_date >= date('now', ?)"
params.append(f"-{days} days")
return db.execute(
f"SELECT * FROM articles {where} ORDER BY source_count DESC LIMIT ?",
(*params, limit),
).fetchall()
# ---------------------------------------------------------------------------
# Display
# ---------------------------------------------------------------------------
def print_top(rows: list[sqlite3.Row], header: str = "Top artikler") -> None:
print(f"\n{'='*76}")
print(f" {header} ({len(rows)} artikler)")
print(f"{'='*76}\n")
for i, a in enumerate(rows, 1):
bias = ""
if a["left_pct"] is not None:
bias = f" L{a['left_pct']:.0f}% C{a['ctr_pct']:.0f}% R{a['right_pct']:.0f}%"
cats = (a["categories"] or "").replace(",", " · ")
ob = f" bias={a['overall_bias']:+.2f}" if a["overall_bias"] is not None else ""
bs = f" blindspot={a['blindspot']}" if a["blindspot"] else ""
print(f"{i:2}. [{a['source_count']:4} src{bias}{ob}{bs}] [{a['start_date']}]")
print(f" {a['title'][:80]}")
if a["description"]:
print(f" {a['description'][:90]}")
print(f" [{cats}]")
print(f" /article/{a['slug']}")
print()
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import sys
db = get_db()
if len(sys.argv) >= 2 and sys.argv[1] == "article":
slug = sys.argv[2]
url = f"{BASE_URL}/article/{slug}"
data, cached = fetch_cached(db, url, "article")
result = parse_single_article(data, slug)
print(f"({'cached' if cached else 'fetched'})")
print(json.dumps(result, indent=2, ensure_ascii=False))
elif len(sys.argv) >= 2 and sys.argv[1] == "category":
slug = sys.argv[2]
stories, cached = fetch_category(slug, db)
print(f"({'cached' if cached else 'fetched'}) {len(stories)} stories\n")
for s in sorted(stories, key=lambda x: x["source_count"], reverse=True):
print(f" [{s['source_count']:4} src] {s['title'][:70]}")
else:
force = "--force" in sys.argv
days = 3
print(f"Fetching all {len(KNOWN_INTERESTS)} categories (force={force})…\n")
fetch_all(db, force=force)
rows = top_articles(db, limit=30, days=days)
print_top(rows, f"Top 30 seneste {days} dage")
db.close()