Balzer-WaagenDaten/exporter.py

249 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import traceback
import datetime
import pyodbc
import adodbapi
from dotenv import load_dotenv
import mail
# =========================
# INITIALISIERUNG
# =========================
load_dotenv()
MAIN_DIR = os.getcwd()
LOG_DIR = os.path.join(MAIN_DIR, "Logs")
os.makedirs(LOG_DIR, exist_ok=True)
logfile_name = f"MSSQL_exporter_log_{datetime.datetime.now().strftime('%Y-%m-%d')}.txt"
logfile_path = os.path.join(LOG_DIR, logfile_name)
process = "SDF_to_MSSQL_Export"
mssql_cursor_global = None
global_error_log = [] # ❗ hier sammeln wir alle Fehler aus allen Tabellen
# =========================
# LOGGING-FUNKTIONEN
# =========================
def write_log(line: str):
timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
log_entry = f"{line}\n------------------------------{timestamp}------------------------------"
line = line.encode('ascii', errors='ignore').decode()
print(line)
with open(logfile_path, "a", encoding="utf-8") as f:
f.write(log_entry + "\n")
def write_log_summary(table, inserted, skipped, errors, inserted_rows):
"""Schreibt eine kurze Zusammenfassung pro Tabelle in MSSQL-logs"""
if not mssql_cursor_global:
return
try:
if inserted == 0 and errors == 0:
message = "-"
else:
message = f"{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler"
if inserted_rows:
message += " | " + "; ".join(inserted_rows[:5])
full_message = f"Tabelle {table}: {message}"
mssql_cursor_global.execute(
"INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)",
datetime.datetime.now(), full_message, process
)
mssql_cursor_global.connection.commit()
except Exception as e:
print(f"Fehler beim Schreiben der Tabellen-Zusammenfassung in logs: {e}")
# =========================
# VERBINDUNGEN
# =========================
SDF_LOCAL_PFAD = os.getenv("SDF_LOCAL_PFAD")
SDF_NAME = os.getenv("SDF_NAME", "App.sdf")
sdf_file = os.path.join(SDF_LOCAL_PFAD, SDF_NAME)
MSSQL_CONNECTION_STR = os.getenv("MSSQL_CONNECTION_STR")
if not MSSQL_CONNECTION_STR:
msg = "❌ MSSQL_CONNECTION_STR fehlt in .env"
write_log(msg)
mail.send_error_email(msg, process)
exit(1)
# Tabellen aus .env
tables_env = os.getenv("TABLES", "")
tables = [t.strip() for t in tables_env.split(",") if t.strip()]
# =========================
# HELFER
# =========================
def get_pk_columns(mssql_cursor, table_name):
pk_query = """
SELECT KU.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU
ON TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
WHERE TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND KU.TABLE_NAME = ?
ORDER BY KU.ORDINAL_POSITION;
"""
mssql_cursor.execute(pk_query, (table_name,))
return [row[0] for row in mssql_cursor.fetchall()]
def row_summary(columns, row):
summary = []
for i, col in enumerate(columns[:3]): # max. 3 Spalten
val = row[i]
summary.append(f"{col}={repr(val)}")
return ", ".join(summary)
# =========================
# HAUPTPROGRAMM
# =========================
def main():
global mssql_cursor_global
try:
# ========================
# MSSQL-Verbindung
# ========================
try:
mssql_conn = pyodbc.connect(MSSQL_CONNECTION_STR)
mssql_cursor = mssql_conn.cursor()
mssql_cursor_global = mssql_cursor
write_log("✅ Verbindung zu MSSQL erfolgreich hergestellt.")
except Exception as mssql_err:
err_msg = f"❌ Fehler bei der Verbindung zu MSSQL: {mssql_err}"
write_log(err_msg)
mail.send_error_email(err_msg, process)
return
# ========================
# SDF-Verbindung
# ========================
try:
sdf_conn = adodbapi.connect(
f"Provider=Microsoft.SQLSERVER.CE.OLEDB.3.5;Data Source={sdf_file};Persist Security Info=False;"
)
sdf_cursor = sdf_conn.cursor()
write_log("✅ Verbindung zur SDF erfolgreich geöffnet.")
except Exception as sdf_err:
err_msg = f"❌ Fehler beim Zugriff auf SDF: {sdf_err}"
write_log(err_msg)
mail.send_error_email(err_msg, process)
return
report_lines = []
# ============ Tabellen-Export ============
for table in tables:
write_log(f"\n🔹 Verarbeite Tabelle: {table}")
error_messages = []
try:
sdf_cursor.execute(f"SELECT * FROM [{table}]")
columns = [desc[0] for desc in sdf_cursor.description]
write_log(f"Spalten in {table}: {columns}")
except Exception as e:
msg = f"❌ Fehler beim Lesen der SDF-Tabelle {table}: {e}"
write_log(msg)
global_error_log.append(msg)
continue
# Primärschlüssel aus MSSQL
pk_columns = get_pk_columns(mssql_cursor, table)
if pk_columns:
write_log(f"Primary Key(s) in {table}: {pk_columns}")
pk_indices = [columns.index(pk) for pk in pk_columns if pk in columns]
else:
write_log(f"⚠️ Kein Primary Key in {table}. Alle Datensätze werden eingefügt.")
pk_indices = []
placeholders = ", ".join("?" for _ in columns)
insert_sql = f"INSERT INTO {table} ({', '.join('[' + c + ']' for c in columns)}) VALUES ({placeholders})"
inserted = 0
skipped = 0
errors = 0
inserted_rows = []
rows = sdf_cursor.fetchall()
for row in rows:
try:
if pk_indices:
pk_values = tuple(row[i] for i in pk_indices)
pk_clause = " AND ".join(f"[{col}] = ?" for col in pk_columns)
select_sql = f"SELECT COUNT(*) FROM {table} WHERE {pk_clause}"
mssql_cursor.execute(select_sql, pk_values)
if mssql_cursor.fetchone()[0] > 0:
skipped += 1
continue
# NULL-Behandlung
row = [None if v in (None, '', 'NULL') else v for v in row]
mssql_cursor.execute(insert_sql, tuple(row))
inserted += 1
if len(inserted_rows) < 5:
inserted_rows.append(row_summary(columns, row))
except Exception as insert_err:
errors += 1
error_details = f"Fehler beim Einfügen in {table}: {insert_err}"
write_log(error_details)
error_messages.append(error_details)
mssql_conn.commit()
sdf_conn.commit()
# Tabelle fertig → loggen
write_log(f"{table}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.")
write_log_summary(table, inserted, skipped, errors, inserted_rows)
report_lines.append(f"{table}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.")
if error_messages:
global_error_log.append(f"\nTabelle {table}:\n" + "\n".join(error_messages[:20]))
# --- Abschluss ---
marker_path = os.path.join(MAIN_DIR, ".export")
with open(marker_path, "w"):
pass
write_log(f"Leere .export-Datei erstellt: {marker_path}")
# Logeintrag in MSSQL
try:
mssql_cursor.execute(
"INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)",
datetime.datetime.now(), "Export abgeschlossen (.export erzeugt)", process
)
mssql_conn.commit()
except Exception as log_err:
print(f"Fehler beim finalen DB-Log: {log_err}")
sdf_cursor.close()
sdf_conn.close()
mssql_cursor.close()
mssql_conn.close()
# --- Sammelmail am Ende ---
if global_error_log:
combined_errors = "\n\n".join(global_error_log)
mail.send_error_email(
f"⚠️ Export abgeschlossen mit Fehlern:\n\n{combined_errors}",
process
)
else:
mail.send_success_email("✅ Export erfolgreich abgeschlossen keine Fehler.", process)
write_log("🏁 Exportprozess beendet.")
except Exception as e:
err = f"Allgemeiner Fehler: {e}\n{traceback.format_exc()}"
write_log(err)
mail.send_error_email(err, process)
if __name__ == "__main__":
main()