timestamp_mysql_check/mysql_timestamp_check.py

269 lines
9.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import mysql.connector
import datetime
import smtplib
import ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import argparse
import sys
import os
from dotenv import load_dotenv
def get_tables_from_db(connection):
"""
Holt die Liste der zu überprüfenden Tabellen, die 'RAW.' im Namen enthalten
Returns:
list: Liste der Tabellennamen
"""
try:
cursor = connection.cursor()
# Holen aller Tabellen in der Kunden-Datenbank
cursor.execute("SHOW TABLES FROM Kunden")
tables = []
for (table,) in cursor.fetchall():
if 'RAW.' in table:
# Tabellennamen mit Backticks versehen
tables.append(f"Kunden.`{table}`")
cursor.close()
if not tables:
print("Warnung: Keine Tabellen gefunden, die 'RAW.' enthalten.")
else:
print(f"Gefundene zu überprüfende Tabellen: {', '.join(tables)}")
return tables
except mysql.connector.Error as err:
print(f"Fehler beim Abrufen der Tabellenliste: {err}")
sys.exit(1)
def check_timestamp(connection, tables, timestamp_column, email_config, test_mode=False):
"""
Überprüft, ob der letzte Zeitstempel in jeder angegebenen Tabelle von gestern ist.
Sendet eine E-Mail mit den Namen der Tabellen, die nicht aktuell sind oder
gibt die Nachricht auf der Konsole aus, wenn test_mode=True.
"""
cursor = connection.cursor()
# Berechne das Datum für gestern
yesterday = datetime.date.today() - datetime.timedelta(days=1)
yesterday_start = datetime.datetime.combine(yesterday, datetime.time.min)
yesterday_end = datetime.datetime.combine(yesterday, datetime.time.max)
# Liste der Tabellen, die nicht aktuell sind
outdated_tables = []
database = os.getenv("MYSQL_DATABASE", "unbekannt")
# Jede Tabelle überprüfen
for table in tables:
try:
# SQL-Abfrage, um zu prüfen, ob es Zeitstempel von gestern gibt
query = f"""
SELECT 1
FROM {table}
WHERE {timestamp_column} BETWEEN %s AND %s
LIMIT 1
"""
cursor.execute(query, (yesterday_start, yesterday_end))
result = cursor.fetchone()
# Überprüfen, ob ein Ergebnis vorhanden ist (Daten von gestern existieren)
if result is None:
print(f"Tabelle {table}: KEINE Zeitstempel von gestern gefunden.")
outdated_tables.append(f"{table} (keine Daten von gestern)")
else:
print(f"Tabelle {table}: Zeitstempel von gestern gefunden - OK.")
except mysql.connector.Error as err:
print(f"Fehler beim Überprüfen der Tabelle {table}: {err}")
outdated_tables.append(f"{table} (Fehler: {err}")
# Cursor schließen
cursor.close()
# Wenn es nicht aktuelle Tabellen gibt, sende eine E-Mail oder gib Meldung aus
if outdated_tables:
if test_mode:
print("\n--- TEST MODUS: E-Mail würde gesendet werden ---")
print(f"Betreff: Warnung: Nicht aktuelle Tabellen in Datenbank {database}")
print("Inhalt:")
print("Hallo,")
print(
f"\ndie folgenden Tabellen in der Datenbank '{database}' haben keinen aktuellen Zeitstempel von gestern:")
for table in outdated_tables:
print(f"- {table}")
print("\nBitte überprüfen Sie diese Tabellen.")
print("\nDies ist eine automatisch generierte Nachricht.")
print("--- ENDE TEST MODUS ---\n")
else:
send_email(outdated_tables, database, email_config)
return False
else:
print("Alle Tabellen sind aktuell.")
return True
def send_email(outdated_tables, database, email_config):
"""
Sendet eine E-Mail mit der Liste der nicht aktuellen Tabellen.
"""
sender_email = email_config["sender"]
receiver_email = email_config["receiver"]
smtp_server = email_config["smtp_server"]
smtp_port = email_config["smtp_port"]
smtp_user = email_config.get("username", sender_email)
smtp_password = email_config.get("password", "")
# E-Mail erstellen
message = MIMEMultipart()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = f"Warnung: Nicht aktuelle Tabellen in Datenbank {database}"
# E-Mail-Inhalt
body = f"""
Hallo,
die folgenden Tabellen in der Datenbank '{database}' haben keinen aktuellen Zeitstempel von gestern:
{chr(10).join('- ' + table for table in outdated_tables)}
Bitte überprüfen Sie diese Tabellen.
Dies ist eine automatisch generierte Nachricht.
"""
message.attach(MIMEText(body, "plain"))
try:
# Verbindung zum SMTP-Server herstellen mit TLS
context = ssl.create_default_context()
server = smtplib.SMTP(smtp_server, smtp_port)
server.ehlo() # Kann bei manchen Servern erforderlich sein
server.starttls(context=context) # TLS-Verschlüsselung aktivieren mit sicherem Kontext
server.ehlo() # Nach TLS erneut ehlo senden
# Anmelden (falls erforderlich)
if smtp_password:
server.login(smtp_user, smtp_password)
# E-Mail senden
server.send_message(message)
server.quit()
print(f"E-Mail an {receiver_email} gesendet.")
except Exception as e:
print(f"Fehler beim Senden der E-Mail: {e}")
def main():
# .env-Datei laden
load_dotenv()
parser = argparse.ArgumentParser(description="Überprüft MySQL-Tabellen auf aktuelle Zeitstempel.")
# Optionale DB-Parameter (werden sonst aus .env gelesen)
parser.add_argument("--host", help="MySQL-Hostadresse (Standard: aus .env MYSQL_HOST)")
parser.add_argument("--user", help="MySQL-Benutzername (Standard: aus .env MYSQL_USER)")
parser.add_argument("--password", help="MySQL-Passwort (Standard: aus .env MYSQL_PASSWORD)")
parser.add_argument("--database", help="MySQL-Datenbankname (Standard: aus .env MYSQL_DATABASE)")
# Parameter für Zeitstempel-Spalte (mit add_date als Standard)
parser.add_argument("--timestamp-column", default="add_date",
help="Name der Zeitstempel-Spalte (Standard: add_date)")
# Optionale Parameter
parser.add_argument("--test", action="store_true",
help="Testmodus: Keine E-Mail senden, nur Ausgabe auf der Konsole")
# E-Mail-Parameter
email_group = parser.add_argument_group("E-Mail-Konfiguration",
"Diese Parameter werden nicht benötigt, wenn --test verwendet wird")
email_group.add_argument("--email-sender", help="Absender-E-Mail-Adresse (Standard: aus .env EMAIL_SENDER)")
email_group.add_argument("--email-receiver", help="Empfänger-E-Mail-Adresse (Standard: aus .env EMAIL_RECEIVER)")
email_group.add_argument("--smtp-server", help="SMTP-Server-Adresse (Standard: aus .env SMTP_SERVER)")
email_group.add_argument("--smtp-port", type=int, help="SMTP-Server-Port (Standard: aus .env SMTP_PORT)")
email_group.add_argument("--smtp-user", help="SMTP-Benutzername (Standard: aus .env SMTP_USER)")
email_group.add_argument("--smtp-password", help="SMTP-Passwort (Standard: aus .env SMTP_PASSWORD)")
args = parser.parse_args()
# Datenbankverbindungsparameter aus .env oder Kommandozeile
host = args.host or os.getenv("MYSQL_HOST", "localhost")
user = args.user or os.getenv("MYSQL_USER")
password = args.password or os.getenv("MYSQL_PASSWORD")
database = args.database or os.getenv("MYSQL_DATABASE")
# Prüfe, ob erforderliche DB-Parameter vorhanden sind
if not user or not password or not database:
print(
"Fehler: MySQL-Benutzer, Passwort und Datenbankname müssen entweder in der .env-Datei oder als Parameter angegeben werden!")
sys.exit(1)
# Verbindung zur Datenbank herstellen
try:
connection = mysql.connector.connect(
host=host,
user=user,
password=password,
database=database
)
except mysql.connector.Error as err:
print(f"Fehler bei der Datenbankverbindung: {err}")
sys.exit(1)
# Testmodus aktiviert?
test_mode = args.test
# E-Mail-Konfiguration
email_config = {
"sender": args.email_sender or os.getenv("EMAIL_SENDER"),
"receiver": args.email_receiver or os.getenv("EMAIL_RECEIVER"),
"smtp_server": args.smtp_server or os.getenv("SMTP_SERVER", "localhost"),
"smtp_port": args.smtp_port or int(os.getenv("SMTP_PORT", "587")),
"username": args.smtp_user or os.getenv("SMTP_USER"),
"password": args.smtp_password or os.getenv("SMTP_PASSWORD", "")
}
# Wenn nicht im Testmodus, prüfe ob E-Mail-Parameter angegeben wurden
if not test_mode:
if not email_config["sender"] or not email_config["receiver"]:
print(
"Fehler: E-Mail-Absender und -Empfänger müssen entweder in der .env-Datei oder als Parameter angegeben werden!")
connection.close()
sys.exit(1)
# Tabellen aus der Datenbanktabelle holen
tables = get_tables_from_db(connection)
if not tables:
print("Keine zu überprüfenden Tabellen gefunden. Programm wird beendet.")
connection.close()
sys.exit(0)
# Zeitstempel-Spalte (Standard: add_date)
timestamp_column = args.timestamp_column
print(f"Überprüfe Zeitstempel in Spalte: {timestamp_column}")
# Timestamp-Überprüfung durchführen
check_timestamp(
connection,
tables,
timestamp_column,
email_config,
test_mode
)
# Verbindung schließen
connection.close()
if __name__ == "__main__":
main()