""" db.py — Database abstraction layer for MoneyMaker. Supports both PostgreSQL (production, int.i80.dk) and SQLite (local dev). Usage: from db import get_conn, DB_TYPE conn = get_conn() rows = conn.execute("SELECT * FROM articles").fetchall() conn.commit() conn.close() Set DATABASE_URL in .env to use PostgreSQL: DATABASE_URL=postgresql://moneymaker:pass@int.i80.dk:5432/moneymaker Without DATABASE_URL, falls back to SQLite (ground_news.db). """ import os import sqlite3 from pathlib import Path from dotenv import load_dotenv load_dotenv() DATABASE_URL = os.getenv("DATABASE_URL", "") SQLITE_PATH = Path(__file__).parent / "ground_news.db" DB_TYPE = "postgres" if DATABASE_URL else "sqlite" # ── DBConn wrapper ──────────────────────────────────────────────────────────── class DBConn: """ Unified connection wrapper for SQLite and PostgreSQL. - execute(sql, params): normalises ? → %s for Postgres; returns dict-row cursor - upsert(table, pk, cols, vals): cross-DB INSERT OR REPLACE / ON CONFLICT DO UPDATE - commit() / rollback() / close() """ def __init__(self, conn, db_type: str): self._conn = conn self.db_type = db_type def _sql(self, sql: str) -> str: """Translate ? placeholders to %s for Postgres.""" if self.db_type == "postgres": return sql.replace("?", "%s") return sql def execute(self, sql: str, params=None): sql = self._sql(sql) if self.db_type == "postgres": import psycopg2.extras cur = self._conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute(sql, params) return cur else: return self._conn.execute(sql, params) def executemany(self, sql: str, seq): sql = self._sql(sql) if self.db_type == "postgres": cur = self._conn.cursor() cur.executemany(sql, seq) else: self._conn.executemany(sql, seq) def upsert(self, table: str, pk, cols: list, vals: tuple): """ Cross-DB upsert. pk: single column name (str) or list of column names for composite PK. SQLite: INSERT OR REPLACE INTO table (cols) VALUES (?,...) Postgres: INSERT INTO table (cols) VALUES (%s,...) ON CONFLICT (pk) DO UPDATE SET ... """ if isinstance(pk, str): pk = [pk] if self.db_type == "postgres": col_sql = ", ".join(cols) val_sql = ", ".join(["%s"] * len(cols)) non_pk = [c for c in cols if c not in pk] pk_sql = ", ".join(pk) if non_pk: update = ", ".join(f"{c}=EXCLUDED.{c}" for c in non_pk) sql = ( f"INSERT INTO {table} ({col_sql}) VALUES ({val_sql}) " f"ON CONFLICT ({pk_sql}) DO UPDATE SET {update}" ) else: sql = ( f"INSERT INTO {table} ({col_sql}) VALUES ({val_sql}) " f"ON CONFLICT ({pk_sql}) DO NOTHING" ) cur = self._conn.cursor() cur.execute(sql, vals) else: col_sql = ", ".join(cols) val_sql = ", ".join(["?"] * len(cols)) self._conn.execute( f"INSERT OR REPLACE INTO {table} ({col_sql}) VALUES ({val_sql})", vals, ) def commit(self): self._conn.commit() def rollback(self): self._conn.rollback() def close(self): self._conn.close() def __enter__(self): return self def __exit__(self, exc_type, *_): if exc_type: self.rollback() else: self.commit() self.close() # ── Placeholder helpers (kept for legacy usage) ─────────────────────────────── def ph(n: int = 1) -> str: """Return query placeholder: %s for postgres, ? for sqlite.""" return "%s" if DB_TYPE == "postgres" else "?" def placeholders(n: int) -> str: """Return comma-separated placeholders for n values.""" p = ph() return ", ".join([p] * n) # ── Connection ──────────────────────────────────────────────────────────────── def get_conn() -> DBConn: """Return a DBConn wrapper (Postgres or SQLite).""" if DB_TYPE == "postgres": import psycopg2 conn = psycopg2.connect(DATABASE_URL) conn.autocommit = False return DBConn(conn, "postgres") else: conn = sqlite3.connect(str(SQLITE_PATH)) conn.row_factory = sqlite3.Row return DBConn(conn, "sqlite") # ── Schema ──────────────────────────────────────────────────────────────────── SCHEMA_SQLITE = """ CREATE TABLE IF NOT EXISTS page_cache ( url TEXT PRIMARY KEY, page_type TEXT NOT NULL, fetched_at INTEGER NOT NULL, content TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS rss_feed_cache ( feed_id TEXT PRIMARY KEY, fetched_at INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS articles ( slug TEXT PRIMARY KEY, story_id TEXT, title TEXT NOT NULL, description TEXT, start_date TEXT, source_count INTEGER, bias_src_count INTEGER, left_pct REAL, ctr_pct REAL, right_pct REAL, left_src_count INTEGER, ctr_src_count INTEGER, right_src_count INTEGER, overall_bias REAL, blindspot TEXT, factuality_json TEXT, interests_json TEXT, categories TEXT, first_seen INTEGER, last_seen INTEGER ); CREATE TABLE IF NOT EXISTS article_signals ( id INTEGER PRIMARY KEY AUTOINCREMENT, article_slug TEXT NOT NULL, ticker TEXT NOT NULL, company_name TEXT NOT NULL, sector TEXT, sentiment TEXT, sentiment_score REAL, entity_score REAL, mention_count INTEGER, full_text_used INTEGER, analyzed_at INTEGER NOT NULL, coverage_spread REAL, claude_tickers TEXT, claude_magnitude INTEGER, claude_timeframe TEXT, claude_reasoning TEXT, momentum_dir TEXT, momentum_pct_5d REAL, signal_score REAL, alert INTEGER ); CREATE TABLE IF NOT EXISTS positions ( ticker TEXT PRIMARY KEY, shares REAL NOT NULL, entry_price REAL NOT NULL, entry_date TEXT NOT NULL, stop_loss REAL NOT NULL, take_profit REAL NOT NULL, note TEXT ); CREATE TABLE IF NOT EXISTS position_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, ticker TEXT NOT NULL, action TEXT NOT NULL, shares REAL NOT NULL, price REAL NOT NULL, total_dkk REAL NOT NULL, fee_dkk REAL, pnl_dkk REAL, signal_correct INTEGER, event_date TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS saxo_orders ( id INTEGER PRIMARY KEY AUTOINCREMENT, ticker TEXT NOT NULL, direction TEXT NOT NULL, shares REAL NOT NULL, price_dkk REAL, total_dkk REAL, fee_dkk REAL, saxo_order_id TEXT, status TEXT NOT NULL, signal_score REAL, analyst_rec TEXT, placed_at TEXT NOT NULL, filled_at TEXT, note TEXT ); """ SCHEMA_POSTGRES = """ CREATE TABLE IF NOT EXISTS page_cache ( url TEXT PRIMARY KEY, page_type TEXT NOT NULL, fetched_at BIGINT NOT NULL, content TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS rss_feed_cache ( feed_id TEXT PRIMARY KEY, fetched_at BIGINT NOT NULL ); CREATE TABLE IF NOT EXISTS articles ( slug TEXT PRIMARY KEY, story_id TEXT, title TEXT NOT NULL, description TEXT, start_date TEXT, source_count INTEGER, bias_src_count INTEGER, left_pct REAL, ctr_pct REAL, right_pct REAL, left_src_count INTEGER, ctr_src_count INTEGER, right_src_count INTEGER, overall_bias REAL, blindspot TEXT, factuality_json TEXT, interests_json TEXT, categories TEXT, first_seen BIGINT, last_seen BIGINT ); CREATE TABLE IF NOT EXISTS article_signals ( id SERIAL PRIMARY KEY, article_slug TEXT NOT NULL, ticker TEXT NOT NULL, company_name TEXT NOT NULL, sector TEXT, sentiment TEXT, sentiment_score REAL, entity_score REAL, mention_count INTEGER, full_text_used INTEGER, analyzed_at BIGINT NOT NULL, coverage_spread REAL, claude_tickers TEXT, claude_magnitude INTEGER, claude_timeframe TEXT, claude_reasoning TEXT, momentum_dir TEXT, momentum_pct_5d REAL, signal_score REAL, alert INTEGER, UNIQUE(article_slug, ticker) ); CREATE TABLE IF NOT EXISTS positions ( ticker TEXT PRIMARY KEY, shares REAL NOT NULL, entry_price REAL NOT NULL, entry_date TEXT NOT NULL, stop_loss REAL NOT NULL, take_profit REAL NOT NULL, note TEXT ); CREATE TABLE IF NOT EXISTS position_events ( id SERIAL PRIMARY KEY, ticker TEXT NOT NULL, action TEXT NOT NULL, shares REAL NOT NULL, price REAL NOT NULL, total_dkk REAL NOT NULL, fee_dkk REAL, pnl_dkk REAL, signal_correct SMALLINT, event_date TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS saxo_orders ( id SERIAL PRIMARY KEY, ticker TEXT NOT NULL, direction TEXT NOT NULL, shares REAL NOT NULL, price_dkk REAL, total_dkk REAL, fee_dkk REAL, saxo_order_id TEXT, status TEXT NOT NULL, signal_score REAL, analyst_rec TEXT, placed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), filled_at TIMESTAMPTZ, note TEXT ); """ def init_schema(): """Create all tables if they don't exist. Safe to run multiple times.""" if DB_TYPE == "postgres": import psycopg2 conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() for stmt in SCHEMA_POSTGRES.split(";"): stmt = stmt.strip() if stmt: cur.execute(stmt) # Add UNIQUE constraint for article_signals (article_slug, ticker) if missing cur.execute(""" SELECT 1 FROM pg_constraint WHERE conname = 'uq_article_ticker' AND conrelid = 'article_signals'::regclass """) if not cur.fetchone(): cur.execute( "ALTER TABLE article_signals " "ADD CONSTRAINT uq_article_ticker UNIQUE (article_slug, ticker)" ) conn.commit() cur.close() conn.close() else: conn = sqlite3.connect(str(SQLITE_PATH)) conn.executescript(SCHEMA_SQLITE) conn.commit() conn.close() print(f" Schema initialiseret ({DB_TYPE})") # ── SQLite → Postgres migration ─────────────────────────────────────────────── def migrate_sqlite_to_postgres(): """ One-time migration: copy all data from local SQLite to Postgres. Safe to run multiple times (uses INSERT OR IGNORE / ON CONFLICT DO NOTHING). """ if DB_TYPE != "postgres": print("DATABASE_URL ikke sat — kører ikke migration") return import psycopg2 sqlite_conn = sqlite3.connect(str(SQLITE_PATH)) sqlite_conn.row_factory = sqlite3.Row pg_conn = get_conn() pg_cur = pg_conn.cursor() tables = [ "page_cache", "rss_feed_cache", "articles", "article_signals", "positions", "position_events" ] for table in tables: rows = sqlite_conn.execute(f"SELECT * FROM {table}").fetchall() if not rows: print(f" {table}: 0 rækker — skip") continue cols = list(rows[0].keys()) cols_sql = ", ".join(cols) vals_sql = ", ".join(["%s"] * len(cols)) # SERIAL columns (id) — let Postgres auto-assign insert_cols = [c for c in cols if c != "id"] if "id" in cols else cols insert_vals = ", ".join(["%s"] * len(insert_cols)) insert_cols_sql = ", ".join(insert_cols) inserted = 0 skipped = 0 for row in rows: # Coerce bytes (SQLite blob) to float/None for Postgres REAL columns values = [] for c in insert_cols: v = row[c] if isinstance(v, (bytes, bytearray)): try: import struct v = struct.unpack('d', v)[0] except Exception: v = None values.append(v) values = tuple(values) try: pg_cur.execute( f"INSERT INTO {table} ({insert_cols_sql}) VALUES ({insert_vals}) " f"ON CONFLICT DO NOTHING", values ) if pg_cur.rowcount > 0: inserted += 1 else: skipped += 1 except Exception as e: skipped += 1 pg_conn.commit() print(f" {table}: {inserted} indsat, {skipped} sprunget over") sqlite_conn.close() pg_conn.close() print(" Migration færdig!") if __name__ == "__main__": import sys cmd = sys.argv[1] if len(sys.argv) > 1 else "status" if cmd == "init": init_schema() elif cmd == "migrate": print(f"Migrerer SQLite → PostgreSQL ...") init_schema() migrate_sqlite_to_postgres() elif cmd == "status": conn = get_conn() if DB_TYPE == "postgres": raw = conn._conn.cursor() raw.execute("SELECT tablename FROM pg_tables WHERE schemaname='public'") tables = [r[0] for r in raw.fetchall()] for t in sorted(tables): raw.execute(f"SELECT COUNT(*) FROM {t}") print(f" {t}: {raw.fetchone()[0]} rækker") raw.close() else: tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() for t in tables: n = conn.execute(f"SELECT COUNT(*) AS cnt FROM {t['name']}").fetchone()["cnt"] print(f" {t['name']}: {n} rækker") conn.close() print(f"\n DB type: {DB_TYPE}") else: print("Brug: python db.py [init|migrate|status]")