Update exporter add adodbapi

master
Sebastian Serfling 2025-10-29 11:41:30 +01:00
parent ab0d40de67
commit e70cc3dd23
1 changed files with 104 additions and 96 deletions

View File

@ -1,27 +1,31 @@
import os import os
import pyodbc
import traceback import traceback
import datetime
import adodbapi
import pyodbc
from dotenv import load_dotenv from dotenv import load_dotenv
import mail import mail
import datetime
# =========================
# INITIALISIERUNG
# =========================
load_dotenv() load_dotenv()
# Logdatei vorbereiten
MAIN_DIR = os.getcwd() MAIN_DIR = os.getcwd()
LOG_DIR = os.path.join(MAIN_DIR, "Logs") LOG_DIR = os.path.join(MAIN_DIR, "Logs")
os.makedirs(LOG_DIR, exist_ok=True) os.makedirs(LOG_DIR, exist_ok=True)
logfile_name = f"MSSQL_exporter_log_{datetime.datetime.now().strftime('%Y-%m-%d')}.txt" logfile_name = f"MSSQL_exporter_log_{datetime.datetime.now().strftime('%Y-%m-%d')}.txt"
logfile_path = os.path.join(LOG_DIR, logfile_name) logfile_path = os.path.join(LOG_DIR, logfile_name)
# Prozessname für Mail und Logs
process = "SDF_to_MSSQL_Export" process = "SDF_to_MSSQL_Export"
# Globaler MSSQL-Cursor für Logging
mssql_cursor_global = None mssql_cursor_global = None
def write_log(line): # =========================
# LOGGING
# =========================
def write_log(line: str):
timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
log_entry = f"{line}\n------------------------------{timestamp}------------------------------" log_entry = f"{line}\n------------------------------{timestamp}------------------------------"
print(line) print(line)
@ -30,7 +34,7 @@ def write_log(line):
def write_log_summary(table, inserted, skipped, errors, inserted_rows): def write_log_summary(table, inserted, skipped, errors, inserted_rows):
"""Schreibt eine zusammenfassende Logzeile pro Tabelle in die logs-Tabelle.""" """Schreibt kurze Zusammenfassung in MSSQL-logs."""
if not mssql_cursor_global: if not mssql_cursor_global:
return return
try: try:
@ -40,43 +44,45 @@ def write_log_summary(table, inserted, skipped, errors, inserted_rows):
message = f"{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler" message = f"{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler"
if inserted_rows: if inserted_rows:
message += " | " + "; ".join(inserted_rows[:5]) message += " | " + "; ".join(inserted_rows[:5])
full_message = f"Tabelle {table}: {message}" msg = f"Tabelle {table}: {message}"
mssql_cursor_global.execute( mssql_cursor_global.execute(
"INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)", "INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)",
datetime.datetime.now(), full_message, process datetime.datetime.now(), msg, process
) )
mssql_cursor_global.connection.commit() mssql_cursor_global.connection.commit()
except Exception as log_db_error: except Exception as e:
print(f"Fehler beim Schreiben der Tabellen-Zusammenfassung in logs: {log_db_error}") print(f"Fehler beim Schreiben der Tabellen-Zusammenfassung in logs: {e}")
# Pfade und Verbindungen # =========================
# VERBINDUNGEN
# =========================
SDF_LOCAL_PFAD = os.getenv("SDF_LOCAL_PFAD") SDF_LOCAL_PFAD = os.getenv("SDF_LOCAL_PFAD")
SDF_NAME = os.getenv("SDF_NAME") SDF_NAME = os.getenv("SDF_NAME", "App.sdf")
sdf_file = os.path.join(SDF_LOCAL_PFAD, SDF_NAME) sdf_file = os.path.join(SDF_LOCAL_PFAD, SDF_NAME)
# Neuer pyodbc-Treiber für SQL Server Compact Edition 4.0 MSSQL_CONNECTION_STR = os.getenv("MSSQL_CONNECTION_STR")
sdf_connection_str = ( if not MSSQL_CONNECTION_STR:
r"Driver={SQL Server Compact Edition 4.0};" msg = "❌ MSSQL_CONNECTION_STR fehlt in .env"
f"Data Source={sdf_file};" write_log(msg)
) mail.send_error_email(msg, process)
mssql_connection_str = os.getenv("MSSQL_CONNECTION_STR")
if not mssql_connection_str:
error_msg = "MSSQL_CONNECTION_STR nicht in der .env gefunden."
write_log(error_msg)
mail.send_error_email(error_msg, process)
exit(1) exit(1)
# Tabellenliste aus .env oder Fallback sdf_connection_str = (
"Provider=Microsoft.SQLSERVER.CE.OLEDB.3.5;"
f"Data Source={sdf_file};"
"Persist Security Info=False;"
)
tables_env = os.getenv("TABLES", "") tables_env = os.getenv("TABLES", "")
tables = [t.strip() for t in tables_env.split(",") if t.strip()] tables = [t.strip() for t in tables_env.split(",") if t.strip()]
if not tables:
write_log("Hinweis: Keine Tabellen in der .env gefunden (Variable TABLES fehlt oder ist leer).")
# =========================
# HELFER
# =========================
def get_pk_columns(mssql_cursor, table_name): def get_pk_columns(mssql_cursor, table_name):
pk_query = """ query = """
SELECT KU.COLUMN_NAME SELECT KU.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU
@ -85,130 +91,132 @@ def get_pk_columns(mssql_cursor, table_name):
AND KU.TABLE_NAME = ? AND KU.TABLE_NAME = ?
ORDER BY KU.ORDINAL_POSITION; ORDER BY KU.ORDINAL_POSITION;
""" """
mssql_cursor.execute(pk_query, (table_name,)) mssql_cursor.execute(query, (table_name,))
return [row[0] for row in mssql_cursor.fetchall()] return [row[0] for row in mssql_cursor.fetchall()]
def row_summary(columns, row): def row_summary(columns, row):
summary_parts = [] summary = []
for i, col in enumerate(columns): for i, col in enumerate(columns[:3]): # max. 3 Spalten
val = row[i] val = row[i]
if isinstance(val, (int, float, str)) and len(summary_parts) < 3: summary.append(f"{col}={repr(val)}")
summary_parts.append(f"{col}={repr(val)}") return ", ".join(summary)
return ", ".join(summary_parts)
# =========================
# HAUPTPROGRAMM
# =========================
def main(): def main():
global mssql_cursor_global global mssql_cursor_global
try: try:
# --- Verbindungen herstellen --- # --- MSSQL ---
sdf_conn = pyodbc.connect(sdf_connection_str) try:
sdf_cursor = sdf_conn.cursor() mssql_conn = pyodbc.connect(MSSQL_CONNECTION_STR)
mssql_cursor = mssql_conn.cursor()
mssql_cursor_global = mssql_cursor
write_log("✅ Verbindung zu MSSQL erfolgreich hergestellt.")
except Exception as mssql_err:
err_msg = f"❌ Fehler bei der Verbindung zu MSSQL: {mssql_err}"
write_log(err_msg)
mail.send_error_email(err_msg, process)
#return # abbrechen
mssql_conn = pyodbc.connect(mssql_connection_str) # --- SDF ---
mssql_cursor = mssql_conn.cursor() try:
mssql_cursor_global = mssql_cursor sdf_conn = adodbapi.connect(sdf_connection_str)
sdf_cursor = sdf_conn.cursor()
write_log("✅ Verbindung zur SDF erfolgreich geöffnet.")
except Exception as sdf_err:
err_msg = f"❌ Fehler beim Zugriff auf SDF: {sdf_err}"
write_log(err_msg)
mail.send_error_email(err_msg, process)
return
report_lines = [] report_lines = []
for table_name in tables: # ============ Tabellen-Export ============
write_log(f"\nVerarbeite Tabelle: {table_name}") for table in tables:
write_log(f"\n🔹 Verarbeite Tabelle: {table}")
try: try:
sdf_cursor.execute(f"SELECT * FROM [{table_name}]") sdf_cursor.execute(f"SELECT * FROM [{table}]")
columns = [col[0] for col in sdf_cursor.description] columns = [col[0] for col in sdf_cursor.description]
rows = sdf_cursor.fetchall() write_log(f"Spalten in {table}: {columns}")
except Exception as e: except Exception as e:
write_log(f"Fehler beim Lesen aus SDF-Tabelle {table_name}: {e}") write_log(f"❌ Fehler beim Lesen der Tabelle {table}: {e}")
mail.send_error_email(f"Fehler beim Lesen aus {table_name}: {e}", process)
continue continue
write_log(f"{len(rows)} Datensätze in SDF gefunden. Spalten: {columns}") pk_columns = get_pk_columns(mssql_cursor, table)
# Primary Keys ermitteln
pk_columns = get_pk_columns(mssql_cursor, table_name)
if pk_columns: if pk_columns:
write_log(f"Primary Key(s): {pk_columns}") write_log(f"Primary Key(s) in {table}: {pk_columns}")
try: pk_indices = [columns.index(pk) for pk in pk_columns if pk in columns]
pk_indices = [columns.index(pk) for pk in pk_columns]
except ValueError as ve:
write_log(f"Fehler bei PK-Index-Ermittlung: {ve}")
pk_indices = []
else: else:
write_log("Kein Primary Key definiert.") write_log(f"⚠️ Kein Primary Key in {table}. Alle Datensätze werden eingefügt.")
pk_indices = [] pk_indices = []
placeholders = ", ".join("?" for _ in columns) placeholders = ", ".join("?" for _ in columns)
insert_sql = f"INSERT INTO [{table_name}] ({', '.join('[' + col + ']' for col in columns)}) VALUES ({placeholders})" insert_sql = f"INSERT INTO {table} ({', '.join('[' + c + ']' for c in columns)}) VALUES ({placeholders})"
inserted = 0 inserted = 0
skipped = 0 skipped = 0
errors = 0 errors = 0
inserted_rows = [] inserted_rows = []
rows = sdf_cursor.fetchall()
write_log(f"{len(rows)} Datensätze in {table} gefunden.")
for row in rows: for row in rows:
try: try:
if pk_indices: if pk_indices:
pk_values = tuple(row[i] for i in pk_indices) pk_values = tuple(row[i] for i in pk_indices)
pk_clause = " AND ".join(f"[{col}] = ?" for col in pk_columns) pk_clause = " AND ".join(f"[{col}] = ?" for col in pk_columns)
select_sql = f"SELECT COUNT(*) FROM [{table_name}] WHERE {pk_clause}" select_sql = f"SELECT COUNT(*) FROM {table} WHERE {pk_clause}"
mssql_cursor.execute(select_sql, pk_values) mssql_cursor.execute(select_sql, pk_values)
count = mssql_cursor.fetchone()[0] if mssql_cursor.fetchone()[0] > 0:
if count > 0:
skipped += 1 skipped += 1
continue continue
mssql_cursor.execute(insert_sql, row) mssql_cursor.execute(insert_sql, row)
inserted += 1 inserted += 1
inserted_rows.append(row_summary(columns, row)) if len(inserted_rows) < 5:
inserted_rows.append(row_summary(columns, row))
except Exception as ie: except Exception as insert_err:
errors += 1 errors += 1
err = f"Fehler beim Einfügen in Tabelle {table_name}: {ie}" error_details = f"Fehler beim Einfügen in {table}: {insert_err}"
write_log(err) write_log(error_details)
mail.send_error_email(err, process) mail.send_error_email(error_details, process)
mssql_conn.commit() mssql_conn.commit()
write_log(f"{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.") write_log(f"{table}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.")
write_log_summary(table_name, inserted, skipped, errors, inserted_rows) write_log_summary(table, inserted, skipped, errors, inserted_rows)
report_lines.append(f"Tabelle {table_name}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.") report_lines.append(f"{table}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.")
# --- Abschlussbericht --- # --- .export-Marker ---
report_text = "\n".join(report_lines)
write_log("Export-Zusammenfassung:\n" + report_text)
mail.send_report_email(report_text, process)
# --- Leere .export-Datei erstellen ---
export_marker_path = os.path.join(MAIN_DIR, ".export") export_marker_path = os.path.join(MAIN_DIR, ".export")
try: with open(export_marker_path, "w"):
with open(export_marker_path, "w"): pass
pass write_log(f"Leere .export-Datei erstellt: {export_marker_path}")
write_log(f"Leere .export-Datei erstellt: {export_marker_path}")
except Exception as marker_err:
write_log(f"Fehler beim Erstellen der .export-Datei: {marker_err}")
# --- Abschlusslog in DB --- # --- Abschlusslog ---
try: try:
if mssql_cursor_global: mssql_cursor.execute(
mssql_cursor_global.execute( "INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)",
"INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)", datetime.datetime.now(), "Export abgeschlossen (.export erzeugt)", process
datetime.datetime.now(), "Export abgeschlossen (.export erzeugt)", process )
) mssql_conn.commit()
mssql_cursor_global.connection.commit() except Exception as log_err:
except Exception as db_log_final_error: print(f"Fehler beim finalen DB-Log: {log_err}")
print(f"Fehler beim Schreiben des finalen Logs in DB: {db_log_final_error}")
# Verbindungen schließen
sdf_cursor.close() sdf_cursor.close()
sdf_conn.close() sdf_conn.close()
mssql_cursor.close() mssql_cursor.close()
mssql_conn.close() mssql_conn.close()
except Exception as e: except Exception as e:
error_details = f"Allgemeiner Fehler: {e}\n{traceback.format_exc()}" err = f"Allgemeiner Fehler: {e}\n{traceback.format_exc()}"
write_log(error_details) write_log(err)
mail.send_error_email(error_details, process) mail.send_error_email(err, process)
if __name__ == "__main__": if __name__ == "__main__":