Add Importer.py
parent
b90726df40
commit
15f510a3b6
189
importer.py
189
importer.py
|
|
@ -1 +1,188 @@
|
|||
## Importer aus MSSQL in SDF
|
||||
import os
|
||||
import pyodbc
|
||||
import adodbapi
|
||||
import traceback
|
||||
from dotenv import load_dotenv
|
||||
import mail
|
||||
import datetime
|
||||
|
||||
load_dotenv()
|
||||
|
||||
MAIN_DIR = os.getcwd()
|
||||
LOG_DIR = os.path.join(MAIN_DIR, 'Logs')
|
||||
os.makedirs(LOG_DIR, exist_ok=True)
|
||||
logfile_name = f"SDF_importer_log_{datetime.datetime.now().strftime('%Y-%m-%d')}.txt"
|
||||
logfile_path = os.path.join(LOG_DIR, logfile_name)
|
||||
|
||||
process = 'MSSQL_to_SDF_Import'
|
||||
mssql_cursor_global = None
|
||||
|
||||
|
||||
def write_log(line):
|
||||
timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
|
||||
log_entry = f'{line}\n------------------------------{timestamp}------------------------------'
|
||||
print(line)
|
||||
with open(logfile_path, 'a', encoding='utf-8') as f:
|
||||
f.write(log_entry + '\n')
|
||||
|
||||
|
||||
def write_log_summary(table, inserted, skipped, errors, inserted_rows):
|
||||
"""Zusammenfassung pro Tabelle in DB schreiben"""
|
||||
global mssql_cursor_global
|
||||
if not mssql_cursor_global:
|
||||
return
|
||||
try:
|
||||
if inserted == 0 and errors == 0:
|
||||
message = '-'
|
||||
else:
|
||||
message = f'{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler'
|
||||
if inserted_rows:
|
||||
message += ' | ' + '; '.join(inserted_rows[:5])
|
||||
full_message = f'Tabelle {table}: {message}'
|
||||
mssql_cursor_global.execute(
|
||||
'INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)',
|
||||
datetime.datetime.now(), full_message, process
|
||||
)
|
||||
mssql_cursor_global.connection.commit()
|
||||
except Exception as log_db_error:
|
||||
print(f'Fehler beim Schreiben der Tabellen-Zusammenfassung in logs: {log_db_error}')
|
||||
|
||||
|
||||
def get_pk_columns(cursor, table_name):
|
||||
pk_query = """
|
||||
SELECT KU.COLUMN_NAME
|
||||
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
|
||||
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU
|
||||
ON TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
|
||||
WHERE TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
|
||||
AND KU.TABLE_NAME = ?
|
||||
ORDER BY KU.ORDINAL_POSITION;
|
||||
"""
|
||||
cursor.execute(pk_query, (table_name,))
|
||||
return [row[0] for row in cursor.fetchall()]
|
||||
|
||||
|
||||
def escape(val):
|
||||
if val is None:
|
||||
return 'NULL'
|
||||
val = str(val).replace("'", "''")
|
||||
return f"'{val}'"
|
||||
|
||||
|
||||
def row_summary(columns, row):
|
||||
summary_parts = []
|
||||
for i, col in enumerate(columns):
|
||||
val = row[i]
|
||||
if isinstance(val, (int, float, str)) and len(summary_parts) < 3:
|
||||
summary_parts.append(f'{col}={repr(val)}')
|
||||
return ', '.join(summary_parts)
|
||||
|
||||
|
||||
def main():
|
||||
global mssql_cursor_global
|
||||
|
||||
try:
|
||||
mssql_connection_str = os.getenv('MSSQL_CONNECTION_STR')
|
||||
if not mssql_connection_str:
|
||||
raise ValueError('MSSQL_CONNECTION_STR nicht in der .env gefunden.')
|
||||
|
||||
SDF_LOCAL_PFAD = os.getenv('SDF_LOCAL_PFAD')
|
||||
if not SDF_LOCAL_PFAD:
|
||||
raise ValueError('SDF_LOCAL_PFAD nicht in der .env gefunden.')
|
||||
|
||||
sdf_file = os.path.join(SDF_LOCAL_PFAD, 'App.sdf')
|
||||
sdf_connection_str = f'Provider=Microsoft.SQLSERVER.CE.OLEDB.3.5;Data Source={sdf_file};Persist Security Info=False;'
|
||||
|
||||
tables = [
|
||||
'Addressee', 'ADR', 'AxlesArchive', 'CardEncoding', 'Carrier',
|
||||
'Coeff', 'Conveyer', 'CustomerLDB', 'Fields', 'GeneralData',
|
||||
'PDR', 'Plate', 'Product', 'RDR', 'RDR_LDB_Weighing',
|
||||
'Reason', 'Supplier', 'Tare', 'TxWeighing', 'Weighing_LDB'
|
||||
]
|
||||
|
||||
mssql_conn = pyodbc.connect(mssql_connection_str)
|
||||
mssql_cursor = mssql_conn.cursor()
|
||||
mssql_cursor_global = mssql_cursor
|
||||
|
||||
sdf_conn = adodbapi.connect(sdf_connection_str)
|
||||
sdf_cursor = sdf_conn.cursor()
|
||||
|
||||
report_lines = []
|
||||
|
||||
for table in tables:
|
||||
try:
|
||||
write_log(f'\nVerarbeite Tabelle: {table}')
|
||||
mssql_cursor.execute(f'SELECT * FROM [{table}]')
|
||||
columns = [col[0] for col in mssql_cursor.description]
|
||||
rows = mssql_cursor.fetchall()
|
||||
|
||||
write_log(f'In MSSQL gefunden: {len(rows)} Datensätze; Spalten: {columns}')
|
||||
|
||||
pk_columns = get_pk_columns(mssql_cursor, table)
|
||||
if pk_columns:
|
||||
write_log(f'Primary Key Spalte(n) für {table}: {pk_columns}')
|
||||
pk_indices = [columns.index(pk) for pk in pk_columns]
|
||||
else:
|
||||
write_log(f'Kein Primary Key für {table}. Es wird jeder Datensatz eingefügt.')
|
||||
pk_indices = []
|
||||
|
||||
inserted = 0
|
||||
skipped = 0
|
||||
errors = 0
|
||||
inserted_rows = []
|
||||
|
||||
for row in rows:
|
||||
try:
|
||||
record_exists = False
|
||||
if pk_indices:
|
||||
pk_clause = ' AND '.join(
|
||||
f'[{col}] = {escape(row[columns.index(col)])}' for col in pk_columns
|
||||
)
|
||||
select_pk_sql = f'SELECT COUNT(*) FROM [{table}] WHERE {pk_clause}'
|
||||
sdf_cursor.execute(select_pk_sql)
|
||||
count = sdf_cursor.fetchone()[0]
|
||||
record_exists = count > 0
|
||||
|
||||
if record_exists:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
escaped_values = ', '.join(escape(v) for v in row)
|
||||
insert_sql = f"INSERT INTO [{table}] ({', '.join('[' + c + ']' for c in columns)}) VALUES ({escaped_values})"
|
||||
sdf_cursor.execute(insert_sql)
|
||||
inserted += 1
|
||||
inserted_rows.append(row_summary(columns, row))
|
||||
|
||||
except Exception as ie:
|
||||
errors += 1
|
||||
error_details = f'Fehler bei Tabelle {table}: {ie}\nRow: {row}'
|
||||
write_log(error_details)
|
||||
mail.send_error_email(error_details, process)
|
||||
|
||||
sdf_conn.commit()
|
||||
write_log_summary(table, inserted, skipped, errors, inserted_rows)
|
||||
report_lines.append(f'Tabelle {table}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.')
|
||||
|
||||
except Exception as te:
|
||||
error_details = f'Fehler in Tabelle {table}: {te}\n{traceback.format_exc()}'
|
||||
write_log(error_details)
|
||||
mail.send_error_email(error_details, process)
|
||||
|
||||
# Abschlussbericht
|
||||
report_text = '\n'.join(report_lines)
|
||||
write_log('Import-Zusammenfassung:\n' + report_text)
|
||||
mail.send_report_email(report_text, process)
|
||||
|
||||
import_marker_path = os.path.join(MAIN_DIR, '.import')
|
||||
with open(import_marker_path, 'w') as f:
|
||||
f.write('')
|
||||
write_log(f'Leere .import-Datei erstellt: {import_marker_path}')
|
||||
|
||||
except Exception as e:
|
||||
error_details = f'Allgemeiner Fehler beim Import: {e}\n{traceback.format_exc()}'
|
||||
write_log(error_details)
|
||||
mail.send_error_email(error_details, process)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
|||
Loading…
Reference in New Issue