Initial commit: Windmill workspace sync

Scripts, flows, apps, resources and resource types from the Windmill workspace.
API token excluded via .gitignore (config/).
This commit is contained in:
Sebastian Serfling
2026-04-24 09:06:07 +02:00
commit 2b5d29ef67
302 changed files with 9229 additions and 0 deletions
@@ -0,0 +1 @@
SELECT datastore FROM Kunden.`bronze.backup.server.datastore` AS kbbsd WHERE kbbsd.restore = 1
@@ -0,0 +1,17 @@
# py: 3.12
anyio==4.12.1
bcrypt==5.0.0
certifi==2026.2.25
cffi==2.0.0
cryptography==46.0.5
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
invoke==2.2.1
mysql-connector-python==9.6.0
paramiko==4.0.0
pycparser==3.0
pynacl==1.6.2
typing-extensions==4.15.0
wmill==1.657.2
@@ -0,0 +1,40 @@
import wmill, mysql.connector, json
def main(prev: dict):
if prev.get("mode") == "webhook":
return prev
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
# FIX: max_backup_size_gb hinzugefügt
cur.execute("""
SELECT hostname, ip, free_space_gb,
script_deployed, script_version,
restore_mount, restore_path,
max_backup_size_gb,
min_backup_size_gb
FROM Kunden.`bronze.restore.server`
WHERE is_active = 1 AND current_job_uuid IS NULL
ORDER BY free_space_gb DESC
""")
servers = cur.fetchall()
cur.close(); conn.close()
if not servers:
raise Exception("Kein freier Restore-Server verfuegbar!")
for s in servers:
if not s.get("restore_mount"):
raise Exception(
f"restore_mount fuer '{s['hostname']}' nicht konfiguriert!"
)
if not s.get("restore_path"):
raise Exception(
f"restore_path fuer '{s['hostname']}' nicht konfiguriert!"
)
print(f"{len(servers)} freie Restore-Server: "
f"{[s['hostname'] for s in servers]}")
return {**prev, "target_servers": servers}
@@ -0,0 +1,17 @@
# py: 3.12
anyio==4.13.0
bcrypt==5.0.0
certifi==2026.2.25
cffi==2.0.0
cryptography==46.0.6
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
invoke==2.2.1
mysql-connector-python==9.6.0
paramiko==4.0.0
pycparser==3.0
pynacl==1.6.2
typing-extensions==4.15.0
wmill==1.666.0
@@ -0,0 +1,83 @@
import wmill, json, paramiko, mysql.connector, re, io
def main(prev: dict):
if prev.get("mode") == "webhook":
return prev
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT datastore, rsync_target, retention_days
FROM Kunden.`bronze.backup.datastore.config`
WHERE rsync_target IS NOT NULL AND rsync_target != ''
""")
configs = cur.fetchall()
cur.close(); conn.close()
if not configs:
print("Keine Datastore-Configs - Cleanup uebersprungen.")
return prev
backup_server = wmill.get_variable("f/Backup/backup_server_host")
ip_match = re.search(r'https?://([0-9.]+)', backup_server)
ip = ip_match.group(1) if ip_match else backup_server
bw_pass = wmill.get_variable("f/Backup/backup_server_ssh_password")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip, username="root", password=bw_pass)
# Cleanup-Script aufbauen
# Datum-basierter Vergleich statt -mtime
script_lines = ["#!/bin/bash", "set -euo pipefail", ""]
for cfg in configs:
datastore = cfg["datastore"]
target = cfg["rsync_target"]
retention_days = cfg.get("retention_days") or 7
script_lines.append(
f"echo '[{datastore}] Cleanup {target} (Ordner aelter als {retention_days} Tage)'"
)
script_lines.append(f"if [ -d '{target}' ]; then")
script_lines.append(f" cutoff=$(date -d '{retention_days} days ago' +%Y-%m-%d)")
script_lines.append(f" find '{target}' -maxdepth 1 -type d -name '????-??-??' | while read dir; do")
script_lines.append(f" folder_date=$(basename \"$dir\")")
script_lines.append(f" if [[ \"$folder_date\" < \"$cutoff\" ]]; then")
script_lines.append(f" echo \" Loesche: $dir (Datum: $folder_date < Cutoff: $cutoff)\"")
script_lines.append(f" rm -rf \"$dir\"")
script_lines.append(f" else")
script_lines.append(f" echo \" Behalte: $dir\"")
script_lines.append(f" fi")
script_lines.append(f" done")
script_lines.append(f"else")
script_lines.append(f" echo ' WARNUNG: Verzeichnis nicht gefunden: {target}'")
script_lines.append(f"fi")
script_lines.append("")
script_lines.append("echo 'Cleanup abgeschlossen.'")
cleanup_script = "\n".join(script_lines)
# Script per SFTP hochladen statt Heredoc
sftp = ssh.open_sftp()
sftp.putfo(
io.BytesIO(cleanup_script.encode()),
"/tmp/cleanup_restore.sh"
)
sftp.close()
ssh.exec_command("chmod +x /tmp/cleanup_restore.sh")
import time; time.sleep(1)
ssh.exec_command(
"nohup /tmp/cleanup_restore.sh > /tmp/cleanup_restore.log 2>&1 &"
)
ssh.close()
print(f"Cleanup auf {ip} im Hintergrund gestartet.")
print(f"Log: /tmp/cleanup_restore.log")
return {**prev, "cleanup": "started_background"}
@@ -0,0 +1,17 @@
# py: 3.12
anyio==4.13.0
bcrypt==5.0.0
certifi==2026.2.25
cffi==2.0.0
cryptography==46.0.5
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
invoke==2.2.1
mysql-connector-python==9.6.0
paramiko==4.0.0
pycparser==3.0
pynacl==1.6.2
typing-extensions==4.15.0
wmill==1.664.0
@@ -0,0 +1,158 @@
import wmill, json, paramiko, mysql.connector
def start_restore(server, backup, job_uuid, webhook_url, webhook_tok):
"""Startet restore.sh auf einem Server non-blocking."""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
server["ip"],
username=server["ssh_creds"]["user"],
password=server["ssh_creds"]["password"]
)
safe_client = backup["client_name"].replace("/", "_").replace(":", "_")
srv_hostname = server["hostname"]
bk_size = backup.get("backup_size_bytes") or 0
cmd = (
f"nohup /opt/windmill-restore/restore.sh"
f" --job-uuid '{job_uuid}'"
f" --backup-path '{backup['backup_path']}'"
f" --client '{backup['client_name']}'"
f" --restore-mount '{server['restore_mount']}'"
f" --restore-path '{server['restore_path']}'"
f" --rsync-target '{backup['rsync_target']}'"
f" --pbs-storage '{backup['pbs_storage_id']}'"
f" --webhook-url '{webhook_url}'"
f" --webhook-token '{webhook_tok}'"
f" --server-hostname '{srv_hostname}'"
f" --backup-size '{bk_size}'"
f" > /opt/windmill-restore/logs/{safe_client}.log 2>&1 &"
)
ssh.exec_command(cmd)
ssh.close()
def find_backup_for_server(server, backups):
"""
Sucht das erste Backup aus der Liste das zur Server-Größenklasse passt.
max_backup_size_gb = NULL -> kein oberes Limit
min_backup_size_gb = NULL -> kein unteres Limit
Gibt (index, backup) zurueck oder (None, None) wenn nichts passt.
"""
max_gb = server.get("max_backup_size_gb")
min_gb = server.get("min_backup_size_gb")
max_bytes = max_gb * 1024 * 1024 * 1024 if max_gb is not None else None
min_bytes = min_gb * 1024 * 1024 * 1024 if min_gb is not None else None
for i, backup in enumerate(backups):
size = backup.get("backup_size_bytes") or 0
if max_bytes is not None and size > max_bytes:
continue
if min_bytes is not None and size < min_bytes:
continue
return i, backup
return None, None
def main(prev: dict):
if prev.get("mode") == "webhook":
return prev
servers = prev.get("target_servers", [])
backups = list(prev.get("backups", []))
job_uuid = prev["job_uuid"]
if not backups:
return {**prev, "status": "no_backups"}
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor()
webhook_url = wmill.get_variable("f/Backup/windmill_webhook_url")
webhook_tok = wmill.get_variable("f/Backup/windmill_webhook_token")
servers_sorted = sorted(
servers,
key=lambda s: s.get("max_backup_size_gb") or 999999,
reverse=True
)
started = []
for server in servers_sorted:
if not backups:
break
idx, backup = find_backup_for_server(server, backups)
if backup is None:
print(f"Kein passendes Backup fuer '{server['hostname']}' "
f"(max: {server.get('max_backup_size_gb')} GB)")
continue
backups.pop(idx)
def fail_backup(msg, bk=backup, sv=server):
cur.execute("""
INSERT INTO Kunden.`bronze.restore.result`
(job_uuid, client_name, backup_path,
backup_size_bytes, restore_server, status, error_message)
VALUES (%s, %s, %s, %s, %s, 'failed', %s)
""", (job_uuid, bk["client_name"], bk["backup_path"],
bk.get("backup_size_bytes", 0), sv["hostname"], msg))
cur.execute("""
UPDATE Kunden.`bronze.restore.jobs`
SET failed_count=failed_count+1 WHERE job_uuid=%s
""", (job_uuid,))
conn.commit()
if not backup.get("rsync_target"):
fail_backup("rsync_target fehlt"); continue
if not backup.get("pbs_storage_id"):
fail_backup("pbs_storage_id fehlt"); continue
if not server.get("restore_mount"):
fail_backup("restore_mount fehlt"); continue
if not server.get("restore_path"):
fail_backup("restore_path fehlt"); continue
client_like = f"{backup['client_name']}%"
cur.execute("""
INSERT INTO Kunden.`bronze.restore.result`
(job_uuid, client_name, backup_path,
backup_size_bytes, restore_server, status, started_at)
VALUES (%s, %s, %s, %s, %s, 'restoring', NOW())
""", (job_uuid, backup["client_name"], backup["backup_path"],
backup.get("backup_size_bytes", 0), server["hostname"]))
cur.execute("""
UPDATE Kunden.`bronze.restore.server`
SET current_job_uuid=%s WHERE hostname=%s
""", (job_uuid, server["hostname"]))
cur.execute("""
UPDATE Kunden.`bronze.backup.queue`
SET status='assigned'
WHERE job_uuid=%s AND backup_path LIKE %s
""", (job_uuid, client_like))
conn.commit()
start_restore(server, backup, job_uuid, webhook_url, webhook_tok)
size_gb = (backup.get("backup_size_bytes") or 0) / 1024 / 1024 / 1024
print(f"Restore gestartet: {backup['client_name']} "
f"({size_gb:.1f} GB) auf {server['hostname']} "
f"(max: {server.get('max_backup_size_gb')} GB)")
started.append({
"client": backup["client_name"],
"server": server["hostname"],
})
cur.close(); conn.close()
return {
**prev,
"status": "restore_started",
"started": started,
"backups": backups,
}
@@ -0,0 +1,121 @@
summary: Backup Restore Orchestrator
description: |
Startet täglich um 00:11 Uhr. Holt Backup-Liste direkt via
proxmox-backup-client, schreibt Queue nach Größe sortiert in DB,
registriert PBS-Datastores auf allen freien Restore-Servern,
startet Restores parallel auf mehreren Servern per Webhook-Chaining.
value:
modules:
- id: f
summary: Aktive Datastores aus DB holen
value:
type: rawscript
content: '!inline aktive_datastores_aus_db_holen.my.sql'
input_transforms:
database:
type: static
value: $res:u/sebastianserfling/fascinating_mysql
lock: ''
language: mysql
- id: a
summary: Job initialisieren & Backup-Queue aus PBS aufbauen
value:
type: rawscript
content: '!inline job_initialisieren_&_backup-queue_aus_pbs_aufbauen.py'
input_transforms:
datastores:
type: javascript
expr: results.f
trigger_type:
type: javascript
expr: "flow_input.job_uuid ? 'webhook' : 'schedule'"
webhook_data:
type: javascript
expr: 'flow_input.job_uuid ? flow_input : {}'
lock: '!inline job_initialisieren_&_backup-queue_aus_pbs_aufbauen.lock'
language: python3
- id: b
summary: Alle freien Restore-Server holen
value:
type: rawscript
content: '!inline alle_freien_restore-server_holen.py'
input_transforms:
prev:
type: javascript
expr: results.a
lock: '!inline alle_freien_restore-server_holen.lock'
language: python3
- id: g
summary: SSH-Credentials fuer alle Restore-Server aus Bitwarden
value:
type: rawscript
content: '!inline ssh-credentials_fuer_alle_restore-server_aus_bitwarden.py'
input_transforms:
bw_url:
type: static
value: https://bitwarden.stines.de
prev:
type: javascript
expr: results.b
lock: '!inline ssh-credentials_fuer_alle_restore-server_aus_bitwarden.lock'
language: python3
- id: c
summary: Script deployen & PBS-Datastores auf allen Servern registrieren
value:
type: rawscript
content: '!inline
script_deployen_&_pbs-datastores_auf_allen_servern_registrieren.py'
input_transforms:
bw_result:
type: javascript
expr: results.g
datastores:
type: javascript
expr: results.f
prev:
type: javascript
expr: results.g
lock: '!inline
script_deployen_&_pbs-datastores_auf_allen_servern_registrieren.lock'
language: python3
- id: h
summary: Alte Restore-Ordner auf Backup-Server loeschen
value:
type: rawscript
content: '!inline alte_restore-ordner_auf_backup-server_loeschen.py'
input_transforms:
prev:
type: javascript
expr: results.c
lock: '!inline alte_restore-ordner_auf_backup-server_loeschen.lock'
language: python3
- id: d
summary: Ersten Restore pro Server starten
value:
type: rawscript
content: '!inline ersten_restore_pro_server_starten.py'
input_transforms:
prev:
type: javascript
expr: results.h
lock: '!inline ersten_restore_pro_server_starten.lock'
language: python3
continue_on_error: false
- id: e
summary: Webhook verarbeiten & naechsten Restore auf demselben Server starten
value:
type: rawscript
content: '!inline
webhook_verarbeiten_&_naechsten_restore_auf_demselben_server_starten.py'
input_transforms:
from_init:
type: javascript
expr: results.a
lock: '!inline
webhook_verarbeiten_&_naechsten_restore_auf_demselben_server_starten.lock'
language: python3
schema:
$schema: https://json-schema.org/draft/2020-12/schema
type: object
order: []
properties: {}
@@ -0,0 +1,10 @@
# py: 3.12
anyio==4.12.1
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
mysql-connector-python==9.6.0
typing-extensions==4.15.0
wmill==1.662.0
@@ -0,0 +1,161 @@
import wmill, mysql.connector, json, uuid, subprocess, sys, os
from datetime import datetime
def main(
trigger_type: str = "schedule",
webhook_data: dict = {},
datastores: list = [],
):
# Webhook erkennen: kind=webhook ODER job_uuid im payload
if trigger_type == "webhook" or webhook_data.get("job_uuid"):
return {"mode": "webhook", "data": webhook_data}
pbs = json.loads(wmill.get_variable("f/Backup/pbs_variable"))
port = pbs.get("port", 8007)
env = os.environ.copy()
env["PBS_PASSWORD"] = pbs["password"]
if pbs.get("fingerprint"):
env["PBS_FINGERPRINT"] = pbs["fingerprint"]
if subprocess.run(["which", "proxmox-backup-client"],
capture_output=True).returncode != 0:
print("Installiere proxmox-backup-client...", file=sys.stderr)
subprocess.run(["bash", "-c", (
"echo 'deb http://download.proxmox.com/debian/pbs bookworm pbs-no-subscription'"
" > /etc/apt/sources.list.d/pbs-client.list && "
"wget -qO /etc/apt/trusted.gpg.d/proxmox-release-bookworm.gpg "
" https://enterprise.proxmox.com/debian/proxmox-release-bookworm.gpg && "
"apt-get update -qq && apt-get install -y proxmox-backup-client"
)], check=True)
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT datastore, rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.datastore.config`
""")
ds_config = {row["datastore"]: row for row in cur.fetchall()}
all_snaps = []
for row in datastores:
datastore = row["datastore"]
if port != 8007:
repo = f"{pbs['user']}@{pbs['host']}!{port}:{datastore}"
else:
repo = f"{pbs['user']}@{pbs['host']}:{datastore}"
env["PBS_REPOSITORY"] = repo
print(f"Hole Snapshots: {datastore}...", file=sys.stderr)
result = subprocess.run(
["proxmox-backup-client", "snapshots", "--output-format", "json"],
capture_output=True, text=True, env=env,
)
if result.returncode != 0:
print(f"WARNUNG: {datastore} fehlgeschlagen:\n{result.stderr}",
file=sys.stderr)
continue
snaps = json.loads(result.stdout)
snaps = [s for s in snaps if s.get("backup-type") in ("vm", "ct")]
for s in snaps:
s["_datastore"] = datastore
all_snaps.extend(snaps)
print(f" -> {len(snaps)} Snapshots.", file=sys.stderr)
if not all_snaps:
raise Exception("Keine Snapshots gefunden!")
latest: dict = {}
for snap in all_snaps:
key = f"{snap['_datastore']}/{snap['backup-type']}/{snap['backup-id']}"
if key not in latest or snap["backup-time"] > latest[key]["backup-time"]:
latest[key] = snap
sorted_snaps = sorted(latest.values(), key=lambda s: s.get("size", 0), reverse=True)
print(f"{len(sorted_snaps)} Gruppen -> Queue.", file=sys.stderr)
job_uuid = str(uuid.uuid4())
cur.execute("""
SELECT job_uuid FROM Kunden.`bronze.restore.jobs`
WHERE status = 'running'
LIMIT 1
""")
existing = cur.fetchone()
if existing:
cur.close(); conn.close()
raise Exception(
f"Job bereits aktiv: {existing['job_uuid']} "
f"kein neuer Job gestartet."
)
cur.execute("SET time_zone = 'Europe/Berlin'")
cur.execute("""
INSERT INTO Kunden.`bronze.restore.jobs`
(job_uuid, started_at, status, restore_server)
VALUES (%s, NOW(), 'running', '')
""", (job_uuid,))
cur.execute("""
UPDATE Kunden.`bronze.backup.queue`
SET status='obsolete' WHERE status='queued'
""")
for idx, snap in enumerate(sorted_snaps):
backup_type = snap["backup-type"]
backup_id = str(snap["backup-id"])
datastore = snap["_datastore"]
size_bytes = snap.get("size", 0)
ts = snap["backup-time"]
client_name = f"{datastore}:{backup_type}/{backup_id}"
backup_time_str = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%SZ")
backup_path = f"{datastore}:{backup_type}/{backup_id}/{backup_time_str}"
cfg = ds_config.get(datastore, {})
rsync_target = cfg.get("rsync_target")
pbs_storage_id = cfg.get("pbs_storage_id")
cur.execute("""
INSERT INTO Kunden.`bronze.backup.queue`
(job_uuid, client_name, backup_path, backup_size_bytes,
encrypt_key_ref, priority, rsync_target,
pbs_storage_id, status, last_seen_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, 'queued', NOW())
""", (
job_uuid, client_name, backup_path, size_bytes,
client_name, idx, rsync_target, pbs_storage_id,
))
cur.execute("""
INSERT INTO Kunden.`bronze.restore.plan`
(job_uuid, client_name, vm_name)
VALUES (%s, %s, %s)
""", (job_uuid, client_name, f"{backup_type}/{backup_id}"))
cur.execute("""
UPDATE Kunden.`bronze.restore.jobs`
SET total_backups=%s WHERE job_uuid=%s
""", (len(sorted_snaps), job_uuid))
conn.commit()
cur.execute("""
SELECT client_name, backup_path, backup_size_bytes,
encrypt_key_ref, priority,
rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.queue`
WHERE job_uuid = %s AND status = 'queued'
ORDER BY priority ASC
""", (job_uuid,))
queued = cur.fetchall()
cur.close(); conn.close()
print(f"Queue: {len(queued)} Backups.", file=sys.stderr)
return {
"mode": "schedule",
"job_uuid": job_uuid,
"backups": queued,
}
@@ -0,0 +1,17 @@
# py: 3.12
anyio==4.12.1
bcrypt==5.0.0
certifi==2026.2.25
cffi==2.0.0
cryptography==46.0.5
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
invoke==2.2.1
mysql-connector-python==9.6.0
paramiko==4.0.0
pycparser==3.0
pynacl==1.6.2
typing-extensions==4.15.0
wmill==1.659.1
@@ -0,0 +1,245 @@
import wmill, json, paramiko, io, mysql.connector, re
GITEA_REPO = "http://172.17.1.251:8080/sebastian.serfling/BackupScript.git"
def deploy_to_server(ssh, server, pbs, pbs_host, pbs_user, pbs_pass,
pbs_port, ds_config, datastores, gitea_token,
backup_server_host, job_uuid, ssh_creds, db_cfg,
script_version):
hostname = server["hostname"]
_, out, _ = ssh.exec_command(
"cat /opt/windmill-restore/version.txt 2>/dev/null || echo none"
)
current_version = out.read().decode().strip()
needs_deploy = current_version != script_version \
or not server.get("script_deployed")
if needs_deploy:
print(f"[{hostname}] Deploye Script v{script_version}...")
ssh.exec_command("mkdir -p /opt/windmill-restore/logs")
ssh.exec_command("which git || apt-get install -y git 2>/dev/null")
repo_url = GITEA_REPO.replace("http://", f"http://{gitea_token}@")
_, out, err = ssh.exec_command(
"cd /opt/windmill-restore && "
"if [ -d 'BackupScript/.git' ]; then "
" cd BackupScript && git pull; "
"else "
" rm -rf BackupScript && "
" git clone '" + repo_url + "' BackupScript; "
"fi"
)
exit_code = out.channel.recv_exit_status()
stderr_out = err.read().decode().strip()
if exit_code != 0:
raise Exception(f"[{hostname}] Git fehlgeschlagen: {stderr_out}")
_, out, err = ssh.exec_command(
"cp /opt/windmill-restore/BackupScript/restore.sh "
" /opt/windmill-restore/restore.sh && "
"chmod +x /opt/windmill-restore/restore.sh && "
"echo '" + script_version + "' > /opt/windmill-restore/version.txt"
)
exit_code = out.channel.recv_exit_status()
stderr_out = err.read().decode().strip()
if exit_code != 0:
raise Exception(f"[{hostname}] Script kopieren fehlgeschlagen: {stderr_out}")
print(f"[{hostname}] Script v{script_version} deployed.")
pbs_conf = "\n".join([
f"PBS_HOST={pbs_host}",
f"PBS_PORT={pbs_port}",
f"PBS_USER={pbs_user}",
f"PBS_PASSWORD={pbs_pass}",
]) + "\n"
sftp = ssh.open_sftp()
sftp.putfo(io.BytesIO(pbs_conf.encode()),
"/opt/windmill-restore/pbs.conf")
sftp.putfo(io.BytesIO(backup_server_host.encode()),
"/opt/windmill-restore/backup_server_host")
sftp.close()
ssh.exec_command("chmod 600 /opt/windmill-restore/pbs.conf")
print(f"[{hostname}] backup_server_host: {backup_server_host}")
else:
print(f"[{hostname}] Script aktuell (v{current_version}).")
ssh.exec_command(
"mkdir -p /opt/windmill-restore/keys && "
"chmod 700 /opt/windmill-restore/keys"
)
_, out, _ = ssh.exec_command(
"pvesm status 2>/dev/null | awk 'NR>1{print $1}'"
)
existing_storages = out.read().decode().splitlines()
pbs_storages = []
for row in datastores:
datastore = row["datastore"]
storage_id = "pbs-" + datastore.lower() \
.replace(" ", "-") \
.replace("_", "-")
ds_cfg = ds_config.get(datastore, {})
fingerprint = ds_cfg.get("fingerprint", "") or ""
keyfile_path = f"/opt/windmill-restore/keys/{datastore}.keyfile"
rsync_src = f"root@{pbs_host}:/root/Scripte/{datastore}.keyfile"
_, out, err = ssh.exec_command(
"if [ -s '" + keyfile_path + "' ]; then "
" echo 'vorhanden'; "
"else "
" rsync -az -e 'ssh -o StrictHostKeyChecking=no' "
" '" + rsync_src + "' '" + keyfile_path + "' "
" && chmod 600 '" + keyfile_path + "' "
" && echo 'geholt'; "
"fi"
)
exit_code = out.channel.recv_exit_status()
stderr_out = err.read().decode().strip()
if exit_code != 0:
raise Exception(
f"[{hostname}] Keyfile fehlgeschlagen fuer '{datastore}': {stderr_out}"
)
if storage_id in existing_storages:
print(f"[{hostname}] Storage '{storage_id}' vorhanden.")
else:
fp_part = f"--fingerprint '{fingerprint}'" if fingerprint else ""
cmd = (
f"pvesm add pbs {storage_id} "
f"--server '{pbs_host}' "
f"--datastore '{datastore}' "
f"--username '{pbs_user}' "
f"--password '{pbs_pass}' "
f"--port {pbs_port} "
f"--encryption-key '{keyfile_path}' "
f"--content backup"
)
_, out, err = ssh.exec_command(cmd)
exit_code = out.channel.recv_exit_status()
stderr = err.read().decode().strip()
if exit_code != 0:
raise Exception(
f"[{hostname}] pvesm add '{datastore}' fehlgeschlagen: {stderr}"
)
print(f"[{hostname}] -> '{storage_id}' registriert")
pbs_storages.append({"datastore": datastore, "storage_id": storage_id})
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor()
if needs_deploy:
cur.execute("""
UPDATE Kunden.`bronze.restore.server`
SET script_deployed=1, script_version=%s WHERE hostname=%s
""", (script_version, hostname))
for entry in pbs_storages:
cur.execute("""
UPDATE Kunden.`bronze.backup.datastore.config`
SET pbs_storage_id=%s WHERE datastore=%s
""", (entry["storage_id"], entry["datastore"]))
cur.execute("""
INSERT INTO Kunden.`bronze.restore.session`
(job_uuid, hostname, ip, ssh_user, ssh_password)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
ip=VALUES(ip), ssh_user=VALUES(ssh_user),
ssh_password=VALUES(ssh_password)
""", (
job_uuid, hostname,
ssh_creds["ip"], ssh_creds["user"], ssh_creds["password"],
))
conn.commit(); cur.close(); conn.close()
print(f"[{hostname}] Session-Creds gespeichert.")
return pbs_storages
def main(prev: dict, bw_result: dict = {}, datastores: list = []):
if prev.get("mode") == "webhook":
return prev
servers = prev["target_servers"]
server_creds = prev.get("server_creds", {})
job_uuid = prev["job_uuid"]
script_version = wmill.get_variable("f/Backup/restore_version").strip()
print(f"Script-Version aus Variable: {script_version}")
pbs = json.loads(wmill.get_variable("f/Backup/pbs_variable"))
pbs_host = pbs["host"]
pbs_user = pbs["user"]
pbs_pass = pbs["password"]
pbs_port = str(pbs.get("port", 8007))
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT datastore, rsync_target, pbs_storage_id, fingerprint
FROM Kunden.`bronze.backup.datastore.config`
""")
ds_config = {row["datastore"]: row for row in cur.fetchall()}
cur.close(); conn.close()
gitea_token = wmill.get_variable("f/Backup/gitea_token")
backup_server_host = wmill.get_variable("f/Backup/backup_server_host")
target_servers_out = []
for server in servers:
hostname = server["hostname"]
creds = server_creds.get(hostname, {})
url = creds.get("url", "")
ip_match = re.search(r'https?://([0-9.]+)', url)
ip = ip_match.group(1) if ip_match else server.get("ip", "")
if not ip:
print(f"WARNUNG: Keine IP fuer '{hostname}' uebersprungen.")
continue
ssh_creds_dict = {
"ip": ip,
"user": creds.get("username", "root"),
"password": creds.get("password", ""),
}
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip, username=ssh_creds_dict["user"],
password=ssh_creds_dict["password"])
try:
pbs_storages = deploy_to_server(
ssh, server, pbs, pbs_host, pbs_user, pbs_pass,
pbs_port, ds_config, datastores, gitea_token,
backup_server_host, job_uuid, ssh_creds_dict, db_cfg,
script_version
)
finally:
ssh.close()
target_servers_out.append({
**server,
"ip": ip,
"ssh_creds": ssh_creds_dict,
"pbs_storages": pbs_storages,
})
if not target_servers_out:
raise Exception("Kein Server konnte vorbereitet werden!")
return {
**prev,
"target_servers": target_servers_out,
"script_version": script_version,
}
@@ -0,0 +1,9 @@
# py: 3.12
anyio==4.12.1
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
typing-extensions==4.15.0
wmill==1.657.2
@@ -0,0 +1,82 @@
import subprocess, sys, json, os, wmill
def bw_lookup(search_term, env, run):
run(["bw", "sync", "--session", env["BW_SESSION"]], check=False)
for attempt in range(3):
result = run(
["bw", "list", "items", "--search", search_term,
"--session", env["BW_SESSION"]]
)
items = json.loads(result.stdout)
if items:
break
import time; time.sleep(3)
if not items:
raise Exception(f"Kein Bitwarden-Eintrag fuer: '{search_term}'")
exact = next(
(i for i in items
if i.get("name","").strip().lower() == search_term.strip().lower()),
None,
)
item = exact if exact else items[0]
url = ((item.get("login",{}).get("uris") or [{}])[0].get("uri","")) \
if item.get("login") else ""
return {
"username": item.get("login",{}).get("username","") if item.get("login") else "",
"password": item.get("login",{}).get("password","") if item.get("login") else "",
"url": url,
}
def main(
prev: dict,
bw_url: str = "https://bitwarden.stines.de",
):
if prev.get("mode") == "webhook":
return prev
servers = prev.get("target_servers", [])
bw_creds = json.loads(wmill.get_variable("f/Backup/bitwarden_api_login"))
env = os.environ.copy()
env["BW_CLIENTID"] = bw_creds["bw_clientid"]
env["BW_CLIENTSECRET"] = bw_creds["bw_clientsecret"]
env["BW_PASSWORD"] = bw_creds["bw_masterpassword"]
def run(cmd, check=True):
return subprocess.run(cmd, env=env, text=True, capture_output=True, check=check)
if subprocess.run(["which", "bw"], capture_output=True).returncode != 0:
run(["wget",
"https://github.com/bitwarden/cli/releases/download/v1.22.1/bw-linux-1.22.1.zip",
"-O", "bw.zip"])
run(["unzip", "bw.zip"])
run(["chmod", "+x", "bw"])
run(["mv", "bw", "/usr/local/bin/bw"])
with open("/etc/hosts", "a") as f:
f.write("172.17.1.3 bitwarden.stines.de\n")
run(["bw", "config", "server", bw_url])
run(["bw", "logout"], check=False)
result = run(["bw", "login", "--apikey"], check=False)
if result.returncode != 0:
raise Exception(f"Bitwarden Login fehlgeschlagen: {result.stderr}")
unlock = run(["bw", "unlock", bw_creds["bw_masterpassword"], "--raw"])
bw_session = unlock.stdout.strip()
if not bw_session:
raise Exception("Vault konnte nicht entsperrt werden")
env["BW_SESSION"] = bw_session
server_creds = {}
for server in servers:
hostname = server["hostname"]
print(f"Hole Creds fuer: {hostname}")
creds = bw_lookup(hostname, env, run)
server_creds[hostname] = creds
print(f" -> OK: {creds['username']}@{hostname}")
run(["bw", "logout"], check=False)
return {**prev, "server_creds": server_creds}
@@ -0,0 +1,17 @@
# py: 3.12
anyio==4.13.0
bcrypt==5.0.0
certifi==2026.2.25
cffi==2.0.0
cryptography==46.0.7
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
invoke==3.0.3
mysql-connector-python==9.6.0
paramiko==4.0.0
pycparser==3.0
pynacl==1.6.2
typing-extensions==4.15.0
wmill==1.680.0
@@ -0,0 +1,404 @@
import wmill, json, mysql.connector, paramiko, re, base64
from datetime import datetime
import httpx
def send_nextcloud_message(message: str):
"""Sendet eine Nachricht an Nextcloud Talk."""
try:
nc_url = wmill.get_variable("f/Backup/nextcloud_talk_url").rstrip("/")
nc_room = wmill.get_variable("f/Backup/nextcloud_talk_room")
nc_user = wmill.get_variable("f/Backup/nextcloud_talk_user")
nc_password = wmill.get_variable("f/Backup/nextcloud_talk_password")
credentials = base64.b64encode(
f"{nc_user}:{nc_password}".encode()
).decode()
url = f"{nc_url}/ocs/v2.php/apps/spreed/api/v1/chat/{nc_room}"
headers = {
"Authorization": f"Basic {credentials}",
"OCS-APIREQUEST": "true",
"Content-Type": "application/json",
"Accept": "application/json",
}
response = httpx.post(
url,
headers=headers,
json={"message": message},
timeout=15,
verify=False,
)
print(f"Nextcloud Talk: HTTP {response.status_code}")
except Exception as e:
print(f"Nextcloud Talk Fehler (nicht kritisch): {e}")
def find_next_backup_for_server(cur, job_uuid, server_hostname, max_backup_size_gb):
if max_backup_size_gb is not None:
max_bytes = max_backup_size_gb * 1024 * 1024 * 1024
cur.execute("""
SELECT q.client_name, q.backup_path, q.backup_size_bytes,
q.rsync_target, q.pbs_storage_id,
r.hostname AS server_hostname,
r.ip AS server_ip,
r.restore_mount,
r.restore_path AS restore_path,
r.free_space_gb,
r.max_backup_size_gb,
r.min_backup_size_gb
FROM Kunden.`bronze.backup.queue` q
JOIN Kunden.`bronze.restore.server` r
ON r.hostname = %s
WHERE q.job_uuid = %s AND q.status = 'queued'
AND r.current_job_uuid = %s
AND (q.backup_size_bytes IS NULL OR q.backup_size_bytes <= %s)
AND (r.min_backup_size_gb IS NULL OR q.backup_size_bytes >= r.min_backup_size_gb * 1024 * 1024 * 1024)
ORDER BY q.priority ASC
LIMIT 1
""", (server_hostname, job_uuid, job_uuid, max_bytes))
else:
cur.execute("""
SELECT q.client_name, q.backup_path, q.backup_size_bytes,
q.rsync_target, q.pbs_storage_id,
r.hostname AS server_hostname,
r.ip AS server_ip,
r.restore_mount,
r.restore_path AS restore_path,
r.free_space_gb,
r.max_backup_size_gb,
r.min_backup_size_gb
FROM Kunden.`bronze.backup.queue` q
JOIN Kunden.`bronze.restore.server` r
ON r.hostname = %s
WHERE q.job_uuid = %s AND q.status = 'queued'
AND r.current_job_uuid = %s
AND (r.min_backup_size_gb IS NULL OR q.backup_size_bytes >= r.min_backup_size_gb * 1024 * 1024 * 1024)
ORDER BY q.priority ASC
LIMIT 1
""", (server_hostname, job_uuid, job_uuid))
return cur.fetchone()
def release_server_and_check_done(cur, conn, job_uuid, server_hostname):
"""Server freigeben und prüfen ob alle Jobs fertig sind."""
cur.execute("""
UPDATE Kunden.`bronze.restore.server`
SET current_job_uuid=NULL
WHERE hostname=%s
""", (server_hostname,))
conn.commit()
print(f"Server '{server_hostname}' fertig.")
cur.execute("""
SELECT COUNT(*) AS cnt FROM Kunden.`bronze.restore.server`
WHERE current_job_uuid = %s
""", (job_uuid,))
still_active = cur.fetchone()["cnt"]
if still_active == 0:
cur.execute("""
UPDATE Kunden.`bronze.restore.jobs`
SET status='completed', finished_at=NOW()
WHERE job_uuid=%s
""", (job_uuid,))
cur.execute("""
DELETE FROM Kunden.`bronze.restore.session`
WHERE job_uuid=%s
""", (job_uuid,))
# Statistik für Abschluss-Nachricht
cur.execute("""
SELECT total_backups, restored_count, failed_count,
TIMESTAMPDIFF(MINUTE, started_at, NOW()) AS duration_min
FROM Kunden.`bronze.restore.jobs`
WHERE job_uuid=%s
""", (job_uuid,))
job_stats = cur.fetchone()
conn.commit()
print(f"Job {job_uuid} vollstaendig abgeschlossen.")
if job_stats:
total = job_stats["total_backups"] or 0
restored = job_stats["restored_count"] or 0
failed = job_stats["failed_count"] or 0
dur_min = job_stats["duration_min"] or 0
dur_str = f"{dur_min//60}h {dur_min%60}m" if dur_min >= 60 else f"{dur_min}m"
if failed == 0:
msg = (
f"✅ **Backup-Job abgeschlossen**\n"
f"Alle {total} Backups erfolgreich | Dauer: {dur_str}"
)
else:
msg = (
f"⚠️ **Backup-Job abgeschlossen mit Fehlern**\n"
f"{restored}/{total} erfolgreich | ❌ {failed} fehlgeschlagen | Dauer: {dur_str}"
)
send_nextcloud_message(msg)
return "all_done"
else:
conn.commit()
print(f"Noch {still_active} Server aktiv.")
return "server_done"
def main(from_init: dict):
if from_init.get("mode") == "schedule":
print("Restores gestartet - Flow wartet auf Webhooks.")
return {"status": "waiting_for_webhook",
"job_uuid": from_init.get("job_uuid")}
data = from_init.get("data", {})
job_uuid = data.get("job_uuid")
client = data.get("client_name")
status = data.get("status", "unknown")
server_hostname = data.get("server_hostname", "")
if not job_uuid or not client:
raise Exception(f"Ungueltiger Webhook-Payload: {data}")
print(f"Webhook: {client} -> {status} (Server: {server_hostname})")
client_like = f"{client}%"
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
cur.execute("""
UPDATE Kunden.`bronze.restore.result` SET
vm_name = %s,
vm_id_original = %s,
vm_id_restored = %s,
restore_duration_sec = %s,
actual_disk_used_bytes = %s,
zip_size_bytes = %s,
zip_duration_sec = %s,
rsync_size_bytes = %s,
rsync_ok = %s,
rsync_retries = %s,
qm_agent_ok = %s,
status = %s,
error_message = %s,
webhook_received_at = %s
WHERE job_uuid = %s AND backup_path LIKE %s
""", (
data.get("vm_name", ""),
data.get("vm_id_original"),
data.get("vm_id_restored"),
data.get("restore_duration_sec"),
data.get("actual_disk_used_bytes"),
data.get("zip_size_bytes"),
data.get("zip_duration_sec"),
data.get("rsync_size_bytes"),
1 if data.get("rsync_ok") else 0,
data.get("rsync_retries", 0),
1 if data.get("qm_agent_ok") in (True, "true", "skipped") else 0,
"done" if status == "success" else "failed",
data.get("error_message", ""),
datetime.now(),
job_uuid, client_like,
))
cur.execute("""
UPDATE Kunden.`bronze.backup.queue` SET status=%s
WHERE job_uuid=%s AND backup_path LIKE %s
""", ("done" if status == "success" else "failed", job_uuid, client_like))
field = "restored_count" if status == "success" else "failed_count"
cur.execute(
f"UPDATE Kunden.`bronze.restore.jobs` "
f"SET {field}={field}+1 WHERE job_uuid=%s",
(job_uuid,)
)
if data.get("free_space_gb") is not None and server_hostname:
cur.execute("""
UPDATE Kunden.`bronze.restore.server`
SET free_space_gb = %s WHERE hostname = %s
""", (data.get("free_space_gb"), server_hostname))
conn.commit()
vm_name = data.get("vm_name") or client
dur_sec = data.get("restore_duration_sec") or 0
dur_str = f"{dur_sec//60}m {dur_sec%60}s" if dur_sec >= 60 else f"{dur_sec}s"
zip_mb = (data.get("zip_size_bytes") or 0) // 1024 // 1024
icon = "" if status == "success" else ""
if status != "success":
err = data.get("error_message", "")[:100]
nc_msg = (
f"{icon} **{vm_name}** ({client})\n"
f"Server: {server_hostname} | Fehler: {err}"
)
send_nextcloud_message(nc_msg)
cur.execute("""
SELECT max_backup_size_gb, min_backup_size_gb, free_space_gb
FROM Kunden.`bronze.restore.server`
WHERE hostname = %s
""", (server_hostname,))
srv_cfg = cur.fetchone()
max_backup_size_gb = srv_cfg["max_backup_size_gb"] if srv_cfg else None
min_backup_size_gb = srv_cfg["min_backup_size_gb"] if srv_cfg else None
max_bytes = max_backup_size_gb * 1024 * 1024 * 1024 if max_backup_size_gb is not None else None
min_bytes = min_backup_size_gb * 1024 * 1024 * 1024 if min_backup_size_gb is not None else None
nxt = find_next_backup_for_server(cur, job_uuid, server_hostname, max_backup_size_gb)
conn.commit()
if not nxt:
if max_backup_size_gb is not None and min_backup_size_gb is not None:
cur.execute("""
SELECT client_name, backup_path, backup_size_bytes,
rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.queue`
WHERE job_uuid = %s AND status = 'queued'
AND (backup_size_bytes IS NULL OR backup_size_bytes <= %s)
AND backup_size_bytes >= %s
ORDER BY priority ASC
LIMIT 1
""", (job_uuid, max_bytes, min_bytes))
elif max_backup_size_gb is not None:
cur.execute("""
SELECT client_name, backup_path, backup_size_bytes,
rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.queue`
WHERE job_uuid = %s AND status = 'queued'
AND (backup_size_bytes IS NULL OR backup_size_bytes <= %s)
ORDER BY priority ASC
LIMIT 1
""", (job_uuid, max_bytes))
elif min_backup_size_gb is not None:
cur.execute("""
SELECT client_name, backup_path, backup_size_bytes,
rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.queue`
WHERE job_uuid = %s AND status = 'queued'
AND backup_size_bytes >= %s
ORDER BY priority ASC
LIMIT 1
""", (job_uuid, min_bytes))
else:
cur.execute("""
SELECT client_name, backup_path, backup_size_bytes,
rsync_target, pbs_storage_id
FROM Kunden.`bronze.backup.queue`
WHERE job_uuid = %s AND status = 'queued'
ORDER BY priority ASC
LIMIT 1
""", (job_uuid,))
next_queued = cur.fetchone()
if next_queued:
print(f"Server '{server_hostname}' nimmt naechstes passendes Backup: "
f"{next_queued['client_name']}")
cur.execute("""
SELECT hostname, ip, restore_mount, restore_path,
free_space_gb, max_backup_size_gb, min_backup_size_gb
FROM Kunden.`bronze.restore.server`
WHERE hostname = %s
""", (server_hostname,))
srv = cur.fetchone()
nxt = {
**next_queued,
"server_hostname": server_hostname,
"server_ip": srv["ip"] if srv else "",
"restore_mount": srv["restore_mount"] if srv else "",
"restore_path": srv["restore_path"] if srv else "",
"free_space_gb": srv["free_space_gb"] if srv else 0,
"max_backup_size_gb": srv["max_backup_size_gb"] if srv else None,
"min_backup_size_gb": srv["min_backup_size_gb"] if srv else None,
}
else:
result = release_server_and_check_done(cur, conn, job_uuid, server_hostname)
cur.close(); conn.close()
if result == "all_done":
return {"status": "all_done", "job_uuid": job_uuid}
else:
return {"status": "server_done",
"server": server_hostname,
"job_uuid": job_uuid}
cur.close(); conn.close()
# FIX: started_at hinzugefügt
conn2 = mysql.connector.connect(**db_cfg)
cur2 = conn2.cursor()
cur2.execute("""
INSERT INTO Kunden.`bronze.restore.result`
(job_uuid, client_name, backup_path,
backup_size_bytes, restore_server, status, started_at)
VALUES (%s, %s, %s, %s, %s, 'restoring', NOW())
""", (
job_uuid, nxt["client_name"], nxt["backup_path"],
nxt.get("backup_size_bytes", 0), nxt["server_hostname"],
))
cur2.execute("""
UPDATE Kunden.`bronze.backup.queue` SET status='assigned'
WHERE job_uuid=%s AND backup_path LIKE %s
""", (job_uuid, f"{nxt['client_name']}%"))
conn2.commit(); cur2.close(); conn2.close()
conn3 = mysql.connector.connect(**db_cfg)
cur3 = conn3.cursor(dictionary=True)
cur3.execute("""
SELECT ip, ssh_user, ssh_password
FROM Kunden.`bronze.restore.session`
WHERE job_uuid = %s AND hostname = %s
LIMIT 1
""", (job_uuid, server_hostname))
session = cur3.fetchone()
cur3.close(); conn3.close()
if not session:
raise Exception(
f"Keine Session-Creds fuer '{server_hostname}'. "
f"Job-UUID: {job_uuid}"
)
webhook_url = wmill.get_variable("f/Backup/windmill_webhook_url")
webhook_tok = wmill.get_variable("f/Backup/windmill_webhook_token")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
session["ip"],
username=session["ssh_user"],
password=session["ssh_password"]
)
safe_client = nxt["client_name"].replace("/", "_").replace(":", "_")
nxt_size = nxt.get("backup_size_bytes") or 0
cmd = (
f"nohup /opt/windmill-restore/restore.sh"
f" --job-uuid '{job_uuid}'"
f" --backup-path '{nxt['backup_path']}'"
f" --client '{nxt['client_name']}'"
f" --restore-mount '{nxt['restore_mount']}'"
f" --restore-path '{nxt['restore_path']}'"
f" --rsync-target '{nxt['rsync_target']}'"
f" --pbs-storage '{nxt['pbs_storage_id']}'"
f" --webhook-url '{webhook_url}'"
f" --webhook-token '{webhook_tok}'"
f" --server-hostname '{server_hostname}'"
f" --backup-size '{nxt_size}'"
f" > /opt/windmill-restore/logs/{safe_client}.log 2>&1 &"
)
ssh.exec_command(cmd)
ssh.close()
size_gb = nxt_size / 1024 / 1024 / 1024
print(f"Naechster Restore: {nxt['client_name']} ({size_gb:.1f} GB) auf {server_hostname}")
return {
"status": "next_restore_started",
"client": nxt["client_name"],
"server": server_hostname,
"job_uuid": job_uuid,
}
@@ -0,0 +1,31 @@
summary: Backup Restore Report - Nextcloud Talk
description: |
Läuft täglich um 08:00 Uhr. Holt das Ergebnis des letzten
Backup-Restore-Jobs aus der DB und sendet eine Zusammenfassung
per Nextcloud Talk (Webhook/Bot).
value:
modules:
- id: a
summary: Letzten Job aus DB holen & Report zusammenbauen
value:
type: rawscript
content: '!inline letzten_job_aus_db_holen_&_report_zusammenbauen.py'
input_transforms: {}
lock: '!inline letzten_job_aus_db_holen_&_report_zusammenbauen.lock'
language: python3
- id: b
summary: Nachricht an Nextcloud Talk senden
value:
type: rawscript
content: '!inline nachricht_an_nextcloud_talk_senden.py'
input_transforms:
report:
type: javascript
expr: results.a
lock: '!inline nachricht_an_nextcloud_talk_senden.lock'
language: python3
schema:
$schema: https://json-schema.org/draft/2020-12/schema
type: object
order: []
properties: {}
@@ -0,0 +1,10 @@
# py: 3.12
anyio==4.12.1
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
mysql-connector-python==9.6.0
typing-extensions==4.15.0
wmill==1.659.1
@@ -0,0 +1,129 @@
import wmill, mysql.connector, json
from datetime import datetime
def fmt_bytes(b):
if not b: return ""
b = int(b)
for unit in ["B","KB","MB","GB","TB"]:
if b < 1024: return f"{b:.1f} {unit}"
b /= 1024
return f"{b:.1f} PB"
def fmt_dur(sec):
if not sec: return ""
sec = int(sec)
if sec < 60: return f"{sec}s"
if sec < 3600: return f"{sec//60}m {sec%60}s"
return f"{sec//3600}h {(sec%3600)//60}m"
def main():
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT job_uuid, started_at, finished_at, status,
total_backups, restored_count, failed_count
FROM Kunden.`bronze.restore.jobs`
WHERE status IN ('completed', 'failed')
ORDER BY started_at DESC
LIMIT 1
""")
job = cur.fetchone()
if not job:
cur.close(); conn.close()
return {"message": "⚠️ Kein abgeschlossener Backup-Job gefunden."}
job_uuid = job["job_uuid"]
duration = ""
if job["started_at"] and job["finished_at"]:
secs = (job["finished_at"] - job["started_at"]).seconds
duration = fmt_dur(secs)
cur.execute("""
SELECT client_name, vm_name, restore_server, status,
restore_duration_sec, zip_size_bytes,
rsync_ok, qm_agent_ok, error_message
FROM Kunden.`bronze.restore.result`
WHERE job_uuid = %s
ORDER BY status ASC, client_name ASC
""", (job_uuid,))
results = cur.fetchall()
# Nicht gelaufene VMs
cur.execute("""
SELECT p.client_name, p.vm_name
FROM Kunden.`bronze.restore.plan` p
LEFT JOIN Kunden.`bronze.restore.result` r
ON r.job_uuid = p.job_uuid
AND r.client_name = p.client_name
WHERE p.job_uuid = %s
AND r.id IS NULL
ORDER BY p.client_name ASC
""", (job_uuid,))
not_run = cur.fetchall()
cur.close(); conn.close()
total = job["total_backups"] or 0
done = job["restored_count"] or 0
failed = job["failed_count"] or 0
skipped = len(not_run)
date_str = job["started_at"].strftime("%d.%m.%Y") if job["started_at"] else "?"
time_str = job["started_at"].strftime("%H:%M") if job["started_at"] else "?"
if failed == 0 and skipped == 0:
status_icon = ""
elif done == 0:
status_icon = ""
else:
status_icon = "⚠️"
lines = [
f"{status_icon} **Backup Restore Report {date_str}**",
f"",
f"🕐 Start: {time_str} | Dauer: {duration}",
f"📊 Gesamt: {total} | ✅ OK: {done} | ❌ Fehler: {failed} | ⏭️ Nicht gestartet: {skipped}",
f"",
]
# Fehlgeschlagene
failed_results = [r for r in results if r["status"] == "failed"]
if failed_results:
lines.append("**❌ Fehlgeschlagen:**")
for r in failed_results:
name = r["vm_name"] or r["client_name"]
err = r["error_message"] or "unbekannt"
if len(err) > 80:
err = err[:80] + "..."
lines.append(f"{name} ({r['restore_server']}): {err}")
lines.append("")
# Nicht gestartet
if not_run:
lines.append("**⏭️ Nicht gestartet:**")
for r in not_run:
name = r["vm_name"] or r["client_name"]
lines.append(f"{name}")
lines.append("")
# Erfolgreich
done_results = [r for r in results if r["status"] == "done"]
if done_results:
lines.append("**✅ Erfolgreich:**")
for r in done_results:
name = r["vm_name"] or r["client_name"]
dauer = fmt_dur(r["restore_duration_sec"])
zipsize = fmt_bytes(r["zip_size_bytes"])
agent = "" if r["qm_agent_ok"] else ""
rsync = "" if r["rsync_ok"] else ""
lines.append(
f"{name} | {r['restore_server']} | "
f"{dauer} | 📦 {zipsize} | Agent: {agent} | Rsync: {rsync}"
)
message = "\n".join(lines)
print(message)
return {"message": message, "job_uuid": job_uuid, "failed": failed, "skipped": skipped}
@@ -0,0 +1,9 @@
# py: 3.12
anyio==4.12.1
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
typing-extensions==4.15.0
wmill==1.657.2
@@ -0,0 +1,44 @@
import wmill, json
import httpx
def main(report: dict):
import base64
message = report.get("message", "Kein Bericht verfuegbar.")
nc_url = wmill.get_variable("f/Backup/nextcloud_talk_url").rstrip("/")
nc_room = wmill.get_variable("f/Backup/nextcloud_talk_room")
nc_user = wmill.get_variable("f/Backup/nextcloud_talk_user")
nc_password = wmill.get_variable("f/Backup/nextcloud_talk_password")
credentials = base64.b64encode(
f"{nc_user}:{nc_password}".encode()
).decode()
url = f"{nc_url}/ocs/v2.php/apps/spreed/api/v1/chat/{nc_room}"
headers = {
"Authorization": f"Basic {credentials}",
"OCS-APIREQUEST": "true",
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = {"message": message}
response = httpx.post(
url,
headers=headers,
json=payload,
timeout=30,
verify=False, # falls self-signed cert
)
print(f"HTTP: {response.status_code}")
print(f"URL: {url}")
if response.status_code not in (200, 201):
raise Exception(
f"Nextcloud Talk Fehler: {response.status_code} {response.text}"
)
print("Nachricht gesendet ✓")
return {"status": "sent", "http": response.status_code}
+6
View File
@@ -0,0 +1,6 @@
summary: ''
display_name: Backup
extra_perms:
sebastianserfling@stines.de: true
owners:
- sebastianserfling@stines.de
@@ -0,0 +1,3 @@
description: ''
value: btrv2jb9
is_secret: false
@@ -0,0 +1,3 @@
description: ''
value: https://cloudstorage.stines.de
is_secret: false
+3
View File
@@ -0,0 +1,3 @@
description: ''
value: 1.0.28
is_secret: false
+6
View File
@@ -0,0 +1,6 @@
summary: null
display_name: Nextcloud
extra_perms:
sebastianserfling@stines.de: true
owners:
- sebastianserfling@stines.de
+2
View File
@@ -0,0 +1,2 @@
summary: Proxmox
description: ''
@@ -0,0 +1,87 @@
summary: Proxmox Backup Webhook
description: |
Empfängt Backup-Benachrichtigungen von Proxmox via Webhook,
speichert alle VM-Backups in MySQL und sendet eine Zusammenfassung
an Nextcloud Talk.
value:
modules:
- id: a
summary: Payload parsen & aufbereiten
value:
type: rawscript
content: '!inline payload_parsen_&_aufbereiten.py'
input_transforms:
message:
type: javascript
expr: flow_input.message || ''
severity:
type: javascript
expr: flow_input.severity || ''
title:
type: javascript
expr: flow_input.title || ''
debug_mode:
type: javascript
expr: flow_input.debug_mode || false
lock: '!inline payload_parsen_&_aufbereiten.lock'
language: python3
- id: b
summary: In MySQL speichern
value:
type: rawscript
content: '!inline in_mysql_speichern.py'
input_transforms:
backups:
type: javascript
expr: results.a.backups
batch_id:
type: javascript
expr: results.a.batch_id
raw_payload:
type: javascript
expr: results.a.raw_payload
total_size:
type: javascript
expr: results.a.total_size
total_time:
type: javascript
expr: results.a.total_time
debug_mode:
type: javascript
expr: flow_input.debug_mode || false
lock: '!inline in_mysql_speichern.lock'
language: python3
- id: c
summary: Nachricht an Nextcloud Talk
value:
type: rawscript
content: '!inline nachricht_an_nextcloud_talk.py'
input_transforms:
talk_message:
type: javascript
expr: results.a.talk_message
debug_mode:
type: javascript
expr: flow_input.debug_mode || false
lock: '!inline nachricht_an_nextcloud_talk.lock'
language: python3
schema:
$schema: https://json-schema.org/draft/2020-12/schema
type: object
properties:
message:
type: string
description: Vollständige Nachricht mit Summary und Logs
default: "Details\n=======\nVMID Name Status Time Size Filename\n127 OLV-CLOUD01-IP17.2 ok 4s 50 GiB vm/127/2026-04-22T19:43:13Z\n131 OLV-WORDP01-IP17.3 ok 3s 50 GiB vm/131/2026-04-22T19:43:17Z\n132 OLV-SMTP01-IP17.1 ok 5s 50 GiB vm/132/2026-04-22T19:43:20Z\n\nTotal running time: 12s\nTotal size: 150 GiB\n\nLogs\n====\nvzdump 127 131 132 --prune-backups 'keep-all=1' --mode snapshot\n\n127: 2026-04-22 21:43:13 INFO: Starting Backup of VM 127 (qemu)\n127: 2026-04-22 21:43:17 INFO: Finished Backup of VM 127 (00:00:04)\n\n131: 2026-04-22 21:43:17 INFO: Starting Backup of VM 131 (qemu)\n131: 2026-04-22 21:43:20 INFO: Finished Backup of VM 131 (00:00:03)\n\n132: 2026-04-22 21:43:20 INFO: Starting Backup of VM 132 (qemu)\n132: 2026-04-22 21:43:25 INFO: Finished Backup of VM 132 (00:00:05)"
severity:
type: string
description: Severity-Level (info, warning, error)
default: "info"
title:
type: string
description: Titel der Proxmox-Notification
default: "vzdump backup status (proxmox.netbird.stines.de): backup successful"
debug_mode:
type: boolean
description: "Debug-Modus: überspringt DB-Insert und Nextcloud-Nachricht"
default: false
@@ -0,0 +1,10 @@
# py: 3.12
anyio==4.13.0
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
mysql-connector-python==9.6.0
typing-extensions==4.15.0
wmill==1.688.0
@@ -0,0 +1,51 @@
import wmill
import json
import mysql.connector
def main(backups: list, batch_id: str, total_time: str, total_size: str, raw_payload: dict, debug_mode: bool = False):
if not backups:
print("Keine Backups zum Speichern.")
return {"inserted": 0}
if debug_mode:
print(f"[DEBUG] DB-Insert übersprungen (debug_mode=True)")
print(f"[DEBUG] batch_id={batch_id} | {len(backups)} VMs | total_time={total_time!r} | total_size={total_size!r}")
for b in backups:
print(f"[DEBUG] VM {b['vmid']} ({b['vm_name']}): status={b['status']}, size={b['size']}, duration={b['duration_sec']}s")
return {"inserted": 0, "batch_id": batch_id, "debug": True}
db_cfg = json.loads(wmill.get_variable("f/Backup/mysql_config"))
conn = mysql.connector.connect(**db_cfg)
cur = conn.cursor()
inserted = 0
for b in backups:
try:
cur.execute("""
INSERT INTO Kunden.`bronze.proxmox_backup_log`
(batch_id, vmid, vm_name, status, duration_sec, size, filename, log_text, total_time, total_size, raw_payload)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
batch_id,
b['vmid'],
b['vm_name'],
b['status'],
b['duration_sec'],
b['size'],
b['filename'],
b['log_text'],
total_time,
total_size,
json.dumps(raw_payload)
))
inserted += 1
except Exception as e:
print(f"Fehler beim Insert für VM {b['vmid']}: {e}")
conn.commit()
cur.close()
conn.close()
print(f"{inserted} Backup-Einträge in DB gespeichert (batch_id: {batch_id})")
return {"inserted": inserted, "batch_id": batch_id}
@@ -0,0 +1,9 @@
# py: 3.12
anyio==4.13.0
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
typing-extensions==4.15.0
wmill==1.688.0
@@ -0,0 +1,46 @@
import wmill
import httpx
import base64
def main(talk_message: str, debug_mode: bool = False):
if debug_mode:
print(f"[DEBUG] Nextcloud Talk übersprungen (debug_mode=True)")
print(f"[DEBUG] Nachricht die gesendet worden wäre:\n{talk_message}")
return {"status": "skipped", "debug": True}
nc_url = wmill.get_variable("f/Backup/nextcloud_talk_url").rstrip("/")
nc_room = wmill.get_variable("f/Backup/nextcloud_talk_room")
nc_user = wmill.get_variable("f/Backup/nextcloud_talk_user")
nc_password = wmill.get_variable("f/Backup/nextcloud_talk_password")
credentials = base64.b64encode(
f"{nc_user}:{nc_password}".encode()
).decode()
url = f"{nc_url}/ocs/v2.php/apps/spreed/api/v1/chat/{nc_room}"
headers = {
"Authorization": f"Basic {credentials}",
"OCS-APIREQUEST": "true",
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = {"message": talk_message}
response = httpx.post(
url,
headers=headers,
json=payload,
timeout=30,
verify=False
)
print(f"HTTP: {response.status_code}")
if response.status_code not in (200, 201):
raise Exception(
f"Nextcloud Talk Fehler: {response.status_code} {response.text}"
)
print("Nachricht gesendet ✓")
return {"status": "sent", "http": response.status_code}
@@ -0,0 +1 @@
# py: 3.12
@@ -0,0 +1,174 @@
import re
import uuid
from datetime import datetime
def parse_duration(duration_str):
"""Parse '1m 21s', '48s', '4s', '1h 58s' to seconds."""
if not duration_str:
return None
total = 0
for match in re.finditer(r'(\d+)\s*(h|m|s)', duration_str):
val, unit = int(match.group(1)), match.group(2)
if unit == 'h':
total += val * 3600
elif unit == 'm':
total += val * 60
elif unit == 's':
total += val
return total if total > 0 else None
def main(title: str = "", message: str = "", severity: str = "", debug_mode: bool = False):
batch_id = str(uuid.uuid4())[:8]
if debug_mode:
print(f"[DEBUG] title={title!r} | severity={severity!r}")
print(f"[DEBUG] message ({len(message)} Zeichen):\n{message[:2000]}")
lines = message.split('\n')
backups = []
total_time = None
total_size = None
in_details = False
in_logs = False
vm_logs = {}
current_vmid = None
for line in lines:
stripped = line.strip()
if stripped.startswith('Total running time:'):
total_time = stripped.replace('Total running time:', '').strip()
in_details = False
elif stripped.startswith('Total size:'):
total_size = stripped.replace('Total size:', '').strip()
elif stripped == 'Details':
in_details = True
continue
elif stripped == 'Logs':
in_details = False
in_logs = True
continue
elif re.match(r'^=+$', stripped):
continue
elif in_details:
# Skip header row and empty lines
if not stripped or stripped.upper().startswith('VMID'):
continue
# Split by 2+ spaces → works for both old and new Proxmox format
cols = re.split(r'\s{2,}', stripped)
if len(cols) >= 6 and cols[0].isdigit():
vmid, name, status, time_str, size_str, filename = cols[:6]
backups.append({
'vmid': int(vmid),
'vm_name': name,
'status': status.lower(),
'duration_sec': parse_duration(time_str),
'size': size_str.strip(),
'filename': filename.strip(),
'log_text': ''
})
elif in_logs:
vm_match = re.match(r'^(\d+):\s+\d{4}-\d{2}-\d{2}', line)
if vm_match:
current_vmid = vm_match.group(1)
if current_vmid not in vm_logs:
vm_logs[current_vmid] = []
if current_vmid:
vm_logs[current_vmid].append(line)
else:
# Old format without "Details" header: try direct VM row parsing
if stripped and not stripped.upper().startswith('VMID'):
cols = re.split(r'\s{2,}', stripped)
if len(cols) >= 6 and cols[0].isdigit():
vmid, name, status, time_str, size_str, filename = cols[:6]
backups.append({
'vmid': int(vmid),
'vm_name': name,
'status': status.lower(),
'duration_sec': parse_duration(time_str),
'size': size_str.strip(),
'filename': filename.strip(),
'log_text': ''
})
for backup in backups:
vmid_str = str(backup['vmid'])
if vmid_str in vm_logs:
backup['log_text'] = '\n'.join(vm_logs[vmid_str])
if debug_mode:
print(f"[DEBUG] {len(backups)} VMs geparst: {[b['vm_name'] for b in backups]}")
print(f"[DEBUG] total_time={total_time!r} | total_size={total_size!r}")
if not backups:
return {
'backups': [],
'batch_id': batch_id,
'total_time': total_time,
'total_size': total_size,
'talk_message': f"⚠️ Proxmox Webhook empfangen, aber keine VMs gefunden.\n\n```\n{message[:500]}\n```",
'raw_payload': {'title': title, 'message': message, 'severity': severity}
}
failed = [b for b in backups if b['status'] != 'ok']
ok = [b for b in backups if b['status'] == 'ok']
if len(failed) == 0:
status_icon = ""
elif len(ok) == 0:
status_icon = ""
else:
status_icon = "⚠️"
# --- Talk-Nachricht aufbauen ---
talk_lines = []
# Titel-Zeile mit Hostname und Status aus Proxmox
display_title = title if title else f"Proxmox Backup {datetime.now().strftime('%d.%m.%Y')}"
talk_lines.append(f"{status_icon} **{display_title}**")
talk_lines.append("")
# Details-Tabelle als Code-Block für korrekte Ausrichtung
talk_lines.append("```")
talk_lines.append(f"{'VMID':<6} {'Name':<22} {'Status':<8} {'Time':<6} {'Size':<10} Filename")
for b in backups:
dur = f"{b['duration_sec']}s" if b['duration_sec'] else "?"
talk_lines.append(
f"{b['vmid']:<6} {b['vm_name']:<22} {b['status']:<8} {dur:<6} {b['size']:<10} {b['filename']}"
)
talk_lines.append("```")
talk_lines.append("")
# Zusammenfassung
parts = []
if total_time:
parts.append(f"{total_time}")
if total_size:
parts.append(f"📦 {total_size}")
if parts:
talk_lines.append(" | ".join(parts))
# Fehler hervorheben
if failed:
talk_lines.append("")
talk_lines.append("**❌ Fehler bei:**")
for b in failed:
talk_lines.append(f"{b['vm_name']} ({b['vmid']})")
talk_message = '\n'.join(talk_lines)
if debug_mode:
print(f"[DEBUG] Generierte Talk-Nachricht:\n{talk_message}")
return {
'backups': backups,
'batch_id': batch_id,
'total_time': total_time,
'total_size': total_size,
'talk_message': talk_message,
'raw_payload': {'title': title, 'message': message, 'severity': severity}
}
+6
View File
@@ -0,0 +1,6 @@
summary: null
display_name: Reporting
extra_perms:
sebastianserfling@stines.de: true
owners:
- sebastianserfling@stines.de
@@ -0,0 +1,44 @@
{
"dependencies": {
"mysql2": "latest"
}
}
//bun.lock
{
"lockfileVersion": 1,
"configVersion": 1,
"workspaces": {
"": {
"dependencies": {
"mysql2": "latest",
},
},
},
"packages": {
"@types/node": ["@types/node@25.5.2", "", { "dependencies": { "undici-types": "~7.18.0" } }, "sha512-tO4ZIRKNC+MDWV4qKVZe3Ql/woTnmHDr5JD8UI5hn2pwBrHEwOEMZK7WlNb5RKB6EoJ02gwmQS9OrjuFnZYdpg=="],
"aws-ssl-profiles": ["aws-ssl-profiles@1.1.2", "", {}, "sha512-NZKeq9AfyQvEeNlN0zSYAaWrmBffJh3IELMZfRpJVWgrpEbtEpnjvzqBPf+mxoI287JohRDoa+/nsfqqiZmF6g=="],
"denque": ["denque@2.1.0", "", {}, "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw=="],
"generate-function": ["generate-function@2.3.1", "", { "dependencies": { "is-property": "^1.0.2" } }, "sha512-eeB5GfMNeevm/GRYq20ShmsaGcmI81kIX2K9XQx5miC8KdHaC6Jm0qQ8ZNeGOi7wYB8OsdxKs+Y2oVuTFuVwKQ=="],
"iconv-lite": ["iconv-lite@0.7.2", "", { "dependencies": { "safer-buffer": ">= 2.1.2 < 3.0.0" } }, "sha512-im9DjEDQ55s9fL4EYzOAv0yMqmMBSZp6G0VvFyTMPKWxiSBHUj9NW/qqLmXUwXrrM7AvqSlTCfvqRb0cM8yYqw=="],
"is-property": ["is-property@1.0.2", "", {}, "sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g=="],
"long": ["long@5.3.2", "", {}, "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA=="],
"lru.min": ["lru.min@1.1.4", "", {}, "sha512-DqC6n3QQ77zdFpCMASA1a3Jlb64Hv2N2DciFGkO/4L9+q/IpIAuRlKOvCXabtRW6cQf8usbmM6BE/TOPysCdIA=="],
"mysql2": ["mysql2@3.21.0", "", { "dependencies": { "aws-ssl-profiles": "^1.1.2", "denque": "^2.1.0", "generate-function": "^2.3.1", "iconv-lite": "^0.7.2", "long": "^5.3.2", "lru.min": "^1.1.4", "named-placeholders": "^1.1.6", "sql-escaper": "^1.3.3" }, "peerDependencies": { "@types/node": ">= 8" } }, "sha512-CYNKIuhnalXHTa4gonZ+KhzLESKllvo1qQIDYUVuatpN4NgMk+lsA3WwHYno5AS4PACUiD2qEmiVD9pr3bXWOw=="],
"named-placeholders": ["named-placeholders@1.1.6", "", { "dependencies": { "lru.min": "^1.1.0" } }, "sha512-Tz09sEL2EEuv5fFowm419c1+a/jSMiBjI9gHxVLrVdbUkkNUUfjsVYs9pVZu5oCon/kmRh9TfLEObFtkVxmY0w=="],
"safer-buffer": ["safer-buffer@2.1.2", "", {}, "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg=="],
"sql-escaper": ["sql-escaper@1.3.3", "", {}, "sha512-BsTCV265VpTp8tm1wyIm1xqQCS+Q9NHx2Sr+WcnUrgLrQ6yiDIvHYJV5gHxsj1lMBy2zm5twLaZao8Jd+S8JJw=="],
"undici-types": ["undici-types@7.18.2", "", {}, "sha512-AsuCzffGHJybSaRrmr5eHr81mwJU3kjw6M+uprWvCXiNeN9SOGwQ3Jn8jb8m3Z6izVgknn1R0FTCEAP2QrLY/w=="],
}
}
@@ -0,0 +1,39 @@
type Mysql = {
host: string;
port: number;
user: string;
password: string;
database: string;
};
type LoginRecord = {
username: string;
lastaccess: string;
ipaddress: string;
groups: string;
};
export async function main(
database: Mysql,
record: LoginRecord
): Promise<{ inserted: boolean }> {
const mysql2 = await import("mysql2/promise");
const conn = await mysql2.createConnection({
host: database.host,
port: database.port,
user: database.user,
password: database.password,
database: database.database,
});
try {
await conn.execute(
"INSERT INTO `bronze.services.reporting` (username, lastaccess, ipaddress, add_date, memberof) VALUES (?, ?, ?, NOW(), ?)",
[record.username, record.lastaccess, record.ipaddress, record.groups]
);
return { inserted: true };
} finally {
await conn.end();
}
}
+97
View File
@@ -0,0 +1,97 @@
summary: RDP Logins Collector
description: >
Lädt alle aktiven RDS-Server aus bronze.server, gleicht sie mit den
verbundenen rport.io Clients ab (per Hostname oder IP), führt auf jedem den
PS-Login-Collector aus und schreibt die Ergebnisse in
bronze.services.reporting.
value:
modules:
- id: find_rds_clients
summary: RDS-Server mit rport.io Clients abgleichen
value:
type: rawscript
content: '!inline rds-server_mit_rport.io_clients_abgleichen.ts'
input_transforms:
database:
type: static
value: $res:u/sebastianserfling/fascinating_mysql
rportio_api_token:
type: static
value: $var:f/Reporting/rportio_api_token
rportio_base_url:
type: static
value: $var:f/Reporting/rportio_base_url
rportio_username:
type: static
value: $var:f/Reporting/rportio_username
lock: '!inline rds-server_mit_rport.io_clients_abgleichen.lock'
language: bun
- id: process_servers
summary: Pro Server Logins sammeln und speichern
value:
type: forloopflow
modules:
- id: execute_ps
summary: PowerShell via rport.io ausführen
value:
type: rawscript
content: '!inline powershell_via_rport.io_ausführen.ts'
input_transforms:
client_id:
type: javascript
expr: flow_input.iter.value.rport_client_id
hours_back:
type: javascript
expr: flow_input.hours_back ?? 1
rportio_api_token:
type: static
value: $var:f/Reporting/rportio_api_token
rportio_base_url:
type: static
value: $var:f/Reporting/rportio_base_url
rportio_username:
type: static
value: $var:f/Reporting/rportio_username
server_ip:
type: javascript
expr: flow_input.iter.value.ipaddress
lock: '!inline powershell_via_rport.io_ausführen.lock'
language: bun
- id: insert_logins
summary: Login-Einträge in MySQL speichern
value:
type: forloopflow
modules:
- id: insert_login
summary: Einzelnen Login-Eintrag einfügen
value:
type: rawscript
content: '!inline einzelnen_login-eintrag_einfügen.ts'
input_transforms:
database:
type: static
value: $res:u/sebastianserfling/fascinating_mysql
record:
type: javascript
expr: flow_input.iter.value
lock: '!inline einzelnen_login-eintrag_einfügen.lock'
language: bun
iterator:
type: javascript
expr: results.execute_ps
parallel: false
skip_failures: false
iterator:
type: javascript
expr: results.find_rds_clients
parallel: false
skip_failures: true
schema:
$schema: https://json-schema.org/draft/2019-09/schema
type: object
properties:
hours_back:
type: integer
description: 'Wie viele Stunden zurückschauen (Standard: 1)'
default: 1
required: []
@@ -0,0 +1,5 @@
{
"dependencies": {}
}
//bun.lock
<empty>
@@ -0,0 +1,133 @@
type LoginRecord = {
username: string;
lastaccess: string;
ipaddress: string;
groups: string;
};
export async function main(
rportio_base_url: string,
rportio_username: string,
rportio_api_token: string,
client_id: string,
server_ip: string,
hours_back: number = 1
): Promise<LoginRecord[]> {
const psScript = `
$ErrorActionPreference = 'SilentlyContinue'
$startTime = (Get-Date).AddHours(-${hours_back})
$endTime = Get-Date
$filterHashTable = @{
LogName = 'Security'
Id = 4624
StartTime = $startTime
EndTime = $endTime
}
$events = Get-WinEvent -FilterHashtable $filterHashTable -ErrorAction SilentlyContinue
$userLogins = @{}
if ($events) {
foreach ($event in $events) {
$eventDetails = [xml]$event.ToXml()
$timeCreated = $event.TimeCreated
$username = $eventDetails.Event.EventData.Data | Where-Object { $_.Name -eq 'TargetUserName' } | Select-Object -ExpandProperty '#text'
$logonType = $eventDetails.Event.EventData.Data | Where-Object { $_.Name -eq 'LogonType' } | Select-Object -ExpandProperty '#text'
if ($logonType -ne '10' -or $username -like 'DWM*' -or $username -like '*UMFD*') { continue }
$ipaddress = '${server_ip}'
$formattedTime = $timeCreated.ToString('yyyy-MM-dd HH:mm:ss')
if (-not $userLogins.ContainsKey($username) -or $userLogins[$username]._raw -lt $timeCreated) {
$userLogins[$username] = [PSCustomObject]@{
lastaccess = $formattedTime
username = $username
ipaddress = $ipaddress
_raw = $timeCreated
}
}
}
}
$result = [System.Collections.Generic.List[object]]::new()
foreach ($entry in $userLogins.GetEnumerator()) {
$u = $entry.Value
$adGroups = $null
try {
$adGroups = @(Get-ADPrincipalGroupMembership -Identity $u.username -ErrorAction Stop | Select-Object -ExpandProperty Name)
} catch {}
if (-not $adGroups -or $adGroups.Count -eq 0) { $adGroups = @('G-RDP-User') }
foreach ($group in $adGroups) {
$result.Add([PSCustomObject]@{
username = $u.username
lastaccess = $u.lastaccess
ipaddress = $u.ipaddress
groups = $group
})
}
}
if ($result.Count -eq 0) { Write-Output '[]' } else { $result | ConvertTo-Json -Depth 3 -Compress }
`.trim();
// rport.io powershell interpreter executes the script directly as PS code
const command = psScript;
// API token is used as Basic Auth password (bypasses 2FA)
const auth = Buffer.from(`${rportio_username}:${rportio_api_token}`).toString("base64");
const headers: Record<string, string> = {
Authorization: `Basic ${auth}`,
"Content-Type": "application/json",
};
const tlsOpts = { tls: { rejectUnauthorized: false } };
// Submit command to rport.io
// @ts-ignore - Bun-specific TLS option for self-signed certificates
const execResp = await fetch(
`${rportio_base_url}/api/v1/clients/${client_id}/commands`,
{
method: "POST",
headers,
body: JSON.stringify({ command, interpreter: "powershell", timeout_sec: 120 }),
...tlsOpts,
}
);
if (!execResp.ok) {
const text = await execResp.text();
throw new Error(`rport.io execute failed [${execResp.status}]: ${text}`);
}
const execData = await execResp.json();
const jid: string = execData?.data?.jid;
if (!jid) throw new Error(`No job ID from rport.io: ${JSON.stringify(execData)}`);
// Poll until finished (max 120s)
let cmdResult: Record<string, unknown> | null = null;
for (let i = 0; i < 60; i++) {
await new Promise((r) => setTimeout(r, 2000));
// @ts-ignore - Bun-specific TLS option
const statusResp = await fetch(
`${rportio_base_url}/api/v1/clients/${client_id}/commands/${jid}`,
{ headers, ...tlsOpts }
);
if (!statusResp.ok) continue;
const statusData = await statusResp.json();
const cmd = statusData?.data as Record<string, unknown>;
if (cmd?.finished_at) {
cmdResult = cmd;
break;
}
}
if (!cmdResult) throw new Error("Timeout waiting for rport.io command result");
const status = cmdResult.status as string;
if (status === "failed" || status === "unknown") {
const result = cmdResult.result as Record<string, string> ?? {};
throw new Error(`PowerShell failed [${status}]: ${cmdResult.error ?? result.stderr ?? ""}`);
}
const result = cmdResult.result as Record<string, string> ?? {};
const stdout = (result.stdout ?? "").trim();
if (!stdout || stdout === "[]") return [];
try {
const parsed = JSON.parse(stdout);
return Array.isArray(parsed) ? parsed : [parsed];
} catch {
throw new Error(`Failed to parse PowerShell JSON output: ${stdout}`);
}
}
@@ -0,0 +1,44 @@
{
"dependencies": {
"mysql2": "latest"
}
}
//bun.lock
{
"lockfileVersion": 1,
"configVersion": 1,
"workspaces": {
"": {
"dependencies": {
"mysql2": "latest",
},
},
},
"packages": {
"@types/node": ["@types/node@25.5.2", "", { "dependencies": { "undici-types": "~7.18.0" } }, "sha512-tO4ZIRKNC+MDWV4qKVZe3Ql/woTnmHDr5JD8UI5hn2pwBrHEwOEMZK7WlNb5RKB6EoJ02gwmQS9OrjuFnZYdpg=="],
"aws-ssl-profiles": ["aws-ssl-profiles@1.1.2", "", {}, "sha512-NZKeq9AfyQvEeNlN0zSYAaWrmBffJh3IELMZfRpJVWgrpEbtEpnjvzqBPf+mxoI287JohRDoa+/nsfqqiZmF6g=="],
"denque": ["denque@2.1.0", "", {}, "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw=="],
"generate-function": ["generate-function@2.3.1", "", { "dependencies": { "is-property": "^1.0.2" } }, "sha512-eeB5GfMNeevm/GRYq20ShmsaGcmI81kIX2K9XQx5miC8KdHaC6Jm0qQ8ZNeGOi7wYB8OsdxKs+Y2oVuTFuVwKQ=="],
"iconv-lite": ["iconv-lite@0.7.2", "", { "dependencies": { "safer-buffer": ">= 2.1.2 < 3.0.0" } }, "sha512-im9DjEDQ55s9fL4EYzOAv0yMqmMBSZp6G0VvFyTMPKWxiSBHUj9NW/qqLmXUwXrrM7AvqSlTCfvqRb0cM8yYqw=="],
"is-property": ["is-property@1.0.2", "", {}, "sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g=="],
"long": ["long@5.3.2", "", {}, "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA=="],
"lru.min": ["lru.min@1.1.4", "", {}, "sha512-DqC6n3QQ77zdFpCMASA1a3Jlb64Hv2N2DciFGkO/4L9+q/IpIAuRlKOvCXabtRW6cQf8usbmM6BE/TOPysCdIA=="],
"mysql2": ["mysql2@3.21.0", "", { "dependencies": { "aws-ssl-profiles": "^1.1.2", "denque": "^2.1.0", "generate-function": "^2.3.1", "iconv-lite": "^0.7.2", "long": "^5.3.2", "lru.min": "^1.1.4", "named-placeholders": "^1.1.6", "sql-escaper": "^1.3.3" }, "peerDependencies": { "@types/node": ">= 8" } }, "sha512-CYNKIuhnalXHTa4gonZ+KhzLESKllvo1qQIDYUVuatpN4NgMk+lsA3WwHYno5AS4PACUiD2qEmiVD9pr3bXWOw=="],
"named-placeholders": ["named-placeholders@1.1.6", "", { "dependencies": { "lru.min": "^1.1.0" } }, "sha512-Tz09sEL2EEuv5fFowm419c1+a/jSMiBjI9gHxVLrVdbUkkNUUfjsVYs9pVZu5oCon/kmRh9TfLEObFtkVxmY0w=="],
"safer-buffer": ["safer-buffer@2.1.2", "", {}, "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg=="],
"sql-escaper": ["sql-escaper@1.3.3", "", {}, "sha512-BsTCV265VpTp8tm1wyIm1xqQCS+Q9NHx2Sr+WcnUrgLrQ6yiDIvHYJV5gHxsj1lMBy2zm5twLaZao8Jd+S8JJw=="],
"undici-types": ["undici-types@7.18.2", "", {}, "sha512-AsuCzffGHJybSaRrmr5eHr81mwJU3kjw6M+uprWvCXiNeN9SOGwQ3Jn8jb8m3Z6izVgknn1R0FTCEAP2QrLY/w=="],
}
}
@@ -0,0 +1,88 @@
type Mysql = {
host: string;
port: number;
user: string;
password: string;
database: string;
};
type RportClient = {
rport_client_id: string;
hostname: string;
ipaddress: string;
};
export async function main(
database: Mysql,
rportio_base_url: string,
rportio_username: string,
rportio_api_token: string
): Promise<RportClient[]> {
const mysql2 = await import("mysql2/promise");
// 1. Query MySQL for all active RDS servers
const conn = await mysql2.createConnection({
host: database.host,
port: database.port,
user: database.user,
password: database.password,
database: database.database,
});
const [rows] = await conn.execute(
"SELECT hostname, privat_ipaddress FROM `bronze.server` WHERE services LIKE '%RDS%' AND (disable_date IS NULL OR disable_date > NOW())"
);
await conn.end();
const dbServers = rows as Array<{ hostname: string; privat_ipaddress: string }>;
if (dbServers.length === 0) return [];
// Build lookup maps for fast matching (hostname → DB row)
const byHostname = new Map(dbServers.map((s) => [s.hostname.toLowerCase(), s]));
const byIp = new Map(dbServers.map((s) => [s.privat_ipaddress, s]));
// 2. Query rport.io for all connected clients
const auth = Buffer.from(`${rportio_username}:${rportio_api_token}`).toString("base64");
const headers = {
Authorization: `Basic ${auth}`,
"Content-Type": "application/json",
};
// @ts-ignore - Bun-specific TLS option to allow self-signed certificates
const resp = await fetch(
`${rportio_base_url}/api/v1/clients?filter[connection_state]=connected&fields[clients]=id,name,hostname,ipv4&page[limit]=500`,
{ headers, tls: { rejectUnauthorized: false } }
);
if (!resp.ok) {
const err = await resp.text();
throw new Error(`rport.io clients list failed [${resp.status}]: ${err}`);
}
const data = await resp.json();
const clients = data?.data ?? [];
// 3. Match rport.io clients against DB server list (hostname or IP)
const matched: RportClient[] = [];
for (const client of clients) {
const rportHostname = (client.hostname ?? "").toLowerCase();
const rportIps: string[] = client.ipv4 ?? [];
const dbRow = byHostname.get(rportHostname)
?? rportIps.map((ip) => byIp.get(ip)).find(Boolean);
if (dbRow) {
matched.push({
rport_client_id: client.id,
hostname: client.hostname ?? client.name,
ipaddress: dbRow.privat_ipaddress, // use IP from DB, not from PS script
});
}
}
console.log(
`Found ${dbServers.length} RDS servers in DB, ${clients.length} connected rport.io clients, ${matched.length} matched`
);
return matched;
}
@@ -0,0 +1,9 @@
# py: 3.12
anyio==4.12.1
certifi==2026.2.25
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
typing-extensions==4.15.0
wmill==1.657.2
+127
View File
@@ -0,0 +1,127 @@
import subprocess
import sys
import json
import os
import wmill
def main(
client_name: str,
bw_url: str = "https://bitwarden.stines.de",
):
# ── Credentials aus Windmill-Secret holen ─────────────────────────────────
bw_creds = json.loads(wmill.get_variable("f/Backup/bitwarden_api_login"))
bw_clientid = bw_creds["bw_clientid"]
bw_clientsecret = bw_creds["bw_clientsecret"]
bw_password = bw_creds["bw_masterpassword"]
search_term = f"{client_name}"
env = os.environ.copy()
env["BW_CLIENTID"] = bw_clientid
env["BW_CLIENTSECRET"] = bw_clientsecret
env["BW_PASSWORD"] = bw_password
def run(cmd, check=True, capture=True):
return subprocess.run(
cmd, env=env, text=True, capture_output=capture, check=check
)
# ── 1. Bitwarden CLI prüfen ───────────────────────────────────────────────
print("==> Prüfe Bitwarden CLI...", file=sys.stderr)
if subprocess.run(["which", "bw"], capture_output=True).returncode != 0:
print(" Installiere bw CLI...", file=sys.stderr)
run(
[
"wget",
"https://github.com/bitwarden/cli/releases/download/v1.22.1/bw-linux-1.22.1.zip",
"-O",
"bw.zip",
]
)
run(["unzip", "bw.zip"])
run(["chmod", "+x", "bw"])
run(["mv", "bw", "/usr/local/bin/bw"])
bw_version = run(["bw", "--version"]).stdout.strip()
print(f" Bitwarden CLI Version: {bw_version}", file=sys.stderr)
# ── Hostfile setzen ───────────────────────────────────────────────────────
with open("/etc/hosts", "a") as f:
f.write("172.17.1.3 bitwarden.stines.de\n")
# ── 2. Server-URL konfigurieren ───────────────────────────────────────────
print(f"==> Setze Server-URL: {bw_url}", file=sys.stderr)
run(["bw", "config", "server", bw_url])
# ── 3. Login ──────────────────────────────────────────────────────────────
print("==> Melde bei Bitwarden an...", file=sys.stderr)
run(["bw", "logout"], check=False)
result = run(["bw", "login", "--apikey"], check=False)
if result.returncode != 0:
print(f"ERROR: API-Key-Login fehlgeschlagen.\n{result.stderr}", file=sys.stderr)
sys.exit(1)
print(" Login erfolgreich.", file=sys.stderr)
# ── 4. Vault entsperren ───────────────────────────────────────────────────
print("==> Entsperre Vault...", file=sys.stderr)
unlock = run(["bw", "unlock", bw_password, "--raw"])
bw_session = unlock.stdout.strip()
if not bw_session:
print("ERROR: Vault konnte nicht entsperrt werden.", file=sys.stderr)
run(["bw", "logout"], check=False)
sys.exit(1)
env["BW_SESSION"] = bw_session
print(" Vault entsperrt.", file=sys.stderr)
# ── 5. Vault synchronisieren ──────────────────────────────────────────────
print("==> Synchronisiere Vault...", file=sys.stderr)
run(["bw", "sync", "--session", bw_session])
print(" Sync abgeschlossen.", file=sys.stderr)
# ── 6. Eintrag suchen ─────────────────────────────────────────────────────
print(f"==> Suche Eintrag: '{search_term}'...", file=sys.stderr)
search = run(
["bw", "list", "items", "--search", search_term, "--session", bw_session]
)
items = json.loads(search.stdout)
if not items:
print(f"ERROR: Kein Eintrag gefunden für: '{search_term}'", file=sys.stderr)
run(["bw", "logout"], check=False)
sys.exit(1)
print(f" {len(items)} Treffer gefunden.", file=sys.stderr)
# Exakten Treffer bevorzugen, sonst ersten nehmen
exact = next(
(
i
for i in items
if i.get("name", "").strip().lower() == search_term.strip().lower()
),
None,
)
item = exact if exact else items[0]
# ── 7. Abmelden ───────────────────────────────────────────────────────────
run(["bw", "logout"], check=False)
print("==> Abgemeldet.", file=sys.stderr)
# ── 8. Rückgabe als Objekt ────────────────────────────────────────────────
return {
"id": item.get("id", ""),
"name": item.get("name", ""),
"client": item.get("name", "").split("// ")[-1],
"username": item.get("login", {}).get("username", "")
if item.get("login")
else "",
"password": item.get("login", {}).get("password", "")
if item.get("login")
else "",
"notes": item.get("notes", "") or "",
"url": ((item.get("login", {}).get("uris") or [{}])[0].get("uri", ""))
if item.get("login")
else "",
}
@@ -0,0 +1,29 @@
summary: Bitwarden Backup-Info Export
description: Sucht nach dem passenden Eintrag in der Datenbank vom Bitwarden mit
Password und Notes (Entcrypt-KEy)
value:
modules:
- id: c
value:
type: rawscript
content: '!inline c.py'
input_transforms:
bw_url:
type: static
value: https://bitwarden.stines.de
client_name:
type: javascript
expr: flow_input.search
lock: '!inline c.lock'
language: python3
schema:
$schema: https://json-schema.org/draft/2020-12/schema
type: object
order:
- search
properties:
search:
type: string
description: ''
default: ''
required: []
+6
View File
@@ -0,0 +1,6 @@
summary: ''
display_name: Server
extra_perms:
sebastianserfling@stines.de: true
owners:
- sebastianserfling@stines.de
+5
View File
@@ -0,0 +1,5 @@
summary: null
display_name: App Custom Components
extra_perms:
g/all: false
owners: []
+5
View File
@@ -0,0 +1,5 @@
summary: null
display_name: App Groups
extra_perms:
g/all: false
owners: []
+5
View File
@@ -0,0 +1,5 @@
summary: null
display_name: App Themes
extra_perms:
g/all: false
owners: []
+5
View File
@@ -0,0 +1,5 @@
description: The default app theme
value:
name: Default Theme
value: ''
resource_type: app_theme
+8
View File
@@ -0,0 +1,8 @@
summary: ''
display_name: random_stuff
extra_perms:
g/all: true
sebastianserfling@stines.de: true
owners:
- sebastianserfling@stines.de
- g/all