""" analyze.py — C25 Financial Signal Extractor Pipeline (v2): 1. Alias screen on title+desc for C25 mentions 2. Coverage-spread filter: skip low-quality / one-sided articles 3. NER upgrade: BERT-NER to confirm and expand matches 4. Full-text fetch + re-screen 5. FinBERT: quick sentiment — drop neutral < FINBERT_MIN_CONF 6. Claude: structured extraction (tickers, magnitude, timeframe) 7. yfinance: momentum check — direction alignment 8. signal_score = sentiment_confidence × coverage_spread × momentum_alignment 9. Alert if signal_score > ALERT_THRESHOLD Usage: python3 analyze.py # analyze new articles only python3 analyze.py --force # re-analyze everything python3 analyze.py --limit 20 # limit to 20 articles python3 analyze.py --dry-run # show matches without storing python3 analyze.py --no-claude # skip Claude step (no API cost) """ import re import os import sys import json import time import math import sqlite3 import logging import warnings from pathlib import Path # Silence transformer noise before importing os.environ.setdefault("TRANSFORMERS_VERBOSITY", "error") os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1") warnings.filterwarnings("ignore") logging.getLogger("transformers").setLevel(logging.ERROR) import torch from transformers import pipeline from dotenv import load_dotenv # Load .env (supports both ANTHROPIC_API_KEY and anthropic_api_key) _env_file = Path(__file__).parent / ".env" if _env_file.exists(): load_dotenv(_env_file, override=False) for _k, _v in list(os.environ.items()): if _k.lower() == "anthropic_api_key" and "ANTHROPIC_API_KEY" not in os.environ: os.environ["ANTHROPIC_API_KEY"] = _v # Ground News helpers sys.path.insert(0, str(Path(__file__).parent)) from ground_news import get_db, fetch_article_text, fetch_all from rss_feeds import fetch_all_rss # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- C25_PATH = Path(__file__).parent / "c25.json" _c25_raw = json.loads(C25_PATH.read_text()) C25: dict[str, dict] = {k: v for k, v in _c25_raw.items() if not k.startswith("_")} # Build alias → ticker lookup (lower-cased for matching) ALIAS_MAP: dict[str, str] = {} for _ticker, _data in C25.items(): for _alias in _data["aliases"]: al = _alias.lower() if al not in ALIAS_MAP: # first alias wins (most specific first in c25.json) ALIAS_MAP[al] = _ticker DEVICE = -1 # always CPU — Quadro P400 (CC 6.1) too old + too little VRAM for these models # Quality thresholds MIN_SOURCES = 1 # coverage_spread naturally weights single-source articles near zero MIN_COVERAGE_SPREAD = 0.0 # disabled: signal_score naturally zeros out single-source articles FINBERT_MIN_CONF = 0.70 # drop neutral articles below this FinBERT confidence ALERT_THRESHOLD = 0.35 # signal_score > this → alert # --------------------------------------------------------------------------- # Model loading # --------------------------------------------------------------------------- _ner_model = None _finbert_model = None def get_ner(): global _ner_model if _ner_model is None: print("[analyze] Loading dslim/bert-base-NER …", flush=True) _ner_model = pipeline( "ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=DEVICE, ) return _ner_model def get_finbert(): global _finbert_model if _finbert_model is None: print("[analyze] Loading ProsusAI/finbert …", flush=True) _finbert_model = pipeline( "sentiment-analysis", model="ProsusAI/finbert", device=DEVICE, truncation=True, max_length=512, ) return _finbert_model # --------------------------------------------------------------------------- # C25 alias matching # --------------------------------------------------------------------------- def match_c25(text: str) -> dict[str, float]: """ Find C25 companies mentioned in text. Returns {ticker: confidence_score}. """ text_lower = text.lower() matches: dict[str, float] = {} for alias_lower, ticker in ALIAS_MAP.items(): if ticker in matches: continue # already found this company # Always use word boundaries — prevents "sonic" matching "hypersonic", # "net" matching "internet", "iss" matching "mission", etc. pat = r"(? dict[str, float]: """ Cross-reference NER ORG entities with alias map. Requires whole-token match to avoid 'EMA' matching 'd-ema-nt'. """ merged = dict(base) for ent in ner_result: if ent.get("entity_group") not in ("ORG", "PER"): continue word_tokens = set(re.split(r"[\s\-_/]+", ent["word"].lower().strip("##"))) for alias_lower, ticker in ALIAS_MAP.items(): if len(alias_lower) < 4: continue alias_tokens = set(re.split(r"[\s\-_/]+", alias_lower)) # Need significant token overlap, not just substring containment overlap = alias_tokens & word_tokens if overlap and len(overlap) / max(len(alias_tokens), len(word_tokens)) >= 0.5: score = ent.get("score", 0.7) if score > merged.get(ticker, 0): merged[ticker] = score return merged # --------------------------------------------------------------------------- # Coverage spread scoring # --------------------------------------------------------------------------- def coverage_spread_score(row) -> float: """ Quality score (0–1) based on source count and bias diversity. High = many sources from left + right + centre. Low = few or echo chamber. """ src = row["source_count"] or 0 left = row["left_src_count"] or 0 right = row["right_src_count"] or 0 ctr = row["ctr_src_count"] or 0 if src < MIN_SOURCES: return 0.0 quantity = min(1.0, math.log(max(1, src)) / math.log(50)) fl, fr, fc = left / src, right / src, ctr / src diversity = min(1.0, (fl * fr * fc) ** (1 / 3) * 9) # peaks at equal thirds return round(quantity * 0.6 + diversity * 0.4, 3) # --------------------------------------------------------------------------- # Claude structured extraction # --------------------------------------------------------------------------- def claude_extract(title: str, text: str, tickers: list[str]) -> dict: """ Use Claude Haiku to extract structured financial signal. Returns {"confirmed_tickers", "magnitude", "timeframe", "reasoning"}. """ import anthropic api_key = os.environ.get("ANTHROPIC_API_KEY") if not api_key: return {"confirmed_tickers": tickers, "magnitude": 5, "timeframe": "days", "reasoning": "(no API key)"} client = anthropic.Anthropic(api_key=api_key) ticker_ctx = "\n".join( f" {t}: {C25[t]['name']} ({C25[t]['sector']})" for t in tickers if t in C25 ) prompt = f"""You are a financial analyst specializing in Scandinavian equities. Analyze this news article and assess its financial impact on the listed Danish C25 companies. ## Companies to analyze: {ticker_ctx} ## Article: Title: {title} {text[:1500]} Respond ONLY with valid JSON (no markdown fences): {{ "confirmed_tickers": ["NOVO-B"], "magnitude": 7, "timeframe": "days", "reasoning": "Two sentences max on financial impact and direction." }} Fields: - confirmed_tickers: only companies truly affected (can be []) - magnitude: 1–10 (1=irrelevant, 10=major market mover) - timeframe: "hours", "days", "weeks", or "months" - reasoning: brief analyst note""" try: msg = client.messages.create( model="claude-haiku-4-5", max_tokens=256, messages=[{"role": "user", "content": prompt}], ) raw = msg.content[0].text.strip() raw = re.sub(r"^```(?:json)?\n?", "", raw) raw = re.sub(r"\n?```$", "", raw) return json.loads(raw) except Exception as e: print(f" [warn] Claude failed: {e}") return {"confirmed_tickers": tickers, "magnitude": 5, "timeframe": "days", "reasoning": str(e)[:120]} # --------------------------------------------------------------------------- # yfinance momentum # --------------------------------------------------------------------------- _momentum_cache: dict[str, dict] = {} def momentum_check(ticker: str) -> dict: """5-day price momentum for a C25 ticker via yfinance.""" if ticker in _momentum_cache: return _momentum_cache[ticker] import yfinance as yf company = C25.get(ticker, {}) yahoo_ticker = company.get("yahoo_ticker", ticker + ".CO") result: dict = {"direction": "unknown", "pct_5d": 0.0, "pct_20d": 0.0} try: hist = yf.Ticker(yahoo_ticker).history(period="1mo", auto_adjust=True) if len(hist) >= 5: close = hist["Close"] pct_5d = float((close.iloc[-1] / close.iloc[-5] - 1) * 100) pct_20d = float((close.iloc[-1] / close.iloc[0] - 1) * 100) if len(hist) >= 20 else 0.0 direction = "up" if pct_5d > 1.5 else ("down" if pct_5d < -1.5 else "flat") result = {"direction": direction, "pct_5d": round(pct_5d, 2), "pct_20d": round(pct_20d, 2)} except Exception: pass _momentum_cache[ticker] = result return result # --------------------------------------------------------------------------- # Signal score # --------------------------------------------------------------------------- def calc_signal_score(sent_score: float, sentiment: str, coverage: float, momentum: dict) -> float: """signal_score = sentiment_confidence × coverage_spread × momentum_alignment""" d = momentum.get("direction", "unknown") if d == "unknown": alignment = 0.5 elif d == "flat": alignment = 0.7 elif (sentiment == "positive" and d == "up") or (sentiment == "negative" and d == "down"): alignment = 1.0 else: alignment = 0.4 # contrarian return round(sent_score * coverage * alignment, 3) # --------------------------------------------------------------------------- # DB schema migration # --------------------------------------------------------------------------- def migrate_db(db) -> None: """Apply schema migrations for SQLite. No-op for Postgres (schema managed by db.py).""" if hasattr(db, "db_type") and db.db_type == "postgres": return existing = {row[1] for row in db.execute("PRAGMA table_info(article_signals)").fetchall()} new_cols = [ ("coverage_spread", "REAL DEFAULT 0"), ("claude_tickers", "TEXT"), ("claude_magnitude", "INTEGER DEFAULT 5"), ("claude_timeframe", "TEXT"), ("claude_reasoning", "TEXT"), ("momentum_dir", "TEXT"), ("momentum_pct_5d", "REAL DEFAULT 0"), ("signal_score", "REAL DEFAULT 0"), ("alert", "INTEGER DEFAULT 0"), ] for col_name, col_def in new_cols: if col_name not in existing: db.execute(f"ALTER TABLE article_signals ADD COLUMN {col_name} {col_def}") db.commit() # --------------------------------------------------------------------------- # Main pipeline # --------------------------------------------------------------------------- def analyze_articles( *, force: bool = False, limit: int | None = None, dry_run: bool = False, use_claude: bool = True, auto_fetch: bool = True, ) -> None: db = get_db() migrate_db(db) # Auto-refresh articles (respects 30-min cache TTL) if auto_fetch and not dry_run: before = db.execute("SELECT COUNT(*) AS cnt FROM articles").fetchone()["cnt"] print("[analyze] Refreshing Ground News feed …") fetch_all(db) print("[analyze] Henter danske RSS feeds …") fetch_all_rss(db) after = db.execute("SELECT COUNT(*) AS cnt FROM articles").fetchone()["cnt"] if after > before: print(f"[analyze] +{after - before} new articles") base_q = """ SELECT slug, title, description, source_count, left_src_count, right_src_count, ctr_src_count, left_pct, right_pct, ctr_pct FROM articles {where} ORDER BY source_count DESC """ rows = db.execute( base_q.format(where="" if force else "WHERE slug NOT IN (SELECT DISTINCT article_slug FROM article_signals)") ).fetchall() if limit: rows = rows[:limit] total = len(rows) print(f"[analyze] {total} articles to process (force={force} dry_run={dry_run} claude={use_claude})") if total == 0: print("[analyze] Nothing to do.") db.close() return # ------------------------------------------------------------------ # Phase 1 — Alias screen + coverage spread filter # ------------------------------------------------------------------ print("[analyze] Phase 1: alias screen + coverage filter …") screened: list[tuple] = [] dropped_cov = 0 for row in rows: cov = coverage_spread_score(row) if cov < MIN_COVERAGE_SPREAD: dropped_cov += 1 continue text = f"{row['title']}. {row['description'] or ''}" matches = match_c25(text) if matches: screened.append((row, matches, cov)) print(f"[analyze] {len(screened)}/{total} passed ({dropped_cov} dropped by coverage filter)") if not screened: db.close() return # ------------------------------------------------------------------ # Phase 2 — NER upgrade # ------------------------------------------------------------------ print("[analyze] Phase 2: NER upgrade …") ner = get_ner() texts = [f"{r['title']}. {r['description'] or ''}" for r, _, _ in screened] BATCH = 16 all_ner = [] for i in range(0, len(texts), BATCH): all_ner.extend(ner(texts[i : i + BATCH])) enriched = [ (row, merge_ner_matches(ner_res, base), cov) for (row, base, cov), ner_res in zip(screened, all_ner) ] # ------------------------------------------------------------------ # Phase 3 — Full text + re-screen # ------------------------------------------------------------------ print(f"[analyze] Phase 3: fetching full text for {len(enriched)} articles …") final: list[tuple] = [] for idx, (row, matches, cov) in enumerate(enriched, 1): slug = row["slug"] if idx % 5 == 0 or idx == len(enriched): print(f" {idx}/{len(enriched)}: {slug[:55]}") # RSS artikler har teksten gemt i page_cache som "rss:{slug}" cats = row["categories"] if "categories" in row.keys() else "" if cats and cats.startswith("rss:"): cache_row = db.execute( "SELECT content FROM page_cache WHERE url = ?", (f"rss:{slug}",), ).fetchone() full_text = cache_row["content"] if cache_row else f"{row['title']}. {row['description'] or ''}" else: full_text = fetch_article_text(slug, db) full_matches = match_c25(full_text) for ticker, score in full_matches.items(): if score > matches.get(ticker, 0): matches[ticker] = score if matches: final.append((row, matches, cov, full_text)) print(f"[analyze] {len(final)} articles with confirmed C25 mentions") # ------------------------------------------------------------------ # Phase 4 — FinBERT sentiment (confidence filter) # ------------------------------------------------------------------ print("[analyze] Phase 4: FinBERT sentiment …") finbert = get_finbert() now = int(time.time()) signals_written = 0 alerts_triggered = 0 for row, matches, cov, full_text in final: slug = row["slug"] title = row["title"] try: fb = finbert(" ".join(full_text.split()[:400]))[0] sentiment = fb["label"].lower() sent_score = round(fb["score"], 4) except Exception as e: print(f" [warn] FinBERT: {e}") sentiment, sent_score = "neutral", 0.5 if sentiment == "neutral" and sent_score < FINBERT_MIN_CONF: continue # drop low-confidence neutral noise # ------------------------------------------------------------------ # Phase 5 — Claude extraction # ------------------------------------------------------------------ claude_data: dict = {} if use_claude and not dry_run and os.environ.get("ANTHROPIC_API_KEY"): print(f" [claude] {slug[:50]}") claude_data = claude_extract(title, full_text, list(matches.keys())) # ------------------------------------------------------------------ # Phase 6 — yfinance momentum + scoring # ------------------------------------------------------------------ for ticker, entity_score in matches.items(): company = C25[ticker] full_lower = full_text.lower() mention_count = max(1, sum( len(re.findall( r"(? ALERT_THRESHOLD and sentiment != "neutral" if dry_run: print( f" DRY: {slug[:38]:<38} | {ticker:<8} | " f"{sentiment:<8} {sent_score:.2f} | cov={cov:.2f} | sig={sig_score:.3f}" f"{' ⚡' if alert else ''}" ) else: db.upsert( "article_signals", ["article_slug", "ticker"], [ "article_slug", "ticker", "company_name", "sector", "sentiment", "sentiment_score", "entity_score", "mention_count", "full_text_used", "analyzed_at", "coverage_spread", "claude_tickers", "claude_magnitude", "claude_timeframe", "claude_reasoning", "momentum_dir", "momentum_pct_5d", "signal_score", "alert", ], ( slug, ticker, company["name"], company["sector"], sentiment, float(sent_score), round(float(entity_score), 4), mention_count, 1, now, float(cov), json.dumps(claude_data.get("confirmed_tickers", [])) or None, claude_data.get("magnitude", 5), claude_data.get("timeframe", "days"), claude_data.get("reasoning", ""), momentum.get("direction", "unknown"), float(momentum.get("pct_5d", 0.0)), float(sig_score), int(alert), ), ) signals_written += 1 if alert: alerts_triggered += 1 icon = "↑" if sentiment == "positive" else "↓" print( f" ⚡ ALERT: {icon} {ticker} ({company['name']}) | " f"{sentiment} {sent_score:.2f} | sig={sig_score:.3f} | {slug[:40]}" ) if not dry_run: db.commit() print(f"[analyze] Done. {signals_written} signals written, {alerts_triggered} alerts triggered.") else: print(f"[analyze] Dry-run complete. {len(final)} articles matched.") db.close() # --------------------------------------------------------------------------- # CLI — brug Makefile i stedet for at huske flags # --------------------------------------------------------------------------- def main() -> None: import sys force = "--force" in sys.argv dry_run = "--dry" in sys.argv analyze_articles(force=force, dry_run=dry_run) if __name__ == "__main__": main()