Some checks failed
Build and Deploy MoneyMaker / build-and-deploy (push) Failing after 11m37s
psycopg2 raises TypeError if params=None is passed to cur.execute() sqlite3 raises ProgrammingError on unsupported parameter type
497 lines
15 KiB
Python
497 lines
15 KiB
Python
"""
|
|
db.py — Database abstraction layer for MoneyMaker.
|
|
|
|
Supports both PostgreSQL (production, int.i80.dk) and SQLite (local dev).
|
|
|
|
Usage:
|
|
from db import get_conn, DB_TYPE
|
|
|
|
conn = get_conn()
|
|
rows = conn.execute("SELECT * FROM articles").fetchall()
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
Set DATABASE_URL in .env to use PostgreSQL:
|
|
DATABASE_URL=postgresql://moneymaker:pass@int.i80.dk:5432/moneymaker
|
|
|
|
Without DATABASE_URL, falls back to SQLite (ground_news.db).
|
|
"""
|
|
import os
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "")
|
|
SQLITE_PATH = Path(__file__).parent / "ground_news.db"
|
|
DB_TYPE = "postgres" if DATABASE_URL else "sqlite"
|
|
|
|
|
|
# ── DBConn wrapper ────────────────────────────────────────────────────────────
|
|
|
|
class DBConn:
|
|
"""
|
|
Unified connection wrapper for SQLite and PostgreSQL.
|
|
|
|
- execute(sql, params): normalises ? → %s for Postgres; returns dict-row cursor
|
|
- upsert(table, pk, cols, vals): cross-DB INSERT OR REPLACE / ON CONFLICT DO UPDATE
|
|
- commit() / rollback() / close()
|
|
"""
|
|
|
|
def __init__(self, conn, db_type: str):
|
|
self._conn = conn
|
|
self.db_type = db_type
|
|
|
|
def _sql(self, sql: str) -> str:
|
|
"""Translate ? placeholders to %s for Postgres."""
|
|
if self.db_type == "postgres":
|
|
return sql.replace("?", "%s")
|
|
return sql
|
|
|
|
def execute(self, sql: str, params=None):
|
|
sql = self._sql(sql)
|
|
if self.db_type == "postgres":
|
|
import psycopg2.extras
|
|
cur = self._conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
if params is None:
|
|
cur.execute(sql)
|
|
else:
|
|
cur.execute(sql, params)
|
|
return cur
|
|
else:
|
|
if params is None:
|
|
return self._conn.execute(sql)
|
|
return self._conn.execute(sql, params)
|
|
|
|
def executemany(self, sql: str, seq):
|
|
sql = self._sql(sql)
|
|
if self.db_type == "postgres":
|
|
cur = self._conn.cursor()
|
|
cur.executemany(sql, seq)
|
|
else:
|
|
self._conn.executemany(sql, seq)
|
|
|
|
def upsert(self, table: str, pk, cols: list, vals: tuple):
|
|
"""
|
|
Cross-DB upsert.
|
|
|
|
pk: single column name (str) or list of column names for composite PK.
|
|
SQLite: INSERT OR REPLACE INTO table (cols) VALUES (?,...)
|
|
Postgres: INSERT INTO table (cols) VALUES (%s,...) ON CONFLICT (pk) DO UPDATE SET ...
|
|
"""
|
|
if isinstance(pk, str):
|
|
pk = [pk]
|
|
if self.db_type == "postgres":
|
|
col_sql = ", ".join(cols)
|
|
val_sql = ", ".join(["%s"] * len(cols))
|
|
non_pk = [c for c in cols if c not in pk]
|
|
pk_sql = ", ".join(pk)
|
|
if non_pk:
|
|
update = ", ".join(f"{c}=EXCLUDED.{c}" for c in non_pk)
|
|
sql = (
|
|
f"INSERT INTO {table} ({col_sql}) VALUES ({val_sql}) "
|
|
f"ON CONFLICT ({pk_sql}) DO UPDATE SET {update}"
|
|
)
|
|
else:
|
|
sql = (
|
|
f"INSERT INTO {table} ({col_sql}) VALUES ({val_sql}) "
|
|
f"ON CONFLICT ({pk_sql}) DO NOTHING"
|
|
)
|
|
cur = self._conn.cursor()
|
|
cur.execute(sql, vals)
|
|
else:
|
|
col_sql = ", ".join(cols)
|
|
val_sql = ", ".join(["?"] * len(cols))
|
|
self._conn.execute(
|
|
f"INSERT OR REPLACE INTO {table} ({col_sql}) VALUES ({val_sql})",
|
|
vals,
|
|
)
|
|
|
|
def commit(self):
|
|
self._conn.commit()
|
|
|
|
def rollback(self):
|
|
self._conn.rollback()
|
|
|
|
def close(self):
|
|
self._conn.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, *_):
|
|
if exc_type:
|
|
self.rollback()
|
|
else:
|
|
self.commit()
|
|
self.close()
|
|
|
|
|
|
# ── Placeholder helpers (kept for legacy usage) ───────────────────────────────
|
|
|
|
def ph(n: int = 1) -> str:
|
|
"""Return query placeholder: %s for postgres, ? for sqlite."""
|
|
return "%s" if DB_TYPE == "postgres" else "?"
|
|
|
|
|
|
def placeholders(n: int) -> str:
|
|
"""Return comma-separated placeholders for n values."""
|
|
p = ph()
|
|
return ", ".join([p] * n)
|
|
|
|
|
|
# ── Connection ────────────────────────────────────────────────────────────────
|
|
|
|
def get_conn() -> DBConn:
|
|
"""Return a DBConn wrapper (Postgres or SQLite)."""
|
|
if DB_TYPE == "postgres":
|
|
import psycopg2
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
conn.autocommit = False
|
|
return DBConn(conn, "postgres")
|
|
else:
|
|
conn = sqlite3.connect(str(SQLITE_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
return DBConn(conn, "sqlite")
|
|
|
|
|
|
# ── Schema ────────────────────────────────────────────────────────────────────
|
|
|
|
SCHEMA_SQLITE = """
|
|
CREATE TABLE IF NOT EXISTS page_cache (
|
|
url TEXT PRIMARY KEY,
|
|
page_type TEXT NOT NULL,
|
|
fetched_at INTEGER NOT NULL,
|
|
content TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS rss_feed_cache (
|
|
feed_id TEXT PRIMARY KEY,
|
|
fetched_at INTEGER NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS articles (
|
|
slug TEXT PRIMARY KEY,
|
|
story_id TEXT,
|
|
title TEXT NOT NULL,
|
|
description TEXT,
|
|
start_date TEXT,
|
|
source_count INTEGER,
|
|
bias_src_count INTEGER,
|
|
left_pct REAL,
|
|
ctr_pct REAL,
|
|
right_pct REAL,
|
|
left_src_count INTEGER,
|
|
ctr_src_count INTEGER,
|
|
right_src_count INTEGER,
|
|
overall_bias REAL,
|
|
blindspot TEXT,
|
|
factuality_json TEXT,
|
|
interests_json TEXT,
|
|
categories TEXT,
|
|
first_seen INTEGER,
|
|
last_seen INTEGER
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS article_signals (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
article_slug TEXT NOT NULL,
|
|
ticker TEXT NOT NULL,
|
|
company_name TEXT NOT NULL,
|
|
sector TEXT,
|
|
sentiment TEXT,
|
|
sentiment_score REAL,
|
|
entity_score REAL,
|
|
mention_count INTEGER,
|
|
full_text_used INTEGER,
|
|
analyzed_at INTEGER NOT NULL,
|
|
coverage_spread REAL,
|
|
claude_tickers TEXT,
|
|
claude_magnitude INTEGER,
|
|
claude_timeframe TEXT,
|
|
claude_reasoning TEXT,
|
|
momentum_dir TEXT,
|
|
momentum_pct_5d REAL,
|
|
signal_score REAL,
|
|
alert INTEGER
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS positions (
|
|
ticker TEXT PRIMARY KEY,
|
|
shares REAL NOT NULL,
|
|
entry_price REAL NOT NULL,
|
|
entry_date TEXT NOT NULL,
|
|
stop_loss REAL NOT NULL,
|
|
take_profit REAL NOT NULL,
|
|
note TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS position_events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ticker TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
shares REAL NOT NULL,
|
|
price REAL NOT NULL,
|
|
total_dkk REAL NOT NULL,
|
|
fee_dkk REAL,
|
|
pnl_dkk REAL,
|
|
signal_correct INTEGER,
|
|
event_date TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS saxo_orders (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ticker TEXT NOT NULL,
|
|
direction TEXT NOT NULL,
|
|
shares REAL NOT NULL,
|
|
price_dkk REAL,
|
|
total_dkk REAL,
|
|
fee_dkk REAL,
|
|
saxo_order_id TEXT,
|
|
status TEXT NOT NULL,
|
|
signal_score REAL,
|
|
analyst_rec TEXT,
|
|
placed_at TEXT NOT NULL,
|
|
filled_at TEXT,
|
|
note TEXT
|
|
);
|
|
"""
|
|
|
|
SCHEMA_POSTGRES = """
|
|
CREATE TABLE IF NOT EXISTS page_cache (
|
|
url TEXT PRIMARY KEY,
|
|
page_type TEXT NOT NULL,
|
|
fetched_at BIGINT NOT NULL,
|
|
content TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS rss_feed_cache (
|
|
feed_id TEXT PRIMARY KEY,
|
|
fetched_at BIGINT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS articles (
|
|
slug TEXT PRIMARY KEY,
|
|
story_id TEXT,
|
|
title TEXT NOT NULL,
|
|
description TEXT,
|
|
start_date TEXT,
|
|
source_count INTEGER,
|
|
bias_src_count INTEGER,
|
|
left_pct REAL,
|
|
ctr_pct REAL,
|
|
right_pct REAL,
|
|
left_src_count INTEGER,
|
|
ctr_src_count INTEGER,
|
|
right_src_count INTEGER,
|
|
overall_bias REAL,
|
|
blindspot TEXT,
|
|
factuality_json TEXT,
|
|
interests_json TEXT,
|
|
categories TEXT,
|
|
first_seen BIGINT,
|
|
last_seen BIGINT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS article_signals (
|
|
id SERIAL PRIMARY KEY,
|
|
article_slug TEXT NOT NULL,
|
|
ticker TEXT NOT NULL,
|
|
company_name TEXT NOT NULL,
|
|
sector TEXT,
|
|
sentiment TEXT,
|
|
sentiment_score REAL,
|
|
entity_score REAL,
|
|
mention_count INTEGER,
|
|
full_text_used INTEGER,
|
|
analyzed_at BIGINT NOT NULL,
|
|
coverage_spread REAL,
|
|
claude_tickers TEXT,
|
|
claude_magnitude INTEGER,
|
|
claude_timeframe TEXT,
|
|
claude_reasoning TEXT,
|
|
momentum_dir TEXT,
|
|
momentum_pct_5d REAL,
|
|
signal_score REAL,
|
|
alert INTEGER,
|
|
UNIQUE(article_slug, ticker)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS positions (
|
|
ticker TEXT PRIMARY KEY,
|
|
shares REAL NOT NULL,
|
|
entry_price REAL NOT NULL,
|
|
entry_date TEXT NOT NULL,
|
|
stop_loss REAL NOT NULL,
|
|
take_profit REAL NOT NULL,
|
|
note TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS position_events (
|
|
id SERIAL PRIMARY KEY,
|
|
ticker TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
shares REAL NOT NULL,
|
|
price REAL NOT NULL,
|
|
total_dkk REAL NOT NULL,
|
|
fee_dkk REAL,
|
|
pnl_dkk REAL,
|
|
signal_correct SMALLINT,
|
|
event_date TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS saxo_orders (
|
|
id SERIAL PRIMARY KEY,
|
|
ticker TEXT NOT NULL,
|
|
direction TEXT NOT NULL,
|
|
shares REAL NOT NULL,
|
|
price_dkk REAL,
|
|
total_dkk REAL,
|
|
fee_dkk REAL,
|
|
saxo_order_id TEXT,
|
|
status TEXT NOT NULL,
|
|
signal_score REAL,
|
|
analyst_rec TEXT,
|
|
placed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
filled_at TIMESTAMPTZ,
|
|
note TEXT
|
|
);
|
|
"""
|
|
|
|
|
|
def init_schema():
|
|
"""Create all tables if they don't exist. Safe to run multiple times."""
|
|
if DB_TYPE == "postgres":
|
|
import psycopg2
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
for stmt in SCHEMA_POSTGRES.split(";"):
|
|
stmt = stmt.strip()
|
|
if stmt:
|
|
cur.execute(stmt)
|
|
# Add UNIQUE constraint for article_signals (article_slug, ticker) if missing
|
|
cur.execute("""
|
|
SELECT 1 FROM pg_constraint
|
|
WHERE conname = 'uq_article_ticker'
|
|
AND conrelid = 'article_signals'::regclass
|
|
""")
|
|
if not cur.fetchone():
|
|
cur.execute(
|
|
"ALTER TABLE article_signals "
|
|
"ADD CONSTRAINT uq_article_ticker UNIQUE (article_slug, ticker)"
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
else:
|
|
conn = sqlite3.connect(str(SQLITE_PATH))
|
|
conn.executescript(SCHEMA_SQLITE)
|
|
conn.commit()
|
|
conn.close()
|
|
print(f" Schema initialiseret ({DB_TYPE})")
|
|
|
|
|
|
# ── SQLite → Postgres migration ───────────────────────────────────────────────
|
|
|
|
def migrate_sqlite_to_postgres():
|
|
"""
|
|
One-time migration: copy all data from local SQLite to Postgres.
|
|
Safe to run multiple times (uses INSERT OR IGNORE / ON CONFLICT DO NOTHING).
|
|
"""
|
|
if DB_TYPE != "postgres":
|
|
print("DATABASE_URL ikke sat — kører ikke migration")
|
|
return
|
|
|
|
import psycopg2
|
|
sqlite_conn = sqlite3.connect(str(SQLITE_PATH))
|
|
sqlite_conn.row_factory = sqlite3.Row
|
|
pg_conn = get_conn()
|
|
pg_cur = pg_conn.cursor()
|
|
|
|
tables = [
|
|
"page_cache", "rss_feed_cache", "articles",
|
|
"article_signals", "positions", "position_events"
|
|
]
|
|
|
|
for table in tables:
|
|
rows = sqlite_conn.execute(f"SELECT * FROM {table}").fetchall()
|
|
if not rows:
|
|
print(f" {table}: 0 rækker — skip")
|
|
continue
|
|
|
|
cols = list(rows[0].keys())
|
|
cols_sql = ", ".join(cols)
|
|
vals_sql = ", ".join(["%s"] * len(cols))
|
|
|
|
# SERIAL columns (id) — let Postgres auto-assign
|
|
insert_cols = [c for c in cols if c != "id"] if "id" in cols else cols
|
|
insert_vals = ", ".join(["%s"] * len(insert_cols))
|
|
insert_cols_sql = ", ".join(insert_cols)
|
|
|
|
inserted = 0
|
|
skipped = 0
|
|
for row in rows:
|
|
# Coerce bytes (SQLite blob) to float/None for Postgres REAL columns
|
|
values = []
|
|
for c in insert_cols:
|
|
v = row[c]
|
|
if isinstance(v, (bytes, bytearray)):
|
|
try:
|
|
import struct
|
|
v = struct.unpack('d', v)[0]
|
|
except Exception:
|
|
v = None
|
|
values.append(v)
|
|
values = tuple(values)
|
|
try:
|
|
pg_cur.execute(
|
|
f"INSERT INTO {table} ({insert_cols_sql}) VALUES ({insert_vals}) "
|
|
f"ON CONFLICT DO NOTHING",
|
|
values
|
|
)
|
|
if pg_cur.rowcount > 0:
|
|
inserted += 1
|
|
else:
|
|
skipped += 1
|
|
except Exception as e:
|
|
skipped += 1
|
|
|
|
pg_conn.commit()
|
|
print(f" {table}: {inserted} indsat, {skipped} sprunget over")
|
|
|
|
sqlite_conn.close()
|
|
pg_conn.close()
|
|
print(" Migration færdig!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
cmd = sys.argv[1] if len(sys.argv) > 1 else "status"
|
|
|
|
if cmd == "init":
|
|
init_schema()
|
|
elif cmd == "migrate":
|
|
print(f"Migrerer SQLite → PostgreSQL ...")
|
|
init_schema()
|
|
migrate_sqlite_to_postgres()
|
|
elif cmd == "status":
|
|
conn = get_conn()
|
|
if DB_TYPE == "postgres":
|
|
raw = conn._conn.cursor()
|
|
raw.execute("SELECT tablename FROM pg_tables WHERE schemaname='public'")
|
|
tables = [r[0] for r in raw.fetchall()]
|
|
for t in sorted(tables):
|
|
raw.execute(f"SELECT COUNT(*) FROM {t}")
|
|
print(f" {t}: {raw.fetchone()[0]} rækker")
|
|
raw.close()
|
|
else:
|
|
tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
|
|
for t in tables:
|
|
n = conn.execute(f"SELECT COUNT(*) AS cnt FROM {t['name']}").fetchone()["cnt"]
|
|
print(f" {t['name']}: {n} rækker")
|
|
conn.close()
|
|
print(f"\n DB type: {DB_TYPE}")
|
|
else:
|
|
print("Brug: python db.py [init|migrate|status]")
|