Files
mmd/saxo_auth.py

221 lines
7.3 KiB
Python
Raw Normal View History

2026-05-26 22:21:27 +02:00
"""
saxo_auth.py OAuth2 token manager for Saxo SIM.
Første gang: åbner browser, du logger ind én gang.
Derefter: auto-refresher token uden brugerinteraktion.
Token gemmes i .saxo_token.json (gitignored).
Brug:
python saxo_auth.py login # første gang — åbner browser
python saxo_auth.py refresh # forny access token
python saxo_auth.py token # print current access token
I kode:
from saxo_auth import get_token
token = get_token() # returnerer gyldigt token, refresher automatisk
"""
import os
import sys
import json
import time
import webbrowser
import urllib.parse
import http.server
import threading
import requests
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
APP_KEY = os.getenv("SAXO_APP_KEY", "")
APP_SECRET = os.getenv("SAXO_APP_SECRET_1", "")
AUTH_URL = os.getenv("SAXO_AUTH_URL", "https://sim.logonvalidation.net/authorize")
TOKEN_URL = os.getenv("SAXO_TOKEN_URL", "https://sim.logonvalidation.net/token")
REDIRECT = os.getenv("SAXO_REDIRECT", "http://localhost:8765/callback")
TOKEN_FILE = Path(__file__).parent / ".saxo_token.json"
REDIRECT_PORT = 8765
# ── Token storage ────────────────────────────────────────────────────────────
def _load() -> dict:
if TOKEN_FILE.exists():
return json.loads(TOKEN_FILE.read_text())
return {}
def _save(data: dict):
TOKEN_FILE.write_text(json.dumps(data, indent=2))
TOKEN_FILE.chmod(0o600) # only owner can read
def _is_expired(data: dict, margin_sec: int = 60) -> bool:
expires_at = data.get("expires_at", 0)
return time.time() >= (expires_at - margin_sec)
# ── OAuth2 flow ───────────────────────────────────────────────────────────────
def _exchange_code(code: str) -> dict:
"""Exchange auth code for access + refresh tokens."""
r = requests.post(TOKEN_URL, data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT,
"client_id": APP_KEY,
"client_secret": APP_SECRET,
}, timeout=15)
r.raise_for_status()
data = r.json()
data["expires_at"] = time.time() + data.get("expires_in", 1200)
return data
def _refresh(refresh_token: str) -> dict:
"""Use refresh token to get a new access token."""
r = requests.post(TOKEN_URL, data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": APP_KEY,
"client_secret": APP_SECRET,
}, timeout=15)
r.raise_for_status()
data = r.json()
data["expires_at"] = time.time() + data.get("expires_in", 1200)
return data
def _do_browser_login() -> str:
"""
Start local callback server, open browser for OAuth login.
Returns the authorization code.
"""
code_holder = {}
done = threading.Event()
class Handler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
code_holder["code"] = params.get("code", [None])[0]
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(b"<h1>Login lykkedes!</h1><p>Du kan lukke dette vindue.</p>")
done.set()
def log_message(self, *args):
pass # suppress access log
server = http.server.HTTPServer(("localhost", REDIRECT_PORT), Handler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
auth_params = {
"response_type": "code",
"client_id": APP_KEY,
"redirect_uri": REDIRECT,
"state": "moneymaker",
}
url = AUTH_URL + "?" + urllib.parse.urlencode(auth_params)
print(f"\n Åbner browser til Saxo login ...")
print(f" Hvis browseren ikke åbner, gå til:\n {url}\n")
webbrowser.open(url)
done.wait(timeout=120)
server.shutdown()
code = code_holder.get("code")
if not code:
raise RuntimeError("Login timeout — ingen callback modtaget inden 120 sekunder.")
return code
# ── Public API ────────────────────────────────────────────────────────────────
def login():
"""Full browser-based login. Call once to get refresh token."""
code = _do_browser_login()
data = _exchange_code(code)
_save(data)
print(" Login OK — token gemt i .saxo_token.json")
print(f" Access token udløber om {data.get('expires_in', '?')} sekunder")
rt = data.get("refresh_token")
if rt:
print(" Refresh token gemt — ingen manuel login fremover!")
else:
print(" ADVARSEL: Ingen refresh token modtaget.")
return data["access_token"]
def refresh():
"""Refresh access token using stored refresh token."""
data = _load()
rt = data.get("refresh_token")
if not rt:
raise RuntimeError("Ingen refresh token — kør: python saxo_auth.py login")
data = _refresh(rt)
_save(data)
print(" Token refreshet OK")
return data["access_token"]
def get_token() -> str:
"""
Return a valid access token. Auto-refreshes if expired.
Falls back to 24h SAXO_TOKEN env var if no stored token.
"""
data = _load()
if not data:
# Fall back to 24h token from .env
fallback = os.getenv("SAXO_TOKEN", "")
if fallback:
return fallback
raise RuntimeError("Ingen token — kør: python saxo_auth.py login")
if _is_expired(data):
rt = data.get("refresh_token")
if rt:
data = _refresh(rt)
_save(data)
else:
raise RuntimeError("Token udløbet og ingen refresh token — kør login igen.")
return data["access_token"]
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
cmd = sys.argv[1] if len(sys.argv) > 1 else "help"
if cmd == "login":
login()
elif cmd == "refresh":
t = refresh()
print(f" Nyt token: {t[:20]}...")
elif cmd == "token":
t = get_token()
print(t)
elif cmd == "status":
data = _load()
if not data:
print("Ingen gemt token. Kør: python saxo_auth.py login")
return
exp = data.get("expires_at", 0)
remaining = int(exp - time.time())
print(f" Token status: {'GYLDIG' if remaining > 0 else 'UDLØBET'}")
print(f" Udløber om: {remaining} sekunder ({remaining//60} min)")
print(f" Refresh token: {'JA' if data.get('refresh_token') else 'NEJ'}")
else:
print("Brug:")
print(" python saxo_auth.py login # første gang — åbner browser")
print(" python saxo_auth.py refresh # forny token manuelt")
print(" python saxo_auth.py token # vis current token")
print(" python saxo_auth.py status # vis token status")
if __name__ == "__main__":
main()