import subprocess import sys import json import os import wmill def main( client_name: str, bw_url: str = "https://bitwarden.stines.de", ): # ── Credentials aus Windmill-Secret holen ───────────────────────────────── bw_creds = json.loads(wmill.get_variable("f/Backup/bitwarden_api_login")) bw_clientid = bw_creds["bw_clientid"] bw_clientsecret = bw_creds["bw_clientsecret"] bw_password = bw_creds["bw_masterpassword"] search_term = f"{client_name}" env = os.environ.copy() env["BW_CLIENTID"] = bw_clientid env["BW_CLIENTSECRET"] = bw_clientsecret env["BW_PASSWORD"] = bw_password def run(cmd, check=True, capture=True): return subprocess.run( cmd, env=env, text=True, capture_output=capture, check=check ) # ── 1. Bitwarden CLI prüfen ─────────────────────────────────────────────── print("==> Prüfe Bitwarden CLI...", file=sys.stderr) if subprocess.run(["which", "bw"], capture_output=True).returncode != 0: print(" Installiere bw CLI...", file=sys.stderr) run( [ "wget", "https://github.com/bitwarden/cli/releases/download/v1.22.1/bw-linux-1.22.1.zip", "-O", "bw.zip", ] ) run(["unzip", "bw.zip"]) run(["chmod", "+x", "bw"]) run(["mv", "bw", "/usr/local/bin/bw"]) bw_version = run(["bw", "--version"]).stdout.strip() print(f" Bitwarden CLI Version: {bw_version}", file=sys.stderr) # ── Hostfile setzen ─────────────────────────────────────────────────────── with open("/etc/hosts", "a") as f: f.write("172.17.1.3 bitwarden.stines.de\n") # ── 2. Server-URL konfigurieren ─────────────────────────────────────────── print(f"==> Setze Server-URL: {bw_url}", file=sys.stderr) run(["bw", "config", "server", bw_url]) # ── 3. Login ────────────────────────────────────────────────────────────── print("==> Melde bei Bitwarden an...", file=sys.stderr) run(["bw", "logout"], check=False) result = run(["bw", "login", "--apikey"], check=False) if result.returncode != 0: print(f"ERROR: API-Key-Login fehlgeschlagen.\n{result.stderr}", file=sys.stderr) sys.exit(1) print(" Login erfolgreich.", file=sys.stderr) # ── 4. Vault entsperren ─────────────────────────────────────────────────── print("==> Entsperre Vault...", file=sys.stderr) unlock = run(["bw", "unlock", bw_password, "--raw"]) bw_session = unlock.stdout.strip() if not bw_session: print("ERROR: Vault konnte nicht entsperrt werden.", file=sys.stderr) run(["bw", "logout"], check=False) sys.exit(1) env["BW_SESSION"] = bw_session print(" Vault entsperrt.", file=sys.stderr) # ── 5. Vault synchronisieren ────────────────────────────────────────────── print("==> Synchronisiere Vault...", file=sys.stderr) run(["bw", "sync", "--session", bw_session]) print(" Sync abgeschlossen.", file=sys.stderr) # ── 6. Eintrag suchen ───────────────────────────────────────────────────── print(f"==> Suche Eintrag: '{search_term}'...", file=sys.stderr) search = run( ["bw", "list", "items", "--search", search_term, "--session", bw_session] ) items = json.loads(search.stdout) if not items: print(f"ERROR: Kein Eintrag gefunden für: '{search_term}'", file=sys.stderr) run(["bw", "logout"], check=False) sys.exit(1) print(f" {len(items)} Treffer gefunden.", file=sys.stderr) # Exakten Treffer bevorzugen, sonst ersten nehmen exact = next( ( i for i in items if i.get("name", "").strip().lower() == search_term.strip().lower() ), None, ) item = exact if exact else items[0] # ── 7. Abmelden ─────────────────────────────────────────────────────────── run(["bw", "logout"], check=False) print("==> Abgemeldet.", file=sys.stderr) # ── 8. Rückgabe als Objekt ──────────────────────────────────────────────── return { "id": item.get("id", ""), "name": item.get("name", ""), "client": item.get("name", "").split("// ")[-1], "username": item.get("login", {}).get("username", "") if item.get("login") else "", "password": item.get("login", {}).get("password", "") if item.get("login") else "", "notes": item.get("notes", "") or "", "url": ((item.get("login", {}).get("uris") or [{}])[0].get("uri", "")) if item.get("login") else "", }