#!/usr/bin/env python3 """ ImapSync Worker --------------- Polls SQLite for queued jobs, executes imapsync, parses output, updates run records and job status. Also handles cron schedules. """ import os import re import sqlite3 import subprocess import time import logging from datetime import datetime DB_PATH = os.environ.get("DB_PATH", "/data/imapsync.db") LOG_DIR = os.environ.get("LOG_DIR", "/data/logs") POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "15")) logging.basicConfig( level=logging.INFO, format="%(asctime)s [WORKER] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger(__name__) os.makedirs(LOG_DIR, exist_ok=True) def get_db(): conn = sqlite3.connect(DB_PATH, timeout=30, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def parse_imapsync_output(text: str) -> dict: """Extract stats from imapsync stdout/stderr.""" stats = {"messages_synced": 0, "messages_skipped": 0, "errors": 0} m = re.search(r"Messages transferred:\s+(\d+)", text) if m: stats["messages_synced"] = int(m.group(1)) m = re.search(r"Messages skipped:\s+(\d+)", text) if m: stats["messages_skipped"] = int(m.group(1)) # Count error lines stats["errors"] = len(re.findall(r"(?i)^\s*(error|err)\b", text, re.MULTILINE)) return stats def check_due_schedules(): """Queue jobs whose cron schedule is due (within last POLL_INTERVAL seconds).""" try: from croniter import croniter except ImportError: return # croniter not installed in this image, skip conn = get_db() try: now = datetime.utcnow() rows = conn.execute( "SELECT id, schedule, last_run FROM sync_jobs " "WHERE enabled=1 AND schedule IS NOT NULL AND schedule != '' " "AND status NOT IN ('running', 'queued')" ).fetchall() for row in rows: try: cron = croniter(row["schedule"]) last_run = datetime.fromisoformat(row["last_run"]) if row["last_run"] else datetime(2000, 1, 1) prev_due = cron.get_prev(datetime) # If last scheduled run is after last actual run, queue it if prev_due > last_run: conn.execute( "UPDATE sync_jobs SET status='queued' WHERE id=?", (row["id"],) ) log.info(f"Job {row['id']} scheduled run queued (cron: {row['schedule']})") except Exception as e: log.warning(f"Cron parse error for job {row['id']}: {e}") conn.commit() finally: conn.close() def run_job(job: sqlite3.Row): job_id = job["id"] log.info(f"Starting job {job_id}: {job['name']}") timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") log_filename = f"job_{job_id}_{timestamp}.log" log_path = os.path.join(LOG_DIR, log_filename) conn = get_db() conn.execute("UPDATE sync_jobs SET status='running', last_run=? WHERE id=?", (datetime.utcnow().isoformat(), job_id)) cur = conn.execute( "INSERT INTO job_runs (job_id, log_file, status) VALUES (?, ?, 'running')", (job_id, log_filename) ) run_id = cur.lastrowid conn.commit() conn.close() # Build imapsync command ssl1 = "--ssl1" if job["src_ssl"] else "--nossl1" ssl2 = "--ssl2" if job["dst_ssl"] else "--nossl2" cmd = [ "imapsync", "--host1", job["src_host"], "--port1", str(job["src_port"]), ssl1, "--user1", job["src_user"], "--password1", job["src_password"], "--host2", job["dst_host"], "--port2", str(job["dst_port"]), ssl2, "--user2", job["dst_user"], "--password2", job["dst_password"], "--nolog", ] if job["extra_args"]: cmd += job["extra_args"].split() started = time.time() exit_code = 0 output = "" try: with open(log_path, "w") as lf: lf.write(f"# ImapSync Job: {job['name']}\n") lf.write(f"# Started: {datetime.utcnow().isoformat()}\n") lf.write(f"# Command: {' '.join(cmd[:20])}...\n\n") result = subprocess.run( cmd, stdout=lf, stderr=subprocess.STDOUT, timeout=7200, # 2h max ) exit_code = result.returncode with open(log_path, "r", errors="replace") as lf: output = lf.read() except subprocess.TimeoutExpired: log.error(f"Job {job_id} timed out after 2h") exit_code = -1 output = "TIMEOUT after 2 hours" with open(log_path, "a") as lf: lf.write("\n\nTIMEOUT: Job exceeded 2 hour limit\n") except Exception as e: log.error(f"Job {job_id} exception: {e}") exit_code = -2 output = str(e) with open(log_path, "a") as lf: lf.write(f"\n\nEXCEPTION: {e}\n") duration = int(time.time() - started) stats = parse_imapsync_output(output) job_status = "done" if exit_code == 0 else "failed" conn = get_db() conn.execute(""" UPDATE job_runs SET status=?, finished_at=?, messages_synced=?, messages_skipped=?, errors=?, duration_sec=? WHERE id=? """, (job_status, datetime.utcnow().isoformat(), stats["messages_synced"], stats["messages_skipped"], stats["errors"], duration, run_id)) conn.execute( "UPDATE sync_jobs SET status=? WHERE id=?", ("idle", job_id) ) conn.commit() conn.close() log.info( f"Job {job_id} finished [{job_status}] in {duration}s — " f"synced={stats['messages_synced']} skipped={stats['messages_skipped']} " f"errors={stats['errors']}" ) def main(): log.info(f"Worker started. DB={DB_PATH} LOG_DIR={LOG_DIR} POLL={POLL_INTERVAL}s") # Wait for DB to be initialized by the web container for i in range(30): if os.path.exists(DB_PATH): break log.info(f"Waiting for DB... ({i+1}/30)") time.sleep(2) while True: try: check_due_schedules() conn = get_db() job = conn.execute( "SELECT * FROM sync_jobs WHERE status='queued' AND enabled=1 " "ORDER BY created_at ASC LIMIT 1" ).fetchone() conn.close() if job: run_job(job) else: time.sleep(POLL_INTERVAL) except Exception as e: log.error(f"Worker loop error: {e}", exc_info=True) time.sleep(POLL_INTERVAL) if __name__ == "__main__": main()