Files
mmd/analyze.py
Henrik Jess Nielsen 026b470b31
All checks were successful
Build and Deploy MoneyMaker / build-and-deploy (push) Successful in 12m21s
Fix metrics.json path to persistent data volume (/app/data)
2026-05-28 13:42:36 +02:00

636 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
analyze.py — C25 Financial Signal Extractor
Pipeline (v2):
1. Alias screen on title+desc for C25 mentions
2. Coverage-spread filter: skip low-quality / one-sided articles
3. NER upgrade: BERT-NER to confirm and expand matches
4. Full-text fetch + re-screen
5. FinBERT: quick sentiment — drop neutral < FINBERT_MIN_CONF
6. Claude: structured extraction (tickers, magnitude, timeframe)
7. yfinance: momentum check — direction alignment
8. signal_score = sentiment_confidence × coverage_spread × momentum_alignment
9. Alert if signal_score > ALERT_THRESHOLD
Usage:
python3 analyze.py # analyze new articles only
python3 analyze.py --force # re-analyze everything
python3 analyze.py --limit 20 # limit to 20 articles
python3 analyze.py --dry-run # show matches without storing
python3 analyze.py --no-claude # skip Claude step (no API cost)
"""
import re
import os
import sys
import json
import time
import math
import sqlite3
import logging
import warnings
from datetime import datetime
from pathlib import Path
# Silence transformer noise before importing
os.environ.setdefault("TRANSFORMERS_VERBOSITY", "error")
os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1")
warnings.filterwarnings("ignore")
logging.getLogger("transformers").setLevel(logging.ERROR)
import torch
from transformers import pipeline
from dotenv import load_dotenv
# Load .env (supports both ANTHROPIC_API_KEY and anthropic_api_key)
_env_file = Path(__file__).parent / ".env"
if _env_file.exists():
load_dotenv(_env_file, override=False)
for _k, _v in list(os.environ.items()):
if _k.lower() == "anthropic_api_key" and "ANTHROPIC_API_KEY" not in os.environ:
os.environ["ANTHROPIC_API_KEY"] = _v
# Ground News helpers
sys.path.insert(0, str(Path(__file__).parent))
from ground_news import get_db, fetch_article_text, fetch_all
from rss_feeds import fetch_all_rss
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
C25_PATH = Path(__file__).parent / "c25.json"
_c25_raw = json.loads(C25_PATH.read_text())
C25: dict[str, dict] = {k: v for k, v in _c25_raw.items() if not k.startswith("_")}
# Build alias → ticker lookup (lower-cased for matching)
ALIAS_MAP: dict[str, str] = {}
for _ticker, _data in C25.items():
for _alias in _data["aliases"]:
al = _alias.lower()
if al not in ALIAS_MAP: # first alias wins (most specific first in c25.json)
ALIAS_MAP[al] = _ticker
DEVICE = -1 # always CPU — Quadro P400 (CC 6.1) too old + too little VRAM for these models
# Quality thresholds
MIN_SOURCES = 1 # coverage_spread naturally weights single-source articles near zero
MIN_COVERAGE_SPREAD = 0.0 # disabled: signal_score naturally zeros out single-source articles
FINBERT_MIN_CONF = 0.70 # drop neutral articles below this FinBERT confidence
ALERT_THRESHOLD = 0.35 # signal_score > this → alert
# ---------------------------------------------------------------------------
# Claude metrics
# ---------------------------------------------------------------------------
METRICS_FILE = Path(os.getenv("DATA_DIR", str(Path(__file__).parent / "data"))) / "metrics.json"
# Pricing: Claude 3 Haiku — https://www.anthropic.com/pricing
_PRICE_INPUT_PER_TOKEN = 0.25 / 1_000_000 # $0.25 per MTok
_PRICE_OUTPUT_PER_TOKEN = 1.25 / 1_000_000 # $1.25 per MTok
def calc_cost(input_tokens: int, output_tokens: int) -> float:
return round(input_tokens * _PRICE_INPUT_PER_TOKEN + output_tokens * _PRICE_OUTPUT_PER_TOKEN, 6)
def update_metrics(input_tokens: int, output_tokens: int) -> None:
"""Accumulate Claude token usage into metrics.json."""
cost = calc_cost(input_tokens, output_tokens)
data: dict = {}
if METRICS_FILE.exists():
try:
data = json.loads(METRICS_FILE.read_text())
except Exception:
pass
data["total_calls"] = data.get("total_calls", 0) + 1
data["total_input_tokens"] = data.get("total_input_tokens", 0) + input_tokens
data["total_output_tokens"] = data.get("total_output_tokens", 0) + output_tokens
data["total_cost_usd"] = round(data.get("total_cost_usd", 0.0) + cost, 6)
data["last_updated"] = datetime.now().isoformat(timespec="seconds")
METRICS_FILE.write_text(json.dumps(data, indent=2))
# ---------------------------------------------------------------------------
# Model loading
# ---------------------------------------------------------------------------
_ner_model = None
_finbert_model = None
def get_ner():
global _ner_model
if _ner_model is None:
print("[analyze] Loading dslim/bert-base-NER …", flush=True)
_ner_model = pipeline(
"ner",
model="dslim/bert-base-NER",
aggregation_strategy="simple",
device=DEVICE,
)
return _ner_model
def get_finbert():
global _finbert_model
if _finbert_model is None:
print("[analyze] Loading ProsusAI/finbert …", flush=True)
_finbert_model = pipeline(
"sentiment-analysis",
model="ProsusAI/finbert",
device=DEVICE,
truncation=True,
max_length=512,
)
return _finbert_model
# ---------------------------------------------------------------------------
# C25 alias matching
# ---------------------------------------------------------------------------
def match_c25(text: str) -> dict[str, float]:
"""
Find C25 companies mentioned in text.
Returns {ticker: confidence_score}.
"""
text_lower = text.lower()
matches: dict[str, float] = {}
for alias_lower, ticker in ALIAS_MAP.items():
if ticker in matches:
continue # already found this company
# Always use word boundaries — prevents "sonic" matching "hypersonic",
# "net" matching "internet", "iss" matching "mission", etc.
pat = r"(?<![a-zA-Z0-9])" + re.escape(alias_lower) + r"(?![a-zA-Z0-9])"
if re.search(pat, text_lower):
# Confidence: longer alias match = more reliable
conf = min(0.99, 0.70 + len(alias_lower) * 0.01)
matches[ticker] = conf
return matches
def merge_ner_matches(ner_result: list[dict], base: dict[str, float]) -> dict[str, float]:
"""
Cross-reference NER ORG entities with alias map.
Requires whole-token match to avoid 'EMA' matching 'd-ema-nt'.
"""
merged = dict(base)
for ent in ner_result:
if ent.get("entity_group") not in ("ORG", "PER"):
continue
word_tokens = set(re.split(r"[\s\-_/]+", ent["word"].lower().strip("##")))
for alias_lower, ticker in ALIAS_MAP.items():
if len(alias_lower) < 4:
continue
alias_tokens = set(re.split(r"[\s\-_/]+", alias_lower))
# Need significant token overlap, not just substring containment
overlap = alias_tokens & word_tokens
if overlap and len(overlap) / max(len(alias_tokens), len(word_tokens)) >= 0.5:
score = ent.get("score", 0.7)
if score > merged.get(ticker, 0):
merged[ticker] = score
return merged
# ---------------------------------------------------------------------------
# Coverage spread scoring
# ---------------------------------------------------------------------------
def coverage_spread_score(row) -> float:
"""
Quality score (01) based on source count and bias diversity.
High = many sources from left + right + centre. Low = few or echo chamber.
"""
src = row["source_count"] or 0
left = row["left_src_count"] or 0
right = row["right_src_count"] or 0
ctr = row["ctr_src_count"] or 0
if src < MIN_SOURCES:
return 0.0
quantity = min(1.0, math.log(max(1, src)) / math.log(50))
fl, fr, fc = left / src, right / src, ctr / src
diversity = min(1.0, (fl * fr * fc) ** (1 / 3) * 9) # peaks at equal thirds
return round(quantity * 0.6 + diversity * 0.4, 3)
# ---------------------------------------------------------------------------
# Claude structured extraction
# ---------------------------------------------------------------------------
def claude_extract(title: str, text: str, tickers: list[str]) -> tuple[dict, int, int]:
"""
Use Claude Haiku to extract structured financial signal.
Returns ({"confirmed_tickers", "magnitude", "timeframe", "reasoning"}, input_tokens, output_tokens).
"""
import anthropic
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
return {"confirmed_tickers": tickers, "magnitude": 5, "timeframe": "days", "reasoning": "(no API key)"}, 0, 0
client = anthropic.Anthropic(api_key=api_key)
ticker_ctx = "\n".join(
f" {t}: {C25[t]['name']} ({C25[t]['sector']})" for t in tickers if t in C25
)
prompt = f"""You are a financial analyst specializing in Scandinavian equities.
Analyze this news article and assess its financial impact on the listed Danish C25 companies.
## Companies to analyze:
{ticker_ctx}
## Article:
Title: {title}
{text[:1500]}
Respond ONLY with valid JSON (no markdown fences):
{{
"confirmed_tickers": ["NOVO-B"],
"magnitude": 7,
"timeframe": "days",
"reasoning": "Two sentences max on financial impact and direction."
}}
Fields:
- confirmed_tickers: only companies truly affected (can be [])
- magnitude: 110 (1=irrelevant, 10=major market mover)
- timeframe: "hours", "days", "weeks", or "months"
- reasoning: brief analyst note"""
try:
msg = client.messages.create(
model="claude-3-haiku-20240307",
max_tokens=256,
messages=[{"role": "user", "content": prompt}],
)
raw = msg.content[0].text.strip()
raw = re.sub(r"^```(?:json)?\n?", "", raw)
raw = re.sub(r"\n?```$", "", raw)
return json.loads(raw), msg.usage.input_tokens, msg.usage.output_tokens
except Exception as e:
print(f" [warn] Claude failed: {e}")
return {"confirmed_tickers": tickers, "magnitude": 5, "timeframe": "days", "reasoning": str(e)[:120]}, 0, 0
# ---------------------------------------------------------------------------
# yfinance momentum
# ---------------------------------------------------------------------------
_momentum_cache: dict[str, dict] = {}
def momentum_check(ticker: str) -> dict:
"""5-day price momentum for a C25 ticker via yfinance."""
if ticker in _momentum_cache:
return _momentum_cache[ticker]
import yfinance as yf
company = C25.get(ticker, {})
yahoo_ticker = company.get("yahoo_ticker", ticker + ".CO")
result: dict = {"direction": "unknown", "pct_5d": 0.0, "pct_20d": 0.0}
try:
hist = yf.Ticker(yahoo_ticker).history(period="1mo", auto_adjust=True)
if len(hist) >= 5:
close = hist["Close"]
pct_5d = float((close.iloc[-1] / close.iloc[-5] - 1) * 100)
pct_20d = float((close.iloc[-1] / close.iloc[0] - 1) * 100) if len(hist) >= 20 else 0.0
direction = "up" if pct_5d > 1.5 else ("down" if pct_5d < -1.5 else "flat")
result = {"direction": direction, "pct_5d": round(pct_5d, 2), "pct_20d": round(pct_20d, 2)}
except Exception:
pass
_momentum_cache[ticker] = result
return result
# ---------------------------------------------------------------------------
# Signal score
# ---------------------------------------------------------------------------
def calc_signal_score(sent_score: float, sentiment: str, coverage: float, momentum: dict) -> float:
"""signal_score = sentiment_confidence × coverage_spread × momentum_alignment"""
d = momentum.get("direction", "unknown")
if d == "unknown":
alignment = 0.5
elif d == "flat":
alignment = 0.7
elif (sentiment == "positive" and d == "up") or (sentiment == "negative" and d == "down"):
alignment = 1.0
else:
alignment = 0.4 # contrarian
return round(sent_score * coverage * alignment, 3)
# ---------------------------------------------------------------------------
# DB schema migration
# ---------------------------------------------------------------------------
def migrate_db(db) -> None:
"""Apply schema migrations for SQLite. No-op for Postgres (schema managed by db.py)."""
if hasattr(db, "db_type") and db.db_type == "postgres":
return
existing = {row[1] for row in db.execute("PRAGMA table_info(article_signals)").fetchall()}
new_cols = [
("coverage_spread", "REAL DEFAULT 0"),
("claude_tickers", "TEXT"),
("claude_magnitude", "INTEGER DEFAULT 5"),
("claude_timeframe", "TEXT"),
("claude_reasoning", "TEXT"),
("momentum_dir", "TEXT"),
("momentum_pct_5d", "REAL DEFAULT 0"),
("signal_score", "REAL DEFAULT 0"),
("alert", "INTEGER DEFAULT 0"),
]
for col_name, col_def in new_cols:
if col_name not in existing:
db.execute(f"ALTER TABLE article_signals ADD COLUMN {col_name} {col_def}")
db.commit()
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def analyze_articles(
*,
force: bool = False,
limit: int | None = None,
dry_run: bool = False,
use_claude: bool = True,
auto_fetch: bool = True,
max_claude_calls: int = 50,
) -> None:
db = get_db()
migrate_db(db)
# Auto-refresh articles (respects 30-min cache TTL)
if auto_fetch and not dry_run:
before = db.execute("SELECT COUNT(*) AS cnt FROM articles").fetchone()["cnt"]
print("[analyze] Refreshing Ground News feed …")
fetch_all(db)
print("[analyze] Henter danske RSS feeds …")
fetch_all_rss(db)
after = db.execute("SELECT COUNT(*) AS cnt FROM articles").fetchone()["cnt"]
if after > before:
print(f"[analyze] +{after - before} new articles")
base_q = """
SELECT slug, title, description, source_count,
left_src_count, right_src_count, ctr_src_count,
left_pct, right_pct, ctr_pct
FROM articles {where}
ORDER BY source_count DESC
"""
rows = db.execute(
base_q.format(where="" if force else
"WHERE slug NOT IN (SELECT DISTINCT article_slug FROM article_signals)")
).fetchall()
if limit:
rows = rows[:limit]
total = len(rows)
print(f"[analyze] {total} articles to process (force={force} dry_run={dry_run} claude={use_claude})")
if total == 0:
print("[analyze] Nothing to do.")
db.close()
return
# ------------------------------------------------------------------
# Phase 1 — Alias screen + coverage spread filter
# ------------------------------------------------------------------
print("[analyze] Phase 1: alias screen + coverage filter …")
screened: list[tuple] = []
dropped_cov = 0
for row in rows:
cov = coverage_spread_score(row)
if cov < MIN_COVERAGE_SPREAD:
dropped_cov += 1
continue
text = f"{row['title']}. {row['description'] or ''}"
matches = match_c25(text)
if matches:
screened.append((row, matches, cov))
print(f"[analyze] {len(screened)}/{total} passed ({dropped_cov} dropped by coverage filter)")
if not screened:
db.close()
return
# ------------------------------------------------------------------
# Phase 2 — NER upgrade
# ------------------------------------------------------------------
print("[analyze] Phase 2: NER upgrade …")
ner = get_ner()
texts = [f"{r['title']}. {r['description'] or ''}" for r, _, _ in screened]
BATCH = 16
all_ner = []
for i in range(0, len(texts), BATCH):
all_ner.extend(ner(texts[i : i + BATCH]))
enriched = [
(row, merge_ner_matches(ner_res, base), cov)
for (row, base, cov), ner_res in zip(screened, all_ner)
]
# ------------------------------------------------------------------
# Phase 3 — Full text + re-screen
# ------------------------------------------------------------------
print(f"[analyze] Phase 3: fetching full text for {len(enriched)} articles …")
final: list[tuple] = []
for idx, (row, matches, cov) in enumerate(enriched, 1):
slug = row["slug"]
if idx % 5 == 0 or idx == len(enriched):
print(f" {idx}/{len(enriched)}: {slug[:55]}")
# RSS artikler har teksten gemt i page_cache som "rss:{slug}"
cats = row["categories"] if "categories" in row.keys() else ""
if cats and cats.startswith("rss:"):
cache_row = db.execute(
"SELECT content FROM page_cache WHERE url = ?",
(f"rss:{slug}",),
).fetchone()
full_text = cache_row["content"] if cache_row else f"{row['title']}. {row['description'] or ''}"
else:
full_text = fetch_article_text(slug, db)
full_matches = match_c25(full_text)
for ticker, score in full_matches.items():
if score > matches.get(ticker, 0):
matches[ticker] = score
if matches:
final.append((row, matches, cov, full_text))
print(f"[analyze] {len(final)} articles with confirmed C25 mentions")
# ------------------------------------------------------------------
# Phase 4 — FinBERT sentiment (confidence filter)
# ------------------------------------------------------------------
print("[analyze] Phase 4: FinBERT sentiment …")
finbert = get_finbert()
now = int(time.time())
signals_written = 0
alerts_triggered = 0
claude_calls_this_run = 0
# Daily spend guard — read current metrics before starting
_existing = {}
if METRICS_FILE.exists():
try:
_existing = json.loads(METRICS_FILE.read_text())
except Exception:
pass
DAILY_COST_CAP = float(os.getenv("CLAUDE_DAILY_CAP_USD", "2.0"))
for row, matches, cov, full_text in final:
slug = row["slug"]
title = row["title"]
try:
fb = finbert(" ".join(full_text.split()[:400]))[0]
sentiment = fb["label"].lower()
sent_score = round(fb["score"], 4)
except Exception as e:
print(f" [warn] FinBERT: {e}")
sentiment, sent_score = "neutral", 0.5
if sentiment == "neutral" and sent_score < FINBERT_MIN_CONF:
continue # drop low-confidence neutral noise
# ------------------------------------------------------------------
# Phase 5 — Claude extraction
# ------------------------------------------------------------------
claude_data: dict = {}
if use_claude and not dry_run and os.environ.get("ANTHROPIC_API_KEY"):
if claude_calls_this_run >= max_claude_calls:
print(f" [claude] ⚠️ per-run cap ({max_claude_calls}) reached — skipping remaining articles")
use_claude = False
elif _existing.get("total_cost_usd", 0.0) >= DAILY_COST_CAP:
print(f" [claude] ⚠️ daily spend cap ${DAILY_COST_CAP} reached — skipping Claude for remaining articles")
use_claude = False
else:
print(f" [claude] {slug[:50]}")
claude_data, _in_tok, _out_tok = claude_extract(title, full_text, list(matches.keys()))
if _in_tok:
update_metrics(_in_tok, _out_tok)
_existing["total_cost_usd"] = _existing.get("total_cost_usd", 0.0) + calc_cost(_in_tok, _out_tok)
claude_calls_this_run += 1
# ------------------------------------------------------------------
# Phase 6 — yfinance momentum + scoring
# ------------------------------------------------------------------
for ticker, entity_score in matches.items():
company = C25[ticker]
full_lower = full_text.lower()
mention_count = max(1, sum(
len(re.findall(
r"(?<![a-zA-Z0-9])" + re.escape(a.lower()) + r"(?![a-zA-Z0-9])",
full_lower,
))
for a in company["aliases"]
))
momentum = momentum_check(ticker) if not dry_run else {}
sig_score = calc_signal_score(sent_score, sentiment, cov, momentum)
alert = sig_score > ALERT_THRESHOLD and sentiment != "neutral"
if dry_run:
print(
f" DRY: {slug[:38]:<38} | {ticker:<8} | "
f"{sentiment:<8} {sent_score:.2f} | cov={cov:.2f} | sig={sig_score:.3f}"
f"{'' if alert else ''}"
)
else:
db.upsert(
"article_signals",
["article_slug", "ticker"],
[
"article_slug", "ticker", "company_name", "sector",
"sentiment", "sentiment_score", "entity_score",
"mention_count", "full_text_used", "analyzed_at",
"coverage_spread", "claude_tickers", "claude_magnitude",
"claude_timeframe", "claude_reasoning",
"momentum_dir", "momentum_pct_5d", "signal_score", "alert",
],
(
slug, ticker, company["name"], company["sector"],
sentiment, float(sent_score), round(float(entity_score), 4),
mention_count, 1, now,
float(cov),
json.dumps(claude_data.get("confirmed_tickers", [])) or None,
claude_data.get("magnitude", 5),
claude_data.get("timeframe", "days"),
claude_data.get("reasoning", ""),
momentum.get("direction", "unknown"),
float(momentum.get("pct_5d", 0.0)),
float(sig_score),
int(alert),
),
)
signals_written += 1
if alert:
alerts_triggered += 1
icon = "" if sentiment == "positive" else ""
print(
f" ⚡ ALERT: {icon} {ticker} ({company['name']}) | "
f"{sentiment} {sent_score:.2f} | sig={sig_score:.3f} | {slug[:40]}"
)
if not dry_run:
db.commit()
print(f"[analyze] Done. {signals_written} signals written, {alerts_triggered} alerts triggered.")
else:
print(f"[analyze] Dry-run complete. {len(final)} articles matched.")
db.close()
# ---------------------------------------------------------------------------
# CLI — brug Makefile i stedet for at huske flags
# ---------------------------------------------------------------------------
def main() -> None:
import sys
force = "--force" in sys.argv
dry_run = "--dry-run" in sys.argv or "--dry" in sys.argv
no_claude = "--no-claude" in sys.argv
# --max-claude=N override per-run cap
max_calls = 50
for arg in sys.argv:
if arg.startswith("--max-claude="):
try:
max_calls = int(arg.split("=", 1)[1])
except ValueError:
pass
if force:
print(f"[analyze] ⚠️ --force mode: re-analyzing ALL articles (claude cap={max_calls})")
analyze_articles(
force=force,
dry_run=dry_run,
use_claude=not no_claude,
max_claude_calls=max_calls,
)
if __name__ == "__main__":
main()