#!/usr/bin/env python3 """ ImapSync Worker --------------- Polls SQLite for queued jobs, executes imapsync, parses output, updates run records and job status. Also handles cron schedules. """ import os import re import sqlite3 import subprocess import time import logging from datetime import datetime DB_PATH = os.environ.get("DB_PATH", "/data/imapsync.db") LOG_DIR = os.environ.get("LOG_DIR", "/data/logs") POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "15")) logging.basicConfig( level=logging.INFO, format="%(asctime)s [WORKER] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger(__name__) os.makedirs(LOG_DIR, exist_ok=True) def get_db(): conn = sqlite3.connect(DB_PATH, timeout=30, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def parse_imapsync_output(text: str) -> dict: stats = {"messages_synced": 0, "messages_skipped": 0, "errors": 0} m = re.search(r"Messages transferred:\s+(\d+)", text) if m: stats["messages_synced"] = int(m.group(1)) m = re.search(r"Messages skipped:\s+(\d+)", text) if m: stats["messages_skipped"] = int(m.group(1)) stats["errors"] = len(re.findall(r"(?i)^\s*(error|err)\b", text, re.MULTILINE)) return stats def check_due_schedules(): try: from croniter import croniter except ImportError: return conn = get_db() try: now = datetime.utcnow() rows = conn.execute( "SELECT id, schedule, last_run FROM sync_jobs " "WHERE enabled=1 AND schedule IS NOT NULL AND schedule != '' " "AND status NOT IN ('running', 'queued')" ).fetchall() for row in rows: try: cron = croniter(row["schedule"]) last_run = datetime.fromisoformat(row["last_run"]) if row["last_run"] else datetime(2000, 1, 1) prev_due = cron.get_prev(datetime) if prev_due > last_run: conn.execute( "UPDATE sync_jobs SET status='queued' WHERE id=?", (row["id"],) ) log.info(f"Job {row['id']} scheduled run queued (cron: {row['schedule']})") except Exception as e: log.warning(f"Cron parse error for job {row['id']}: {e}") conn.commit() finally: conn.close() def run_job(job: sqlite3.Row): job_id = job["id"] log.info(f"Starting job {job_id}: {job['name']}") timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") log_filename = f"job_{job_id}_{timestamp}.log" log_path = os.path.join(LOG_DIR, log_filename) conn = get_db() conn.execute("UPDATE sync_jobs SET status='running', last_run=? WHERE id=?", (datetime.utcnow().isoformat(), job_id)) cur = conn.execute( "INSERT INTO job_runs (job_id, log_file, status) VALUES (?, ?, 'running')", (job_id, log_filename) ) run_id = cur.lastrowid conn.commit() conn.close() ssl1 = "--ssl1" if job["src_ssl"] else "--nossl1" ssl2 = "--ssl2" if job["dst_ssl"] else "--nossl2" cmd = [ "imapsync", "--host1", job["src_host"], "--port1", str(job["src_port"]), ssl1, "--user1", job["src_user"], "--password1", job["src_password"], "--host2", job["dst_host"], "--port2", str(job["dst_port"]), ssl2, "--user2", job["dst_user"], "--password2", job["dst_password"], "--nolog", ] if job["extra_args"]: cmd += job["extra_args"].split() started = time.time() exit_code = 0 output = "" cancelled = False try: with open(log_path, "w") as lf: lf.write(f"# ImapSync Job: {job['name']}\n") lf.write(f"# Started: {datetime.utcnow().isoformat()}\n") lf.write(f"# Command: {' '.join(cmd[:20])}...\n\n") lf.flush() proc = subprocess.Popen( cmd, stdout=lf, stderr=subprocess.STDOUT, ) while proc.poll() is None: conn2 = get_db() row = conn2.execute("SELECT status FROM sync_jobs WHERE id=?", (job_id,)).fetchone() conn2.close() if row and row["status"] == "cancelling": log.info(f"Job {job_id} cancel requested, terminating process") proc.terminate() try: proc.wait(timeout=10) except subprocess.TimeoutExpired: proc.kill() proc.wait() cancelled = True break time.sleep(3) elapsed = time.time() - started if elapsed > 7200: log.error(f"Job {job_id} timed out after 2h") proc.kill() proc.wait() break if not cancelled and proc.returncode is not None: exit_code = proc.returncode with open(log_path, "r", errors="replace") as lf: output = lf.read() except Exception as e: log.error(f"Job {job_id} exception: {e}") exit_code = -2 output = str(e) with open(log_path, "a") as lf: lf.write(f"\n\nEXCEPTION: {e}\n") duration = int(time.time() - started) stats = parse_imapsync_output(output) conn3 = get_db() current_status = conn3.execute("SELECT status FROM sync_jobs WHERE id=?", (job_id,)).fetchone() conn3.close() if not cancelled and current_status and current_status["status"] == "cancelling": cancelled = True if cancelled: job_status = "cancelled" with open(log_path, "a") as lf: lf.write("\n\nCANCELLED: Job was cancelled by user\n") elif exit_code == 0: job_status = "done" else: job_status = "failed" conn = get_db() conn.execute(""" UPDATE job_runs SET status=?, finished_at=?, messages_synced=?, messages_skipped=?, errors=?, duration_sec=? WHERE id=? AND status != 'cancelled' """, (job_status, datetime.utcnow().isoformat(), stats["messages_synced"], stats["messages_skipped"], stats["errors"], duration, run_id)) if job_status == "cancelled": conn.execute("UPDATE job_runs SET duration_sec=? WHERE id=?", (duration, run_id)) conn.execute( "UPDATE sync_jobs SET status='idle' WHERE id=?", (job_id,) ) conn.commit() conn.close() log.info( f"Job {job_id} finished [{job_status}] in {duration}s — " f"synced={stats['messages_synced']} skipped={stats['messages_skipped']} " f"errors={stats['errors']}" ) def recover_stale_jobs(): conn = get_db() try: stale = conn.execute( "SELECT id FROM sync_jobs WHERE status IN ('running', 'cancelling')" ).fetchall() for row in stale: job_id = row["id"] conn.execute( "UPDATE job_runs SET status='failed', finished_at=? " "WHERE job_id=? AND status='running'", (datetime.utcnow().isoformat(), job_id) ) conn.execute( "UPDATE sync_jobs SET status='idle' WHERE id=?", (job_id,) ) log.warning(f"Recovered stale job {job_id} → idle (worker restart)") conn.commit() finally: conn.close() def main(): log.info(f"Worker started. DB={DB_PATH} LOG_DIR={LOG_DIR} POLL={POLL_INTERVAL}s") for i in range(30): if os.path.exists(DB_PATH): break log.info(f"Waiting for DB... ({i+1}/30)") time.sleep(2) recover_stale_jobs() while True: try: check_due_schedules() conn = get_db() job = conn.execute(""" SELECT j.*, s1.host as src_host, s1.port as src_port, s1.ssl as src_ssl, s2.host as dst_host, s2.port as dst_port, s2.ssl as dst_ssl FROM sync_jobs j JOIN servers s1 ON j.src_server_id = s1.id JOIN servers s2 ON j.dst_server_id = s2.id WHERE j.status='queued' AND j.enabled=1 ORDER BY j.created_at ASC LIMIT 1 """).fetchone() conn.close() if job: run_job(job) else: time.sleep(POLL_INTERVAL) except Exception as e: log.error(f"Worker loop error: {e}", exc_info=True) time.sleep(POLL_INTERVAL) if __name__ == "__main__": main()