Files
mmd/scheduler.py

87 lines
2.7 KiB
Python
Raw Permalink Normal View History

"""
scheduler.py MoneyMaker worker daemon.
Runs the pipeline at scheduled times (Mon-Fri UTC):
04:00 analyze-only (NLP before Copenhagen market open 09:00 CET)
07:30 trade window 1
10:00 trade window 2
12:30 trade window 3
14:30 trade window 4
17:00 daily P&L report
Outputs appended to LOG_DIR/runner_YYYY-MM-DD.log so the dashboard can
show a live log tail. Uses only stdlib no extra scheduler dependency.
"""
import os
import sys
import time
import traceback
from datetime import datetime, timezone
from pathlib import Path
LOG_DIR = Path(os.getenv("LOG_DIR", str(Path(__file__).parent / "data" / "logs")))
LOG_DIR.mkdir(parents=True, exist_ok=True)
# hour, minute, weekdays (0=Mon…4=Fri), analyze_only, is_report
SCHEDULE = [
(4, 0, frozenset(range(5)), True, False),
(7, 30, frozenset(range(5)), False, False),
(10, 0, frozenset(range(5)), False, False),
(12, 30, frozenset(range(5)), False, False),
(14, 30, frozenset(range(5)), False, False),
(17, 0, frozenset(range(5)), False, True),
]
def _log_path() -> Path:
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
return LOG_DIR / f"runner_{today}.log"
def _run_job(analyze_only: bool, is_report: bool) -> None:
log = _log_path()
with open(log, "a", buffering=1) as fh:
old_out, old_err = sys.stdout, sys.stderr
sys.stdout = sys.stderr = fh
try:
if is_report:
from report import print_report
print_report()
else:
from runner import run_pipeline
run_pipeline(analyze_only=analyze_only)
except Exception as exc:
print(f"[scheduler] ERROR: {exc}")
traceback.print_exc()
finally:
sys.stdout = old_out
sys.stderr = old_err
def main() -> None:
last_run: dict = {}
print(f"[scheduler] started — LOG_DIR={LOG_DIR}", flush=True)
while True:
now = datetime.now(timezone.utc)
dow = now.weekday()
for hour, minute, days, analyze_only, is_report in SCHEDULE:
if dow not in days:
continue
key = (hour, minute, now.date())
if key in last_run:
continue
if now.hour == hour and now.minute == minute:
last_run[key] = True
label = "report" if is_report else ("analyze-only" if analyze_only else "full")
print(f"[scheduler] {now.strftime('%H:%M UTC')}{label}", flush=True)
_run_job(analyze_only, is_report)
print(f"[scheduler] {label} done", flush=True)
time.sleep(30)
if __name__ == "__main__":
main()