""" analyze.py — C25 Financial Signal Extractor Pipeline (v2): 1. Alias screen on title+desc for C25 mentions 2. Coverage-spread filter: skip low-quality / one-sided articles 3. NER upgrade: BERT-NER to confirm and expand matches 4. Full-text fetch + re-screen 5. FinBERT: quick sentiment — drop neutral < FINBERT_MIN_CONF 6. Claude: structured extraction (tickers, magnitude, timeframe) 7. yfinance: momentum check — direction alignment 8. signal_score = sentiment_confidence × coverage_spread × momentum_alignment 9. Alert if signal_score > ALERT_THRESHOLD Usage: python3 analyze.py # analyze new articles only python3 analyze.py --force # re-analyze everything python3 analyze.py --limit 20 # limit to 20 articles python3 analyze.py --dry-run # show matches without storing python3 analyze.py --no-claude # skip Claude step (no API cost) """ import re import os import sys import json import time import math import sqlite3 import logging import warnings from datetime import datetime from pathlib import Path # Silence transformer noise before importing os.environ.setdefault("TRANSFORMERS_VERBOSITY", "error") os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1") warnings.filterwarnings("ignore") logging.getLogger("transformers").setLevel(logging.ERROR) import torch from transformers import pipeline from dotenv import load_dotenv # Load .env (supports both ANTHROPIC_API_KEY and anthropic_api_key) _env_file = Path(__file__).parent / ".env" if _env_file.exists(): load_dotenv(_env_file, override=False) for _k, _v in list(os.environ.items()): if _k.lower() == "anthropic_api_key" and "ANTHROPIC_API_KEY" not in os.environ: os.environ["ANTHROPIC_API_KEY"] = _v # Ground News helpers sys.path.insert(0, str(Path(__file__).parent)) from ground_news import get_db, fetch_article_text, fetch_all from rss_feeds import fetch_all_rss # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- C25_PATH = Path(__file__).parent / "c25.json" _c25_raw = json.loads(C25_PATH.read_text()) C25: dict[str, dict] = {k: v for k, v in _c25_raw.items() if not k.startswith("_")} # Build alias → ticker lookup (lower-cased for matching) ALIAS_MAP: dict[str, str] = {} for _ticker, _data in C25.items(): for _alias in _data["aliases"]: al = _alias.lower() if al not in ALIAS_MAP: # first alias wins (most specific first in c25.json) ALIAS_MAP[al] = _ticker DEVICE = -1 # always CPU — Quadro P400 (CC 6.1) too old + too little VRAM for these models # Quality thresholds MIN_SOURCES = 1 # coverage_spread naturally weights single-source articles near zero MIN_COVERAGE_SPREAD = 0.0 # disabled: signal_score naturally zeros out single-source articles FINBERT_MIN_CONF = 0.70 # drop neutral articles below this FinBERT confidence ALERT_THRESHOLD = 0.35 # signal_score > this → alert # --------------------------------------------------------------------------- # Claude metrics # --------------------------------------------------------------------------- METRICS_FILE = Path(__file__).parent / "metrics.json" # Pricing: Claude 3 Haiku — https://www.anthropic.com/pricing _PRICE_INPUT_PER_TOKEN = 0.25 / 1_000_000 # $0.25 per MTok _PRICE_OUTPUT_PER_TOKEN = 1.25 / 1_000_000 # $1.25 per MTok def calc_cost(input_tokens: int, output_tokens: int) -> float: return round(input_tokens * _PRICE_INPUT_PER_TOKEN + output_tokens * _PRICE_OUTPUT_PER_TOKEN, 6) def update_metrics(input_tokens: int, output_tokens: int) -> None: """Accumulate Claude token usage into metrics.json.""" cost = calc_cost(input_tokens, output_tokens) data: dict = {} if METRICS_FILE.exists(): try: data = json.loads(METRICS_FILE.read_text()) except Exception: pass data["total_calls"] = data.get("total_calls", 0) + 1 data["total_input_tokens"] = data.get("total_input_tokens", 0) + input_tokens data["total_output_tokens"] = data.get("total_output_tokens", 0) + output_tokens data["total_cost_usd"] = round(data.get("total_cost_usd", 0.0) + cost, 6) data["last_updated"] = datetime.now().isoformat(timespec="seconds") METRICS_FILE.write_text(json.dumps(data, indent=2)) # --------------------------------------------------------------------------- # Model loading # --------------------------------------------------------------------------- _ner_model = None _finbert_model = None def get_ner(): global _ner_model if _ner_model is None: print("[analyze] Loading dslim/bert-base-NER …", flush=True) _ner_model = pipeline( "ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=DEVICE, ) return _ner_model def get_finbert(): global _finbert_model if _finbert_model is None: print("[analyze] Loading ProsusAI/finbert …", flush=True) _finbert_model = pipeline( "sentiment-analysis", model="ProsusAI/finbert", device=DEVICE, truncation=True, max_length=512, ) return _finbert_model # --------------------------------------------------------------------------- # C25 alias matching # --------------------------------------------------------------------------- def match_c25(text: str) -> dict[str, float]: """ Find C25 companies mentioned in text. Returns {ticker: confidence_score}. """ text_lower = text.lower() matches: dict[str, float] = {} for alias_lower, ticker in ALIAS_MAP.items(): if ticker in matches: continue # already found this company # Always use word boundaries — prevents "sonic" matching "hypersonic", # "net" matching "internet", "iss" matching "mission", etc. pat = r"(? dict[str, float]: """ Cross-reference NER ORG entities with alias map. Requires whole-token match to avoid 'EMA' matching 'd-ema-nt'. """ merged = dict(base) for ent in ner_result: if ent.get("entity_group") not in ("ORG", "PER"): continue word_tokens = set(re.split(r"[\s\-_/]+", ent["word"].lower().strip("##"))) for alias_lower, ticker in ALIAS_MAP.items(): if len(alias_lower) < 4: continue alias_tokens = set(re.split(r"[\s\-_/]+", alias_lower)) # Need significant token overlap, not just substring containment overlap = alias_tokens & word_tokens if overlap and len(overlap) / max(len(alias_tokens), len(word_tokens)) >= 0.5: score = ent.get("score", 0.7) if score > merged.get(ticker, 0): merged[ticker] = score return merged # --------------------------------------------------------------------------- # Coverage spread scoring # --------------------------------------------------------------------------- def coverage_spread_score(row) -> float: """ Quality score (0–1) based on source count and bias diversity. High = many sources from left + right + centre. Low = few or echo chamber. """ src = row["source_count"] or 0 left = row["left_src_count"] or 0 right = row["right_src_count"] or 0 ctr = row["ctr_src_count"] or 0 if src < MIN_SOURCES: return 0.0 quantity = min(1.0, math.log(max(1, src)) / math.log(50)) fl, fr, fc = left / src, right / src, ctr / src diversity = min(1.0, (fl * fr * fc) ** (1 / 3) * 9) # peaks at equal thirds return round(quantity * 0.6 + diversity * 0.4, 3) # --------------------------------------------------------------------------- # Claude structured extraction # --------------------------------------------------------------------------- def claude_extract(title: str, text: str, tickers: list[str]) -> tuple[dict, int, int]: """ Use Claude Haiku to extract structured financial signal. Returns ({"confirmed_tickers", "magnitude", "timeframe", "reasoning"}, input_tokens, output_tokens). """ import anthropic api_key = os.environ.get("ANTHROPIC_API_KEY") if not api_key: return {"confirmed_tickers": tickers, "magnitude": 5, "timeframe": "days", "reasoning": "(no API key)"}, 0, 0 client = anthropic.Anthropic(api_key=api_key) ticker_ctx = "\n".join( f" {t}: {C25[t]['name']} ({C25[t]['sector']})" for t in tickers if t in C25 ) prompt = f"""You are a financial analyst specializing in Scandinavian equities. Analyze this news article and assess its financial impact on the listed Danish C25 companies. ## Companies to analyze: {ticker_ctx} ## Article: Title: {title} {text[:1500]} Respond ONLY with valid JSON (no markdown fences): {{ "confirmed_tickers": ["NOVO-B"], "magnitude": 7, "timeframe": "days", "reasoning": "Two sentences max on financial impact and direction." }} Fields: - confirmed_tickers: only companies truly affected (can be []) - magnitude: 1–10 (1=irrelevant, 10=major market mover) - timeframe: "hours", "days", "weeks", or "months" - reasoning: brief analyst note""" try: msg = client.messages.create( model="claude-3-haiku-20240307", max_tokens=256, messages=[{"role": "user", "content": prompt}], ) raw = msg.content[0].text.strip() raw = re.sub(r"^```(?:json)?\n?", "", raw) raw = re.sub(r"\n?```$", "", raw) return json.loads(raw), msg.usage.input_tokens, msg.usage.output_tokens except Exception as e: print(f" [warn] Claude failed: {e}") return {"confirmed_tickers": tickers, "magnitude": 5, "timeframe": "days", "reasoning": str(e)[:120]}, 0, 0 # --------------------------------------------------------------------------- # yfinance momentum # --------------------------------------------------------------------------- _momentum_cache: dict[str, dict] = {} def momentum_check(ticker: str) -> dict: """5-day price momentum for a C25 ticker via yfinance.""" if ticker in _momentum_cache: return _momentum_cache[ticker] import yfinance as yf company = C25.get(ticker, {}) yahoo_ticker = company.get("yahoo_ticker", ticker + ".CO") result: dict = {"direction": "unknown", "pct_5d": 0.0, "pct_20d": 0.0} try: hist = yf.Ticker(yahoo_ticker).history(period="1mo", auto_adjust=True) if len(hist) >= 5: close = hist["Close"] pct_5d = float((close.iloc[-1] / close.iloc[-5] - 1) * 100) pct_20d = float((close.iloc[-1] / close.iloc[0] - 1) * 100) if len(hist) >= 20 else 0.0 direction = "up" if pct_5d > 1.5 else ("down" if pct_5d < -1.5 else "flat") result = {"direction": direction, "pct_5d": round(pct_5d, 2), "pct_20d": round(pct_20d, 2)} except Exception: pass _momentum_cache[ticker] = result return result # --------------------------------------------------------------------------- # Signal score # --------------------------------------------------------------------------- def calc_signal_score(sent_score: float, sentiment: str, coverage: float, momentum: dict) -> float: """signal_score = sentiment_confidence × coverage_spread × momentum_alignment""" d = momentum.get("direction", "unknown") if d == "unknown": alignment = 0.5 elif d == "flat": alignment = 0.7 elif (sentiment == "positive" and d == "up") or (sentiment == "negative" and d == "down"): alignment = 1.0 else: alignment = 0.4 # contrarian return round(sent_score * coverage * alignment, 3) # --------------------------------------------------------------------------- # DB schema migration # --------------------------------------------------------------------------- def migrate_db(db) -> None: """Apply schema migrations for SQLite. No-op for Postgres (schema managed by db.py).""" if hasattr(db, "db_type") and db.db_type == "postgres": return existing = {row[1] for row in db.execute("PRAGMA table_info(article_signals)").fetchall()} new_cols = [ ("coverage_spread", "REAL DEFAULT 0"), ("claude_tickers", "TEXT"), ("claude_magnitude", "INTEGER DEFAULT 5"), ("claude_timeframe", "TEXT"), ("claude_reasoning", "TEXT"), ("momentum_dir", "TEXT"), ("momentum_pct_5d", "REAL DEFAULT 0"), ("signal_score", "REAL DEFAULT 0"), ("alert", "INTEGER DEFAULT 0"), ] for col_name, col_def in new_cols: if col_name not in existing: db.execute(f"ALTER TABLE article_signals ADD COLUMN {col_name} {col_def}") db.commit() # --------------------------------------------------------------------------- # Main pipeline # --------------------------------------------------------------------------- def analyze_articles( *, force: bool = False, limit: int | None = None, dry_run: bool = False, use_claude: bool = True, auto_fetch: bool = True, max_claude_calls: int = 50, ) -> None: db = get_db() migrate_db(db) # Auto-refresh articles (respects 30-min cache TTL) if auto_fetch and not dry_run: before = db.execute("SELECT COUNT(*) AS cnt FROM articles").fetchone()["cnt"] print("[analyze] Refreshing Ground News feed …") fetch_all(db) print("[analyze] Henter danske RSS feeds …") fetch_all_rss(db) after = db.execute("SELECT COUNT(*) AS cnt FROM articles").fetchone()["cnt"] if after > before: print(f"[analyze] +{after - before} new articles") base_q = """ SELECT slug, title, description, source_count, left_src_count, right_src_count, ctr_src_count, left_pct, right_pct, ctr_pct FROM articles {where} ORDER BY source_count DESC """ rows = db.execute( base_q.format(where="" if force else "WHERE slug NOT IN (SELECT DISTINCT article_slug FROM article_signals)") ).fetchall() if limit: rows = rows[:limit] total = len(rows) print(f"[analyze] {total} articles to process (force={force} dry_run={dry_run} claude={use_claude})") if total == 0: print("[analyze] Nothing to do.") db.close() return # ------------------------------------------------------------------ # Phase 1 — Alias screen + coverage spread filter # ------------------------------------------------------------------ print("[analyze] Phase 1: alias screen + coverage filter …") screened: list[tuple] = [] dropped_cov = 0 for row in rows: cov = coverage_spread_score(row) if cov < MIN_COVERAGE_SPREAD: dropped_cov += 1 continue text = f"{row['title']}. {row['description'] or ''}" matches = match_c25(text) if matches: screened.append((row, matches, cov)) print(f"[analyze] {len(screened)}/{total} passed ({dropped_cov} dropped by coverage filter)") if not screened: db.close() return # ------------------------------------------------------------------ # Phase 2 — NER upgrade # ------------------------------------------------------------------ print("[analyze] Phase 2: NER upgrade …") ner = get_ner() texts = [f"{r['title']}. {r['description'] or ''}" for r, _, _ in screened] BATCH = 16 all_ner = [] for i in range(0, len(texts), BATCH): all_ner.extend(ner(texts[i : i + BATCH])) enriched = [ (row, merge_ner_matches(ner_res, base), cov) for (row, base, cov), ner_res in zip(screened, all_ner) ] # ------------------------------------------------------------------ # Phase 3 — Full text + re-screen # ------------------------------------------------------------------ print(f"[analyze] Phase 3: fetching full text for {len(enriched)} articles …") final: list[tuple] = [] for idx, (row, matches, cov) in enumerate(enriched, 1): slug = row["slug"] if idx % 5 == 0 or idx == len(enriched): print(f" {idx}/{len(enriched)}: {slug[:55]}") # RSS artikler har teksten gemt i page_cache som "rss:{slug}" cats = row["categories"] if "categories" in row.keys() else "" if cats and cats.startswith("rss:"): cache_row = db.execute( "SELECT content FROM page_cache WHERE url = ?", (f"rss:{slug}",), ).fetchone() full_text = cache_row["content"] if cache_row else f"{row['title']}. {row['description'] or ''}" else: full_text = fetch_article_text(slug, db) full_matches = match_c25(full_text) for ticker, score in full_matches.items(): if score > matches.get(ticker, 0): matches[ticker] = score if matches: final.append((row, matches, cov, full_text)) print(f"[analyze] {len(final)} articles with confirmed C25 mentions") # ------------------------------------------------------------------ # Phase 4 — FinBERT sentiment (confidence filter) # ------------------------------------------------------------------ print("[analyze] Phase 4: FinBERT sentiment …") finbert = get_finbert() now = int(time.time()) signals_written = 0 alerts_triggered = 0 claude_calls_this_run = 0 # Daily spend guard — read current metrics before starting _existing = {} if METRICS_FILE.exists(): try: _existing = json.loads(METRICS_FILE.read_text()) except Exception: pass DAILY_COST_CAP = float(os.getenv("CLAUDE_DAILY_CAP_USD", "2.0")) for row, matches, cov, full_text in final: slug = row["slug"] title = row["title"] try: fb = finbert(" ".join(full_text.split()[:400]))[0] sentiment = fb["label"].lower() sent_score = round(fb["score"], 4) except Exception as e: print(f" [warn] FinBERT: {e}") sentiment, sent_score = "neutral", 0.5 if sentiment == "neutral" and sent_score < FINBERT_MIN_CONF: continue # drop low-confidence neutral noise # ------------------------------------------------------------------ # Phase 5 — Claude extraction # ------------------------------------------------------------------ claude_data: dict = {} if use_claude and not dry_run and os.environ.get("ANTHROPIC_API_KEY"): if claude_calls_this_run >= max_claude_calls: print(f" [claude] ⚠️ per-run cap ({max_claude_calls}) reached — skipping remaining articles") use_claude = False elif _existing.get("total_cost_usd", 0.0) >= DAILY_COST_CAP: print(f" [claude] ⚠️ daily spend cap ${DAILY_COST_CAP} reached — skipping Claude for remaining articles") use_claude = False else: print(f" [claude] {slug[:50]}") claude_data, _in_tok, _out_tok = claude_extract(title, full_text, list(matches.keys())) if _in_tok: update_metrics(_in_tok, _out_tok) _existing["total_cost_usd"] = _existing.get("total_cost_usd", 0.0) + calc_cost(_in_tok, _out_tok) claude_calls_this_run += 1 # ------------------------------------------------------------------ # Phase 6 — yfinance momentum + scoring # ------------------------------------------------------------------ for ticker, entity_score in matches.items(): company = C25[ticker] full_lower = full_text.lower() mention_count = max(1, sum( len(re.findall( r"(? ALERT_THRESHOLD and sentiment != "neutral" if dry_run: print( f" DRY: {slug[:38]:<38} | {ticker:<8} | " f"{sentiment:<8} {sent_score:.2f} | cov={cov:.2f} | sig={sig_score:.3f}" f"{' ⚡' if alert else ''}" ) else: db.upsert( "article_signals", ["article_slug", "ticker"], [ "article_slug", "ticker", "company_name", "sector", "sentiment", "sentiment_score", "entity_score", "mention_count", "full_text_used", "analyzed_at", "coverage_spread", "claude_tickers", "claude_magnitude", "claude_timeframe", "claude_reasoning", "momentum_dir", "momentum_pct_5d", "signal_score", "alert", ], ( slug, ticker, company["name"], company["sector"], sentiment, float(sent_score), round(float(entity_score), 4), mention_count, 1, now, float(cov), json.dumps(claude_data.get("confirmed_tickers", [])) or None, claude_data.get("magnitude", 5), claude_data.get("timeframe", "days"), claude_data.get("reasoning", ""), momentum.get("direction", "unknown"), float(momentum.get("pct_5d", 0.0)), float(sig_score), int(alert), ), ) signals_written += 1 if alert: alerts_triggered += 1 icon = "↑" if sentiment == "positive" else "↓" print( f" ⚡ ALERT: {icon} {ticker} ({company['name']}) | " f"{sentiment} {sent_score:.2f} | sig={sig_score:.3f} | {slug[:40]}" ) if not dry_run: db.commit() print(f"[analyze] Done. {signals_written} signals written, {alerts_triggered} alerts triggered.") else: print(f"[analyze] Dry-run complete. {len(final)} articles matched.") db.close() # --------------------------------------------------------------------------- # CLI — brug Makefile i stedet for at huske flags # --------------------------------------------------------------------------- def main() -> None: import sys force = "--force" in sys.argv dry_run = "--dry-run" in sys.argv or "--dry" in sys.argv no_claude = "--no-claude" in sys.argv # --max-claude=N override per-run cap max_calls = 50 for arg in sys.argv: if arg.startswith("--max-claude="): try: max_calls = int(arg.split("=", 1)[1]) except ValueError: pass if force: print(f"[analyze] ⚠️ --force mode: re-analyzing ALL articles (claude cap={max_calls})") analyze_articles( force=force, dry_run=dry_run, use_claude=not no_claude, max_claude_calls=max_calls, ) if __name__ == "__main__": main()