Initial commit: Backup Restore Orchestrator

Windmill-Flow + restore.sh für das automatische tägliche Backup-Verifikationssystem.
Direkter Windmill-Sync via `wmill sync push` möglich.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Sebastian Serfling
2026-04-29 21:15:42 +02:00
commit 467eb35225
28 changed files with 2632 additions and 0 deletions
@@ -0,0 +1 @@
SELECT datastore FROM Kunden.`bronze.backup.server.datastore` AS kbbsd WHERE kbbsd.restore = 1
@@ -0,0 +1,17 @@
# py: 3.12
anyio==4.12.1
bcrypt==5.0.0
certifi==2026.2.25
cffi==2.0.0
cryptography==46.0.5
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
invoke==2.2.1
mysql-connector-python==9.6.0
paramiko==4.0.0
pycparser==3.0
pynacl==1.6.2
typing-extensions==4.15.0
wmill==1.657.2
@@ -0,0 +1,40 @@
import wmill, mysql.connector, json
def main(prev: dict):
if prev.get("mode") == "webhook":
return prev
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
# FIX: max_backup_size_gb hinzugefügt
cur.execute("""
SELECT hostname, ip, free_space_gb,
script_deployed, script_version,
restore_mount, restore_path,
max_backup_size_gb,
min_backup_size_gb
FROM Kunden.`bronze.restore.server`
WHERE is_active = 1 AND current_job_uuid IS NULL
ORDER BY free_space_gb DESC
""")
servers = cur.fetchall()
cur.close(); conn.close()
if not servers:
raise Exception("Kein freier Restore-Server verfuegbar!")
for s in servers:
if not s.get("restore_mount"):
raise Exception(
f"restore_mount fuer '{s['hostname']}' nicht konfiguriert!"
)
if not s.get("restore_path"):
raise Exception(
f"restore_path fuer '{s['hostname']}' nicht konfiguriert!"
)
print(f"{len(servers)} freie Restore-Server: "
f"{[s['hostname'] for s in servers]}")
return {**prev, "target_servers": servers}
@@ -0,0 +1,17 @@
# py: 3.12
anyio==4.13.0
bcrypt==5.0.0
certifi==2026.2.25
cffi==2.0.0
cryptography==46.0.6
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
invoke==2.2.1
mysql-connector-python==9.6.0
paramiko==4.0.0
pycparser==3.0
pynacl==1.6.2
typing-extensions==4.15.0
wmill==1.666.0
@@ -0,0 +1,83 @@
import wmill, json, paramiko, mysql.connector, re, io
def main(prev: dict):
if prev.get("mode") == "webhook":
return prev
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT datastore, rsync_target, retention_days
FROM Kunden.`bronze.backup.datastore.config`
WHERE rsync_target IS NOT NULL AND rsync_target != ''
""")
configs = cur.fetchall()
cur.close(); conn.close()
if not configs:
print("Keine Datastore-Configs - Cleanup uebersprungen.")
return prev
backup_server = wmill.get_variable("f/Backup/backup_server_host")
ip_match = re.search(r'https?://([0-9.]+)', backup_server)
ip = ip_match.group(1) if ip_match else backup_server
bw_pass = wmill.get_variable("f/Backup/backup_server_ssh_password")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip, username="root", password=bw_pass)
# Cleanup-Script aufbauen
# Datum-basierter Vergleich statt -mtime
script_lines = ["#!/bin/bash", "set -euo pipefail", ""]
for cfg in configs:
datastore = cfg["datastore"]
target = cfg["rsync_target"]
retention_days = cfg.get("retention_days") or 7
script_lines.append(
f"echo '[{datastore}] Cleanup {target} (Ordner aelter als {retention_days} Tage)'"
)
script_lines.append(f"if [ -d '{target}' ]; then")
script_lines.append(f" cutoff=$(date -d '{retention_days} days ago' +%Y-%m-%d)")
script_lines.append(f" find '{target}' -maxdepth 1 -type d -name '????-??-??' | while read dir; do")
script_lines.append(f" folder_date=$(basename \"$dir\")")
script_lines.append(f" if [[ \"$folder_date\" < \"$cutoff\" ]]; then")
script_lines.append(f" echo \" Loesche: $dir (Datum: $folder_date < Cutoff: $cutoff)\"")
script_lines.append(f" rm -rf \"$dir\"")
script_lines.append(f" else")
script_lines.append(f" echo \" Behalte: $dir\"")
script_lines.append(f" fi")
script_lines.append(f" done")
script_lines.append(f"else")
script_lines.append(f" echo ' WARNUNG: Verzeichnis nicht gefunden: {target}'")
script_lines.append(f"fi")
script_lines.append("")
script_lines.append("echo 'Cleanup abgeschlossen.'")
cleanup_script = "\n".join(script_lines)
# Script per SFTP hochladen statt Heredoc
sftp = ssh.open_sftp()
sftp.putfo(
io.BytesIO(cleanup_script.encode()),
"/tmp/cleanup_restore.sh"
)
sftp.close()
ssh.exec_command("chmod +x /tmp/cleanup_restore.sh")
import time; time.sleep(1)
ssh.exec_command(
"nohup /tmp/cleanup_restore.sh > /tmp/cleanup_restore.log 2>&1 &"
)
ssh.close()
print(f"Cleanup auf {ip} im Hintergrund gestartet.")
print(f"Log: /tmp/cleanup_restore.log")
return {**prev, "cleanup": "started_background"}
@@ -0,0 +1,17 @@
# py: 3.12
anyio==4.13.0
bcrypt==5.0.0
certifi==2026.2.25
cffi==2.0.0
cryptography==46.0.5
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
invoke==2.2.1
mysql-connector-python==9.6.0
paramiko==4.0.0
pycparser==3.0
pynacl==1.6.2
typing-extensions==4.15.0
wmill==1.664.0
@@ -0,0 +1,158 @@
import wmill, json, paramiko, mysql.connector
def start_restore(server, backup, job_uuid, webhook_url, webhook_tok):
"""Startet restore.sh auf einem Server non-blocking."""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
server["ip"],
username=server["ssh_creds"]["user"],
password=server["ssh_creds"]["password"]
)
safe_client = backup["client_name"].replace("/", "_").replace(":", "_")
srv_hostname = server["hostname"]
bk_size = backup.get("backup_size_bytes") or 0
cmd = (
f"nohup /opt/windmill-restore/restore.sh"
f" --job-uuid '{job_uuid}'"
f" --backup-path '{backup['backup_path']}'"
f" --client '{backup['client_name']}'"
f" --restore-mount '{server['restore_mount']}'"
f" --restore-path '{server['restore_path']}'"
f" --rsync-target '{backup['rsync_target']}'"
f" --pbs-storage '{backup['pbs_storage_id']}'"
f" --webhook-url '{webhook_url}'"
f" --webhook-token '{webhook_tok}'"
f" --server-hostname '{srv_hostname}'"
f" --backup-size '{bk_size}'"
f" > /opt/windmill-restore/logs/{safe_client}.log 2>&1 &"
)
ssh.exec_command(cmd)
ssh.close()
def find_backup_for_server(server, backups):
"""
Sucht das erste Backup aus der Liste das zur Server-Größenklasse passt.
max_backup_size_gb = NULL -> kein oberes Limit
min_backup_size_gb = NULL -> kein unteres Limit
Gibt (index, backup) zurueck oder (None, None) wenn nichts passt.
"""
max_gb = server.get("max_backup_size_gb")
min_gb = server.get("min_backup_size_gb")
max_bytes = max_gb * 1024 * 1024 * 1024 if max_gb is not None else None
min_bytes = min_gb * 1024 * 1024 * 1024 if min_gb is not None else None
for i, backup in enumerate(backups):
size = backup.get("backup_size_bytes") or 0
if max_bytes is not None and size > max_bytes:
continue
if min_bytes is not None and size < min_bytes:
continue
return i, backup
return None, None
def main(prev: dict):
if prev.get("mode") == "webhook":
return prev
servers = prev.get("target_servers", [])
backups = list(prev.get("backups", []))
job_uuid = prev["job_uuid"]
if not backups:
return {**prev, "status": "no_backups"}
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor()
webhook_url = wmill.get_variable("f/Backup/windmill_webhook_url")
webhook_tok = wmill.get_variable("f/Backup/windmill_webhook_token")
servers_sorted = sorted(
servers,
key=lambda s: s.get("max_backup_size_gb") or 999999,
reverse=True
)
started = []
for server in servers_sorted:
if not backups:
break
idx, backup = find_backup_for_server(server, backups)
if backup is None:
print(f"Kein passendes Backup fuer '{server['hostname']}' "
f"(max: {server.get('max_backup_size_gb')} GB)")
continue
backups.pop(idx)
def fail_backup(msg, bk=backup, sv=server):
cur.execute("""
INSERT INTO Kunden.`bronze.restore.result`
(job_uuid, client_name, backup_path,
backup_size_bytes, restore_server, status, error_message)
VALUES (%s, %s, %s, %s, %s, 'failed', %s)
""", (job_uuid, bk["client_name"], bk["backup_path"],
bk.get("backup_size_bytes", 0), sv["hostname"], msg))
cur.execute("""
UPDATE Kunden.`bronze.restore.jobs`
SET failed_count=failed_count+1 WHERE job_uuid=%s
""", (job_uuid,))
conn.commit()
if not backup.get("rsync_target"):
fail_backup("rsync_target fehlt"); continue
if not backup.get("pbs_storage_id"):
fail_backup("pbs_storage_id fehlt"); continue
if not server.get("restore_mount"):
fail_backup("restore_mount fehlt"); continue
if not server.get("restore_path"):
fail_backup("restore_path fehlt"); continue
client_like = f"{backup['client_name']}%"
cur.execute("""
INSERT INTO Kunden.`bronze.restore.result`
(job_uuid, client_name, backup_path,
backup_size_bytes, restore_server, status, started_at)
VALUES (%s, %s, %s, %s, %s, 'restoring', NOW())
""", (job_uuid, backup["client_name"], backup["backup_path"],
backup.get("backup_size_bytes", 0), server["hostname"]))
cur.execute("""
UPDATE Kunden.`bronze.restore.server`
SET current_job_uuid=%s WHERE hostname=%s
""", (job_uuid, server["hostname"]))
cur.execute("""
UPDATE Kunden.`bronze.backup.queue`
SET status='assigned'
WHERE job_uuid=%s AND backup_path LIKE %s
""", (job_uuid, client_like))
conn.commit()
start_restore(server, backup, job_uuid, webhook_url, webhook_tok)
size_gb = (backup.get("backup_size_bytes") or 0) / 1024 / 1024 / 1024
print(f"Restore gestartet: {backup['client_name']} "
f"({size_gb:.1f} GB) auf {server['hostname']} "
f"(max: {server.get('max_backup_size_gb')} GB)")
started.append({
"client": backup["client_name"],
"server": server["hostname"],
})
cur.close(); conn.close()
return {
**prev,
"status": "restore_started",
"started": started,
"backups": backups,
}
@@ -0,0 +1,121 @@
summary: Backup Restore Orchestrator
description: |
Startet täglich um 00:11 Uhr. Holt Backup-Liste direkt via
proxmox-backup-client, schreibt Queue nach Größe sortiert in DB,
registriert PBS-Datastores auf allen freien Restore-Servern,
startet Restores parallel auf mehreren Servern per Webhook-Chaining.
value:
modules:
- id: f
summary: Aktive Datastores aus DB holen
value:
type: rawscript
content: '!inline aktive_datastores_aus_db_holen.my.sql'
input_transforms:
database:
type: static
value: $res:u/sebastianserfling/fascinating_mysql
lock: ''
language: mysql
- id: a
summary: Job initialisieren & Backup-Queue aus PBS aufbauen
value:
type: rawscript
content: '!inline job_initialisieren_&_backup-queue_aus_pbs_aufbauen.py'
input_transforms:
datastores:
type: javascript
expr: results.f
trigger_type:
type: javascript
expr: "flow_input.job_uuid ? 'webhook' : 'schedule'"
webhook_data:
type: javascript
expr: 'flow_input.job_uuid ? flow_input : {}'
lock: '!inline job_initialisieren_&_backup-queue_aus_pbs_aufbauen.lock'
language: python3
- id: b
summary: Alle freien Restore-Server holen
value:
type: rawscript
content: '!inline alle_freien_restore-server_holen.py'
input_transforms:
prev:
type: javascript
expr: results.a
lock: '!inline alle_freien_restore-server_holen.lock'
language: python3
- id: g
summary: SSH-Credentials fuer alle Restore-Server aus Bitwarden
value:
type: rawscript
content: '!inline ssh-credentials_fuer_alle_restore-server_aus_bitwarden.py'
input_transforms:
bw_url:
type: static
value: https://bitwarden.stines.de
prev:
type: javascript
expr: results.b
lock: '!inline ssh-credentials_fuer_alle_restore-server_aus_bitwarden.lock'
language: python3
- id: c
summary: Script deployen & PBS-Datastores auf allen Servern registrieren
value:
type: rawscript
content: '!inline
script_deployen_&_pbs-datastores_auf_allen_servern_registrieren.py'
input_transforms:
bw_result:
type: javascript
expr: results.g
datastores:
type: javascript
expr: results.f
prev:
type: javascript
expr: results.g
lock: '!inline
script_deployen_&_pbs-datastores_auf_allen_servern_registrieren.lock'
language: python3
- id: h
summary: Alte Restore-Ordner auf Backup-Server loeschen
value:
type: rawscript
content: '!inline alte_restore-ordner_auf_backup-server_loeschen.py'
input_transforms:
prev:
type: javascript
expr: results.c
lock: '!inline alte_restore-ordner_auf_backup-server_loeschen.lock'
language: python3
- id: d
summary: Ersten Restore pro Server starten
value:
type: rawscript
content: '!inline ersten_restore_pro_server_starten.py'
input_transforms:
prev:
type: javascript
expr: results.h
lock: '!inline ersten_restore_pro_server_starten.lock'
language: python3
continue_on_error: false
- id: e
summary: Webhook verarbeiten & naechsten Restore auf demselben Server starten
value:
type: rawscript
content: '!inline
webhook_verarbeiten_&_naechsten_restore_auf_demselben_server_starten.py'
input_transforms:
from_init:
type: javascript
expr: results.a
lock: '!inline
webhook_verarbeiten_&_naechsten_restore_auf_demselben_server_starten.lock'
language: python3
schema:
$schema: https://json-schema.org/draft/2020-12/schema
type: object
order: []
properties: {}
@@ -0,0 +1,10 @@
# py: 3.12
anyio==4.12.1
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
mysql-connector-python==9.6.0
typing-extensions==4.15.0
wmill==1.662.0
@@ -0,0 +1,161 @@
import wmill, mysql.connector, json, uuid, subprocess, sys, os
from datetime import datetime
def main(
trigger_type: str = "schedule",
webhook_data: dict = {},
datastores: list = [],
):
# Webhook erkennen: kind=webhook ODER job_uuid im payload
if trigger_type == "webhook" or webhook_data.get("job_uuid"):
return {"mode": "webhook", "data": webhook_data}
pbs = json.loads(wmill.get_variable("f/Backup/pbs_variable"))
port = pbs.get("port", 8007)
env = os.environ.copy()
env["PBS_PASSWORD"] = pbs["password"]
if pbs.get("fingerprint"):
env["PBS_FINGERPRINT"] = pbs["fingerprint"]
if subprocess.run(["which", "proxmox-backup-client"],
capture_output=True).returncode != 0:
print("Installiere proxmox-backup-client...", file=sys.stderr)
subprocess.run(["bash", "-c", (
"echo 'deb http://download.proxmox.com/debian/pbs bookworm pbs-no-subscription'"
" > /etc/apt/sources.list.d/pbs-client.list && "
"wget -qO /etc/apt/trusted.gpg.d/proxmox-release-bookworm.gpg "
" https://enterprise.proxmox.com/debian/proxmox-release-bookworm.gpg && "
"apt-get update -qq && apt-get install -y proxmox-backup-client"
)], check=True)
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT datastore, rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.datastore.config`
""")
ds_config = {row["datastore"]: row for row in cur.fetchall()}
all_snaps = []
for row in datastores:
datastore = row["datastore"]
if port != 8007:
repo = f"{pbs['user']}@{pbs['host']}!{port}:{datastore}"
else:
repo = f"{pbs['user']}@{pbs['host']}:{datastore}"
env["PBS_REPOSITORY"] = repo
print(f"Hole Snapshots: {datastore}...", file=sys.stderr)
result = subprocess.run(
["proxmox-backup-client", "snapshots", "--output-format", "json"],
capture_output=True, text=True, env=env,
)
if result.returncode != 0:
print(f"WARNUNG: {datastore} fehlgeschlagen:\n{result.stderr}",
file=sys.stderr)
continue
snaps = json.loads(result.stdout)
snaps = [s for s in snaps if s.get("backup-type") in ("vm", "ct")]
for s in snaps:
s["_datastore"] = datastore
all_snaps.extend(snaps)
print(f" -> {len(snaps)} Snapshots.", file=sys.stderr)
if not all_snaps:
raise Exception("Keine Snapshots gefunden!")
latest: dict = {}
for snap in all_snaps:
key = f"{snap['_datastore']}/{snap['backup-type']}/{snap['backup-id']}"
if key not in latest or snap["backup-time"] > latest[key]["backup-time"]:
latest[key] = snap
sorted_snaps = sorted(latest.values(), key=lambda s: s.get("size", 0), reverse=True)
print(f"{len(sorted_snaps)} Gruppen -> Queue.", file=sys.stderr)
job_uuid = str(uuid.uuid4())
cur.execute("""
SELECT job_uuid FROM Kunden.`bronze.restore.jobs`
WHERE status = 'running'
LIMIT 1
""")
existing = cur.fetchone()
if existing:
cur.close(); conn.close()
raise Exception(
f"Job bereits aktiv: {existing['job_uuid']} "
f"kein neuer Job gestartet."
)
cur.execute("SET time_zone = 'Europe/Berlin'")
cur.execute("""
INSERT INTO Kunden.`bronze.restore.jobs`
(job_uuid, started_at, status, restore_server)
VALUES (%s, NOW(), 'running', '')
""", (job_uuid,))
cur.execute("""
UPDATE Kunden.`bronze.backup.queue`
SET status='obsolete' WHERE status='queued'
""")
for idx, snap in enumerate(sorted_snaps):
backup_type = snap["backup-type"]
backup_id = str(snap["backup-id"])
datastore = snap["_datastore"]
size_bytes = snap.get("size", 0)
ts = snap["backup-time"]
client_name = f"{datastore}:{backup_type}/{backup_id}"
backup_time_str = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%SZ")
backup_path = f"{datastore}:{backup_type}/{backup_id}/{backup_time_str}"
cfg = ds_config.get(datastore, {})
rsync_target = cfg.get("rsync_target")
pbs_storage_id = cfg.get("pbs_storage_id")
cur.execute("""
INSERT INTO Kunden.`bronze.backup.queue`
(job_uuid, client_name, backup_path, backup_size_bytes,
encrypt_key_ref, priority, rsync_target,
pbs_storage_id, status, last_seen_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, 'queued', NOW())
""", (
job_uuid, client_name, backup_path, size_bytes,
client_name, idx, rsync_target, pbs_storage_id,
))
cur.execute("""
INSERT INTO Kunden.`bronze.restore.plan`
(job_uuid, client_name, vm_name)
VALUES (%s, %s, %s)
""", (job_uuid, client_name, f"{backup_type}/{backup_id}"))
cur.execute("""
UPDATE Kunden.`bronze.restore.jobs`
SET total_backups=%s WHERE job_uuid=%s
""", (len(sorted_snaps), job_uuid))
conn.commit()
cur.execute("""
SELECT client_name, backup_path, backup_size_bytes,
encrypt_key_ref, priority,
rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.queue`
WHERE job_uuid = %s AND status = 'queued'
ORDER BY priority ASC
""", (job_uuid,))
queued = cur.fetchall()
cur.close(); conn.close()
print(f"Queue: {len(queued)} Backups.", file=sys.stderr)
return {
"mode": "schedule",
"job_uuid": job_uuid,
"backups": queued,
}
@@ -0,0 +1,17 @@
# py: 3.12
anyio==4.12.1
bcrypt==5.0.0
certifi==2026.2.25
cffi==2.0.0
cryptography==46.0.5
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
invoke==2.2.1
mysql-connector-python==9.6.0
paramiko==4.0.0
pycparser==3.0
pynacl==1.6.2
typing-extensions==4.15.0
wmill==1.659.1
@@ -0,0 +1,245 @@
import wmill, json, paramiko, io, mysql.connector, re
GITEA_REPO = "http://172.17.1.251:8080/sebastian.serfling/BackupScript.git"
def deploy_to_server(ssh, server, pbs, pbs_host, pbs_user, pbs_pass,
pbs_port, ds_config, datastores, gitea_token,
backup_server_host, job_uuid, ssh_creds, db_cfg,
script_version):
hostname = server["hostname"]
_, out, _ = ssh.exec_command(
"cat /opt/windmill-restore/version.txt 2>/dev/null || echo none"
)
current_version = out.read().decode().strip()
needs_deploy = current_version != script_version \
or not server.get("script_deployed")
if needs_deploy:
print(f"[{hostname}] Deploye Script v{script_version}...")
ssh.exec_command("mkdir -p /opt/windmill-restore/logs")
ssh.exec_command("which git || apt-get install -y git 2>/dev/null")
repo_url = GITEA_REPO.replace("http://", f"http://{gitea_token}@")
_, out, err = ssh.exec_command(
"cd /opt/windmill-restore && "
"if [ -d 'BackupScript/.git' ]; then "
" cd BackupScript && git pull; "
"else "
" rm -rf BackupScript && "
" git clone '" + repo_url + "' BackupScript; "
"fi"
)
exit_code = out.channel.recv_exit_status()
stderr_out = err.read().decode().strip()
if exit_code != 0:
raise Exception(f"[{hostname}] Git fehlgeschlagen: {stderr_out}")
_, out, err = ssh.exec_command(
"cp /opt/windmill-restore/BackupScript/restore.sh "
" /opt/windmill-restore/restore.sh && "
"chmod +x /opt/windmill-restore/restore.sh && "
"echo '" + script_version + "' > /opt/windmill-restore/version.txt"
)
exit_code = out.channel.recv_exit_status()
stderr_out = err.read().decode().strip()
if exit_code != 0:
raise Exception(f"[{hostname}] Script kopieren fehlgeschlagen: {stderr_out}")
print(f"[{hostname}] Script v{script_version} deployed.")
pbs_conf = "\n".join([
f"PBS_HOST={pbs_host}",
f"PBS_PORT={pbs_port}",
f"PBS_USER={pbs_user}",
f"PBS_PASSWORD={pbs_pass}",
]) + "\n"
sftp = ssh.open_sftp()
sftp.putfo(io.BytesIO(pbs_conf.encode()),
"/opt/windmill-restore/pbs.conf")
sftp.putfo(io.BytesIO(backup_server_host.encode()),
"/opt/windmill-restore/backup_server_host")
sftp.close()
ssh.exec_command("chmod 600 /opt/windmill-restore/pbs.conf")
print(f"[{hostname}] backup_server_host: {backup_server_host}")
else:
print(f"[{hostname}] Script aktuell (v{current_version}).")
ssh.exec_command(
"mkdir -p /opt/windmill-restore/keys && "
"chmod 700 /opt/windmill-restore/keys"
)
_, out, _ = ssh.exec_command(
"pvesm status 2>/dev/null | awk 'NR>1{print $1}'"
)
existing_storages = out.read().decode().splitlines()
pbs_storages = []
for row in datastores:
datastore = row["datastore"]
storage_id = "pbs-" + datastore.lower() \
.replace(" ", "-") \
.replace("_", "-")
ds_cfg = ds_config.get(datastore, {})
fingerprint = ds_cfg.get("fingerprint", "") or ""
keyfile_path = f"/opt/windmill-restore/keys/{datastore}.keyfile"
rsync_src = f"root@{pbs_host}:/root/Scripte/{datastore}.keyfile"
_, out, err = ssh.exec_command(
"if [ -s '" + keyfile_path + "' ]; then "
" echo 'vorhanden'; "
"else "
" rsync -az -e 'ssh -o StrictHostKeyChecking=no' "
" '" + rsync_src + "' '" + keyfile_path + "' "
" && chmod 600 '" + keyfile_path + "' "
" && echo 'geholt'; "
"fi"
)
exit_code = out.channel.recv_exit_status()
stderr_out = err.read().decode().strip()
if exit_code != 0:
raise Exception(
f"[{hostname}] Keyfile fehlgeschlagen fuer '{datastore}': {stderr_out}"
)
if storage_id in existing_storages:
print(f"[{hostname}] Storage '{storage_id}' vorhanden.")
else:
fp_part = f"--fingerprint '{fingerprint}'" if fingerprint else ""
cmd = (
f"pvesm add pbs {storage_id} "
f"--server '{pbs_host}' "
f"--datastore '{datastore}' "
f"--username '{pbs_user}' "
f"--password '{pbs_pass}' "
f"--port {pbs_port} "
f"--encryption-key '{keyfile_path}' "
f"--content backup"
)
_, out, err = ssh.exec_command(cmd)
exit_code = out.channel.recv_exit_status()
stderr = err.read().decode().strip()
if exit_code != 0:
raise Exception(
f"[{hostname}] pvesm add '{datastore}' fehlgeschlagen: {stderr}"
)
print(f"[{hostname}] -> '{storage_id}' registriert")
pbs_storages.append({"datastore": datastore, "storage_id": storage_id})
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor()
if needs_deploy:
cur.execute("""
UPDATE Kunden.`bronze.restore.server`
SET script_deployed=1, script_version=%s WHERE hostname=%s
""", (script_version, hostname))
for entry in pbs_storages:
cur.execute("""
UPDATE Kunden.`bronze.backup.datastore.config`
SET pbs_storage_id=%s WHERE datastore=%s
""", (entry["storage_id"], entry["datastore"]))
cur.execute("""
INSERT INTO Kunden.`bronze.restore.session`
(job_uuid, hostname, ip, ssh_user, ssh_password)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
ip=VALUES(ip), ssh_user=VALUES(ssh_user),
ssh_password=VALUES(ssh_password)
""", (
job_uuid, hostname,
ssh_creds["ip"], ssh_creds["user"], ssh_creds["password"],
))
conn.commit(); cur.close(); conn.close()
print(f"[{hostname}] Session-Creds gespeichert.")
return pbs_storages
def main(prev: dict, bw_result: dict = {}, datastores: list = []):
if prev.get("mode") == "webhook":
return prev
servers = prev["target_servers"]
server_creds = prev.get("server_creds", {})
job_uuid = prev["job_uuid"]
script_version = wmill.get_variable("f/Backup/restore_version").strip()
print(f"Script-Version aus Variable: {script_version}")
pbs = json.loads(wmill.get_variable("f/Backup/pbs_variable"))
pbs_host = pbs["host"]
pbs_user = pbs["user"]
pbs_pass = pbs["password"]
pbs_port = str(pbs.get("port", 8007))
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT datastore, rsync_target, pbs_storage_id, fingerprint
FROM Kunden.`bronze.backup.datastore.config`
""")
ds_config = {row["datastore"]: row for row in cur.fetchall()}
cur.close(); conn.close()
gitea_token = wmill.get_variable("f/Backup/gitea_token")
backup_server_host = wmill.get_variable("f/Backup/backup_server_host")
target_servers_out = []
for server in servers:
hostname = server["hostname"]
creds = server_creds.get(hostname, {})
url = creds.get("url", "")
ip_match = re.search(r'https?://([0-9.]+)', url)
ip = ip_match.group(1) if ip_match else server.get("ip", "")
if not ip:
print(f"WARNUNG: Keine IP fuer '{hostname}' uebersprungen.")
continue
ssh_creds_dict = {
"ip": ip,
"user": creds.get("username", "root"),
"password": creds.get("password", ""),
}
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip, username=ssh_creds_dict["user"],
password=ssh_creds_dict["password"])
try:
pbs_storages = deploy_to_server(
ssh, server, pbs, pbs_host, pbs_user, pbs_pass,
pbs_port, ds_config, datastores, gitea_token,
backup_server_host, job_uuid, ssh_creds_dict, db_cfg,
script_version
)
finally:
ssh.close()
target_servers_out.append({
**server,
"ip": ip,
"ssh_creds": ssh_creds_dict,
"pbs_storages": pbs_storages,
})
if not target_servers_out:
raise Exception("Kein Server konnte vorbereitet werden!")
return {
**prev,
"target_servers": target_servers_out,
"script_version": script_version,
}
@@ -0,0 +1,9 @@
# py: 3.12
anyio==4.12.1
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
typing-extensions==4.15.0
wmill==1.657.2
@@ -0,0 +1,82 @@
import subprocess, sys, json, os, wmill
def bw_lookup(search_term, env, run):
run(["bw", "sync", "--session", env["BW_SESSION"]], check=False)
for attempt in range(3):
result = run(
["bw", "list", "items", "--search", search_term,
"--session", env["BW_SESSION"]]
)
items = json.loads(result.stdout)
if items:
break
import time; time.sleep(3)
if not items:
raise Exception(f"Kein Bitwarden-Eintrag fuer: '{search_term}'")
exact = next(
(i for i in items
if i.get("name","").strip().lower() == search_term.strip().lower()),
None,
)
item = exact if exact else items[0]
url = ((item.get("login",{}).get("uris") or [{}])[0].get("uri","")) \
if item.get("login") else ""
return {
"username": item.get("login",{}).get("username","") if item.get("login") else "",
"password": item.get("login",{}).get("password","") if item.get("login") else "",
"url": url,
}
def main(
prev: dict,
bw_url: str = "https://bitwarden.stines.de",
):
if prev.get("mode") == "webhook":
return prev
servers = prev.get("target_servers", [])
bw_creds = json.loads(wmill.get_variable("f/Backup/bitwarden_api_login"))
env = os.environ.copy()
env["BW_CLIENTID"] = bw_creds["bw_clientid"]
env["BW_CLIENTSECRET"] = bw_creds["bw_clientsecret"]
env["BW_PASSWORD"] = bw_creds["bw_masterpassword"]
def run(cmd, check=True):
return subprocess.run(cmd, env=env, text=True, capture_output=True, check=check)
if subprocess.run(["which", "bw"], capture_output=True).returncode != 0:
run(["wget",
"https://github.com/bitwarden/cli/releases/download/v1.22.1/bw-linux-1.22.1.zip",
"-O", "bw.zip"])
run(["unzip", "bw.zip"])
run(["chmod", "+x", "bw"])
run(["mv", "bw", "/usr/local/bin/bw"])
with open("/etc/hosts", "a") as f:
f.write("172.17.1.3 bitwarden.stines.de\n")
run(["bw", "config", "server", bw_url])
run(["bw", "logout"], check=False)
result = run(["bw", "login", "--apikey"], check=False)
if result.returncode != 0:
raise Exception(f"Bitwarden Login fehlgeschlagen: {result.stderr}")
unlock = run(["bw", "unlock", bw_creds["bw_masterpassword"], "--raw"])
bw_session = unlock.stdout.strip()
if not bw_session:
raise Exception("Vault konnte nicht entsperrt werden")
env["BW_SESSION"] = bw_session
server_creds = {}
for server in servers:
hostname = server["hostname"]
print(f"Hole Creds fuer: {hostname}")
creds = bw_lookup(hostname, env, run)
server_creds[hostname] = creds
print(f" -> OK: {creds['username']}@{hostname}")
run(["bw", "logout"], check=False)
return {**prev, "server_creds": server_creds}
@@ -0,0 +1,17 @@
# py: 3.12
anyio==4.13.0
bcrypt==5.0.0
certifi==2026.2.25
cffi==2.0.0
cryptography==46.0.7
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
invoke==3.0.3
mysql-connector-python==9.6.0
paramiko==4.0.0
pycparser==3.0
pynacl==1.6.2
typing-extensions==4.15.0
wmill==1.680.0
@@ -0,0 +1,404 @@
import wmill, json, mysql.connector, paramiko, re, base64
from datetime import datetime
import httpx
def send_nextcloud_message(message: str):
"""Sendet eine Nachricht an Nextcloud Talk."""
try:
nc_url = wmill.get_variable("f/Backup/nextcloud_talk_url").rstrip("/")
nc_room = wmill.get_variable("f/Backup/nextcloud_talk_room")
nc_user = wmill.get_variable("f/Backup/nextcloud_talk_user")
nc_password = wmill.get_variable("f/Backup/nextcloud_talk_password")
credentials = base64.b64encode(
f"{nc_user}:{nc_password}".encode()
).decode()
url = f"{nc_url}/ocs/v2.php/apps/spreed/api/v1/chat/{nc_room}"
headers = {
"Authorization": f"Basic {credentials}",
"OCS-APIREQUEST": "true",
"Content-Type": "application/json",
"Accept": "application/json",
}
response = httpx.post(
url,
headers=headers,
json={"message": message},
timeout=15,
verify=False,
)
print(f"Nextcloud Talk: HTTP {response.status_code}")
except Exception as e:
print(f"Nextcloud Talk Fehler (nicht kritisch): {e}")
def find_next_backup_for_server(cur, job_uuid, server_hostname, max_backup_size_gb):
if max_backup_size_gb is not None:
max_bytes = max_backup_size_gb * 1024 * 1024 * 1024
cur.execute("""
SELECT q.client_name, q.backup_path, q.backup_size_bytes,
q.rsync_target, q.pbs_storage_id,
r.hostname AS server_hostname,
r.ip AS server_ip,
r.restore_mount,
r.restore_path AS restore_path,
r.free_space_gb,
r.max_backup_size_gb,
r.min_backup_size_gb
FROM Kunden.`bronze.backup.queue` q
JOIN Kunden.`bronze.restore.server` r
ON r.hostname = %s
WHERE q.job_uuid = %s AND q.status = 'queued'
AND r.current_job_uuid = %s
AND (q.backup_size_bytes IS NULL OR q.backup_size_bytes <= %s)
AND (r.min_backup_size_gb IS NULL OR q.backup_size_bytes >= r.min_backup_size_gb * 1024 * 1024 * 1024)
ORDER BY q.priority ASC
LIMIT 1
""", (server_hostname, job_uuid, job_uuid, max_bytes))
else:
cur.execute("""
SELECT q.client_name, q.backup_path, q.backup_size_bytes,
q.rsync_target, q.pbs_storage_id,
r.hostname AS server_hostname,
r.ip AS server_ip,
r.restore_mount,
r.restore_path AS restore_path,
r.free_space_gb,
r.max_backup_size_gb,
r.min_backup_size_gb
FROM Kunden.`bronze.backup.queue` q
JOIN Kunden.`bronze.restore.server` r
ON r.hostname = %s
WHERE q.job_uuid = %s AND q.status = 'queued'
AND r.current_job_uuid = %s
AND (r.min_backup_size_gb IS NULL OR q.backup_size_bytes >= r.min_backup_size_gb * 1024 * 1024 * 1024)
ORDER BY q.priority ASC
LIMIT 1
""", (server_hostname, job_uuid, job_uuid))
return cur.fetchone()
def release_server_and_check_done(cur, conn, job_uuid, server_hostname):
"""Server freigeben und prüfen ob alle Jobs fertig sind."""
cur.execute("""
UPDATE Kunden.`bronze.restore.server`
SET current_job_uuid=NULL
WHERE hostname=%s
""", (server_hostname,))
conn.commit()
print(f"Server '{server_hostname}' fertig.")
cur.execute("""
SELECT COUNT(*) AS cnt FROM Kunden.`bronze.restore.server`
WHERE current_job_uuid = %s
""", (job_uuid,))
still_active = cur.fetchone()["cnt"]
if still_active == 0:
cur.execute("""
UPDATE Kunden.`bronze.restore.jobs`
SET status='completed', finished_at=NOW()
WHERE job_uuid=%s
""", (job_uuid,))
cur.execute("""
DELETE FROM Kunden.`bronze.restore.session`
WHERE job_uuid=%s
""", (job_uuid,))
# Statistik für Abschluss-Nachricht
cur.execute("""
SELECT total_backups, restored_count, failed_count,
TIMESTAMPDIFF(MINUTE, started_at, NOW()) AS duration_min
FROM Kunden.`bronze.restore.jobs`
WHERE job_uuid=%s
""", (job_uuid,))
job_stats = cur.fetchone()
conn.commit()
print(f"Job {job_uuid} vollstaendig abgeschlossen.")
if job_stats:
total = job_stats["total_backups"] or 0
restored = job_stats["restored_count"] or 0
failed = job_stats["failed_count"] or 0
dur_min = job_stats["duration_min"] or 0
dur_str = f"{dur_min//60}h {dur_min%60}m" if dur_min >= 60 else f"{dur_min}m"
if failed == 0:
msg = (
f"✅ **Backup-Job abgeschlossen**\n"
f"Alle {total} Backups erfolgreich | Dauer: {dur_str}"
)
else:
msg = (
f"⚠️ **Backup-Job abgeschlossen mit Fehlern**\n"
f"{restored}/{total} erfolgreich | ❌ {failed} fehlgeschlagen | Dauer: {dur_str}"
)
send_nextcloud_message(msg)
return "all_done"
else:
conn.commit()
print(f"Noch {still_active} Server aktiv.")
return "server_done"
def main(from_init: dict):
if from_init.get("mode") == "schedule":
print("Restores gestartet - Flow wartet auf Webhooks.")
return {"status": "waiting_for_webhook",
"job_uuid": from_init.get("job_uuid")}
data = from_init.get("data", {})
job_uuid = data.get("job_uuid")
client = data.get("client_name")
status = data.get("status", "unknown")
server_hostname = data.get("server_hostname", "")
if not job_uuid or not client:
raise Exception(f"Ungueltiger Webhook-Payload: {data}")
print(f"Webhook: {client} -> {status} (Server: {server_hostname})")
client_like = f"{client}%"
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
cur.execute("""
UPDATE Kunden.`bronze.restore.result` SET
vm_name = %s,
vm_id_original = %s,
vm_id_restored = %s,
restore_duration_sec = %s,
actual_disk_used_bytes = %s,
zip_size_bytes = %s,
zip_duration_sec = %s,
rsync_size_bytes = %s,
rsync_ok = %s,
rsync_retries = %s,
qm_agent_ok = %s,
status = %s,
error_message = %s,
webhook_received_at = %s
WHERE job_uuid = %s AND backup_path LIKE %s
""", (
data.get("vm_name", ""),
data.get("vm_id_original"),
data.get("vm_id_restored"),
data.get("restore_duration_sec"),
data.get("actual_disk_used_bytes"),
data.get("zip_size_bytes"),
data.get("zip_duration_sec"),
data.get("rsync_size_bytes"),
1 if data.get("rsync_ok") else 0,
data.get("rsync_retries", 0),
1 if data.get("qm_agent_ok") in (True, "true", "skipped") else 0,
"done" if status == "success" else "failed",
data.get("error_message", ""),
datetime.now(),
job_uuid, client_like,
))
cur.execute("""
UPDATE Kunden.`bronze.backup.queue` SET status=%s
WHERE job_uuid=%s AND backup_path LIKE %s
""", ("done" if status == "success" else "failed", job_uuid, client_like))
field = "restored_count" if status == "success" else "failed_count"
cur.execute(
f"UPDATE Kunden.`bronze.restore.jobs` "
f"SET {field}={field}+1 WHERE job_uuid=%s",
(job_uuid,)
)
if data.get("free_space_gb") is not None and server_hostname:
cur.execute("""
UPDATE Kunden.`bronze.restore.server`
SET free_space_gb = %s WHERE hostname = %s
""", (data.get("free_space_gb"), server_hostname))
conn.commit()
vm_name = data.get("vm_name") or client
dur_sec = data.get("restore_duration_sec") or 0
dur_str = f"{dur_sec//60}m {dur_sec%60}s" if dur_sec >= 60 else f"{dur_sec}s"
zip_mb = (data.get("zip_size_bytes") or 0) // 1024 // 1024
icon = "" if status == "success" else ""
if status != "success":
err = data.get("error_message", "")[:100]
nc_msg = (
f"{icon} **{vm_name}** ({client})\n"
f"Server: {server_hostname} | Fehler: {err}"
)
send_nextcloud_message(nc_msg)
cur.execute("""
SELECT max_backup_size_gb, min_backup_size_gb, free_space_gb
FROM Kunden.`bronze.restore.server`
WHERE hostname = %s
""", (server_hostname,))
srv_cfg = cur.fetchone()
max_backup_size_gb = srv_cfg["max_backup_size_gb"] if srv_cfg else None
min_backup_size_gb = srv_cfg["min_backup_size_gb"] if srv_cfg else None
max_bytes = max_backup_size_gb * 1024 * 1024 * 1024 if max_backup_size_gb is not None else None
min_bytes = min_backup_size_gb * 1024 * 1024 * 1024 if min_backup_size_gb is not None else None
nxt = find_next_backup_for_server(cur, job_uuid, server_hostname, max_backup_size_gb)
conn.commit()
if not nxt:
if max_backup_size_gb is not None and min_backup_size_gb is not None:
cur.execute("""
SELECT client_name, backup_path, backup_size_bytes,
rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.queue`
WHERE job_uuid = %s AND status = 'queued'
AND (backup_size_bytes IS NULL OR backup_size_bytes <= %s)
AND backup_size_bytes >= %s
ORDER BY priority ASC
LIMIT 1
""", (job_uuid, max_bytes, min_bytes))
elif max_backup_size_gb is not None:
cur.execute("""
SELECT client_name, backup_path, backup_size_bytes,
rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.queue`
WHERE job_uuid = %s AND status = 'queued'
AND (backup_size_bytes IS NULL OR backup_size_bytes <= %s)
ORDER BY priority ASC
LIMIT 1
""", (job_uuid, max_bytes))
elif min_backup_size_gb is not None:
cur.execute("""
SELECT client_name, backup_path, backup_size_bytes,
rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.queue`
WHERE job_uuid = %s AND status = 'queued'
AND backup_size_bytes >= %s
ORDER BY priority ASC
LIMIT 1
""", (job_uuid, min_bytes))
else:
cur.execute("""
SELECT client_name, backup_path, backup_size_bytes,
rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.queue`
WHERE job_uuid = %s AND status = 'queued'
ORDER BY priority ASC
LIMIT 1
""", (job_uuid,))
next_queued = cur.fetchone()
if next_queued:
print(f"Server '{server_hostname}' nimmt naechstes passendes Backup: "
f"{next_queued['client_name']}")
cur.execute("""
SELECT hostname, ip, restore_mount, restore_path,
free_space_gb, max_backup_size_gb, min_backup_size_gb
FROM Kunden.`bronze.restore.server`
WHERE hostname = %s
""", (server_hostname,))
srv = cur.fetchone()
nxt = {
**next_queued,
"server_hostname": server_hostname,
"server_ip": srv["ip"] if srv else "",
"restore_mount": srv["restore_mount"] if srv else "",
"restore_path": srv["restore_path"] if srv else "",
"free_space_gb": srv["free_space_gb"] if srv else 0,
"max_backup_size_gb": srv["max_backup_size_gb"] if srv else None,
"min_backup_size_gb": srv["min_backup_size_gb"] if srv else None,
}
else:
result = release_server_and_check_done(cur, conn, job_uuid, server_hostname)
cur.close(); conn.close()
if result == "all_done":
return {"status": "all_done", "job_uuid": job_uuid}
else:
return {"status": "server_done",
"server": server_hostname,
"job_uuid": job_uuid}
cur.close(); conn.close()
# FIX: started_at hinzugefügt
conn2 = mysql.connector.connect(**db_cfg)
cur2 = conn2.cursor()
cur2.execute("""
INSERT INTO Kunden.`bronze.restore.result`
(job_uuid, client_name, backup_path,
backup_size_bytes, restore_server, status, started_at)
VALUES (%s, %s, %s, %s, %s, 'restoring', NOW())
""", (
job_uuid, nxt["client_name"], nxt["backup_path"],
nxt.get("backup_size_bytes", 0), nxt["server_hostname"],
))
cur2.execute("""
UPDATE Kunden.`bronze.backup.queue` SET status='assigned'
WHERE job_uuid=%s AND backup_path LIKE %s
""", (job_uuid, f"{nxt['client_name']}%"))
conn2.commit(); cur2.close(); conn2.close()
conn3 = mysql.connector.connect(**db_cfg)
cur3 = conn3.cursor(dictionary=True)
cur3.execute("""
SELECT ip, ssh_user, ssh_password
FROM Kunden.`bronze.restore.session`
WHERE job_uuid = %s AND hostname = %s
LIMIT 1
""", (job_uuid, server_hostname))
session = cur3.fetchone()
cur3.close(); conn3.close()
if not session:
raise Exception(
f"Keine Session-Creds fuer '{server_hostname}'. "
f"Job-UUID: {job_uuid}"
)
webhook_url = wmill.get_variable("f/Backup/windmill_webhook_url")
webhook_tok = wmill.get_variable("f/Backup/windmill_webhook_token")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
session["ip"],
username=session["ssh_user"],
password=session["ssh_password"]
)
safe_client = nxt["client_name"].replace("/", "_").replace(":", "_")
nxt_size = nxt.get("backup_size_bytes") or 0
cmd = (
f"nohup /opt/windmill-restore/restore.sh"
f" --job-uuid '{job_uuid}'"
f" --backup-path '{nxt['backup_path']}'"
f" --client '{nxt['client_name']}'"
f" --restore-mount '{nxt['restore_mount']}'"
f" --restore-path '{nxt['restore_path']}'"
f" --rsync-target '{nxt['rsync_target']}'"
f" --pbs-storage '{nxt['pbs_storage_id']}'"
f" --webhook-url '{webhook_url}'"
f" --webhook-token '{webhook_tok}'"
f" --server-hostname '{server_hostname}'"
f" --backup-size '{nxt_size}'"
f" > /opt/windmill-restore/logs/{safe_client}.log 2>&1 &"
)
ssh.exec_command(cmd)
ssh.close()
size_gb = nxt_size / 1024 / 1024 / 1024
print(f"Naechster Restore: {nxt['client_name']} ({size_gb:.1f} GB) auf {server_hostname}")
return {
"status": "next_restore_started",
"client": nxt["client_name"],
"server": server_hostname,
"job_uuid": job_uuid,
}
@@ -0,0 +1,31 @@
summary: Backup Restore Report - Nextcloud Talk
description: |
Läuft täglich um 08:00 Uhr. Holt das Ergebnis des letzten
Backup-Restore-Jobs aus der DB und sendet eine Zusammenfassung
per Nextcloud Talk (Webhook/Bot).
value:
modules:
- id: a
summary: Letzten Job aus DB holen & Report zusammenbauen
value:
type: rawscript
content: '!inline letzten_job_aus_db_holen_&_report_zusammenbauen.py'
input_transforms: {}
lock: '!inline letzten_job_aus_db_holen_&_report_zusammenbauen.lock'
language: python3
- id: b
summary: Nachricht an Nextcloud Talk senden
value:
type: rawscript
content: '!inline nachricht_an_nextcloud_talk_senden.py'
input_transforms:
report:
type: javascript
expr: results.a
lock: '!inline nachricht_an_nextcloud_talk_senden.lock'
language: python3
schema:
$schema: https://json-schema.org/draft/2020-12/schema
type: object
order: []
properties: {}
@@ -0,0 +1,10 @@
# py: 3.12
anyio==4.12.1
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
mysql-connector-python==9.6.0
typing-extensions==4.15.0
wmill==1.659.1
@@ -0,0 +1,129 @@
import wmill, mysql.connector, json
from datetime import datetime
def fmt_bytes(b):
if not b: return ""
b = int(b)
for unit in ["B","KB","MB","GB","TB"]:
if b < 1024: return f"{b:.1f} {unit}"
b /= 1024
return f"{b:.1f} PB"
def fmt_dur(sec):
if not sec: return ""
sec = int(sec)
if sec < 60: return f"{sec}s"
if sec < 3600: return f"{sec//60}m {sec%60}s"
return f"{sec//3600}h {(sec%3600)//60}m"
def main():
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT job_uuid, started_at, finished_at, status,
total_backups, restored_count, failed_count
FROM Kunden.`bronze.restore.jobs`
WHERE status IN ('completed', 'failed')
ORDER BY started_at DESC
LIMIT 1
""")
job = cur.fetchone()
if not job:
cur.close(); conn.close()
return {"message": "⚠️ Kein abgeschlossener Backup-Job gefunden."}
job_uuid = job["job_uuid"]
duration = ""
if job["started_at"] and job["finished_at"]:
secs = (job["finished_at"] - job["started_at"]).seconds
duration = fmt_dur(secs)
cur.execute("""
SELECT client_name, vm_name, restore_server, status,
restore_duration_sec, zip_size_bytes,
rsync_ok, qm_agent_ok, error_message
FROM Kunden.`bronze.restore.result`
WHERE job_uuid = %s
ORDER BY status ASC, client_name ASC
""", (job_uuid,))
results = cur.fetchall()
# Nicht gelaufene VMs
cur.execute("""
SELECT p.client_name, p.vm_name
FROM Kunden.`bronze.restore.plan` p
LEFT JOIN Kunden.`bronze.restore.result` r
ON r.job_uuid = p.job_uuid
AND r.client_name = p.client_name
WHERE p.job_uuid = %s
AND r.id IS NULL
ORDER BY p.client_name ASC
""", (job_uuid,))
not_run = cur.fetchall()
cur.close(); conn.close()
total = job["total_backups"] or 0
done = job["restored_count"] or 0
failed = job["failed_count"] or 0
skipped = len(not_run)
date_str = job["started_at"].strftime("%d.%m.%Y") if job["started_at"] else "?"
time_str = job["started_at"].strftime("%H:%M") if job["started_at"] else "?"
if failed == 0 and skipped == 0:
status_icon = ""
elif done == 0:
status_icon = ""
else:
status_icon = "⚠️"
lines = [
f"{status_icon} **Backup Restore Report {date_str}**",
f"",
f"🕐 Start: {time_str} | Dauer: {duration}",
f"📊 Gesamt: {total} | ✅ OK: {done} | ❌ Fehler: {failed} | ⏭️ Nicht gestartet: {skipped}",
f"",
]
# Fehlgeschlagene
failed_results = [r for r in results if r["status"] == "failed"]
if failed_results:
lines.append("**❌ Fehlgeschlagen:**")
for r in failed_results:
name = r["vm_name"] or r["client_name"]
err = r["error_message"] or "unbekannt"
if len(err) > 80:
err = err[:80] + "..."
lines.append(f"{name} ({r['restore_server']}): {err}")
lines.append("")
# Nicht gestartet
if not_run:
lines.append("**⏭️ Nicht gestartet:**")
for r in not_run:
name = r["vm_name"] or r["client_name"]
lines.append(f"{name}")
lines.append("")
# Erfolgreich
done_results = [r for r in results if r["status"] == "done"]
if done_results:
lines.append("**✅ Erfolgreich:**")
for r in done_results:
name = r["vm_name"] or r["client_name"]
dauer = fmt_dur(r["restore_duration_sec"])
zipsize = fmt_bytes(r["zip_size_bytes"])
agent = "" if r["qm_agent_ok"] else ""
rsync = "" if r["rsync_ok"] else ""
lines.append(
f"{name} | {r['restore_server']} | "
f"{dauer} | 📦 {zipsize} | Agent: {agent} | Rsync: {rsync}"
)
message = "\n".join(lines)
print(message)
return {"message": message, "job_uuid": job_uuid, "failed": failed, "skipped": skipped}
@@ -0,0 +1,9 @@
# py: 3.12
anyio==4.12.1
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
typing-extensions==4.15.0
wmill==1.657.2
@@ -0,0 +1,44 @@
import wmill, json
import httpx
def main(report: dict):
import base64
message = report.get("message", "Kein Bericht verfuegbar.")
nc_url = wmill.get_variable("f/Backup/nextcloud_talk_url").rstrip("/")
nc_room = wmill.get_variable("f/Backup/nextcloud_talk_room")
nc_user = wmill.get_variable("f/Backup/nextcloud_talk_user")
nc_password = wmill.get_variable("f/Backup/nextcloud_talk_password")
credentials = base64.b64encode(
f"{nc_user}:{nc_password}".encode()
).decode()
url = f"{nc_url}/ocs/v2.php/apps/spreed/api/v1/chat/{nc_room}"
headers = {
"Authorization": f"Basic {credentials}",
"OCS-APIREQUEST": "true",
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = {"message": message}
response = httpx.post(
url,
headers=headers,
json=payload,
timeout=30,
verify=False, # falls self-signed cert
)
print(f"HTTP: {response.status_code}")
print(f"URL: {url}")
if response.status_code not in (200, 201):
raise Exception(
f"Nextcloud Talk Fehler: {response.status_code} {response.text}"
)
print("Nachricht gesendet ✓")
return {"status": "sent", "http": response.status_code}
+6
View File
@@ -0,0 +1,6 @@
summary: ''
display_name: Backup
extra_perms:
sebastianserfling@stines.de: true
owners:
- sebastianserfling@stines.de
@@ -0,0 +1,3 @@
description: ''
value: btrv2jb9
is_secret: false
@@ -0,0 +1,3 @@
description: ''
value: https://cloudstorage.stines.de
is_secret: false
+3
View File
@@ -0,0 +1,3 @@
description: ''
value: 1.0.28
is_secret: false