exporter.py aktualisiert
parent
57f256f1e9
commit
f65fc1168f
262
exporter.py
262
exporter.py
|
|
@ -1,13 +1,56 @@
|
||||||
import os
|
import os
|
||||||
import adodbapi
|
import adodbapi
|
||||||
import pyodbc
|
import pyodbc
|
||||||
|
import traceback
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
import mail
|
||||||
|
import datetime
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
# Lokaler Pfad und SDF-Datei definieren
|
# Logdatei vorbereiten
|
||||||
|
MAIN_DIR = os.getcwd()
|
||||||
|
LOG_DIR = os.path.join(MAIN_DIR, "Logs")
|
||||||
|
logfile_name = f"MSSQL_exporter_log_{datetime.datetime.now().strftime('%Y-%m-%d')}.txt"
|
||||||
|
logfile_path = os.path.join(LOG_DIR, logfile_name)
|
||||||
|
|
||||||
|
# Prozessname für Mail und Logs
|
||||||
|
process = "SDF_to_MSSQL_Export"
|
||||||
|
|
||||||
|
# Globaler MSSQL-Cursor für Logging
|
||||||
|
mssql_cursor_global = None
|
||||||
|
|
||||||
|
def write_log(line):
|
||||||
|
timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
|
||||||
|
log_entry = f"{line}\n------------------------------{timestamp}------------------------------"
|
||||||
|
print(line)
|
||||||
|
with open(logfile_path, "a", encoding="utf-8") as f:
|
||||||
|
f.write(log_entry + "\n")
|
||||||
|
|
||||||
|
def write_log_summary(table, inserted, skipped, errors, inserted_rows):
|
||||||
|
"""Schreibt eine zusammenfassende Logzeile pro Tabelle in die logs-Tabelle."""
|
||||||
|
if not mssql_cursor_global:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
if inserted == 0 and errors == 0:
|
||||||
|
message = "-"
|
||||||
|
else:
|
||||||
|
message = f"{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler"
|
||||||
|
if inserted_rows:
|
||||||
|
message += " | " + "; ".join(inserted_rows[:5])
|
||||||
|
full_message = f"Tabelle {table}: {message}"
|
||||||
|
mssql_cursor_global.execute(
|
||||||
|
"INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)",
|
||||||
|
datetime.datetime.now(), full_message, process
|
||||||
|
)
|
||||||
|
mssql_cursor_global.connection.commit()
|
||||||
|
except Exception as log_db_error:
|
||||||
|
print(f"Fehler beim Schreiben der Tabellen-Zusammenfassung in logs: {log_db_error}")
|
||||||
|
|
||||||
|
# Pfade und Verbindungen
|
||||||
SDF_LOCAL_PFAD = os.getenv("SDF_LOCAL_PFAD")
|
SDF_LOCAL_PFAD = os.getenv("SDF_LOCAL_PFAD")
|
||||||
sdf_file = os.path.join(SDF_LOCAL_PFAD, "App.sdf")
|
SDF_NAME = os.getenv("SDF_NAME")
|
||||||
|
sdf_file = os.path.join(SDF_LOCAL_PFAD, SDF_NAME)
|
||||||
|
|
||||||
sdf_connection_str = (
|
sdf_connection_str = (
|
||||||
"Provider=Microsoft.SQLSERVER.CE.OLEDB.3.5;"
|
"Provider=Microsoft.SQLSERVER.CE.OLEDB.3.5;"
|
||||||
|
|
@ -15,41 +58,20 @@ sdf_connection_str = (
|
||||||
"Persist Security Info=False;"
|
"Persist Security Info=False;"
|
||||||
)
|
)
|
||||||
|
|
||||||
# MSSQL-Verbindungszeichenfolge aus der .env
|
|
||||||
mssql_connection_str = os.getenv("MSSQL_CONNECTION_STR")
|
mssql_connection_str = os.getenv("MSSQL_CONNECTION_STR")
|
||||||
if not mssql_connection_str:
|
if not mssql_connection_str:
|
||||||
print("MSSQL_CONNECTION_STR nicht in der .env gefunden.")
|
error_msg = "MSSQL_CONNECTION_STR nicht in der .env gefunden."
|
||||||
|
write_log(error_msg)
|
||||||
|
mail.send_error_email(error_msg, process)
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
# Die Liste der Tabellen aus deiner SDF-Datenbank
|
|
||||||
tables = [
|
tables = [
|
||||||
"Addressee",
|
"Addressee", "ADR", "AxlesArchive", "CardEncoding", "Carrier", "Coeff", "Conveyer",
|
||||||
"ADR",
|
"CustomerLDB", "Fields", "GeneralData", "PDR", "Plate", "Product", "RDR",
|
||||||
"AxlesArchive",
|
"RDR_LDB_Weighing", "Reason", "Supplier", "Tare", "TxWeighing", "Weighing_LDB"
|
||||||
"CardEncoding",
|
|
||||||
"Carrier",
|
|
||||||
"Coeff",
|
|
||||||
"Conveyer",
|
|
||||||
"CustomerLDB",
|
|
||||||
"Fields",
|
|
||||||
"GeneralData",
|
|
||||||
"PDR",
|
|
||||||
"Plate",
|
|
||||||
"Product",
|
|
||||||
"RDR",
|
|
||||||
"RDR_LDB_Weighing",
|
|
||||||
"Reason",
|
|
||||||
"Supplier",
|
|
||||||
"Tare",
|
|
||||||
"TxWeighing",
|
|
||||||
"Weighing_LDB"
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def get_pk_columns(mssql_cursor, table_name):
|
def get_pk_columns(mssql_cursor, table_name):
|
||||||
"""
|
|
||||||
Ermittelt die Primary-Key-Spalten für eine Tabelle in MSSQL.
|
|
||||||
"""
|
|
||||||
pk_query = """
|
pk_query = """
|
||||||
SELECT KU.COLUMN_NAME
|
SELECT KU.COLUMN_NAME
|
||||||
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
|
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
|
||||||
|
|
@ -62,90 +84,122 @@ def get_pk_columns(mssql_cursor, table_name):
|
||||||
mssql_cursor.execute(pk_query, (table_name,))
|
mssql_cursor.execute(pk_query, (table_name,))
|
||||||
return [row[0] for row in mssql_cursor.fetchall()]
|
return [row[0] for row in mssql_cursor.fetchall()]
|
||||||
|
|
||||||
try:
|
def row_summary(columns, row):
|
||||||
# Verbindung zur SDF-Datenbank herstellen
|
summary_parts = []
|
||||||
sdf_conn = adodbapi.connect(sdf_connection_str)
|
for i, col in enumerate(columns):
|
||||||
sdf_cursor = sdf_conn.cursor()
|
val = row[i]
|
||||||
|
if isinstance(val, (int, float, str)) and len(summary_parts) < 3:
|
||||||
|
summary_parts.append(f"{col}={repr(val)}")
|
||||||
|
return ", ".join(summary_parts)
|
||||||
|
|
||||||
# Verbindung zur MSSQL-Datenbank herstellen
|
def main():
|
||||||
mssql_conn = pyodbc.connect(mssql_connection_str)
|
global mssql_cursor_global
|
||||||
mssql_cursor = mssql_conn.cursor()
|
|
||||||
|
|
||||||
for table_name in tables:
|
try:
|
||||||
print(f"\nVerarbeite Tabelle: {table_name}")
|
# Verbindungen herstellen
|
||||||
|
sdf_conn = adodbapi.connect(sdf_connection_str)
|
||||||
|
sdf_cursor = sdf_conn.cursor()
|
||||||
|
mssql_conn = pyodbc.connect(mssql_connection_str)
|
||||||
|
mssql_cursor = mssql_conn.cursor()
|
||||||
|
mssql_cursor_global = mssql_cursor
|
||||||
|
|
||||||
# Spaltennamen ermitteln aus der SDF-Tabelle
|
report_lines = []
|
||||||
sdf_cursor.execute(f"SELECT * FROM [{table_name}]")
|
|
||||||
columns = [col[0] for col in sdf_cursor.description]
|
|
||||||
print(f"Gefundene Spalten: {columns}")
|
|
||||||
|
|
||||||
# Ermittlung der Primary-Key-Spalten aus MSSQL (falls vorhanden)
|
for table_name in tables:
|
||||||
pk_columns = get_pk_columns(mssql_cursor, table_name)
|
write_log(f"\nVerarbeite Tabelle: {table_name}")
|
||||||
if pk_columns:
|
sdf_cursor.execute(f"SELECT * FROM [{table_name}]")
|
||||||
print(f"Primary Key Spalte(n) für {table_name}: {pk_columns}")
|
columns = [col[0] for col in sdf_cursor.description]
|
||||||
try:
|
write_log(f"Spalten: {columns}")
|
||||||
pk_indices = [columns.index(pk) for pk in pk_columns]
|
|
||||||
except ValueError as ve:
|
# PK ermitteln
|
||||||
print(f"Fehler bei der Ermittlung der PK-Indizes für {table_name}: {ve}")
|
pk_columns = get_pk_columns(mssql_cursor, table_name)
|
||||||
|
if pk_columns:
|
||||||
|
write_log(f"Primary Key(s): {pk_columns}")
|
||||||
|
try:
|
||||||
|
pk_indices = [columns.index(pk) for pk in pk_columns]
|
||||||
|
except ValueError as ve:
|
||||||
|
write_log(f"Fehler bei PK-Index-Ermittlung: {ve}")
|
||||||
|
pk_indices = []
|
||||||
|
else:
|
||||||
|
write_log("Kein Primary Key definiert.")
|
||||||
pk_indices = []
|
pk_indices = []
|
||||||
else:
|
|
||||||
print(f"Kein Primary Key in MSSQL für {table_name} definiert.")
|
|
||||||
pk_indices = []
|
|
||||||
|
|
||||||
identity_insert = False
|
# Daten auslesen
|
||||||
if table_name == "Weighing_LDB":
|
sdf_cursor.execute(f"SELECT * FROM [{table_name}]")
|
||||||
identity_insert = True
|
rows = sdf_cursor.fetchall()
|
||||||
try:
|
write_log(f"{len(rows)} Datensätze in SDF gefunden.")
|
||||||
mssql_cursor.execute(f"SET IDENTITY_INSERT {table_name} ON")
|
|
||||||
print(f"IDENTITY_INSERT für {table_name} aktiviert.")
|
|
||||||
except Exception as ie:
|
|
||||||
print(f"Fehler beim Aktivieren von IDENTITY_INSERT für {table_name}: {ie}")
|
|
||||||
|
|
||||||
# Daten aus der SDF-Tabelle auslesen
|
# Insert vorbereiten
|
||||||
sdf_cursor.execute(f"SELECT * FROM [{table_name}]")
|
placeholders = ", ".join("?" for _ in columns)
|
||||||
rows = sdf_cursor.fetchall()
|
insert_sql = f"INSERT INTO {table_name} ({', '.join('[' + col + ']' for col in columns)}) VALUES ({placeholders})"
|
||||||
print(f"{len(rows)} Datensätze gefunden.")
|
|
||||||
|
|
||||||
# Insert-Statement für MSSQL vorbereiten
|
inserted = 0
|
||||||
placeholders = ", ".join("?" for _ in columns)
|
skipped = 0
|
||||||
insert_sql = f"INSERT INTO {table_name} ({', '.join('[' + col + ']' for col in columns)}) VALUES ({placeholders})"
|
errors = 0
|
||||||
|
inserted_rows = []
|
||||||
|
|
||||||
inserted = 0
|
for row in rows:
|
||||||
skipped = 0
|
try:
|
||||||
for row in rows:
|
if pk_indices:
|
||||||
# Falls ein Primary Key definiert ist, prüfen wir, ob der Datensatz bereits existiert
|
pk_values = tuple(row[i] for i in pk_indices)
|
||||||
if pk_indices:
|
pk_clause = " AND ".join(f"[{col}] = ?" for col in pk_columns)
|
||||||
pk_values = tuple(row[i] for i in pk_indices)
|
select_sql = f"SELECT COUNT(*) FROM {table_name} WHERE {pk_clause}"
|
||||||
pk_clause = " AND ".join(f"[{col}] = ?" for col in pk_columns)
|
mssql_cursor.execute(select_sql, pk_values)
|
||||||
select_pk_sql = f"SELECT COUNT(*) FROM {table_name} WHERE {pk_clause}"
|
count = mssql_cursor.fetchone()[0]
|
||||||
mssql_cursor.execute(select_pk_sql, pk_values)
|
if count > 0:
|
||||||
count = mssql_cursor.fetchone()[0]
|
skipped += 1
|
||||||
if count > 0:
|
continue
|
||||||
skipped += 1
|
|
||||||
continue # Datensatz existiert bereits -> überspringen
|
|
||||||
|
|
||||||
# Datensatz einfügen
|
mssql_cursor.execute(insert_sql, *row)
|
||||||
try:
|
inserted += 1
|
||||||
mssql_cursor.execute(insert_sql, *row)
|
inserted_rows.append(row_summary(columns, row))
|
||||||
inserted += 1
|
|
||||||
except Exception as ie:
|
|
||||||
print(f"Fehler beim Einfügen in Tabelle {table_name}: {ie}")
|
|
||||||
mssql_conn.commit()
|
|
||||||
print(f"Für Tabelle {table_name}: {inserted} Datensätze eingefügt, {skipped} übersprungen.")
|
|
||||||
|
|
||||||
# Falls IDENTITY_INSERT aktiviert wurde, wieder deaktivieren
|
except Exception as ie:
|
||||||
if identity_insert:
|
errors += 1
|
||||||
try:
|
error_details = f"Fehler beim Einfügen in Tabelle {table_name}: {ie}\nRow: {row}"
|
||||||
mssql_cursor.execute(f"SET IDENTITY_INSERT {table_name} OFF")
|
write_log(error_details)
|
||||||
mssql_conn.commit()
|
mail.send_error_email(error_details, process)
|
||||||
print(f"IDENTITY_INSERT für {table_name} deaktiviert.")
|
|
||||||
except Exception as ie:
|
|
||||||
print(f"Fehler beim Deaktivieren von IDENTITY_INSERT für {table_name}: {ie}")
|
|
||||||
|
|
||||||
# Verbindungen schließen
|
mssql_conn.commit()
|
||||||
sdf_cursor.close()
|
write_log(f"{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.")
|
||||||
sdf_conn.close()
|
write_log_summary(table_name, inserted, skipped, errors, inserted_rows)
|
||||||
mssql_cursor.close()
|
report_lines.append(f"Tabelle {table_name}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.")
|
||||||
mssql_conn.close()
|
|
||||||
except Exception as e:
|
# Abschlussbericht
|
||||||
print(f"Allgemeiner Fehler: {e}")
|
report_text = "\n".join(report_lines)
|
||||||
|
write_log("Export-Zusammenfassung:\n" + report_text)
|
||||||
|
mail.send_report_email(report_text, process)
|
||||||
|
|
||||||
|
# Leere .export-Datei erstellen
|
||||||
|
export_marker_path = os.path.join(MAIN_DIR, ".export")
|
||||||
|
try:
|
||||||
|
with open(export_marker_path, "w") as f:
|
||||||
|
pass
|
||||||
|
write_log(f"Leere .export-Datei erstellt: {export_marker_path}")
|
||||||
|
except Exception as marker_err:
|
||||||
|
write_log(f"Fehler beim Erstellen der .export-Datei: {marker_err}")
|
||||||
|
|
||||||
|
# Abschlusslog in DB
|
||||||
|
try:
|
||||||
|
if mssql_cursor_global:
|
||||||
|
mssql_cursor_global.execute(
|
||||||
|
"INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)",
|
||||||
|
datetime.datetime.now(), "Export abgeschlossen (.export erzeugt)", process
|
||||||
|
)
|
||||||
|
mssql_cursor_global.connection.commit()
|
||||||
|
except Exception as db_log_final_error:
|
||||||
|
print(f"Fehler beim Schreiben des finalen Logs in DB: {db_log_final_error}")
|
||||||
|
|
||||||
|
# Verbindungen schließen
|
||||||
|
sdf_cursor.close()
|
||||||
|
sdf_conn.close()
|
||||||
|
mssql_cursor.close()
|
||||||
|
mssql_conn.close()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_details = f"Allgemeiner Fehler: {e}\n{traceback.format_exc()}"
|
||||||
|
write_log(error_details)
|
||||||
|
mail.send_error_email(error_details, process)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue