timestamp_mysql_check/mysql_timestamp_check.py

267 lines
9.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import mysql.connector
import datetime
import smtplib
import ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import argparse
import sys
import os
from dotenv import load_dotenv
def get_tables_from_db(connection):
"""
Holt die Liste der zu überprüfenden Tabellen, die 'history.' im Namen enthalten
Returns:
list: Liste der Tabellennamen
"""
try:
cursor = connection.cursor()
# Holen aller Tabellen in der Kunden-Datenbank
cursor.execute("SHOW TABLES FROM Kunden")
tables = []
for (table,) in cursor.fetchall():
if 'history.' in table:
# Tabellennamen mit Backticks versehen
tables.append(f"Kunden.`{table}`")
cursor.close()
if not tables:
print("Warnung: Keine Tabellen gefunden, die 'history.' enthalten.")
else:
print(f"Gefundene zu überprüfende Tabellen: {', '.join(tables)}")
return tables
except mysql.connector.Error as err:
print(f"Fehler beim Abrufen der Tabellenliste: {err}")
sys.exit(1)
def check_timestamp(connection, tables, timestamp_column, email_config, test_mode=False):
"""
Überprüft, ob der letzte Zeitstempel in jeder angegebenen Tabelle von gestern ist.
Sendet eine E-Mail mit den Namen der Tabellen, die nicht aktuell sind oder
eine Bestätigungsmail, wenn alle Tabellen aktuell sind. Im Testmodus wird
nur auf der Konsole ausgegeben.
"""
cursor = connection.cursor()
# Berechne das Datum für gestern
yesterday = datetime.date.today() - datetime.timedelta(days=1)
yesterday_start = datetime.datetime.combine(yesterday, datetime.time.min)
yesterday_end = datetime.datetime.combine(yesterday, datetime.time.max)
# Liste der Tabellen, die nicht aktuell sind
outdated_tables = []
database = os.getenv("MYSQL_DATABASE", "unbekannt")
# Jede Tabelle überprüfen
for table in tables:
try:
# SQL-Abfrage, um zu prüfen, ob es Zeitstempel von gestern gibt
query = f"""
SELECT 1
FROM {table}
WHERE {timestamp_column} BETWEEN %s AND %s
LIMIT 1
"""
cursor.execute(query, (yesterday_start, yesterday_end))
result = cursor.fetchone()
# Überprüfen, ob ein Ergebnis vorhanden ist (Daten von gestern existieren)
if result is None:
print(f"Tabelle {table}: KEINE Zeitstempel von gestern gefunden.")
outdated_tables.append(f"{table} (keine Daten von gestern)")
else:
print(f"Tabelle {table}: Zeitstempel von gestern gefunden - OK.")
except mysql.connector.Error as err:
print(f"Fehler beim Überprüfen der Tabelle {table}: {err}")
outdated_tables.append(f"{table} (Fehler: {err})")
# Cursor schließen
cursor.close()
# E-Mail Inhalt und Betreff anpassen, je nachdem ob es fehlerhafte Tabellen gibt oder nicht
if outdated_tables:
subject = f"Warnung: Nicht aktuelle Tabellen in Datenbank {database}"
body_intro = (
f"Hallo,\n\n"
f"die folgenden Tabellen in der Datenbank '{database}' haben keinen aktuellen Zeitstempel von gestern:\n"
)
body_tables = "\n".join(f"- {table}" for table in outdated_tables)
body_footer = "\n\nBitte überprüfen Sie diese Tabellen.\n\nDies ist eine automatisch generierte Nachricht."
else:
subject = f"Bestätigung: Alle Tabellen in Datenbank {database} sind aktuell"
body_intro = (
f"Hallo,\n\n"
f"alle zu überprüfenden Tabellen in der Datenbank '{database}' enthalten aktuelle Zeitstempel von gestern.\n"
)
body_tables = ""
body_footer = "\n\nDies ist eine automatisch generierte Nachricht."
# Wenn Testmodus aktiviert, Ausgabe auf Konsole
if test_mode:
print("\n--- TEST MODUS: E-Mail würde gesendet werden ---")
print(f"Betreff: {subject}")
print("Inhalt:")
print(body_intro + body_tables + body_footer)
print("--- ENDE TEST MODUS ---\n")
else:
send_email(subject, body_intro, body_tables, body_footer, email_config)
return not outdated_tables
def send_email(subject, body_intro, body_tables, body_footer, email_config):
"""
Sendet eine E-Mail mit dem angegebenen Betreff und Inhalt.
"""
sender_email = email_config["sender"]
receiver_email = email_config["receiver"]
smtp_server = email_config["smtp_server"]
smtp_port = email_config["smtp_port"]
smtp_user = email_config.get("username", sender_email)
smtp_password = email_config.get("password", "")
# E-Mail erstellen
message = MIMEMultipart()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = subject
# E-Mail-Inhalt zusammenfügen
body = f"{body_intro}{chr(10) if body_tables else ''}{body_tables}{body_footer}"
message.attach(MIMEText(body, "plain"))
try:
# Verbindung zum SMTP-Server herstellen mit TLS
context = ssl.create_default_context()
server = smtplib.SMTP(smtp_server, smtp_port)
server.ehlo()
server.starttls(context=context)
server.ehlo()
# Anmelden (falls erforderlich)
if smtp_password:
server.login(smtp_user, smtp_password)
# E-Mail senden
server.send_message(message)
server.quit()
print(f"E-Mail an {receiver_email} gesendet.")
except Exception as e:
print(f"Fehler beim Senden der E-Mail: {e}")
def main():
# .env-Datei laden
load_dotenv()
parser = argparse.ArgumentParser(description="Überprüft MySQL-Tabellen auf aktuelle Zeitstempel.")
# Optionale DB-Parameter (werden sonst aus .env gelesen)
parser.add_argument("--host", help="MySQL-Hostadresse (Standard: aus .env MYSQL_HOST)")
parser.add_argument("--user", help="MySQL-Benutzername (Standard: aus .env MYSQL_USER)")
parser.add_argument("--password", help="MySQL-Passwort (Standard: aus .env MYSQL_PASSWORD)")
parser.add_argument("--database", help="MySQL-Datenbankname (Standard: aus .env MYSQL_DATABASE)")
# Parameter für Zeitstempel-Spalte (mit add_date als Standard)
parser.add_argument("--timestamp-column", default="add_date",
help="Name der Zeitstempel-Spalte (Standard: add_date)")
# Optionale Parameter
parser.add_argument("--test", action="store_true",
help="Testmodus: Keine E-Mail senden, nur Ausgabe auf der Konsole")
# E-Mail-Parameter
email_group = parser.add_argument_group("E-Mail-Konfiguration",
"Diese Parameter werden nicht benötigt, wenn --test verwendet wird")
email_group.add_argument("--email-sender", help="Absender-E-Mail-Adresse (Standard: aus .env EMAIL_SENDER)")
email_group.add_argument("--email-receiver", help="Empfänger-E-Mail-Adresse (Standard: aus .env EMAIL_RECEIVER)")
email_group.add_argument("--smtp-server", help="SMTP-Server-Adresse (Standard: aus .env SMTP_SERVER)")
email_group.add_argument("--smtp-port", type=int, help="SMTP-Server-Port (Standard: aus .env SMTP_PORT)")
email_group.add_argument("--smtp-user", help="SMTP-Benutzername (Standard: aus .env SMTP_USER)")
email_group.add_argument("--smtp-password", help="SMTP-Passwort (Standard: aus .env SMTP_PASSWORD)")
args = parser.parse_args()
# Datenbankverbindungsparameter aus .env oder Kommandozeile
host = args.host or os.getenv("MYSQL_HOST", "localhost")
user = args.user or os.getenv("MYSQL_USER")
password = args.password or os.getenv("MYSQL_PASSWORD")
database = args.database or os.getenv("MYSQL_DATABASE")
# Prüfe, ob erforderliche DB-Parameter vorhanden sind
if not user or not password or not database:
print("Fehler: MySQL-Benutzer, Passwort und Datenbankname müssen entweder in der .env-Datei oder als Parameter angegeben werden!")
sys.exit(1)
# Verbindung zur Datenbank herstellen
try:
connection = mysql.connector.connect(
host=host,
user=user,
password=password,
database=database
)
except mysql.connector.Error as err:
print(f"Fehler bei der Datenbankverbindung: {err}")
sys.exit(1)
# Testmodus aktiviert?
test_mode = args.test
# E-Mail-Konfiguration
email_config = {
"sender": args.email_sender or os.getenv("EMAIL_SENDER"),
"receiver": args.email_receiver or os.getenv("EMAIL_RECEIVER"),
"smtp_server": args.smtp_server or os.getenv("SMTP_SERVER", "localhost"),
"smtp_port": args.smtp_port or int(os.getenv("SMTP_PORT", "587")),
"username": args.smtp_user or os.getenv("SMTP_USER"),
"password": args.smtp_password or os.getenv("SMTP_PASSWORD", "")
}
# Wenn nicht im Testmodus, prüfe ob E-Mail-Parameter angegeben wurden
if not test_mode:
if not email_config["sender"] or not email_config["receiver"]:
print("Fehler: E-Mail-Absender und -Empfänger müssen entweder in der .env-Datei oder als Parameter angegeben werden!")
connection.close()
sys.exit(1)
# Tabellen aus der Datenbank holen
tables = get_tables_from_db(connection)
if not tables:
print("Keine zu überprüfenden Tabellen gefunden. Programm wird beendet.")
connection.close()
sys.exit(0)
# Zeitstempel-Spalte (Standard: add_date)
timestamp_column = args.timestamp_column
print(f"Überprüfe Zeitstempel in Spalte: {timestamp_column}")
# Timestamp-Überprüfung durchführen und entsprechende E-Mail senden
check_timestamp(
connection,
tables,
timestamp_column,
email_config,
test_mode
)
# Verbindung schließen
connection.close()
if __name__ == "__main__":
main()