import os import pyodbc import traceback from dotenv import load_dotenv import mail import datetime load_dotenv() # Logdatei vorbereiten MAIN_DIR = os.getcwd() LOG_DIR = os.path.join(MAIN_DIR, "Logs") os.makedirs(LOG_DIR, exist_ok=True) logfile_name = f"MSSQL_exporter_log_{datetime.datetime.now().strftime('%Y-%m-%d')}.txt" logfile_path = os.path.join(LOG_DIR, logfile_name) # Prozessname für Mail und Logs process = "SDF_to_MSSQL_Export" # Globaler MSSQL-Cursor für Logging mssql_cursor_global = None def write_log(line): timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') log_entry = f"{line}\n------------------------------{timestamp}------------------------------" print(line) with open(logfile_path, "a", encoding="utf-8") as f: f.write(log_entry + "\n") def write_log_summary(table, inserted, skipped, errors, inserted_rows): """Schreibt eine zusammenfassende Logzeile pro Tabelle in die logs-Tabelle.""" if not mssql_cursor_global: return try: if inserted == 0 and errors == 0: message = "-" else: message = f"{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler" if inserted_rows: message += " | " + "; ".join(inserted_rows[:5]) full_message = f"Tabelle {table}: {message}" mssql_cursor_global.execute( "INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)", datetime.datetime.now(), full_message, process ) mssql_cursor_global.connection.commit() except Exception as log_db_error: print(f"Fehler beim Schreiben der Tabellen-Zusammenfassung in logs: {log_db_error}") # Pfade und Verbindungen SDF_LOCAL_PFAD = os.getenv("SDF_LOCAL_PFAD") SDF_NAME = os.getenv("SDF_NAME") sdf_file = os.path.join(SDF_LOCAL_PFAD, SDF_NAME) # Neuer pyodbc-Treiber für SQL Server Compact Edition 4.0 sdf_connection_str = ( r"Driver={SQL Server Compact Edition 4.0};" f"Data Source={sdf_file};" ) mssql_connection_str = os.getenv("MSSQL_CONNECTION_STR") if not mssql_connection_str: error_msg = "MSSQL_CONNECTION_STR nicht in der .env gefunden." write_log(error_msg) mail.send_error_email(error_msg, process) exit(1) # Tabellenliste aus .env oder Fallback tables_env = os.getenv("TABLES", "") tables = [t.strip() for t in tables_env.split(",") if t.strip()] if not tables: write_log("Hinweis: Keine Tabellen in der .env gefunden (Variable TABLES fehlt oder ist leer).") def get_pk_columns(mssql_cursor, table_name): pk_query = """ SELECT KU.COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU ON TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME WHERE TC.CONSTRAINT_TYPE = 'PRIMARY KEY' AND KU.TABLE_NAME = ? ORDER BY KU.ORDINAL_POSITION; """ mssql_cursor.execute(pk_query, (table_name,)) return [row[0] for row in mssql_cursor.fetchall()] def row_summary(columns, row): summary_parts = [] for i, col in enumerate(columns): val = row[i] if isinstance(val, (int, float, str)) and len(summary_parts) < 3: summary_parts.append(f"{col}={repr(val)}") return ", ".join(summary_parts) def main(): global mssql_cursor_global try: # --- Verbindungen herstellen --- sdf_conn = pyodbc.connect(sdf_connection_str) sdf_cursor = sdf_conn.cursor() mssql_conn = pyodbc.connect(mssql_connection_str) mssql_cursor = mssql_conn.cursor() mssql_cursor_global = mssql_cursor report_lines = [] for table_name in tables: write_log(f"\nVerarbeite Tabelle: {table_name}") try: sdf_cursor.execute(f"SELECT * FROM [{table_name}]") columns = [col[0] for col in sdf_cursor.description] rows = sdf_cursor.fetchall() except Exception as e: write_log(f"Fehler beim Lesen aus SDF-Tabelle {table_name}: {e}") mail.send_error_email(f"Fehler beim Lesen aus {table_name}: {e}", process) continue write_log(f"{len(rows)} Datensätze in SDF gefunden. Spalten: {columns}") # Primary Keys ermitteln pk_columns = get_pk_columns(mssql_cursor, table_name) if pk_columns: write_log(f"Primary Key(s): {pk_columns}") try: pk_indices = [columns.index(pk) for pk in pk_columns] except ValueError as ve: write_log(f"Fehler bei PK-Index-Ermittlung: {ve}") pk_indices = [] else: write_log("Kein Primary Key definiert.") pk_indices = [] placeholders = ", ".join("?" for _ in columns) insert_sql = f"INSERT INTO [{table_name}] ({', '.join('[' + col + ']' for col in columns)}) VALUES ({placeholders})" inserted = 0 skipped = 0 errors = 0 inserted_rows = [] for row in rows: try: if pk_indices: pk_values = tuple(row[i] for i in pk_indices) pk_clause = " AND ".join(f"[{col}] = ?" for col in pk_columns) select_sql = f"SELECT COUNT(*) FROM [{table_name}] WHERE {pk_clause}" mssql_cursor.execute(select_sql, pk_values) count = mssql_cursor.fetchone()[0] if count > 0: skipped += 1 continue mssql_cursor.execute(insert_sql, row) inserted += 1 inserted_rows.append(row_summary(columns, row)) except Exception as ie: errors += 1 err = f"Fehler beim Einfügen in Tabelle {table_name}: {ie}" write_log(err) mail.send_error_email(err, process) mssql_conn.commit() write_log(f"{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.") write_log_summary(table_name, inserted, skipped, errors, inserted_rows) report_lines.append(f"Tabelle {table_name}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.") # --- Abschlussbericht --- report_text = "\n".join(report_lines) write_log("Export-Zusammenfassung:\n" + report_text) mail.send_report_email(report_text, process) # --- Leere .export-Datei erstellen --- export_marker_path = os.path.join(MAIN_DIR, ".export") try: with open(export_marker_path, "w"): pass write_log(f"Leere .export-Datei erstellt: {export_marker_path}") except Exception as marker_err: write_log(f"Fehler beim Erstellen der .export-Datei: {marker_err}") # --- Abschlusslog in DB --- try: if mssql_cursor_global: mssql_cursor_global.execute( "INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)", datetime.datetime.now(), "Export abgeschlossen (.export erzeugt)", process ) mssql_cursor_global.connection.commit() except Exception as db_log_final_error: print(f"Fehler beim Schreiben des finalen Logs in DB: {db_log_final_error}") # Verbindungen schließen sdf_cursor.close() sdf_conn.close() mssql_cursor.close() mssql_conn.close() except Exception as e: error_details = f"Allgemeiner Fehler: {e}\n{traceback.format_exc()}" write_log(error_details) mail.send_error_email(error_details, process) if __name__ == "__main__": main()