import os import traceback import datetime import pyodbc import adodbapi from dotenv import load_dotenv import mail # ========================= # INITIALISIERUNG # ========================= load_dotenv() MAIN_DIR = os.getcwd() LOG_DIR = os.path.join(MAIN_DIR, "Logs") os.makedirs(LOG_DIR, exist_ok=True) logfile_name = f"MSSQL_exporter_log_{datetime.datetime.now().strftime('%Y-%m-%d')}.txt" logfile_path = os.path.join(LOG_DIR, logfile_name) process = "SDF_to_MSSQL_Export" mssql_cursor_global = None global_error_log = [] # ❗ hier sammeln wir alle Fehler aus allen Tabellen # ========================= # LOGGING-FUNKTIONEN # ========================= def write_log(line: str): timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') log_entry = f"{line}\n------------------------------{timestamp}------------------------------" line = line.encode('ascii', errors='ignore').decode() print(line) with open(logfile_path, "a", encoding="utf-8") as f: f.write(log_entry + "\n") def write_log_summary(table, inserted, skipped, errors, inserted_rows): """Schreibt eine kurze Zusammenfassung pro Tabelle in MSSQL-logs""" if not mssql_cursor_global: return try: if inserted == 0 and errors == 0: message = "-" else: message = f"{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler" if inserted_rows: message += " | " + "; ".join(inserted_rows[:5]) full_message = f"Tabelle {table}: {message}" mssql_cursor_global.execute( "INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)", datetime.datetime.now(), full_message, process ) mssql_cursor_global.connection.commit() except Exception as e: print(f"Fehler beim Schreiben der Tabellen-Zusammenfassung in logs: {e}") # ========================= # VERBINDUNGEN # ========================= SDF_LOCAL_PFAD = os.getenv("SDF_LOCAL_PFAD") SDF_NAME = os.getenv("SDF_NAME", "App.sdf") sdf_file = os.path.join(SDF_LOCAL_PFAD, SDF_NAME) MSSQL_CONNECTION_STR = os.getenv("MSSQL_CONNECTION_STR") if not MSSQL_CONNECTION_STR: msg = "❌ MSSQL_CONNECTION_STR fehlt in .env" write_log(msg) mail.send_error_email(msg, process) exit(1) # Tabellen aus .env tables_env = os.getenv("TABLES", "") tables = [t.strip() for t in tables_env.split(",") if t.strip()] # ========================= # HELFER # ========================= def get_pk_columns(mssql_cursor, table_name): pk_query = """ SELECT KU.COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU ON TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME WHERE TC.CONSTRAINT_TYPE = 'PRIMARY KEY' AND KU.TABLE_NAME = ? ORDER BY KU.ORDINAL_POSITION; """ mssql_cursor.execute(pk_query, (table_name,)) return [row[0] for row in mssql_cursor.fetchall()] def row_summary(columns, row): summary = [] for i, col in enumerate(columns[:3]): # max. 3 Spalten val = row[i] summary.append(f"{col}={repr(val)}") return ", ".join(summary) # ========================= # HAUPTPROGRAMM # ========================= def main(): global mssql_cursor_global try: # ======================== # MSSQL-Verbindung # ======================== try: mssql_conn = pyodbc.connect(MSSQL_CONNECTION_STR) mssql_cursor = mssql_conn.cursor() mssql_cursor_global = mssql_cursor write_log("✅ Verbindung zu MSSQL erfolgreich hergestellt.") except Exception as mssql_err: err_msg = f"❌ Fehler bei der Verbindung zu MSSQL: {mssql_err}" write_log(err_msg) mail.send_error_email(err_msg, process) return # ======================== # SDF-Verbindung # ======================== try: sdf_conn = adodbapi.connect( f"Provider=Microsoft.SQLSERVER.CE.OLEDB.3.5;Data Source={sdf_file};Persist Security Info=False;" ) sdf_cursor = sdf_conn.cursor() write_log("✅ Verbindung zur SDF erfolgreich geöffnet.") except Exception as sdf_err: err_msg = f"❌ Fehler beim Zugriff auf SDF: {sdf_err}" write_log(err_msg) mail.send_error_email(err_msg, process) return report_lines = [] # ============ Tabellen-Export ============ for table in tables: write_log(f"\n🔹 Verarbeite Tabelle: {table}") error_messages = [] try: sdf_cursor.execute(f"SELECT * FROM [{table}]") columns = [desc[0] for desc in sdf_cursor.description] write_log(f"Spalten in {table}: {columns}") except Exception as e: msg = f"❌ Fehler beim Lesen der SDF-Tabelle {table}: {e}" write_log(msg) global_error_log.append(msg) continue # Primärschlüssel aus MSSQL pk_columns = get_pk_columns(mssql_cursor, table) if pk_columns: write_log(f"Primary Key(s) in {table}: {pk_columns}") pk_indices = [columns.index(pk) for pk in pk_columns if pk in columns] else: write_log(f"⚠️ Kein Primary Key in {table}. Alle Datensätze werden eingefügt.") pk_indices = [] placeholders = ", ".join("?" for _ in columns) insert_sql = f"INSERT INTO {table} ({', '.join('[' + c + ']' for c in columns)}) VALUES ({placeholders})" inserted = 0 skipped = 0 errors = 0 inserted_rows = [] rows = sdf_cursor.fetchall() for row in rows: try: if pk_indices: pk_values = tuple(row[i] for i in pk_indices) pk_clause = " AND ".join(f"[{col}] = ?" for col in pk_columns) select_sql = f"SELECT COUNT(*) FROM {table} WHERE {pk_clause}" mssql_cursor.execute(select_sql, pk_values) if mssql_cursor.fetchone()[0] > 0: skipped += 1 continue # NULL-Behandlung row = [None if v in (None, '', 'NULL') else v for v in row] mssql_cursor.execute(insert_sql, tuple(row)) inserted += 1 if len(inserted_rows) < 5: inserted_rows.append(row_summary(columns, row)) except Exception as insert_err: errors += 1 error_details = f"Fehler beim Einfügen in {table}: {insert_err}" write_log(error_details) error_messages.append(error_details) mssql_conn.commit() sdf_conn.commit() # Tabelle fertig → loggen write_log(f"✅ {table}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.") write_log_summary(table, inserted, skipped, errors, inserted_rows) report_lines.append(f"{table}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.") if error_messages: global_error_log.append(f"\nTabelle {table}:\n" + "\n".join(error_messages[:20])) # --- Abschluss --- marker_path = os.path.join(MAIN_DIR, ".export") with open(marker_path, "w"): pass write_log(f"Leere .export-Datei erstellt: {marker_path}") # Logeintrag in MSSQL try: mssql_cursor.execute( "INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)", datetime.datetime.now(), "Export abgeschlossen (.export erzeugt)", process ) mssql_conn.commit() except Exception as log_err: print(f"Fehler beim finalen DB-Log: {log_err}") sdf_cursor.close() sdf_conn.close() mssql_cursor.close() mssql_conn.close() # --- Sammelmail am Ende --- if global_error_log: combined_errors = "\n\n".join(global_error_log) mail.send_error_email( f"⚠️ Export abgeschlossen mit Fehlern:\n\n{combined_errors}", process ) else: mail.send_success_email("✅ Export erfolgreich abgeschlossen – keine Fehler.", process) write_log("🏁 Exportprozess beendet.") except Exception as e: err = f"Allgemeiner Fehler: {e}\n{traceback.format_exc()}" write_log(err) mail.send_error_email(err, process) if __name__ == "__main__": main()