All checks were successful
Build and Deploy MoneyMaker / build-and-deploy (push) Successful in 12m22s
597 lines
22 KiB
Python
597 lines
22 KiB
Python
"""
|
||
analyze.py — C25 Financial Signal Extractor
|
||
|
||
Pipeline (v2):
|
||
1. Alias screen on title+desc for C25 mentions
|
||
2. Coverage-spread filter: skip low-quality / one-sided articles
|
||
3. NER upgrade: BERT-NER to confirm and expand matches
|
||
4. Full-text fetch + re-screen
|
||
5. FinBERT: quick sentiment — drop neutral < FINBERT_MIN_CONF
|
||
6. Claude: structured extraction (tickers, magnitude, timeframe)
|
||
7. yfinance: momentum check — direction alignment
|
||
8. signal_score = sentiment_confidence × coverage_spread × momentum_alignment
|
||
9. Alert if signal_score > ALERT_THRESHOLD
|
||
|
||
Usage:
|
||
python3 analyze.py # analyze new articles only
|
||
python3 analyze.py --force # re-analyze everything
|
||
python3 analyze.py --limit 20 # limit to 20 articles
|
||
python3 analyze.py --dry-run # show matches without storing
|
||
python3 analyze.py --no-claude # skip Claude step (no API cost)
|
||
"""
|
||
|
||
import re
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
import math
|
||
import sqlite3
|
||
import logging
|
||
import warnings
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
# Silence transformer noise before importing
|
||
os.environ.setdefault("TRANSFORMERS_VERBOSITY", "error")
|
||
os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1")
|
||
warnings.filterwarnings("ignore")
|
||
logging.getLogger("transformers").setLevel(logging.ERROR)
|
||
|
||
import torch
|
||
from transformers import pipeline
|
||
from dotenv import load_dotenv
|
||
|
||
# Load .env (supports both ANTHROPIC_API_KEY and anthropic_api_key)
|
||
_env_file = Path(__file__).parent / ".env"
|
||
if _env_file.exists():
|
||
load_dotenv(_env_file, override=False)
|
||
for _k, _v in list(os.environ.items()):
|
||
if _k.lower() == "anthropic_api_key" and "ANTHROPIC_API_KEY" not in os.environ:
|
||
os.environ["ANTHROPIC_API_KEY"] = _v
|
||
|
||
# Ground News helpers
|
||
sys.path.insert(0, str(Path(__file__).parent))
|
||
from ground_news import get_db, fetch_article_text, fetch_all
|
||
from rss_feeds import fetch_all_rss
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config
|
||
# ---------------------------------------------------------------------------
|
||
|
||
C25_PATH = Path(__file__).parent / "c25.json"
|
||
_c25_raw = json.loads(C25_PATH.read_text())
|
||
C25: dict[str, dict] = {k: v for k, v in _c25_raw.items() if not k.startswith("_")}
|
||
|
||
# Build alias → ticker lookup (lower-cased for matching)
|
||
ALIAS_MAP: dict[str, str] = {}
|
||
for _ticker, _data in C25.items():
|
||
for _alias in _data["aliases"]:
|
||
al = _alias.lower()
|
||
if al not in ALIAS_MAP: # first alias wins (most specific first in c25.json)
|
||
ALIAS_MAP[al] = _ticker
|
||
|
||
DEVICE = -1 # always CPU — Quadro P400 (CC 6.1) too old + too little VRAM for these models
|
||
|
||
# Quality thresholds
|
||
MIN_SOURCES = 1 # coverage_spread naturally weights single-source articles near zero
|
||
MIN_COVERAGE_SPREAD = 0.0 # disabled: signal_score naturally zeros out single-source articles
|
||
FINBERT_MIN_CONF = 0.70 # drop neutral articles below this FinBERT confidence
|
||
ALERT_THRESHOLD = 0.35 # signal_score > this → alert
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Claude metrics
|
||
# ---------------------------------------------------------------------------
|
||
|
||
METRICS_FILE = Path(__file__).parent / "metrics.json"
|
||
|
||
# Pricing: Claude Haiku 4.5 — https://www.anthropic.com/pricing
|
||
_PRICE_INPUT_PER_TOKEN = 0.80 / 1_000_000 # $0.80 per MTok
|
||
_PRICE_OUTPUT_PER_TOKEN = 4.00 / 1_000_000 # $4.00 per MTok
|
||
|
||
|
||
def calc_cost(input_tokens: int, output_tokens: int) -> float:
|
||
return round(input_tokens * _PRICE_INPUT_PER_TOKEN + output_tokens * _PRICE_OUTPUT_PER_TOKEN, 6)
|
||
|
||
|
||
def update_metrics(input_tokens: int, output_tokens: int) -> None:
|
||
"""Accumulate Claude token usage into metrics.json."""
|
||
cost = calc_cost(input_tokens, output_tokens)
|
||
data: dict = {}
|
||
if METRICS_FILE.exists():
|
||
try:
|
||
data = json.loads(METRICS_FILE.read_text())
|
||
except Exception:
|
||
pass
|
||
data["total_calls"] = data.get("total_calls", 0) + 1
|
||
data["total_input_tokens"] = data.get("total_input_tokens", 0) + input_tokens
|
||
data["total_output_tokens"] = data.get("total_output_tokens", 0) + output_tokens
|
||
data["total_cost_usd"] = round(data.get("total_cost_usd", 0.0) + cost, 6)
|
||
data["last_updated"] = datetime.now().isoformat(timespec="seconds")
|
||
METRICS_FILE.write_text(json.dumps(data, indent=2))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Model loading
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_ner_model = None
|
||
_finbert_model = None
|
||
|
||
|
||
def get_ner():
|
||
global _ner_model
|
||
if _ner_model is None:
|
||
print("[analyze] Loading dslim/bert-base-NER …", flush=True)
|
||
_ner_model = pipeline(
|
||
"ner",
|
||
model="dslim/bert-base-NER",
|
||
aggregation_strategy="simple",
|
||
device=DEVICE,
|
||
)
|
||
return _ner_model
|
||
|
||
|
||
def get_finbert():
|
||
global _finbert_model
|
||
if _finbert_model is None:
|
||
print("[analyze] Loading ProsusAI/finbert …", flush=True)
|
||
_finbert_model = pipeline(
|
||
"sentiment-analysis",
|
||
model="ProsusAI/finbert",
|
||
device=DEVICE,
|
||
truncation=True,
|
||
max_length=512,
|
||
)
|
||
return _finbert_model
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# C25 alias matching
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def match_c25(text: str) -> dict[str, float]:
|
||
"""
|
||
Find C25 companies mentioned in text.
|
||
Returns {ticker: confidence_score}.
|
||
"""
|
||
text_lower = text.lower()
|
||
matches: dict[str, float] = {}
|
||
|
||
for alias_lower, ticker in ALIAS_MAP.items():
|
||
if ticker in matches:
|
||
continue # already found this company
|
||
|
||
# Always use word boundaries — prevents "sonic" matching "hypersonic",
|
||
# "net" matching "internet", "iss" matching "mission", etc.
|
||
pat = r"(?<![a-zA-Z0-9])" + re.escape(alias_lower) + r"(?![a-zA-Z0-9])"
|
||
|
||
if re.search(pat, text_lower):
|
||
# Confidence: longer alias match = more reliable
|
||
conf = min(0.99, 0.70 + len(alias_lower) * 0.01)
|
||
matches[ticker] = conf
|
||
|
||
return matches
|
||
|
||
|
||
def merge_ner_matches(ner_result: list[dict], base: dict[str, float]) -> dict[str, float]:
|
||
"""
|
||
Cross-reference NER ORG entities with alias map.
|
||
Requires whole-token match to avoid 'EMA' matching 'd-ema-nt'.
|
||
"""
|
||
merged = dict(base)
|
||
for ent in ner_result:
|
||
if ent.get("entity_group") not in ("ORG", "PER"):
|
||
continue
|
||
word_tokens = set(re.split(r"[\s\-_/]+", ent["word"].lower().strip("##")))
|
||
for alias_lower, ticker in ALIAS_MAP.items():
|
||
if len(alias_lower) < 4:
|
||
continue
|
||
alias_tokens = set(re.split(r"[\s\-_/]+", alias_lower))
|
||
# Need significant token overlap, not just substring containment
|
||
overlap = alias_tokens & word_tokens
|
||
if overlap and len(overlap) / max(len(alias_tokens), len(word_tokens)) >= 0.5:
|
||
score = ent.get("score", 0.7)
|
||
if score > merged.get(ticker, 0):
|
||
merged[ticker] = score
|
||
return merged
|
||
|
||
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Coverage spread scoring
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def coverage_spread_score(row) -> float:
|
||
"""
|
||
Quality score (0–1) based on source count and bias diversity.
|
||
High = many sources from left + right + centre. Low = few or echo chamber.
|
||
"""
|
||
src = row["source_count"] or 0
|
||
left = row["left_src_count"] or 0
|
||
right = row["right_src_count"] or 0
|
||
ctr = row["ctr_src_count"] or 0
|
||
|
||
if src < MIN_SOURCES:
|
||
return 0.0
|
||
|
||
quantity = min(1.0, math.log(max(1, src)) / math.log(50))
|
||
fl, fr, fc = left / src, right / src, ctr / src
|
||
diversity = min(1.0, (fl * fr * fc) ** (1 / 3) * 9) # peaks at equal thirds
|
||
|
||
return round(quantity * 0.6 + diversity * 0.4, 3)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Claude structured extraction
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def claude_extract(title: str, text: str, tickers: list[str]) -> tuple[dict, int, int]:
|
||
"""
|
||
Use Claude Haiku to extract structured financial signal.
|
||
Returns ({"confirmed_tickers", "magnitude", "timeframe", "reasoning"}, input_tokens, output_tokens).
|
||
"""
|
||
import anthropic
|
||
|
||
api_key = os.environ.get("ANTHROPIC_API_KEY")
|
||
if not api_key:
|
||
return {"confirmed_tickers": tickers, "magnitude": 5, "timeframe": "days", "reasoning": "(no API key)"}, 0, 0
|
||
|
||
client = anthropic.Anthropic(api_key=api_key)
|
||
ticker_ctx = "\n".join(
|
||
f" {t}: {C25[t]['name']} ({C25[t]['sector']})" for t in tickers if t in C25
|
||
)
|
||
|
||
prompt = f"""You are a financial analyst specializing in Scandinavian equities.
|
||
|
||
Analyze this news article and assess its financial impact on the listed Danish C25 companies.
|
||
|
||
## Companies to analyze:
|
||
{ticker_ctx}
|
||
|
||
## Article:
|
||
Title: {title}
|
||
{text[:1500]}
|
||
|
||
Respond ONLY with valid JSON (no markdown fences):
|
||
{{
|
||
"confirmed_tickers": ["NOVO-B"],
|
||
"magnitude": 7,
|
||
"timeframe": "days",
|
||
"reasoning": "Two sentences max on financial impact and direction."
|
||
}}
|
||
|
||
Fields:
|
||
- confirmed_tickers: only companies truly affected (can be [])
|
||
- magnitude: 1–10 (1=irrelevant, 10=major market mover)
|
||
- timeframe: "hours", "days", "weeks", or "months"
|
||
- reasoning: brief analyst note"""
|
||
|
||
try:
|
||
msg = client.messages.create(
|
||
model="claude-haiku-4-5",
|
||
max_tokens=256,
|
||
messages=[{"role": "user", "content": prompt}],
|
||
)
|
||
raw = msg.content[0].text.strip()
|
||
raw = re.sub(r"^```(?:json)?\n?", "", raw)
|
||
raw = re.sub(r"\n?```$", "", raw)
|
||
return json.loads(raw), msg.usage.input_tokens, msg.usage.output_tokens
|
||
except Exception as e:
|
||
print(f" [warn] Claude failed: {e}")
|
||
return {"confirmed_tickers": tickers, "magnitude": 5, "timeframe": "days", "reasoning": str(e)[:120]}, 0, 0
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# yfinance momentum
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_momentum_cache: dict[str, dict] = {}
|
||
|
||
|
||
def momentum_check(ticker: str) -> dict:
|
||
"""5-day price momentum for a C25 ticker via yfinance."""
|
||
if ticker in _momentum_cache:
|
||
return _momentum_cache[ticker]
|
||
|
||
import yfinance as yf
|
||
|
||
company = C25.get(ticker, {})
|
||
yahoo_ticker = company.get("yahoo_ticker", ticker + ".CO")
|
||
result: dict = {"direction": "unknown", "pct_5d": 0.0, "pct_20d": 0.0}
|
||
|
||
try:
|
||
hist = yf.Ticker(yahoo_ticker).history(period="1mo", auto_adjust=True)
|
||
if len(hist) >= 5:
|
||
close = hist["Close"]
|
||
pct_5d = float((close.iloc[-1] / close.iloc[-5] - 1) * 100)
|
||
pct_20d = float((close.iloc[-1] / close.iloc[0] - 1) * 100) if len(hist) >= 20 else 0.0
|
||
direction = "up" if pct_5d > 1.5 else ("down" if pct_5d < -1.5 else "flat")
|
||
result = {"direction": direction, "pct_5d": round(pct_5d, 2), "pct_20d": round(pct_20d, 2)}
|
||
except Exception:
|
||
pass
|
||
|
||
_momentum_cache[ticker] = result
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Signal score
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def calc_signal_score(sent_score: float, sentiment: str, coverage: float, momentum: dict) -> float:
|
||
"""signal_score = sentiment_confidence × coverage_spread × momentum_alignment"""
|
||
d = momentum.get("direction", "unknown")
|
||
if d == "unknown":
|
||
alignment = 0.5
|
||
elif d == "flat":
|
||
alignment = 0.7
|
||
elif (sentiment == "positive" and d == "up") or (sentiment == "negative" and d == "down"):
|
||
alignment = 1.0
|
||
else:
|
||
alignment = 0.4 # contrarian
|
||
|
||
return round(sent_score * coverage * alignment, 3)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# DB schema migration
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def migrate_db(db) -> None:
|
||
"""Apply schema migrations for SQLite. No-op for Postgres (schema managed by db.py)."""
|
||
if hasattr(db, "db_type") and db.db_type == "postgres":
|
||
return
|
||
existing = {row[1] for row in db.execute("PRAGMA table_info(article_signals)").fetchall()}
|
||
new_cols = [
|
||
("coverage_spread", "REAL DEFAULT 0"),
|
||
("claude_tickers", "TEXT"),
|
||
("claude_magnitude", "INTEGER DEFAULT 5"),
|
||
("claude_timeframe", "TEXT"),
|
||
("claude_reasoning", "TEXT"),
|
||
("momentum_dir", "TEXT"),
|
||
("momentum_pct_5d", "REAL DEFAULT 0"),
|
||
("signal_score", "REAL DEFAULT 0"),
|
||
("alert", "INTEGER DEFAULT 0"),
|
||
]
|
||
for col_name, col_def in new_cols:
|
||
if col_name not in existing:
|
||
db.execute(f"ALTER TABLE article_signals ADD COLUMN {col_name} {col_def}")
|
||
db.commit()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main pipeline
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def analyze_articles(
|
||
*,
|
||
force: bool = False,
|
||
limit: int | None = None,
|
||
dry_run: bool = False,
|
||
use_claude: bool = True,
|
||
auto_fetch: bool = True,
|
||
) -> None:
|
||
db = get_db()
|
||
migrate_db(db)
|
||
|
||
# Auto-refresh articles (respects 30-min cache TTL)
|
||
if auto_fetch and not dry_run:
|
||
before = db.execute("SELECT COUNT(*) AS cnt FROM articles").fetchone()["cnt"]
|
||
print("[analyze] Refreshing Ground News feed …")
|
||
fetch_all(db)
|
||
print("[analyze] Henter danske RSS feeds …")
|
||
fetch_all_rss(db)
|
||
after = db.execute("SELECT COUNT(*) AS cnt FROM articles").fetchone()["cnt"]
|
||
if after > before:
|
||
print(f"[analyze] +{after - before} new articles")
|
||
|
||
base_q = """
|
||
SELECT slug, title, description, source_count,
|
||
left_src_count, right_src_count, ctr_src_count,
|
||
left_pct, right_pct, ctr_pct
|
||
FROM articles {where}
|
||
ORDER BY source_count DESC
|
||
"""
|
||
rows = db.execute(
|
||
base_q.format(where="" if force else
|
||
"WHERE slug NOT IN (SELECT DISTINCT article_slug FROM article_signals)")
|
||
).fetchall()
|
||
if limit:
|
||
rows = rows[:limit]
|
||
|
||
total = len(rows)
|
||
print(f"[analyze] {total} articles to process (force={force} dry_run={dry_run} claude={use_claude})")
|
||
if total == 0:
|
||
print("[analyze] Nothing to do.")
|
||
db.close()
|
||
return
|
||
|
||
# ------------------------------------------------------------------
|
||
# Phase 1 — Alias screen + coverage spread filter
|
||
# ------------------------------------------------------------------
|
||
print("[analyze] Phase 1: alias screen + coverage filter …")
|
||
screened: list[tuple] = []
|
||
dropped_cov = 0
|
||
|
||
for row in rows:
|
||
cov = coverage_spread_score(row)
|
||
if cov < MIN_COVERAGE_SPREAD:
|
||
dropped_cov += 1
|
||
continue
|
||
text = f"{row['title']}. {row['description'] or ''}"
|
||
matches = match_c25(text)
|
||
if matches:
|
||
screened.append((row, matches, cov))
|
||
|
||
print(f"[analyze] {len(screened)}/{total} passed ({dropped_cov} dropped by coverage filter)")
|
||
if not screened:
|
||
db.close()
|
||
return
|
||
|
||
# ------------------------------------------------------------------
|
||
# Phase 2 — NER upgrade
|
||
# ------------------------------------------------------------------
|
||
print("[analyze] Phase 2: NER upgrade …")
|
||
ner = get_ner()
|
||
texts = [f"{r['title']}. {r['description'] or ''}" for r, _, _ in screened]
|
||
|
||
BATCH = 16
|
||
all_ner = []
|
||
for i in range(0, len(texts), BATCH):
|
||
all_ner.extend(ner(texts[i : i + BATCH]))
|
||
|
||
enriched = [
|
||
(row, merge_ner_matches(ner_res, base), cov)
|
||
for (row, base, cov), ner_res in zip(screened, all_ner)
|
||
]
|
||
|
||
# ------------------------------------------------------------------
|
||
# Phase 3 — Full text + re-screen
|
||
# ------------------------------------------------------------------
|
||
print(f"[analyze] Phase 3: fetching full text for {len(enriched)} articles …")
|
||
final: list[tuple] = []
|
||
|
||
for idx, (row, matches, cov) in enumerate(enriched, 1):
|
||
slug = row["slug"]
|
||
if idx % 5 == 0 or idx == len(enriched):
|
||
print(f" {idx}/{len(enriched)}: {slug[:55]}")
|
||
|
||
# RSS artikler har teksten gemt i page_cache som "rss:{slug}"
|
||
cats = row["categories"] if "categories" in row.keys() else ""
|
||
if cats and cats.startswith("rss:"):
|
||
cache_row = db.execute(
|
||
"SELECT content FROM page_cache WHERE url = ?",
|
||
(f"rss:{slug}",),
|
||
).fetchone()
|
||
full_text = cache_row["content"] if cache_row else f"{row['title']}. {row['description'] or ''}"
|
||
else:
|
||
full_text = fetch_article_text(slug, db)
|
||
|
||
full_matches = match_c25(full_text)
|
||
for ticker, score in full_matches.items():
|
||
if score > matches.get(ticker, 0):
|
||
matches[ticker] = score
|
||
if matches:
|
||
final.append((row, matches, cov, full_text))
|
||
|
||
print(f"[analyze] {len(final)} articles with confirmed C25 mentions")
|
||
|
||
# ------------------------------------------------------------------
|
||
# Phase 4 — FinBERT sentiment (confidence filter)
|
||
# ------------------------------------------------------------------
|
||
print("[analyze] Phase 4: FinBERT sentiment …")
|
||
finbert = get_finbert()
|
||
now = int(time.time())
|
||
signals_written = 0
|
||
alerts_triggered = 0
|
||
|
||
for row, matches, cov, full_text in final:
|
||
slug = row["slug"]
|
||
title = row["title"]
|
||
|
||
try:
|
||
fb = finbert(" ".join(full_text.split()[:400]))[0]
|
||
sentiment = fb["label"].lower()
|
||
sent_score = round(fb["score"], 4)
|
||
except Exception as e:
|
||
print(f" [warn] FinBERT: {e}")
|
||
sentiment, sent_score = "neutral", 0.5
|
||
|
||
if sentiment == "neutral" and sent_score < FINBERT_MIN_CONF:
|
||
continue # drop low-confidence neutral noise
|
||
|
||
# ------------------------------------------------------------------
|
||
# Phase 5 — Claude extraction
|
||
# ------------------------------------------------------------------
|
||
claude_data: dict = {}
|
||
if use_claude and not dry_run and os.environ.get("ANTHROPIC_API_KEY"):
|
||
print(f" [claude] {slug[:50]}")
|
||
claude_data, _in_tok, _out_tok = claude_extract(title, full_text, list(matches.keys()))
|
||
if _in_tok:
|
||
update_metrics(_in_tok, _out_tok)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Phase 6 — yfinance momentum + scoring
|
||
# ------------------------------------------------------------------
|
||
for ticker, entity_score in matches.items():
|
||
company = C25[ticker]
|
||
full_lower = full_text.lower()
|
||
mention_count = max(1, sum(
|
||
len(re.findall(
|
||
r"(?<![a-zA-Z0-9])" + re.escape(a.lower()) + r"(?![a-zA-Z0-9])",
|
||
full_lower,
|
||
))
|
||
for a in company["aliases"]
|
||
))
|
||
|
||
momentum = momentum_check(ticker) if not dry_run else {}
|
||
sig_score = calc_signal_score(sent_score, sentiment, cov, momentum)
|
||
alert = sig_score > ALERT_THRESHOLD and sentiment != "neutral"
|
||
|
||
if dry_run:
|
||
print(
|
||
f" DRY: {slug[:38]:<38} | {ticker:<8} | "
|
||
f"{sentiment:<8} {sent_score:.2f} | cov={cov:.2f} | sig={sig_score:.3f}"
|
||
f"{' ⚡' if alert else ''}"
|
||
)
|
||
else:
|
||
db.upsert(
|
||
"article_signals",
|
||
["article_slug", "ticker"],
|
||
[
|
||
"article_slug", "ticker", "company_name", "sector",
|
||
"sentiment", "sentiment_score", "entity_score",
|
||
"mention_count", "full_text_used", "analyzed_at",
|
||
"coverage_spread", "claude_tickers", "claude_magnitude",
|
||
"claude_timeframe", "claude_reasoning",
|
||
"momentum_dir", "momentum_pct_5d", "signal_score", "alert",
|
||
],
|
||
(
|
||
slug, ticker, company["name"], company["sector"],
|
||
sentiment, float(sent_score), round(float(entity_score), 4),
|
||
mention_count, 1, now,
|
||
float(cov),
|
||
json.dumps(claude_data.get("confirmed_tickers", [])) or None,
|
||
claude_data.get("magnitude", 5),
|
||
claude_data.get("timeframe", "days"),
|
||
claude_data.get("reasoning", ""),
|
||
momentum.get("direction", "unknown"),
|
||
float(momentum.get("pct_5d", 0.0)),
|
||
float(sig_score),
|
||
int(alert),
|
||
),
|
||
)
|
||
signals_written += 1
|
||
if alert:
|
||
alerts_triggered += 1
|
||
icon = "↑" if sentiment == "positive" else "↓"
|
||
print(
|
||
f" ⚡ ALERT: {icon} {ticker} ({company['name']}) | "
|
||
f"{sentiment} {sent_score:.2f} | sig={sig_score:.3f} | {slug[:40]}"
|
||
)
|
||
|
||
if not dry_run:
|
||
db.commit()
|
||
print(f"[analyze] Done. {signals_written} signals written, {alerts_triggered} alerts triggered.")
|
||
else:
|
||
print(f"[analyze] Dry-run complete. {len(final)} articles matched.")
|
||
|
||
db.close()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CLI — brug Makefile i stedet for at huske flags
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
import sys
|
||
force = "--force" in sys.argv
|
||
dry_run = "--dry" in sys.argv
|
||
analyze_articles(force=force, dry_run=dry_run)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|