import os import sqlite3 import hashlib import secrets from datetime import datetime, timedelta from typing import Optional from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Depends, status, Request from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel from jose import JWTError, jwt DB_PATH = os.environ.get("DB_PATH", "/data/imapsync.db") LOG_DIR = os.environ.get("LOG_DIR", "/data/logs") SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret-change-me") ALGORITHM = "HS256" TOKEN_EXPIRE_HOURS = 12 os.makedirs(LOG_DIR, exist_ok=True) def get_db(): conn = sqlite3.connect(DB_PATH, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def init_db(): conn = get_db() tables = [r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()] needs_migration = 'servers' not in tables and 'sync_jobs' in tables if needs_migration: conn.executescript("DROP TABLE IF EXISTS job_runs; DROP TABLE IF EXISTS sync_jobs;") conn.executescript(""" CREATE TABLE IF NOT EXISTS servers ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, host TEXT NOT NULL, port INTEGER DEFAULT 993, ssl INTEGER DEFAULT 1, direction TEXT NOT NULL DEFAULT 'both', created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_md5 TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'viewer', created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS sync_jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, src_server_id INTEGER NOT NULL, src_user TEXT NOT NULL, src_password TEXT NOT NULL, dst_server_id INTEGER NOT NULL, dst_user TEXT NOT NULL, dst_password TEXT NOT NULL, extra_args TEXT DEFAULT '', schedule TEXT DEFAULT NULL, enabled INTEGER DEFAULT 1, status TEXT DEFAULT 'idle', last_run DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_by TEXT, FOREIGN KEY (src_server_id) REFERENCES servers(id), FOREIGN KEY (dst_server_id) REFERENCES servers(id) ); CREATE TABLE IF NOT EXISTS job_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_id INTEGER NOT NULL, started_at DATETIME DEFAULT CURRENT_TIMESTAMP, finished_at DATETIME, status TEXT DEFAULT 'running', log_file TEXT, messages_synced INTEGER DEFAULT 0, messages_skipped INTEGER DEFAULT 0, errors INTEGER DEFAULT 0, duration_sec INTEGER DEFAULT 0, FOREIGN KEY (job_id) REFERENCES sync_jobs(id) ); CREATE TABLE IF NOT EXISTS sessions ( token TEXT PRIMARY KEY, username TEXT NOT NULL, expires_at DATETIME NOT NULL ); """) cur = conn.execute("SELECT COUNT(*) FROM users") if cur.fetchone()[0] == 0: pw_md5 = hashlib.md5("admin".encode()).hexdigest() conn.execute( "INSERT INTO users (username, password_md5, role) VALUES (?, ?, ?)", ("admin", pw_md5, "admin") ) conn.commit() conn.close() def md5_hash(pw: str) -> str: return hashlib.md5(pw.encode()).hexdigest() def create_token(username: str, role: str) -> str: expire = datetime.utcnow() + timedelta(hours=TOKEN_EXPIRE_HOURS) payload = {"sub": username, "role": role, "exp": expire} return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def decode_token(token: str) -> dict: try: return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: raise HTTPException(status_code=401, detail="Token ungültig oder abgelaufen") security = HTTPBearer() def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): return decode_token(credentials.credentials) def require_admin(user=Depends(get_current_user)): if user.get("role") != "admin": raise HTTPException(status_code=403, detail="Nur Admins erlaubt") return user def require_editor(user=Depends(get_current_user)): if user.get("role") == "viewer": raise HTTPException(status_code=403, detail="Keine Berechtigung") return user @asynccontextmanager async def lifespan(app: FastAPI): init_db() yield app = FastAPI(title="ImapSync Manager", lifespan=lifespan) class LoginRequest(BaseModel): username: str password: str class UserCreate(BaseModel): username: str password: str role: str = "viewer" class UserUpdate(BaseModel): password: Optional[str] = None role: Optional[str] = None class ServerCreate(BaseModel): name: str host: str port: int = 993 ssl: bool = True direction: str = "both" class ServerUpdate(BaseModel): name: Optional[str] = None host: Optional[str] = None port: Optional[int] = None ssl: Optional[bool] = None direction: Optional[str] = None class SyncJobCreate(BaseModel): name: str src_server_id: int src_user: str src_password: str dst_server_id: int dst_user: str dst_password: str extra_args: str = "" schedule: Optional[str] = None enabled: bool = True class SyncJobUpdate(BaseModel): name: Optional[str] = None src_server_id: Optional[int] = None src_user: Optional[str] = None src_password: Optional[str] = None dst_server_id: Optional[int] = None dst_user: Optional[str] = None dst_password: Optional[str] = None extra_args: Optional[str] = None schedule: Optional[str] = None enabled: Optional[bool] = None @app.get("/api/health") def health(): return {"status": "ok", "time": datetime.utcnow().isoformat()} @app.post("/api/auth/login") def login(req: LoginRequest): conn = get_db() row = conn.execute( "SELECT * FROM users WHERE username = ? AND password_md5 = ?", (req.username, md5_hash(req.password)) ).fetchone() conn.close() if not row: raise HTTPException(status_code=401, detail="Ungültige Zugangsdaten") token = create_token(row["username"], row["role"]) return {"token": token, "username": row["username"], "role": row["role"]} @app.get("/api/auth/me") def me(user=Depends(get_current_user)): return user @app.get("/api/users") def list_users(user=Depends(require_admin)): conn = get_db() rows = conn.execute( "SELECT id, username, role, created_at FROM users ORDER BY created_at DESC" ).fetchall() conn.close() return [dict(r) for r in rows] @app.post("/api/users", status_code=201) def create_user(req: UserCreate, user=Depends(require_admin)): conn = get_db() try: conn.execute( "INSERT INTO users (username, password_md5, role) VALUES (?, ?, ?)", (req.username, md5_hash(req.password), req.role) ) conn.commit() except sqlite3.IntegrityError: raise HTTPException(status_code=409, detail="Benutzername bereits vergeben") finally: conn.close() return {"message": f"Benutzer '{req.username}' erstellt"} @app.put("/api/users/{user_id}") def update_user(user_id: int, req: UserUpdate, user=Depends(require_admin)): conn = get_db() row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") updates = {} if req.password: updates["password_md5"] = md5_hash(req.password) if req.role: updates["role"] = req.role if updates: set_clause = ", ".join(f"{k} = ?" for k in updates) conn.execute( f"UPDATE users SET {set_clause} WHERE id = ?", (*updates.values(), user_id) ) conn.commit() conn.close() return {"message": "Benutzer aktualisiert"} @app.delete("/api/users/{user_id}") def delete_user(user_id: int, user=Depends(require_admin)): conn = get_db() row = conn.execute("SELECT username FROM users WHERE id = ?", (user_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") if row["username"] == user["sub"]: raise HTTPException(status_code=400, detail="Eigenen Account nicht löschbar") conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) conn.commit() conn.close() return {"message": "Benutzer gelöscht"} @app.get("/api/servers") def list_servers(user=Depends(get_current_user)): conn = get_db() rows = conn.execute(""" SELECT s.*, (SELECT COUNT(*) FROM sync_jobs j WHERE j.src_server_id = s.id) as used_as_src, (SELECT COUNT(*) FROM sync_jobs j WHERE j.dst_server_id = s.id) as used_as_dst FROM servers s ORDER BY s.name """).fetchall() conn.close() return [dict(r) for r in rows] @app.post("/api/servers", status_code=201) def create_server(req: ServerCreate, user=Depends(require_editor)): conn = get_db() cur = conn.execute( "INSERT INTO servers (name, host, port, ssl, direction) VALUES (?, ?, ?, ?, ?)", (req.name, req.host, req.port, int(req.ssl), req.direction) ) conn.commit() server_id = cur.lastrowid conn.close() return {"message": "Server erstellt", "id": server_id} @app.put("/api/servers/{server_id}") def update_server(server_id: int, req: ServerUpdate, user=Depends(require_editor)): conn = get_db() row = conn.execute("SELECT id FROM servers WHERE id = ?", (server_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Server nicht gefunden") fields = {k: v for k, v in req.model_dump().items() if v is not None} if "ssl" in fields: fields["ssl"] = int(fields["ssl"]) if fields: set_clause = ", ".join(f"{k} = ?" for k in fields) conn.execute( f"UPDATE servers SET {set_clause} WHERE id = ?", (*fields.values(), server_id) ) conn.commit() conn.close() return {"message": "Server aktualisiert"} @app.delete("/api/servers/{server_id}") def delete_server(server_id: int, user=Depends(require_editor)): conn = get_db() row = conn.execute("SELECT id FROM servers WHERE id = ?", (server_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Server nicht gefunden") in_use = conn.execute( "SELECT COUNT(*) FROM sync_jobs WHERE src_server_id = ? OR dst_server_id = ?", (server_id, server_id) ).fetchone()[0] if in_use > 0: raise HTTPException(status_code=409, detail="Server wird noch in Jobs verwendet") conn.execute("DELETE FROM servers WHERE id = ?", (server_id,)) conn.commit() conn.close() return {"message": "Server gelöscht"} @app.get("/api/jobs") def list_jobs(user=Depends(get_current_user)): conn = get_db() rows = conn.execute(""" SELECT j.*, s1.name as src_server_name, s1.host as src_host, s1.port as src_port, s1.ssl as src_ssl, s2.name as dst_server_name, s2.host as dst_host, s2.port as dst_port, s2.ssl as dst_ssl, (SELECT COUNT(*) FROM job_runs r WHERE r.job_id = j.id) as run_count, (SELECT SUM(messages_synced) FROM job_runs r WHERE r.job_id = j.id) as total_synced FROM sync_jobs j LEFT JOIN servers s1 ON j.src_server_id = s1.id LEFT JOIN servers s2 ON j.dst_server_id = s2.id ORDER BY j.created_at DESC """).fetchall() conn.close() result = [] for r in rows: d = dict(r) d.pop("src_password", None) d.pop("dst_password", None) result.append(d) return result @app.post("/api/jobs", status_code=201) def create_job(req: SyncJobCreate, user=Depends(require_editor)): conn = get_db() for sid in [req.src_server_id, req.dst_server_id]: if not conn.execute("SELECT id FROM servers WHERE id = ?", (sid,)).fetchone(): conn.close() raise HTTPException(status_code=404, detail=f"Server-ID {sid} nicht gefunden") cur = conn.execute(""" INSERT INTO sync_jobs (name, src_server_id, src_user, src_password, dst_server_id, dst_user, dst_password, extra_args, schedule, enabled, created_by) VALUES (?,?,?,?,?,?,?,?,?,?,?) """, (req.name, req.src_server_id, req.src_user, req.src_password, req.dst_server_id, req.dst_user, req.dst_password, req.extra_args, req.schedule, int(req.enabled), user["sub"])) conn.commit() job_id = cur.lastrowid conn.close() return {"message": "Job erstellt", "id": job_id} @app.get("/api/jobs/{job_id}") def get_job(job_id: int, user=Depends(get_current_user)): conn = get_db() row = conn.execute(""" SELECT j.*, s1.name as src_server_name, s1.host as src_host, s1.port as src_port, s1.ssl as src_ssl, s2.name as dst_server_name, s2.host as dst_host, s2.port as dst_port, s2.ssl as dst_ssl FROM sync_jobs j LEFT JOIN servers s1 ON j.src_server_id = s1.id LEFT JOIN servers s2 ON j.dst_server_id = s2.id WHERE j.id = ? """, (job_id,)).fetchone() conn.close() if not row: raise HTTPException(status_code=404, detail="Job nicht gefunden") d = dict(row) if user.get("role") == "viewer": d.pop("src_password", None) d.pop("dst_password", None) return d @app.put("/api/jobs/{job_id}") def update_job(job_id: int, req: SyncJobUpdate, user=Depends(require_editor)): conn = get_db() row = conn.execute("SELECT id FROM sync_jobs WHERE id = ?", (job_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Job nicht gefunden") fields = {k: v for k, v in req.model_dump().items() if v is not None} if "enabled" in fields: fields["enabled"] = int(fields["enabled"]) if "src_server_id" in fields: if not conn.execute("SELECT id FROM servers WHERE id = ?", (fields["src_server_id"],)).fetchone(): conn.close() raise HTTPException(status_code=404, detail="Quell-Server nicht gefunden") if "dst_server_id" in fields: if not conn.execute("SELECT id FROM servers WHERE id = ?", (fields["dst_server_id"],)).fetchone(): conn.close() raise HTTPException(status_code=404, detail="Ziel-Server nicht gefunden") if fields: set_clause = ", ".join(f"{k} = ?" for k in fields) conn.execute( f"UPDATE sync_jobs SET {set_clause} WHERE id = ?", (*fields.values(), job_id) ) conn.commit() conn.close() return {"message": "Job aktualisiert"} @app.delete("/api/jobs/{job_id}") def delete_job(job_id: int, user=Depends(require_editor)): conn = get_db() conn.execute("DELETE FROM job_runs WHERE job_id = ?", (job_id,)) conn.execute("DELETE FROM sync_jobs WHERE id = ?", (job_id,)) conn.commit() conn.close() return {"message": "Job gelöscht"} @app.post("/api/jobs/{job_id}/trigger") def trigger_job(job_id: int, user=Depends(require_editor)): conn = get_db() row = conn.execute("SELECT status FROM sync_jobs WHERE id = ?", (job_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Job nicht gefunden") if row["status"] in ("queued", "running"): raise HTTPException(status_code=409, detail="Job läuft bereits oder ist in der Warteschlange") conn.execute("UPDATE sync_jobs SET status = 'queued' WHERE id = ?", (job_id,)) conn.commit() conn.close() return {"message": "Job in Warteschlange eingereiht"} @app.post("/api/jobs/{job_id}/stop") def stop_job(job_id: int, user=Depends(require_editor)): conn = get_db() row = conn.execute("SELECT status FROM sync_jobs WHERE id = ?", (job_id,)).fetchone() if not row: conn.close() raise HTTPException(status_code=404, detail="Job nicht gefunden") if row["status"] == "queued": conn.execute("UPDATE sync_jobs SET status = 'idle' WHERE id = ?", (job_id,)) elif row["status"] == "running": conn.execute("UPDATE sync_jobs SET status = 'cancelling' WHERE id = ?", (job_id,)) conn.execute(""" UPDATE job_runs SET status = 'cancelled', finished_at = ? WHERE job_id = ? AND status = 'running' """, (datetime.utcnow().isoformat(), job_id)) conn.commit() conn.close() return {"message": "Job wird abgebrochen"} @app.get("/api/jobs/{job_id}/runs") def get_job_runs(job_id: int, limit: int = 50, user=Depends(get_current_user)): conn = get_db() rows = conn.execute(""" SELECT * FROM job_runs WHERE job_id = ? ORDER BY started_at DESC LIMIT ? """, (job_id, limit)).fetchall() conn.close() return [dict(r) for r in rows] @app.get("/api/runs/{run_id}/log") def get_run_log(run_id: int, user=Depends(get_current_user)): conn = get_db() row = conn.execute("SELECT log_file FROM job_runs WHERE id = ?", (run_id,)).fetchone() conn.close() if not row or not row["log_file"]: raise HTTPException(status_code=404, detail="Kein Log gefunden") log_path = os.path.join(LOG_DIR, row["log_file"]) if not os.path.exists(log_path): raise HTTPException(status_code=404, detail="Logdatei nicht vorhanden") with open(log_path, "r", errors="replace") as f: return {"content": f.read()} @app.get("/api/stats") def get_stats(user=Depends(get_current_user)): conn = get_db() total_jobs = conn.execute("SELECT COUNT(*) FROM sync_jobs").fetchone()[0] active_jobs = conn.execute("SELECT COUNT(*) FROM sync_jobs WHERE enabled=1").fetchone()[0] running = conn.execute("SELECT COUNT(*) FROM sync_jobs WHERE status='running'").fetchone()[0] queued = conn.execute("SELECT COUNT(*) FROM sync_jobs WHERE status='queued'").fetchone()[0] total_runs = conn.execute("SELECT COUNT(*) FROM job_runs").fetchone()[0] failed_runs = conn.execute("SELECT COUNT(*) FROM job_runs WHERE status='failed'").fetchone()[0] total_synced = conn.execute("SELECT COALESCE(SUM(messages_synced),0) FROM job_runs").fetchone()[0] total_errors = conn.execute("SELECT COALESCE(SUM(errors),0) FROM job_runs").fetchone()[0] daily = conn.execute(""" SELECT DATE(started_at) as day, COUNT(*) as runs, COALESCE(SUM(messages_synced),0) as synced, COALESCE(SUM(errors),0) as errors FROM job_runs WHERE started_at >= DATE('now', '-14 days') GROUP BY day ORDER BY day """).fetchall() recent = conn.execute(""" SELECT r.id, r.job_id, j.name as job_name, r.started_at, r.finished_at, r.status, r.messages_synced, r.errors, r.duration_sec FROM job_runs r JOIN sync_jobs j ON j.id = r.job_id ORDER BY r.started_at DESC LIMIT 10 """).fetchall() total_servers = conn.execute("SELECT COUNT(*) FROM servers").fetchone()[0] conn.close() return { "jobs": {"total": total_jobs, "active": active_jobs, "running": running, "queued": queued}, "runs": {"total": total_runs, "failed": failed_runs}, "messages": {"synced": total_synced, "errors": total_errors}, "servers": {"total": total_servers}, "daily": [dict(d) for d in daily], "recent_runs": [dict(r) for r in recent], } app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/{full_path:path}", response_class=HTMLResponse) async def serve_spa(full_path: str): with open("static/index.html", "r") as f: return HTMLResponse(f.read())