Files
mmd/db.py
Henrik Jess Nielsen cd233a0c5a
Some checks failed
Build and Deploy MoneyMaker / build-and-deploy (push) Failing after 11m37s
fix(db): pass params=None correctly to both psycopg2 and sqlite3
psycopg2 raises TypeError if params=None is passed to cur.execute()
sqlite3 raises ProgrammingError on unsupported parameter type
2026-05-27 00:11:55 +02:00

497 lines
15 KiB
Python

"""
db.py — Database abstraction layer for MoneyMaker.
Supports both PostgreSQL (production, int.i80.dk) and SQLite (local dev).
Usage:
from db import get_conn, DB_TYPE
conn = get_conn()
rows = conn.execute("SELECT * FROM articles").fetchall()
conn.commit()
conn.close()
Set DATABASE_URL in .env to use PostgreSQL:
DATABASE_URL=postgresql://moneymaker:pass@int.i80.dk:5432/moneymaker
Without DATABASE_URL, falls back to SQLite (ground_news.db).
"""
import os
import sqlite3
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL", "")
SQLITE_PATH = Path(__file__).parent / "ground_news.db"
DB_TYPE = "postgres" if DATABASE_URL else "sqlite"
# ── DBConn wrapper ────────────────────────────────────────────────────────────
class DBConn:
"""
Unified connection wrapper for SQLite and PostgreSQL.
- execute(sql, params): normalises ? → %s for Postgres; returns dict-row cursor
- upsert(table, pk, cols, vals): cross-DB INSERT OR REPLACE / ON CONFLICT DO UPDATE
- commit() / rollback() / close()
"""
def __init__(self, conn, db_type: str):
self._conn = conn
self.db_type = db_type
def _sql(self, sql: str) -> str:
"""Translate ? placeholders to %s for Postgres."""
if self.db_type == "postgres":
return sql.replace("?", "%s")
return sql
def execute(self, sql: str, params=None):
sql = self._sql(sql)
if self.db_type == "postgres":
import psycopg2.extras
cur = self._conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
if params is None:
cur.execute(sql)
else:
cur.execute(sql, params)
return cur
else:
if params is None:
return self._conn.execute(sql)
return self._conn.execute(sql, params)
def executemany(self, sql: str, seq):
sql = self._sql(sql)
if self.db_type == "postgres":
cur = self._conn.cursor()
cur.executemany(sql, seq)
else:
self._conn.executemany(sql, seq)
def upsert(self, table: str, pk, cols: list, vals: tuple):
"""
Cross-DB upsert.
pk: single column name (str) or list of column names for composite PK.
SQLite: INSERT OR REPLACE INTO table (cols) VALUES (?,...)
Postgres: INSERT INTO table (cols) VALUES (%s,...) ON CONFLICT (pk) DO UPDATE SET ...
"""
if isinstance(pk, str):
pk = [pk]
if self.db_type == "postgres":
col_sql = ", ".join(cols)
val_sql = ", ".join(["%s"] * len(cols))
non_pk = [c for c in cols if c not in pk]
pk_sql = ", ".join(pk)
if non_pk:
update = ", ".join(f"{c}=EXCLUDED.{c}" for c in non_pk)
sql = (
f"INSERT INTO {table} ({col_sql}) VALUES ({val_sql}) "
f"ON CONFLICT ({pk_sql}) DO UPDATE SET {update}"
)
else:
sql = (
f"INSERT INTO {table} ({col_sql}) VALUES ({val_sql}) "
f"ON CONFLICT ({pk_sql}) DO NOTHING"
)
cur = self._conn.cursor()
cur.execute(sql, vals)
else:
col_sql = ", ".join(cols)
val_sql = ", ".join(["?"] * len(cols))
self._conn.execute(
f"INSERT OR REPLACE INTO {table} ({col_sql}) VALUES ({val_sql})",
vals,
)
def commit(self):
self._conn.commit()
def rollback(self):
self._conn.rollback()
def close(self):
self._conn.close()
def __enter__(self):
return self
def __exit__(self, exc_type, *_):
if exc_type:
self.rollback()
else:
self.commit()
self.close()
# ── Placeholder helpers (kept for legacy usage) ───────────────────────────────
def ph(n: int = 1) -> str:
"""Return query placeholder: %s for postgres, ? for sqlite."""
return "%s" if DB_TYPE == "postgres" else "?"
def placeholders(n: int) -> str:
"""Return comma-separated placeholders for n values."""
p = ph()
return ", ".join([p] * n)
# ── Connection ────────────────────────────────────────────────────────────────
def get_conn() -> DBConn:
"""Return a DBConn wrapper (Postgres or SQLite)."""
if DB_TYPE == "postgres":
import psycopg2
conn = psycopg2.connect(DATABASE_URL)
conn.autocommit = False
return DBConn(conn, "postgres")
else:
conn = sqlite3.connect(str(SQLITE_PATH))
conn.row_factory = sqlite3.Row
return DBConn(conn, "sqlite")
# ── Schema ────────────────────────────────────────────────────────────────────
SCHEMA_SQLITE = """
CREATE TABLE IF NOT EXISTS page_cache (
url TEXT PRIMARY KEY,
page_type TEXT NOT NULL,
fetched_at INTEGER NOT NULL,
content TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS rss_feed_cache (
feed_id TEXT PRIMARY KEY,
fetched_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS articles (
slug TEXT PRIMARY KEY,
story_id TEXT,
title TEXT NOT NULL,
description TEXT,
start_date TEXT,
source_count INTEGER,
bias_src_count INTEGER,
left_pct REAL,
ctr_pct REAL,
right_pct REAL,
left_src_count INTEGER,
ctr_src_count INTEGER,
right_src_count INTEGER,
overall_bias REAL,
blindspot TEXT,
factuality_json TEXT,
interests_json TEXT,
categories TEXT,
first_seen INTEGER,
last_seen INTEGER
);
CREATE TABLE IF NOT EXISTS article_signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
article_slug TEXT NOT NULL,
ticker TEXT NOT NULL,
company_name TEXT NOT NULL,
sector TEXT,
sentiment TEXT,
sentiment_score REAL,
entity_score REAL,
mention_count INTEGER,
full_text_used INTEGER,
analyzed_at INTEGER NOT NULL,
coverage_spread REAL,
claude_tickers TEXT,
claude_magnitude INTEGER,
claude_timeframe TEXT,
claude_reasoning TEXT,
momentum_dir TEXT,
momentum_pct_5d REAL,
signal_score REAL,
alert INTEGER
);
CREATE TABLE IF NOT EXISTS positions (
ticker TEXT PRIMARY KEY,
shares REAL NOT NULL,
entry_price REAL NOT NULL,
entry_date TEXT NOT NULL,
stop_loss REAL NOT NULL,
take_profit REAL NOT NULL,
note TEXT
);
CREATE TABLE IF NOT EXISTS position_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
action TEXT NOT NULL,
shares REAL NOT NULL,
price REAL NOT NULL,
total_dkk REAL NOT NULL,
fee_dkk REAL,
pnl_dkk REAL,
signal_correct INTEGER,
event_date TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS saxo_orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
direction TEXT NOT NULL,
shares REAL NOT NULL,
price_dkk REAL,
total_dkk REAL,
fee_dkk REAL,
saxo_order_id TEXT,
status TEXT NOT NULL,
signal_score REAL,
analyst_rec TEXT,
placed_at TEXT NOT NULL,
filled_at TEXT,
note TEXT
);
"""
SCHEMA_POSTGRES = """
CREATE TABLE IF NOT EXISTS page_cache (
url TEXT PRIMARY KEY,
page_type TEXT NOT NULL,
fetched_at BIGINT NOT NULL,
content TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS rss_feed_cache (
feed_id TEXT PRIMARY KEY,
fetched_at BIGINT NOT NULL
);
CREATE TABLE IF NOT EXISTS articles (
slug TEXT PRIMARY KEY,
story_id TEXT,
title TEXT NOT NULL,
description TEXT,
start_date TEXT,
source_count INTEGER,
bias_src_count INTEGER,
left_pct REAL,
ctr_pct REAL,
right_pct REAL,
left_src_count INTEGER,
ctr_src_count INTEGER,
right_src_count INTEGER,
overall_bias REAL,
blindspot TEXT,
factuality_json TEXT,
interests_json TEXT,
categories TEXT,
first_seen BIGINT,
last_seen BIGINT
);
CREATE TABLE IF NOT EXISTS article_signals (
id SERIAL PRIMARY KEY,
article_slug TEXT NOT NULL,
ticker TEXT NOT NULL,
company_name TEXT NOT NULL,
sector TEXT,
sentiment TEXT,
sentiment_score REAL,
entity_score REAL,
mention_count INTEGER,
full_text_used INTEGER,
analyzed_at BIGINT NOT NULL,
coverage_spread REAL,
claude_tickers TEXT,
claude_magnitude INTEGER,
claude_timeframe TEXT,
claude_reasoning TEXT,
momentum_dir TEXT,
momentum_pct_5d REAL,
signal_score REAL,
alert INTEGER,
UNIQUE(article_slug, ticker)
);
CREATE TABLE IF NOT EXISTS positions (
ticker TEXT PRIMARY KEY,
shares REAL NOT NULL,
entry_price REAL NOT NULL,
entry_date TEXT NOT NULL,
stop_loss REAL NOT NULL,
take_profit REAL NOT NULL,
note TEXT
);
CREATE TABLE IF NOT EXISTS position_events (
id SERIAL PRIMARY KEY,
ticker TEXT NOT NULL,
action TEXT NOT NULL,
shares REAL NOT NULL,
price REAL NOT NULL,
total_dkk REAL NOT NULL,
fee_dkk REAL,
pnl_dkk REAL,
signal_correct SMALLINT,
event_date TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS saxo_orders (
id SERIAL PRIMARY KEY,
ticker TEXT NOT NULL,
direction TEXT NOT NULL,
shares REAL NOT NULL,
price_dkk REAL,
total_dkk REAL,
fee_dkk REAL,
saxo_order_id TEXT,
status TEXT NOT NULL,
signal_score REAL,
analyst_rec TEXT,
placed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
filled_at TIMESTAMPTZ,
note TEXT
);
"""
def init_schema():
"""Create all tables if they don't exist. Safe to run multiple times."""
if DB_TYPE == "postgres":
import psycopg2
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
for stmt in SCHEMA_POSTGRES.split(";"):
stmt = stmt.strip()
if stmt:
cur.execute(stmt)
# Add UNIQUE constraint for article_signals (article_slug, ticker) if missing
cur.execute("""
SELECT 1 FROM pg_constraint
WHERE conname = 'uq_article_ticker'
AND conrelid = 'article_signals'::regclass
""")
if not cur.fetchone():
cur.execute(
"ALTER TABLE article_signals "
"ADD CONSTRAINT uq_article_ticker UNIQUE (article_slug, ticker)"
)
conn.commit()
cur.close()
conn.close()
else:
conn = sqlite3.connect(str(SQLITE_PATH))
conn.executescript(SCHEMA_SQLITE)
conn.commit()
conn.close()
print(f" Schema initialiseret ({DB_TYPE})")
# ── SQLite → Postgres migration ───────────────────────────────────────────────
def migrate_sqlite_to_postgres():
"""
One-time migration: copy all data from local SQLite to Postgres.
Safe to run multiple times (uses INSERT OR IGNORE / ON CONFLICT DO NOTHING).
"""
if DB_TYPE != "postgres":
print("DATABASE_URL ikke sat — kører ikke migration")
return
import psycopg2
sqlite_conn = sqlite3.connect(str(SQLITE_PATH))
sqlite_conn.row_factory = sqlite3.Row
pg_conn = get_conn()
pg_cur = pg_conn.cursor()
tables = [
"page_cache", "rss_feed_cache", "articles",
"article_signals", "positions", "position_events"
]
for table in tables:
rows = sqlite_conn.execute(f"SELECT * FROM {table}").fetchall()
if not rows:
print(f" {table}: 0 rækker — skip")
continue
cols = list(rows[0].keys())
cols_sql = ", ".join(cols)
vals_sql = ", ".join(["%s"] * len(cols))
# SERIAL columns (id) — let Postgres auto-assign
insert_cols = [c for c in cols if c != "id"] if "id" in cols else cols
insert_vals = ", ".join(["%s"] * len(insert_cols))
insert_cols_sql = ", ".join(insert_cols)
inserted = 0
skipped = 0
for row in rows:
# Coerce bytes (SQLite blob) to float/None for Postgres REAL columns
values = []
for c in insert_cols:
v = row[c]
if isinstance(v, (bytes, bytearray)):
try:
import struct
v = struct.unpack('d', v)[0]
except Exception:
v = None
values.append(v)
values = tuple(values)
try:
pg_cur.execute(
f"INSERT INTO {table} ({insert_cols_sql}) VALUES ({insert_vals}) "
f"ON CONFLICT DO NOTHING",
values
)
if pg_cur.rowcount > 0:
inserted += 1
else:
skipped += 1
except Exception as e:
skipped += 1
pg_conn.commit()
print(f" {table}: {inserted} indsat, {skipped} sprunget over")
sqlite_conn.close()
pg_conn.close()
print(" Migration færdig!")
if __name__ == "__main__":
import sys
cmd = sys.argv[1] if len(sys.argv) > 1 else "status"
if cmd == "init":
init_schema()
elif cmd == "migrate":
print(f"Migrerer SQLite → PostgreSQL ...")
init_schema()
migrate_sqlite_to_postgres()
elif cmd == "status":
conn = get_conn()
if DB_TYPE == "postgres":
raw = conn._conn.cursor()
raw.execute("SELECT tablename FROM pg_tables WHERE schemaname='public'")
tables = [r[0] for r in raw.fetchall()]
for t in sorted(tables):
raw.execute(f"SELECT COUNT(*) FROM {t}")
print(f" {t}: {raw.fetchone()[0]} rækker")
raw.close()
else:
tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
for t in tables:
n = conn.execute(f"SELECT COUNT(*) AS cnt FROM {t['name']}").fetchone()["cnt"]
print(f" {t['name']}: {n} rækker")
conn.close()
print(f"\n DB type: {DB_TYPE}")
else:
print("Brug: python db.py [init|migrate|status]")