Files
mmd/analyze.py

636 lines
24 KiB
Python
Raw Permalink Normal View History

2026-05-26 22:21:27 +02:00
"""
analyze.py C25 Financial Signal Extractor
Pipeline (v2):
1. Alias screen on title+desc for C25 mentions
2. Coverage-spread filter: skip low-quality / one-sided articles
3. NER upgrade: BERT-NER to confirm and expand matches
4. Full-text fetch + re-screen
5. FinBERT: quick sentiment drop neutral < FINBERT_MIN_CONF
6. Claude: structured extraction (tickers, magnitude, timeframe)
7. yfinance: momentum check direction alignment
8. signal_score = sentiment_confidence × coverage_spread × momentum_alignment
9. Alert if signal_score > ALERT_THRESHOLD
Usage:
python3 analyze.py # analyze new articles only
python3 analyze.py --force # re-analyze everything
python3 analyze.py --limit 20 # limit to 20 articles
python3 analyze.py --dry-run # show matches without storing
python3 analyze.py --no-claude # skip Claude step (no API cost)
"""
import re
import os
import sys
import json
import time
import math
import sqlite3
import logging
import warnings
2026-05-28 13:11:29 +02:00
from datetime import datetime
2026-05-26 22:21:27 +02:00
from pathlib import Path
# Silence transformer noise before importing
os.environ.setdefault("TRANSFORMERS_VERBOSITY", "error")
os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1")
warnings.filterwarnings("ignore")
logging.getLogger("transformers").setLevel(logging.ERROR)
import torch
from transformers import pipeline
from dotenv import load_dotenv
# Load .env (supports both ANTHROPIC_API_KEY and anthropic_api_key)
_env_file = Path(__file__).parent / ".env"
if _env_file.exists():
load_dotenv(_env_file, override=False)
for _k, _v in list(os.environ.items()):
if _k.lower() == "anthropic_api_key" and "ANTHROPIC_API_KEY" not in os.environ:
os.environ["ANTHROPIC_API_KEY"] = _v
# Ground News helpers
sys.path.insert(0, str(Path(__file__).parent))
from ground_news import get_db, fetch_article_text, fetch_all
from rss_feeds import fetch_all_rss
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
C25_PATH = Path(__file__).parent / "c25.json"
_c25_raw = json.loads(C25_PATH.read_text())
C25: dict[str, dict] = {k: v for k, v in _c25_raw.items() if not k.startswith("_")}
# Build alias → ticker lookup (lower-cased for matching)
ALIAS_MAP: dict[str, str] = {}
for _ticker, _data in C25.items():
for _alias in _data["aliases"]:
al = _alias.lower()
if al not in ALIAS_MAP: # first alias wins (most specific first in c25.json)
ALIAS_MAP[al] = _ticker
DEVICE = -1 # always CPU — Quadro P400 (CC 6.1) too old + too little VRAM for these models
# Quality thresholds
MIN_SOURCES = 1 # coverage_spread naturally weights single-source articles near zero
MIN_COVERAGE_SPREAD = 0.0 # disabled: signal_score naturally zeros out single-source articles
FINBERT_MIN_CONF = 0.70 # drop neutral articles below this FinBERT confidence
ALERT_THRESHOLD = 0.35 # signal_score > this → alert
2026-05-28 13:11:29 +02:00
# ---------------------------------------------------------------------------
# Claude metrics
# ---------------------------------------------------------------------------
METRICS_FILE = Path(os.getenv("DATA_DIR", str(Path(__file__).parent / "data"))) / "metrics.json"
2026-05-28 13:11:29 +02:00
# Pricing: Claude 3 Haiku — https://www.anthropic.com/pricing
_PRICE_INPUT_PER_TOKEN = 0.25 / 1_000_000 # $0.25 per MTok
_PRICE_OUTPUT_PER_TOKEN = 1.25 / 1_000_000 # $1.25 per MTok
2026-05-28 13:11:29 +02:00
def calc_cost(input_tokens: int, output_tokens: int) -> float:
return round(input_tokens * _PRICE_INPUT_PER_TOKEN + output_tokens * _PRICE_OUTPUT_PER_TOKEN, 6)
def update_metrics(input_tokens: int, output_tokens: int) -> None:
"""Accumulate Claude token usage into metrics.json."""
cost = calc_cost(input_tokens, output_tokens)
data: dict = {}
if METRICS_FILE.exists():
try:
data = json.loads(METRICS_FILE.read_text())
except Exception:
pass
data["total_calls"] = data.get("total_calls", 0) + 1
data["total_input_tokens"] = data.get("total_input_tokens", 0) + input_tokens
data["total_output_tokens"] = data.get("total_output_tokens", 0) + output_tokens
data["total_cost_usd"] = round(data.get("total_cost_usd", 0.0) + cost, 6)
data["last_updated"] = datetime.now().isoformat(timespec="seconds")
METRICS_FILE.write_text(json.dumps(data, indent=2))
2026-05-26 22:21:27 +02:00
# ---------------------------------------------------------------------------
# Model loading
# ---------------------------------------------------------------------------
_ner_model = None
_finbert_model = None
def get_ner():
global _ner_model
if _ner_model is None:
print("[analyze] Loading dslim/bert-base-NER …", flush=True)
_ner_model = pipeline(
"ner",
model="dslim/bert-base-NER",
aggregation_strategy="simple",
device=DEVICE,
)
return _ner_model
def get_finbert():
global _finbert_model
if _finbert_model is None:
print("[analyze] Loading ProsusAI/finbert …", flush=True)
_finbert_model = pipeline(
"sentiment-analysis",
model="ProsusAI/finbert",
device=DEVICE,
truncation=True,
max_length=512,
)
return _finbert_model
# ---------------------------------------------------------------------------
# C25 alias matching
# ---------------------------------------------------------------------------
def match_c25(text: str) -> dict[str, float]:
"""
Find C25 companies mentioned in text.
Returns {ticker: confidence_score}.
"""
text_lower = text.lower()
matches: dict[str, float] = {}
for alias_lower, ticker in ALIAS_MAP.items():
if ticker in matches:
continue # already found this company
# Always use word boundaries — prevents "sonic" matching "hypersonic",
# "net" matching "internet", "iss" matching "mission", etc.
pat = r"(?<![a-zA-Z0-9])" + re.escape(alias_lower) + r"(?![a-zA-Z0-9])"
if re.search(pat, text_lower):
# Confidence: longer alias match = more reliable
conf = min(0.99, 0.70 + len(alias_lower) * 0.01)
matches[ticker] = conf
return matches
def merge_ner_matches(ner_result: list[dict], base: dict[str, float]) -> dict[str, float]:
"""
Cross-reference NER ORG entities with alias map.
Requires whole-token match to avoid 'EMA' matching 'd-ema-nt'.
"""
merged = dict(base)
for ent in ner_result:
if ent.get("entity_group") not in ("ORG", "PER"):
continue
word_tokens = set(re.split(r"[\s\-_/]+", ent["word"].lower().strip("##")))
for alias_lower, ticker in ALIAS_MAP.items():
if len(alias_lower) < 4:
continue
alias_tokens = set(re.split(r"[\s\-_/]+", alias_lower))
# Need significant token overlap, not just substring containment
overlap = alias_tokens & word_tokens
if overlap and len(overlap) / max(len(alias_tokens), len(word_tokens)) >= 0.5:
score = ent.get("score", 0.7)
if score > merged.get(ticker, 0):
merged[ticker] = score
return merged
# ---------------------------------------------------------------------------
# Coverage spread scoring
# ---------------------------------------------------------------------------
def coverage_spread_score(row) -> float:
"""
Quality score (01) based on source count and bias diversity.
High = many sources from left + right + centre. Low = few or echo chamber.
"""
src = row["source_count"] or 0
left = row["left_src_count"] or 0
right = row["right_src_count"] or 0
ctr = row["ctr_src_count"] or 0
if src < MIN_SOURCES:
return 0.0
quantity = min(1.0, math.log(max(1, src)) / math.log(50))
fl, fr, fc = left / src, right / src, ctr / src
diversity = min(1.0, (fl * fr * fc) ** (1 / 3) * 9) # peaks at equal thirds
return round(quantity * 0.6 + diversity * 0.4, 3)
# ---------------------------------------------------------------------------
# Claude structured extraction
# ---------------------------------------------------------------------------
2026-05-28 13:11:29 +02:00
def claude_extract(title: str, text: str, tickers: list[str]) -> tuple[dict, int, int]:
2026-05-26 22:21:27 +02:00
"""
Use Claude Haiku to extract structured financial signal.
2026-05-28 13:11:29 +02:00
Returns ({"confirmed_tickers", "magnitude", "timeframe", "reasoning"}, input_tokens, output_tokens).
2026-05-26 22:21:27 +02:00
"""
import anthropic
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
2026-05-28 13:11:29 +02:00
return {"confirmed_tickers": tickers, "magnitude": 5, "timeframe": "days", "reasoning": "(no API key)"}, 0, 0
2026-05-26 22:21:27 +02:00
client = anthropic.Anthropic(api_key=api_key)
ticker_ctx = "\n".join(
f" {t}: {C25[t]['name']} ({C25[t]['sector']})" for t in tickers if t in C25
)
prompt = f"""You are a financial analyst specializing in Scandinavian equities.
Analyze this news article and assess its financial impact on the listed Danish C25 companies.
## Companies to analyze:
{ticker_ctx}
## Article:
Title: {title}
{text[:1500]}
Respond ONLY with valid JSON (no markdown fences):
{{
"confirmed_tickers": ["NOVO-B"],
"magnitude": 7,
"timeframe": "days",
"reasoning": "Two sentences max on financial impact and direction."
}}
Fields:
- confirmed_tickers: only companies truly affected (can be [])
- magnitude: 110 (1=irrelevant, 10=major market mover)
- timeframe: "hours", "days", "weeks", or "months"
- reasoning: brief analyst note"""
try:
msg = client.messages.create(
model="claude-3-haiku-20240307",
2026-05-26 22:21:27 +02:00
max_tokens=256,
messages=[{"role": "user", "content": prompt}],
)
raw = msg.content[0].text.strip()
raw = re.sub(r"^```(?:json)?\n?", "", raw)
raw = re.sub(r"\n?```$", "", raw)
2026-05-28 13:11:29 +02:00
return json.loads(raw), msg.usage.input_tokens, msg.usage.output_tokens
2026-05-26 22:21:27 +02:00
except Exception as e:
print(f" [warn] Claude failed: {e}")
2026-05-28 13:11:29 +02:00
return {"confirmed_tickers": tickers, "magnitude": 5, "timeframe": "days", "reasoning": str(e)[:120]}, 0, 0
2026-05-26 22:21:27 +02:00
# ---------------------------------------------------------------------------
# yfinance momentum
# ---------------------------------------------------------------------------
_momentum_cache: dict[str, dict] = {}
def momentum_check(ticker: str) -> dict:
"""5-day price momentum for a C25 ticker via yfinance."""
if ticker in _momentum_cache:
return _momentum_cache[ticker]
import yfinance as yf
company = C25.get(ticker, {})
yahoo_ticker = company.get("yahoo_ticker", ticker + ".CO")
result: dict = {"direction": "unknown", "pct_5d": 0.0, "pct_20d": 0.0}
try:
hist = yf.Ticker(yahoo_ticker).history(period="1mo", auto_adjust=True)
if len(hist) >= 5:
close = hist["Close"]
pct_5d = float((close.iloc[-1] / close.iloc[-5] - 1) * 100)
pct_20d = float((close.iloc[-1] / close.iloc[0] - 1) * 100) if len(hist) >= 20 else 0.0
direction = "up" if pct_5d > 1.5 else ("down" if pct_5d < -1.5 else "flat")
result = {"direction": direction, "pct_5d": round(pct_5d, 2), "pct_20d": round(pct_20d, 2)}
except Exception:
pass
_momentum_cache[ticker] = result
return result
# ---------------------------------------------------------------------------
# Signal score
# ---------------------------------------------------------------------------
def calc_signal_score(sent_score: float, sentiment: str, coverage: float, momentum: dict) -> float:
"""signal_score = sentiment_confidence × coverage_spread × momentum_alignment"""
d = momentum.get("direction", "unknown")
if d == "unknown":
alignment = 0.5
elif d == "flat":
alignment = 0.7
elif (sentiment == "positive" and d == "up") or (sentiment == "negative" and d == "down"):
alignment = 1.0
else:
alignment = 0.4 # contrarian
return round(sent_score * coverage * alignment, 3)
# ---------------------------------------------------------------------------
# DB schema migration
# ---------------------------------------------------------------------------
def migrate_db(db) -> None:
"""Apply schema migrations for SQLite. No-op for Postgres (schema managed by db.py)."""
if hasattr(db, "db_type") and db.db_type == "postgres":
return
existing = {row[1] for row in db.execute("PRAGMA table_info(article_signals)").fetchall()}
new_cols = [
("coverage_spread", "REAL DEFAULT 0"),
("claude_tickers", "TEXT"),
("claude_magnitude", "INTEGER DEFAULT 5"),
("claude_timeframe", "TEXT"),
("claude_reasoning", "TEXT"),
("momentum_dir", "TEXT"),
("momentum_pct_5d", "REAL DEFAULT 0"),
("signal_score", "REAL DEFAULT 0"),
("alert", "INTEGER DEFAULT 0"),
]
for col_name, col_def in new_cols:
if col_name not in existing:
db.execute(f"ALTER TABLE article_signals ADD COLUMN {col_name} {col_def}")
db.commit()
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def analyze_articles(
*,
force: bool = False,
limit: int | None = None,
dry_run: bool = False,
use_claude: bool = True,
auto_fetch: bool = True,
max_claude_calls: int = 50,
2026-05-26 22:21:27 +02:00
) -> None:
db = get_db()
migrate_db(db)
# Auto-refresh articles (respects 30-min cache TTL)
if auto_fetch and not dry_run:
before = db.execute("SELECT COUNT(*) AS cnt FROM articles").fetchone()["cnt"]
print("[analyze] Refreshing Ground News feed …")
fetch_all(db)
print("[analyze] Henter danske RSS feeds …")
fetch_all_rss(db)
after = db.execute("SELECT COUNT(*) AS cnt FROM articles").fetchone()["cnt"]
if after > before:
print(f"[analyze] +{after - before} new articles")
base_q = """
SELECT slug, title, description, source_count,
left_src_count, right_src_count, ctr_src_count,
left_pct, right_pct, ctr_pct
FROM articles {where}
ORDER BY source_count DESC
"""
rows = db.execute(
base_q.format(where="" if force else
"WHERE slug NOT IN (SELECT DISTINCT article_slug FROM article_signals)")
).fetchall()
if limit:
rows = rows[:limit]
total = len(rows)
print(f"[analyze] {total} articles to process (force={force} dry_run={dry_run} claude={use_claude})")
if total == 0:
print("[analyze] Nothing to do.")
db.close()
return
# ------------------------------------------------------------------
# Phase 1 — Alias screen + coverage spread filter
# ------------------------------------------------------------------
print("[analyze] Phase 1: alias screen + coverage filter …")
screened: list[tuple] = []
dropped_cov = 0
for row in rows:
cov = coverage_spread_score(row)
if cov < MIN_COVERAGE_SPREAD:
dropped_cov += 1
continue
text = f"{row['title']}. {row['description'] or ''}"
matches = match_c25(text)
if matches:
screened.append((row, matches, cov))
print(f"[analyze] {len(screened)}/{total} passed ({dropped_cov} dropped by coverage filter)")
if not screened:
db.close()
return
# ------------------------------------------------------------------
# Phase 2 — NER upgrade
# ------------------------------------------------------------------
print("[analyze] Phase 2: NER upgrade …")
ner = get_ner()
texts = [f"{r['title']}. {r['description'] or ''}" for r, _, _ in screened]
BATCH = 16
all_ner = []
for i in range(0, len(texts), BATCH):
all_ner.extend(ner(texts[i : i + BATCH]))
enriched = [
(row, merge_ner_matches(ner_res, base), cov)
for (row, base, cov), ner_res in zip(screened, all_ner)
]
# ------------------------------------------------------------------
# Phase 3 — Full text + re-screen
# ------------------------------------------------------------------
print(f"[analyze] Phase 3: fetching full text for {len(enriched)} articles …")
final: list[tuple] = []
for idx, (row, matches, cov) in enumerate(enriched, 1):
slug = row["slug"]
if idx % 5 == 0 or idx == len(enriched):
print(f" {idx}/{len(enriched)}: {slug[:55]}")
# RSS artikler har teksten gemt i page_cache som "rss:{slug}"
cats = row["categories"] if "categories" in row.keys() else ""
if cats and cats.startswith("rss:"):
cache_row = db.execute(
"SELECT content FROM page_cache WHERE url = ?",
(f"rss:{slug}",),
).fetchone()
full_text = cache_row["content"] if cache_row else f"{row['title']}. {row['description'] or ''}"
else:
full_text = fetch_article_text(slug, db)
full_matches = match_c25(full_text)
for ticker, score in full_matches.items():
if score > matches.get(ticker, 0):
matches[ticker] = score
if matches:
final.append((row, matches, cov, full_text))
print(f"[analyze] {len(final)} articles with confirmed C25 mentions")
# ------------------------------------------------------------------
# Phase 4 — FinBERT sentiment (confidence filter)
# ------------------------------------------------------------------
print("[analyze] Phase 4: FinBERT sentiment …")
finbert = get_finbert()
now = int(time.time())
signals_written = 0
alerts_triggered = 0
claude_calls_this_run = 0
# Daily spend guard — read current metrics before starting
_existing = {}
if METRICS_FILE.exists():
try:
_existing = json.loads(METRICS_FILE.read_text())
except Exception:
pass
DAILY_COST_CAP = float(os.getenv("CLAUDE_DAILY_CAP_USD", "2.0"))
2026-05-26 22:21:27 +02:00
for row, matches, cov, full_text in final:
slug = row["slug"]
title = row["title"]
try:
fb = finbert(" ".join(full_text.split()[:400]))[0]
sentiment = fb["label"].lower()
sent_score = round(fb["score"], 4)
except Exception as e:
print(f" [warn] FinBERT: {e}")
sentiment, sent_score = "neutral", 0.5
if sentiment == "neutral" and sent_score < FINBERT_MIN_CONF:
continue # drop low-confidence neutral noise
# ------------------------------------------------------------------
# Phase 5 — Claude extraction
# ------------------------------------------------------------------
claude_data: dict = {}
if use_claude and not dry_run and os.environ.get("ANTHROPIC_API_KEY"):
if claude_calls_this_run >= max_claude_calls:
print(f" [claude] ⚠️ per-run cap ({max_claude_calls}) reached — skipping remaining articles")
use_claude = False
elif _existing.get("total_cost_usd", 0.0) >= DAILY_COST_CAP:
print(f" [claude] ⚠️ daily spend cap ${DAILY_COST_CAP} reached — skipping Claude for remaining articles")
use_claude = False
else:
print(f" [claude] {slug[:50]}")
claude_data, _in_tok, _out_tok = claude_extract(title, full_text, list(matches.keys()))
if _in_tok:
update_metrics(_in_tok, _out_tok)
_existing["total_cost_usd"] = _existing.get("total_cost_usd", 0.0) + calc_cost(_in_tok, _out_tok)
claude_calls_this_run += 1
2026-05-26 22:21:27 +02:00
# ------------------------------------------------------------------
# Phase 6 — yfinance momentum + scoring
# ------------------------------------------------------------------
for ticker, entity_score in matches.items():
company = C25[ticker]
full_lower = full_text.lower()
mention_count = max(1, sum(
len(re.findall(
r"(?<![a-zA-Z0-9])" + re.escape(a.lower()) + r"(?![a-zA-Z0-9])",
full_lower,
))
for a in company["aliases"]
))
momentum = momentum_check(ticker) if not dry_run else {}
sig_score = calc_signal_score(sent_score, sentiment, cov, momentum)
alert = sig_score > ALERT_THRESHOLD and sentiment != "neutral"
if dry_run:
print(
f" DRY: {slug[:38]:<38} | {ticker:<8} | "
f"{sentiment:<8} {sent_score:.2f} | cov={cov:.2f} | sig={sig_score:.3f}"
f"{'' if alert else ''}"
)
else:
db.upsert(
"article_signals",
["article_slug", "ticker"],
[
"article_slug", "ticker", "company_name", "sector",
"sentiment", "sentiment_score", "entity_score",
"mention_count", "full_text_used", "analyzed_at",
"coverage_spread", "claude_tickers", "claude_magnitude",
"claude_timeframe", "claude_reasoning",
"momentum_dir", "momentum_pct_5d", "signal_score", "alert",
],
(
slug, ticker, company["name"], company["sector"],
sentiment, float(sent_score), round(float(entity_score), 4),
mention_count, 1, now,
float(cov),
json.dumps(claude_data.get("confirmed_tickers", [])) or None,
claude_data.get("magnitude", 5),
claude_data.get("timeframe", "days"),
claude_data.get("reasoning", ""),
momentum.get("direction", "unknown"),
float(momentum.get("pct_5d", 0.0)),
float(sig_score),
int(alert),
),
)
signals_written += 1
if alert:
alerts_triggered += 1
icon = "" if sentiment == "positive" else ""
print(
f" ⚡ ALERT: {icon} {ticker} ({company['name']}) | "
f"{sentiment} {sent_score:.2f} | sig={sig_score:.3f} | {slug[:40]}"
)
if not dry_run:
db.commit()
print(f"[analyze] Done. {signals_written} signals written, {alerts_triggered} alerts triggered.")
else:
print(f"[analyze] Dry-run complete. {len(final)} articles matched.")
db.close()
# ---------------------------------------------------------------------------
# CLI — brug Makefile i stedet for at huske flags
# ---------------------------------------------------------------------------
def main() -> None:
import sys
force = "--force" in sys.argv
dry_run = "--dry-run" in sys.argv or "--dry" in sys.argv
no_claude = "--no-claude" in sys.argv
# --max-claude=N override per-run cap
max_calls = 50
for arg in sys.argv:
if arg.startswith("--max-claude="):
try:
max_calls = int(arg.split("=", 1)[1])
except ValueError:
pass
if force:
print(f"[analyze] ⚠️ --force mode: re-analyzing ALL articles (claude cap={max_calls})")
analyze_articles(
force=force,
dry_run=dry_run,
use_claude=not no_claude,
max_claude_calls=max_calls,
)
2026-05-26 22:21:27 +02:00
if __name__ == "__main__":
main()