From 15f510a3b6f36bf47d732e46e4f8b1317f0f6fee Mon Sep 17 00:00:00 2001 From: Sebastian Serfling Date: Wed, 29 Oct 2025 09:51:34 +0100 Subject: [PATCH] Add Importer.py --- importer.py | 189 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 188 insertions(+), 1 deletion(-) diff --git a/importer.py b/importer.py index 047402d..56064a9 100644 --- a/importer.py +++ b/importer.py @@ -1 +1,188 @@ -## Importer aus MSSQL in SDF \ No newline at end of file +import os +import pyodbc +import adodbapi +import traceback +from dotenv import load_dotenv +import mail +import datetime + +load_dotenv() + +MAIN_DIR = os.getcwd() +LOG_DIR = os.path.join(MAIN_DIR, 'Logs') +os.makedirs(LOG_DIR, exist_ok=True) +logfile_name = f"SDF_importer_log_{datetime.datetime.now().strftime('%Y-%m-%d')}.txt" +logfile_path = os.path.join(LOG_DIR, logfile_name) + +process = 'MSSQL_to_SDF_Import' +mssql_cursor_global = None + + +def write_log(line): + timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + log_entry = f'{line}\n------------------------------{timestamp}------------------------------' + print(line) + with open(logfile_path, 'a', encoding='utf-8') as f: + f.write(log_entry + '\n') + + +def write_log_summary(table, inserted, skipped, errors, inserted_rows): + """Zusammenfassung pro Tabelle in DB schreiben""" + global mssql_cursor_global + if not mssql_cursor_global: + return + try: + if inserted == 0 and errors == 0: + message = '-' + else: + message = f'{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler' + if inserted_rows: + message += ' | ' + '; '.join(inserted_rows[:5]) + full_message = f'Tabelle {table}: {message}' + mssql_cursor_global.execute( + 'INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)', + datetime.datetime.now(), full_message, process + ) + mssql_cursor_global.connection.commit() + except Exception as log_db_error: + print(f'Fehler beim Schreiben der Tabellen-Zusammenfassung in logs: {log_db_error}') + + +def get_pk_columns(cursor, table_name): + pk_query = """ + SELECT KU.COLUMN_NAME + FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC + INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU + ON TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME + WHERE TC.CONSTRAINT_TYPE = 'PRIMARY KEY' + AND KU.TABLE_NAME = ? + ORDER BY KU.ORDINAL_POSITION; + """ + cursor.execute(pk_query, (table_name,)) + return [row[0] for row in cursor.fetchall()] + + +def escape(val): + if val is None: + return 'NULL' + val = str(val).replace("'", "''") + return f"'{val}'" + + +def row_summary(columns, row): + summary_parts = [] + for i, col in enumerate(columns): + val = row[i] + if isinstance(val, (int, float, str)) and len(summary_parts) < 3: + summary_parts.append(f'{col}={repr(val)}') + return ', '.join(summary_parts) + + +def main(): + global mssql_cursor_global + + try: + mssql_connection_str = os.getenv('MSSQL_CONNECTION_STR') + if not mssql_connection_str: + raise ValueError('MSSQL_CONNECTION_STR nicht in der .env gefunden.') + + SDF_LOCAL_PFAD = os.getenv('SDF_LOCAL_PFAD') + if not SDF_LOCAL_PFAD: + raise ValueError('SDF_LOCAL_PFAD nicht in der .env gefunden.') + + sdf_file = os.path.join(SDF_LOCAL_PFAD, 'App.sdf') + sdf_connection_str = f'Provider=Microsoft.SQLSERVER.CE.OLEDB.3.5;Data Source={sdf_file};Persist Security Info=False;' + + tables = [ + 'Addressee', 'ADR', 'AxlesArchive', 'CardEncoding', 'Carrier', + 'Coeff', 'Conveyer', 'CustomerLDB', 'Fields', 'GeneralData', + 'PDR', 'Plate', 'Product', 'RDR', 'RDR_LDB_Weighing', + 'Reason', 'Supplier', 'Tare', 'TxWeighing', 'Weighing_LDB' + ] + + mssql_conn = pyodbc.connect(mssql_connection_str) + mssql_cursor = mssql_conn.cursor() + mssql_cursor_global = mssql_cursor + + sdf_conn = adodbapi.connect(sdf_connection_str) + sdf_cursor = sdf_conn.cursor() + + report_lines = [] + + for table in tables: + try: + write_log(f'\nVerarbeite Tabelle: {table}') + mssql_cursor.execute(f'SELECT * FROM [{table}]') + columns = [col[0] for col in mssql_cursor.description] + rows = mssql_cursor.fetchall() + + write_log(f'In MSSQL gefunden: {len(rows)} Datensätze; Spalten: {columns}') + + pk_columns = get_pk_columns(mssql_cursor, table) + if pk_columns: + write_log(f'Primary Key Spalte(n) für {table}: {pk_columns}') + pk_indices = [columns.index(pk) for pk in pk_columns] + else: + write_log(f'Kein Primary Key für {table}. Es wird jeder Datensatz eingefügt.') + pk_indices = [] + + inserted = 0 + skipped = 0 + errors = 0 + inserted_rows = [] + + for row in rows: + try: + record_exists = False + if pk_indices: + pk_clause = ' AND '.join( + f'[{col}] = {escape(row[columns.index(col)])}' for col in pk_columns + ) + select_pk_sql = f'SELECT COUNT(*) FROM [{table}] WHERE {pk_clause}' + sdf_cursor.execute(select_pk_sql) + count = sdf_cursor.fetchone()[0] + record_exists = count > 0 + + if record_exists: + skipped += 1 + continue + + escaped_values = ', '.join(escape(v) for v in row) + insert_sql = f"INSERT INTO [{table}] ({', '.join('[' + c + ']' for c in columns)}) VALUES ({escaped_values})" + sdf_cursor.execute(insert_sql) + inserted += 1 + inserted_rows.append(row_summary(columns, row)) + + except Exception as ie: + errors += 1 + error_details = f'Fehler bei Tabelle {table}: {ie}\nRow: {row}' + write_log(error_details) + mail.send_error_email(error_details, process) + + sdf_conn.commit() + write_log_summary(table, inserted, skipped, errors, inserted_rows) + report_lines.append(f'Tabelle {table}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.') + + except Exception as te: + error_details = f'Fehler in Tabelle {table}: {te}\n{traceback.format_exc()}' + write_log(error_details) + mail.send_error_email(error_details, process) + + # Abschlussbericht + report_text = '\n'.join(report_lines) + write_log('Import-Zusammenfassung:\n' + report_text) + mail.send_report_email(report_text, process) + + import_marker_path = os.path.join(MAIN_DIR, '.import') + with open(import_marker_path, 'w') as f: + f.write('') + write_log(f'Leere .import-Datei erstellt: {import_marker_path}') + + except Exception as e: + error_details = f'Allgemeiner Fehler beim Import: {e}\n{traceback.format_exc()}' + write_log(error_details) + mail.send_error_email(error_details, process) + + +if __name__ == '__main__': + main()