From f600cd52f3dd0f0f6941a8bf440c262e0ddb44eb Mon Sep 17 00:00:00 2001 From: Sebastian Serfling Date: Wed, 22 Apr 2026 14:07:13 +0200 Subject: [PATCH] fix: recover stale jobs on worker restart, persist active page on reload - Worker resets running/cancelling jobs to idle on startup to fix jobs stuck after Docker restart - Frontend saves current page to localStorage so reload returns to last visited page instead of always dashboard Co-Authored-By: Claude Sonnet 4.6 --- backend/main.py | 291 ++++++++++++++++------- backend/static/index.html | 484 ++++++++++++++++++++++++++++++-------- worker/Dockerfile | 45 +++- worker/worker.py | 106 +++++++-- 4 files changed, 721 insertions(+), 205 deletions(-) diff --git a/backend/main.py b/backend/main.py index cfbf6b8..9736108 100644 --- a/backend/main.py +++ b/backend/main.py @@ -13,7 +13,6 @@ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel from jose import JWTError, jwt -# ── Config ────────────────────────────────────────────────────────────────── DB_PATH = os.environ.get("DB_PATH", "/data/imapsync.db") LOG_DIR = os.environ.get("LOG_DIR", "/data/logs") SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret-change-me") @@ -22,16 +21,33 @@ TOKEN_EXPIRE_HOURS = 12 os.makedirs(LOG_DIR, exist_ok=True) -# ── DB ─────────────────────────────────────────────────────────────────────── + def get_db(): conn = sqlite3.connect(DB_PATH, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn + def init_db(): conn = get_db() + + tables = [r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()] + needs_migration = 'servers' not in tables and 'sync_jobs' in tables + if needs_migration: + conn.executescript("DROP TABLE IF EXISTS job_runs; DROP TABLE IF EXISTS sync_jobs;") + conn.executescript(""" + CREATE TABLE IF NOT EXISTS servers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + host TEXT NOT NULL, + port INTEGER DEFAULT 993, + ssl INTEGER DEFAULT 1, + direction TEXT NOT NULL DEFAULT 'both', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, @@ -41,25 +57,23 @@ def init_db(): ); CREATE TABLE IF NOT EXISTS sync_jobs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - src_host TEXT NOT NULL, - src_port INTEGER DEFAULT 993, - src_ssl INTEGER DEFAULT 1, - src_user TEXT NOT NULL, - src_password TEXT NOT NULL, - dst_host TEXT NOT NULL, - dst_port INTEGER DEFAULT 993, - dst_ssl INTEGER DEFAULT 1, - dst_user TEXT NOT NULL, - dst_password TEXT NOT NULL, - extra_args TEXT DEFAULT '', - schedule TEXT DEFAULT NULL, - enabled INTEGER DEFAULT 1, - status TEXT DEFAULT 'idle', - last_run DATETIME, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - created_by TEXT + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + src_server_id INTEGER NOT NULL, + src_user TEXT NOT NULL, + src_password TEXT NOT NULL, + dst_server_id INTEGER NOT NULL, + dst_user TEXT NOT NULL, + dst_password TEXT NOT NULL, + extra_args TEXT DEFAULT '', + schedule TEXT DEFAULT NULL, + enabled INTEGER DEFAULT 1, + status TEXT DEFAULT 'idle', + last_run DATETIME, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + created_by TEXT, + FOREIGN KEY (src_server_id) REFERENCES servers(id), + FOREIGN KEY (dst_server_id) REFERENCES servers(id) ); CREATE TABLE IF NOT EXISTS job_runs ( @@ -82,7 +96,6 @@ def init_db(): expires_at DATETIME NOT NULL ); """) - # Create default admin if no users exist cur = conn.execute("SELECT COUNT(*) FROM users") if cur.fetchone()[0] == 0: pw_md5 = hashlib.md5("admin".encode()).hexdigest() @@ -93,91 +106,115 @@ def init_db(): conn.commit() conn.close() -# ── Auth ────────────────────────────────────────────────────────────────────── + def md5_hash(pw: str) -> str: return hashlib.md5(pw.encode()).hexdigest() + def create_token(username: str, role: str) -> str: expire = datetime.utcnow() + timedelta(hours=TOKEN_EXPIRE_HOURS) payload = {"sub": username, "role": role, "exp": expire} return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) + def decode_token(token: str) -> dict: try: return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: raise HTTPException(status_code=401, detail="Token ungültig oder abgelaufen") + security = HTTPBearer() + def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): return decode_token(credentials.credentials) + def require_admin(user=Depends(get_current_user)): if user.get("role") != "admin": raise HTTPException(status_code=403, detail="Nur Admins erlaubt") return user -# ── App ─────────────────────────────────────────────────────────────────────── + +def require_editor(user=Depends(get_current_user)): + if user.get("role") == "viewer": + raise HTTPException(status_code=403, detail="Keine Berechtigung") + return user + + @asynccontextmanager async def lifespan(app: FastAPI): init_db() yield + app = FastAPI(title="ImapSync Manager", lifespan=lifespan) -# ── Schemas ─────────────────────────────────────────────────────────────────── + class LoginRequest(BaseModel): username: str password: str + class UserCreate(BaseModel): username: str password: str - role: str = "viewer" # admin | operator | viewer + role: str = "viewer" + class UserUpdate(BaseModel): password: Optional[str] = None role: Optional[str] = None + +class ServerCreate(BaseModel): + name: str + host: str + port: int = 993 + ssl: bool = True + direction: str = "both" + + +class ServerUpdate(BaseModel): + name: Optional[str] = None + host: Optional[str] = None + port: Optional[int] = None + ssl: Optional[bool] = None + direction: Optional[str] = None + + class SyncJobCreate(BaseModel): name: str - src_host: str - src_port: int = 993 - src_ssl: bool = True + src_server_id: int src_user: str src_password: str - dst_host: str - dst_port: int = 993 - dst_ssl: bool = True + dst_server_id: int dst_user: str dst_password: str extra_args: str = "" schedule: Optional[str] = None enabled: bool = True + class SyncJobUpdate(BaseModel): name: Optional[str] = None - src_host: Optional[str] = None - src_port: Optional[int] = None - src_ssl: Optional[bool] = None + src_server_id: Optional[int] = None src_user: Optional[str] = None src_password: Optional[str] = None - dst_host: Optional[str] = None - dst_port: Optional[int] = None - dst_ssl: Optional[bool] = None + dst_server_id: Optional[int] = None dst_user: Optional[str] = None dst_password: Optional[str] = None extra_args: Optional[str] = None schedule: Optional[str] = None enabled: Optional[bool] = None -# ── Health ──────────────────────────────────────────────────────────────────── + @app.get("/api/health") def health(): return {"status": "ok", "time": datetime.utcnow().isoformat()} -# ── Auth Endpoints ──────────────────────────────────────────────────────────── + @app.post("/api/auth/login") def login(req: LoginRequest): conn = get_db() @@ -191,11 +228,12 @@ def login(req: LoginRequest): token = create_token(row["username"], row["role"]) return {"token": token, "username": row["username"], "role": row["role"]} + @app.get("/api/auth/me") def me(user=Depends(get_current_user)): return user -# ── User Endpoints ──────────────────────────────────────────────────────────── + @app.get("/api/users") def list_users(user=Depends(require_admin)): conn = get_db() @@ -205,6 +243,7 @@ def list_users(user=Depends(require_admin)): conn.close() return [dict(r) for r in rows] + @app.post("/api/users", status_code=201) def create_user(req: UserCreate, user=Depends(require_admin)): conn = get_db() @@ -220,6 +259,7 @@ def create_user(req: UserCreate, user=Depends(require_admin)): conn.close() return {"message": f"Benutzer '{req.username}' erstellt"} + @app.put("/api/users/{user_id}") def update_user(user_id: int, req: UserUpdate, user=Depends(require_admin)): conn = get_db() @@ -241,6 +281,7 @@ def update_user(user_id: int, req: UserUpdate, user=Depends(require_admin)): conn.close() return {"message": "Benutzer aktualisiert"} + @app.delete("/api/users/{user_id}") def delete_user(user_id: int, user=Depends(require_admin)): conn = get_db() @@ -254,15 +295,84 @@ def delete_user(user_id: int, user=Depends(require_admin)): conn.close() return {"message": "Benutzer gelöscht"} -# ── Sync Job Endpoints ──────────────────────────────────────────────────────── + +@app.get("/api/servers") +def list_servers(user=Depends(get_current_user)): + conn = get_db() + rows = conn.execute(""" + SELECT s.*, + (SELECT COUNT(*) FROM sync_jobs j WHERE j.src_server_id = s.id) as used_as_src, + (SELECT COUNT(*) FROM sync_jobs j WHERE j.dst_server_id = s.id) as used_as_dst + FROM servers s ORDER BY s.name + """).fetchall() + conn.close() + return [dict(r) for r in rows] + + +@app.post("/api/servers", status_code=201) +def create_server(req: ServerCreate, user=Depends(require_editor)): + conn = get_db() + cur = conn.execute( + "INSERT INTO servers (name, host, port, ssl, direction) VALUES (?, ?, ?, ?, ?)", + (req.name, req.host, req.port, int(req.ssl), req.direction) + ) + conn.commit() + server_id = cur.lastrowid + conn.close() + return {"message": "Server erstellt", "id": server_id} + + +@app.put("/api/servers/{server_id}") +def update_server(server_id: int, req: ServerUpdate, user=Depends(require_editor)): + conn = get_db() + row = conn.execute("SELECT id FROM servers WHERE id = ?", (server_id,)).fetchone() + if not row: + raise HTTPException(status_code=404, detail="Server nicht gefunden") + fields = {k: v for k, v in req.model_dump().items() if v is not None} + if "ssl" in fields: + fields["ssl"] = int(fields["ssl"]) + if fields: + set_clause = ", ".join(f"{k} = ?" for k in fields) + conn.execute( + f"UPDATE servers SET {set_clause} WHERE id = ?", + (*fields.values(), server_id) + ) + conn.commit() + conn.close() + return {"message": "Server aktualisiert"} + + +@app.delete("/api/servers/{server_id}") +def delete_server(server_id: int, user=Depends(require_editor)): + conn = get_db() + row = conn.execute("SELECT id FROM servers WHERE id = ?", (server_id,)).fetchone() + if not row: + raise HTTPException(status_code=404, detail="Server nicht gefunden") + in_use = conn.execute( + "SELECT COUNT(*) FROM sync_jobs WHERE src_server_id = ? OR dst_server_id = ?", + (server_id, server_id) + ).fetchone()[0] + if in_use > 0: + raise HTTPException(status_code=409, detail="Server wird noch in Jobs verwendet") + conn.execute("DELETE FROM servers WHERE id = ?", (server_id,)) + conn.commit() + conn.close() + return {"message": "Server gelöscht"} + + @app.get("/api/jobs") def list_jobs(user=Depends(get_current_user)): conn = get_db() rows = conn.execute(""" SELECT j.*, + s1.name as src_server_name, s1.host as src_host, s1.port as src_port, s1.ssl as src_ssl, + s2.name as dst_server_name, s2.host as dst_host, s2.port as dst_port, s2.ssl as dst_ssl, (SELECT COUNT(*) FROM job_runs r WHERE r.job_id = j.id) as run_count, (SELECT SUM(messages_synced) FROM job_runs r WHERE r.job_id = j.id) as total_synced - FROM sync_jobs j ORDER BY j.created_at DESC + FROM sync_jobs j + LEFT JOIN servers s1 ON j.src_server_id = s1.id + LEFT JOIN servers s2 ON j.dst_server_id = s2.id + ORDER BY j.created_at DESC """).fetchall() conn.close() result = [] @@ -273,29 +383,41 @@ def list_jobs(user=Depends(get_current_user)): result.append(d) return result + @app.post("/api/jobs", status_code=201) -def create_job(req: SyncJobCreate, user=Depends(get_current_user)): - if user.get("role") == "viewer": - raise HTTPException(status_code=403, detail="Keine Berechtigung") +def create_job(req: SyncJobCreate, user=Depends(require_editor)): conn = get_db() + for sid in [req.src_server_id, req.dst_server_id]: + if not conn.execute("SELECT id FROM servers WHERE id = ?", (sid,)).fetchone(): + conn.close() + raise HTTPException(status_code=404, detail=f"Server-ID {sid} nicht gefunden") cur = conn.execute(""" INSERT INTO sync_jobs - (name,src_host,src_port,src_ssl,src_user,src_password, - dst_host,dst_port,dst_ssl,dst_user,dst_password, - extra_args,schedule,enabled,created_by) - VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) - """, (req.name, req.src_host, req.src_port, int(req.src_ssl), req.src_user, req.src_password, - req.dst_host, req.dst_port, int(req.dst_ssl), req.dst_user, req.dst_password, + (name, src_server_id, src_user, src_password, + dst_server_id, dst_user, dst_password, + extra_args, schedule, enabled, created_by) + VALUES (?,?,?,?,?,?,?,?,?,?,?) + """, (req.name, req.src_server_id, req.src_user, req.src_password, + req.dst_server_id, req.dst_user, req.dst_password, req.extra_args, req.schedule, int(req.enabled), user["sub"])) conn.commit() job_id = cur.lastrowid conn.close() return {"message": "Job erstellt", "id": job_id} + @app.get("/api/jobs/{job_id}") def get_job(job_id: int, user=Depends(get_current_user)): conn = get_db() - row = conn.execute("SELECT * FROM sync_jobs WHERE id = ?", (job_id,)).fetchone() + row = conn.execute(""" + SELECT j.*, + s1.name as src_server_name, s1.host as src_host, s1.port as src_port, s1.ssl as src_ssl, + s2.name as dst_server_name, s2.host as dst_host, s2.port as dst_port, s2.ssl as dst_ssl + FROM sync_jobs j + LEFT JOIN servers s1 ON j.src_server_id = s1.id + LEFT JOIN servers s2 ON j.dst_server_id = s2.id + WHERE j.id = ? + """, (job_id,)).fetchone() conn.close() if not row: raise HTTPException(status_code=404, detail="Job nicht gefunden") @@ -305,21 +427,24 @@ def get_job(job_id: int, user=Depends(get_current_user)): d.pop("dst_password", None) return d + @app.put("/api/jobs/{job_id}") -def update_job(job_id: int, req: SyncJobUpdate, user=Depends(get_current_user)): - if user.get("role") == "viewer": - raise HTTPException(status_code=403, detail="Keine Berechtigung") +def update_job(job_id: int, req: SyncJobUpdate, user=Depends(require_editor)): conn = get_db() row = conn.execute("SELECT id FROM sync_jobs WHERE id = ?", (job_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Job nicht gefunden") fields = {k: v for k, v in req.model_dump().items() if v is not None} - if "src_ssl" in fields: - fields["src_ssl"] = int(fields["src_ssl"]) - if "dst_ssl" in fields: - fields["dst_ssl"] = int(fields["dst_ssl"]) if "enabled" in fields: fields["enabled"] = int(fields["enabled"]) + if "src_server_id" in fields: + if not conn.execute("SELECT id FROM servers WHERE id = ?", (fields["src_server_id"],)).fetchone(): + conn.close() + raise HTTPException(status_code=404, detail="Quell-Server nicht gefunden") + if "dst_server_id" in fields: + if not conn.execute("SELECT id FROM servers WHERE id = ?", (fields["dst_server_id"],)).fetchone(): + conn.close() + raise HTTPException(status_code=404, detail="Ziel-Server nicht gefunden") if fields: set_clause = ", ".join(f"{k} = ?" for k in fields) conn.execute( @@ -330,10 +455,9 @@ def update_job(job_id: int, req: SyncJobUpdate, user=Depends(get_current_user)): conn.close() return {"message": "Job aktualisiert"} + @app.delete("/api/jobs/{job_id}") -def delete_job(job_id: int, user=Depends(get_current_user)): - if user.get("role") == "viewer": - raise HTTPException(status_code=403, detail="Keine Berechtigung") +def delete_job(job_id: int, user=Depends(require_editor)): conn = get_db() conn.execute("DELETE FROM job_runs WHERE job_id = ?", (job_id,)) conn.execute("DELETE FROM sync_jobs WHERE id = ?", (job_id,)) @@ -341,10 +465,9 @@ def delete_job(job_id: int, user=Depends(get_current_user)): conn.close() return {"message": "Job gelöscht"} + @app.post("/api/jobs/{job_id}/trigger") -def trigger_job(job_id: int, user=Depends(get_current_user)): - if user.get("role") == "viewer": - raise HTTPException(status_code=403, detail="Keine Berechtigung") +def trigger_job(job_id: int, user=Depends(require_editor)): conn = get_db() row = conn.execute("SELECT status FROM sync_jobs WHERE id = ?", (job_id,)).fetchone() if not row: @@ -356,20 +479,27 @@ def trigger_job(job_id: int, user=Depends(get_current_user)): conn.close() return {"message": "Job in Warteschlange eingereiht"} + @app.post("/api/jobs/{job_id}/stop") -def stop_job(job_id: int, user=Depends(get_current_user)): - if user.get("role") == "viewer": - raise HTTPException(status_code=403, detail="Keine Berechtigung") +def stop_job(job_id: int, user=Depends(require_editor)): conn = get_db() - conn.execute( - "UPDATE sync_jobs SET status = 'idle' WHERE id = ? AND status = 'queued'", - (job_id,) - ) + row = conn.execute("SELECT status FROM sync_jobs WHERE id = ?", (job_id,)).fetchone() + if not row: + conn.close() + raise HTTPException(status_code=404, detail="Job nicht gefunden") + if row["status"] == "queued": + conn.execute("UPDATE sync_jobs SET status = 'idle' WHERE id = ?", (job_id,)) + elif row["status"] == "running": + conn.execute("UPDATE sync_jobs SET status = 'cancelling' WHERE id = ?", (job_id,)) + conn.execute(""" + UPDATE job_runs SET status = 'cancelled', finished_at = ? + WHERE job_id = ? AND status = 'running' + """, (datetime.utcnow().isoformat(), job_id)) conn.commit() conn.close() - return {"message": "Job aus Warteschlange entfernt (wenn möglich)"} + return {"message": "Job wird abgebrochen"} + -# ── Run / Log Endpoints ─────────────────────────────────────────────────────── @app.get("/api/jobs/{job_id}/runs") def get_job_runs(job_id: int, limit: int = 50, user=Depends(get_current_user)): conn = get_db() @@ -380,6 +510,7 @@ def get_job_runs(job_id: int, limit: int = 50, user=Depends(get_current_user)): conn.close() return [dict(r) for r in rows] + @app.get("/api/runs/{run_id}/log") def get_run_log(run_id: int, user=Depends(get_current_user)): conn = get_db() @@ -393,7 +524,7 @@ def get_run_log(run_id: int, user=Depends(get_current_user)): with open(log_path, "r", errors="replace") as f: return {"content": f.read()} -# ── Stats Endpoint ──────────────────────────────────────────────────────────── + @app.get("/api/stats") def get_stats(user=Depends(get_current_user)): conn = get_db() @@ -406,7 +537,6 @@ def get_stats(user=Depends(get_current_user)): total_synced = conn.execute("SELECT COALESCE(SUM(messages_synced),0) FROM job_runs").fetchone()[0] total_errors = conn.execute("SELECT COALESCE(SUM(errors),0) FROM job_runs").fetchone()[0] - # Last 14 days activity daily = conn.execute(""" SELECT DATE(started_at) as day, COUNT(*) as runs, @@ -417,7 +547,6 @@ def get_stats(user=Depends(get_current_user)): GROUP BY day ORDER BY day """).fetchall() - # Recent runs recent = conn.execute(""" SELECT r.id, r.job_id, j.name as job_name, r.started_at, r.finished_at, r.status, r.messages_synced, r.errors, r.duration_sec @@ -426,18 +555,22 @@ def get_stats(user=Depends(get_current_user)): ORDER BY r.started_at DESC LIMIT 10 """).fetchall() + total_servers = conn.execute("SELECT COUNT(*) FROM servers").fetchone()[0] + conn.close() return { "jobs": {"total": total_jobs, "active": active_jobs, "running": running, "queued": queued}, "runs": {"total": total_runs, "failed": failed_runs}, "messages": {"synced": total_synced, "errors": total_errors}, + "servers": {"total": total_servers}, "daily": [dict(d) for d in daily], "recent_runs": [dict(r) for r in recent], } -# ── Static Frontend ─────────────────────────────────────────────────────────── + app.mount("/static", StaticFiles(directory="static"), name="static") + @app.get("/{full_path:path}", response_class=HTMLResponse) async def serve_spa(full_path: str): with open("static/index.html", "r") as f: diff --git a/backend/static/index.html b/backend/static/index.html index d093981..bc69131 100644 --- a/backend/static/index.html +++ b/backend/static/index.html @@ -33,7 +33,7 @@ background: var(--bg); color: var(--text); min-height: 100vh; - font-size: 14px; + font-size: 15px; } /* ── Login ── */ @@ -130,10 +130,10 @@ display: flex; align-items: center; gap: 10px; - padding: 9px 20px; + padding: 10px 20px; color: var(--muted); text-decoration: none; - font-size: 13px; + font-size: 14px; font-weight: 500; border-left: 2px solid transparent; transition: all 0.15s; @@ -170,7 +170,7 @@ flex-shrink: 0; } .user-info { flex: 1; min-width: 0; } - .user-info .name { font-size: 12px; font-weight: 500; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } + .user-info .name { font-size: 14px; font-weight: 500; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } .user-info .role { font-size: 10px; font-family: var(--mono); color: var(--muted); text-transform: uppercase; letter-spacing: 1px; } .logout-btn { background: none; border: none; color: var(--muted); cursor: pointer; @@ -182,7 +182,7 @@ /* ── Main Content ── */ .main { flex: 1; display: flex; flex-direction: column; overflow: hidden; } .topbar { - height: 52px; + height: 54px; border-bottom: 1px solid var(--border); display: flex; align-items: center; @@ -192,7 +192,7 @@ } .page-title { flex: 1; - font-size: 14px; + font-size: 15px; font-weight: 600; font-family: var(--mono); letter-spacing: 0.5px; @@ -223,7 +223,7 @@ } .stat-card .value { font-family: var(--mono); - font-size: 28px; + font-size: 30px; font-weight: 600; line-height: 1; } @@ -261,7 +261,7 @@ table { width: 100%; border-collapse: collapse; } th { text-align: left; - padding: 10px 18px; + padding: 9px 18px; font-family: var(--mono); font-size: 10px; letter-spacing: 1.5px; @@ -270,7 +270,7 @@ border-bottom: 1px solid var(--border2); font-weight: 500; } - td { padding: 11px 18px; border-bottom: 1px solid var(--border2); vertical-align: middle; } + td { padding: 10px 18px; border-bottom: 1px solid var(--border2); vertical-align: middle; } tr:last-child td { border-bottom: none; } tr:hover td { background: var(--surface2); } @@ -298,11 +298,18 @@ .badge.done .badge-dot { background: var(--accent); } .badge.failed { background: rgba(248,81,73,0.15); color: var(--danger); } .badge.failed .badge-dot { background: var(--danger); } + .badge.cancelled { background: rgba(210,153,34,0.15); color: var(--warn); } + .badge.cancelled .badge-dot { background: var(--warn); } + .badge.cancelling { background: rgba(210,153,34,0.15); color: var(--warn); } + .badge.cancelling .badge-dot { background: var(--warn); animation: pulse 0.8s infinite; } .badge.admin { background: rgba(56,139,253,0.15); color: var(--blue); } .badge.operator { background: rgba(210,153,34,0.15); color: var(--warn); } .badge.viewer { background: rgba(125,133,144,0.15); color: var(--muted); } .badge.enabled { background: rgba(46,160,67,0.15); color: var(--accent); } .badge.disabled { background: rgba(125,133,144,0.15); color: var(--muted); } + .badge.source { background: rgba(56,139,253,0.15); color: var(--blue); } + .badge.destination { background: rgba(210,153,34,0.15); color: var(--warn); } + .badge.both { background: rgba(46,160,67,0.15); color: var(--accent); } /* ── Buttons ── */ .btn { @@ -313,7 +320,7 @@ border-radius: 5px; border: 1px solid transparent; font-family: var(--sans); - font-size: 13px; + font-size: 14px; font-weight: 500; cursor: pointer; transition: all 0.15s; @@ -326,14 +333,14 @@ .btn-outline:hover { background: var(--surface2); border-color: var(--muted); } .btn-danger { background: transparent; color: var(--danger); border-color: var(--danger-dim); } .btn-danger:hover { background: rgba(248,81,73,0.1); } - .btn-sm { padding: 4px 10px; font-size: 12px; } + .btn-sm { padding: 4px 10px; font-size: 13px; } .btn-icon { padding: 5px 8px; font-size: 14px; } /* ── Forms ── */ .form-grid { display: grid; grid-template-columns: 1fr 1fr; gap: 14px; } .form-group { display: flex; flex-direction: column; gap: 6px; } .form-group.full { grid-column: 1 / -1; } - label { font-size: 12px; font-weight: 500; color: var(--muted); font-family: var(--mono); letter-spacing: 0.5px; } + label { font-size: 13px; font-weight: 500; color: var(--muted); font-family: var(--mono); letter-spacing: 0.5px; } input, select, textarea { background: var(--bg); border: 1px solid var(--border); @@ -341,7 +348,7 @@ color: var(--text); padding: 8px 12px; font-family: var(--sans); - font-size: 13px; + font-size: 15px; transition: border-color 0.15s; width: 100%; } @@ -350,7 +357,7 @@ border-color: var(--blue); } input[type="checkbox"] { width: auto; } - textarea { resize: vertical; min-height: 80px; font-family: var(--mono); font-size: 12px; } + textarea { resize: vertical; min-height: 80px; font-family: var(--mono); font-size: 14px; } select option { background: var(--surface); } .form-actions { display: flex; gap: 10px; justify-content: flex-end; margin-top: 20px; padding-top: 16px; border-top: 1px solid var(--border2); } .section-divider { @@ -363,6 +370,45 @@ color: var(--muted); text-transform: uppercase; } + .section-divider-prominent { + grid-column: 1 / -1; + display: flex; + align-items: center; + gap: 0; + margin-top: 8px; + } + .section-divider-prominent::before, + .section-divider-prominent::after { + content: ''; + flex: 1; + height: 3px; + border-radius: 2px; + } + .section-divider-prominent.src::before, + .section-divider-prominent.src::after { background: var(--blue); } + .section-divider-prominent.dst::before, + .section-divider-prominent.dst::after { background: var(--danger); } + .section-divider-prominent .section-title { + padding: 0 14px; + font-family: var(--mono); + font-size: 14px; + font-weight: 600; + letter-spacing: 2px; + text-transform: uppercase; + white-space: nowrap; + } + .section-divider-prominent.src .section-title { color: var(--blue); } + .section-divider-prominent.dst .section-title { color: var(--danger); } + .server-info { + background: var(--surface2); + border: 1px solid var(--border2); + border-radius: 4px; + padding: 8px 12px; + font-family: var(--mono); + font-size: 11px; + color: var(--muted); + line-height: 1.6; + } /* ── Modal ── */ .modal-overlay { @@ -386,7 +432,7 @@ border-bottom: 1px solid var(--border2); display: flex; align-items: center; gap: 12px; } - .modal-header h3 { flex: 1; font-family: var(--mono); font-size: 14px; } + .modal-header h3 { flex: 1; font-family: var(--mono); font-size: 15px; } .modal-body { padding: 22px; } .modal-close { background: none; border: none; color: var(--muted); cursor: pointer; font-size: 18px; padding: 4px; } .modal-close:hover { color: var(--text); } @@ -443,7 +489,7 @@ /* ── Misc ── */ .mono { font-family: var(--mono); } .text-muted { color: var(--muted); } - .text-sm { font-size: 12px; } + .text-sm { font-size: 13px; } .flex { display: flex; align-items: center; } .gap-8 { gap: 8px; } .gap-12 { gap: 12px; } @@ -481,8 +527,8 @@ - -
+ +
Standard: admin / admin
@@ -501,6 +547,9 @@ Dashboard + + Server + Sync-Jobs @@ -538,10 +587,8 @@
diff --git a/worker/Dockerfile b/worker/Dockerfile index b5ea63a..cbdbf47 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -1,12 +1,51 @@ FROM debian:bookworm-slim -# Install imapsync and dependencies RUN apt-get update && apt-get install -y --no-install-recommends \ - imapsync \ + wget \ + git \ + lsb-release \ + libauthen-ntlm-perl \ + libdist-checkconflicts-perl \ + libpar-packer-perl \ + libtest-requires-perl \ + libtest-fatal-perl \ + libtest-mock-guard-perl \ + libcgi-pm-perl \ + libcrypt-openssl-rsa-perl \ + libdata-uniqid-perl \ + libencode-imaputf7-perl \ + libfile-copy-recursive-perl \ + libfile-tail-perl \ + libio-socket-inet6-perl \ + libio-socket-ssl-perl \ + libio-tee-perl \ + libhtml-parser-perl \ + libjson-webtoken-perl \ + libmail-imapclient-perl \ + libparse-recdescent-perl \ + libmodule-scandeps-perl \ + libreadonly-perl \ + libregexp-common-perl \ + libsys-meminfo-perl \ + libterm-readkey-perl \ + libtest-mockobject-perl \ + libtest-pod-perl \ + libunicode-string-perl \ + liburi-perl \ + libwww-perl \ + libtest-nowarnings-perl \ + libtest-deep-perl \ + libtest-warn-perl \ + make \ + cpanminus \ + gcc \ python3 \ - python3-pip \ + ca-certificates \ && rm -rf /var/lib/apt/lists/* +RUN wget -O /usr/local/bin/imapsync https://imapsync.lamiral.info/imapsync && \ + chmod +x /usr/local/bin/imapsync + WORKDIR /app COPY worker.py . diff --git a/worker/worker.py b/worker/worker.py index 698c068..4b7e13d 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -36,7 +36,6 @@ def get_db(): def parse_imapsync_output(text: str) -> dict: - """Extract stats from imapsync stdout/stderr.""" stats = {"messages_synced": 0, "messages_skipped": 0, "errors": 0} m = re.search(r"Messages transferred:\s+(\d+)", text) if m: @@ -44,17 +43,15 @@ def parse_imapsync_output(text: str) -> dict: m = re.search(r"Messages skipped:\s+(\d+)", text) if m: stats["messages_skipped"] = int(m.group(1)) - # Count error lines stats["errors"] = len(re.findall(r"(?i)^\s*(error|err)\b", text, re.MULTILINE)) return stats def check_due_schedules(): - """Queue jobs whose cron schedule is due (within last POLL_INTERVAL seconds).""" try: from croniter import croniter except ImportError: - return # croniter not installed in this image, skip + return conn = get_db() try: @@ -70,7 +67,6 @@ def check_due_schedules(): cron = croniter(row["schedule"]) last_run = datetime.fromisoformat(row["last_run"]) if row["last_run"] else datetime(2000, 1, 1) prev_due = cron.get_prev(datetime) - # If last scheduled run is after last actual run, queue it if prev_due > last_run: conn.execute( "UPDATE sync_jobs SET status='queued' WHERE id=?", @@ -103,7 +99,6 @@ def run_job(job: sqlite3.Row): conn.commit() conn.close() - # Build imapsync command ssl1 = "--ssl1" if job["src_ssl"] else "--nossl1" ssl2 = "--ssl2" if job["dst_ssl"] else "--nossl2" cmd = [ @@ -126,27 +121,44 @@ def run_job(job: sqlite3.Row): started = time.time() exit_code = 0 output = "" + cancelled = False try: with open(log_path, "w") as lf: lf.write(f"# ImapSync Job: {job['name']}\n") lf.write(f"# Started: {datetime.utcnow().isoformat()}\n") lf.write(f"# Command: {' '.join(cmd[:20])}...\n\n") - result = subprocess.run( + lf.flush() + proc = subprocess.Popen( cmd, stdout=lf, stderr=subprocess.STDOUT, - timeout=7200, # 2h max ) - exit_code = result.returncode + while proc.poll() is None: + conn2 = get_db() + row = conn2.execute("SELECT status FROM sync_jobs WHERE id=?", (job_id,)).fetchone() + conn2.close() + if row and row["status"] == "cancelling": + log.info(f"Job {job_id} cancel requested, terminating process") + proc.terminate() + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + cancelled = True + break + time.sleep(3) + elapsed = time.time() - started + if elapsed > 7200: + log.error(f"Job {job_id} timed out after 2h") + proc.kill() + proc.wait() + break + if not cancelled and proc.returncode is not None: + exit_code = proc.returncode with open(log_path, "r", errors="replace") as lf: output = lf.read() - except subprocess.TimeoutExpired: - log.error(f"Job {job_id} timed out after 2h") - exit_code = -1 - output = "TIMEOUT after 2 hours" - with open(log_path, "a") as lf: - lf.write("\n\nTIMEOUT: Job exceeded 2 hour limit\n") except Exception as e: log.error(f"Job {job_id} exception: {e}") exit_code = -2 @@ -156,20 +168,36 @@ def run_job(job: sqlite3.Row): duration = int(time.time() - started) stats = parse_imapsync_output(output) - job_status = "done" if exit_code == 0 else "failed" + + conn3 = get_db() + current_status = conn3.execute("SELECT status FROM sync_jobs WHERE id=?", (job_id,)).fetchone() + conn3.close() + if not cancelled and current_status and current_status["status"] == "cancelling": + cancelled = True + + if cancelled: + job_status = "cancelled" + with open(log_path, "a") as lf: + lf.write("\n\nCANCELLED: Job was cancelled by user\n") + elif exit_code == 0: + job_status = "done" + else: + job_status = "failed" conn = get_db() conn.execute(""" UPDATE job_runs SET status=?, finished_at=?, messages_synced=?, messages_skipped=?, errors=?, duration_sec=? - WHERE id=? + WHERE id=? AND status != 'cancelled' """, (job_status, datetime.utcnow().isoformat(), stats["messages_synced"], stats["messages_skipped"], stats["errors"], duration, run_id)) + if job_status == "cancelled": + conn.execute("UPDATE job_runs SET duration_sec=? WHERE id=?", (duration, run_id)) conn.execute( - "UPDATE sync_jobs SET status=? WHERE id=?", - ("idle", job_id) + "UPDATE sync_jobs SET status='idle' WHERE id=?", + (job_id,) ) conn.commit() conn.close() @@ -181,24 +209,54 @@ def run_job(job: sqlite3.Row): ) +def recover_stale_jobs(): + conn = get_db() + try: + stale = conn.execute( + "SELECT id FROM sync_jobs WHERE status IN ('running', 'cancelling')" + ).fetchall() + for row in stale: + job_id = row["id"] + conn.execute( + "UPDATE job_runs SET status='failed', finished_at=? " + "WHERE job_id=? AND status='running'", + (datetime.utcnow().isoformat(), job_id) + ) + conn.execute( + "UPDATE sync_jobs SET status='idle' WHERE id=?", + (job_id,) + ) + log.warning(f"Recovered stale job {job_id} → idle (worker restart)") + conn.commit() + finally: + conn.close() + + def main(): log.info(f"Worker started. DB={DB_PATH} LOG_DIR={LOG_DIR} POLL={POLL_INTERVAL}s") - # Wait for DB to be initialized by the web container for i in range(30): if os.path.exists(DB_PATH): break log.info(f"Waiting for DB... ({i+1}/30)") time.sleep(2) + recover_stale_jobs() + while True: try: check_due_schedules() conn = get_db() - job = conn.execute( - "SELECT * FROM sync_jobs WHERE status='queued' AND enabled=1 " - "ORDER BY created_at ASC LIMIT 1" - ).fetchone() + job = conn.execute(""" + SELECT j.*, + s1.host as src_host, s1.port as src_port, s1.ssl as src_ssl, + s2.host as dst_host, s2.port as dst_port, s2.ssl as dst_ssl + FROM sync_jobs j + JOIN servers s1 ON j.src_server_id = s1.id + JOIN servers s2 ON j.dst_server_id = s2.id + WHERE j.status='queued' AND j.enabled=1 + ORDER BY j.created_at ASC LIMIT 1 + """).fetchone() conn.close() if job: