Balzer-WaagenDaten/exporter.py

224 lines
7.6 KiB
Python

import os
import traceback
import datetime
import adodbapi
import pyodbc
from dotenv import load_dotenv
import mail
# =========================
# INITIALISIERUNG
# =========================
load_dotenv()
MAIN_DIR = os.getcwd()
LOG_DIR = os.path.join(MAIN_DIR, "Logs")
os.makedirs(LOG_DIR, exist_ok=True)
logfile_name = f"MSSQL_exporter_log_{datetime.datetime.now().strftime('%Y-%m-%d')}.txt"
logfile_path = os.path.join(LOG_DIR, logfile_name)
process = "SDF_to_MSSQL_Export"
mssql_cursor_global = None
# =========================
# LOGGING
# =========================
def write_log(line: str):
timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
log_entry = f"{line}\n------------------------------{timestamp}------------------------------"
print(line)
with open(logfile_path, "a", encoding="utf-8") as f:
f.write(log_entry + "\n")
def write_log_summary(table, inserted, skipped, errors, inserted_rows):
"""Schreibt kurze Zusammenfassung in MSSQL-logs."""
if not mssql_cursor_global:
return
try:
if inserted == 0 and errors == 0:
message = "-"
else:
message = f"{inserted} eingefügt, {skipped} übersprungen, {errors} Fehler"
if inserted_rows:
message += " | " + "; ".join(inserted_rows[:5])
msg = f"Tabelle {table}: {message}"
mssql_cursor_global.execute(
"INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)",
datetime.datetime.now(), msg, process
)
mssql_cursor_global.connection.commit()
except Exception as e:
print(f"Fehler beim Schreiben der Tabellen-Zusammenfassung in logs: {e}")
# =========================
# VERBINDUNGEN
# =========================
SDF_LOCAL_PFAD = os.getenv("SDF_LOCAL_PFAD")
SDF_NAME = os.getenv("SDF_NAME", "App.sdf")
sdf_file = os.path.join(SDF_LOCAL_PFAD, SDF_NAME)
MSSQL_CONNECTION_STR = os.getenv("MSSQL_CONNECTION_STR")
if not MSSQL_CONNECTION_STR:
msg = "❌ MSSQL_CONNECTION_STR fehlt in .env"
write_log(msg)
mail.send_error_email(msg, process)
exit(1)
sdf_connection_str = (
"Provider=Microsoft.SQLSERVER.CE.OLEDB.3.5;"
f"Data Source={sdf_file};"
"Persist Security Info=False;"
)
tables_env = os.getenv("TABLES", "")
tables = [t.strip() for t in tables_env.split(",") if t.strip()]
# =========================
# HELFER
# =========================
def get_pk_columns(mssql_cursor, table_name):
query = """
SELECT KU.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU
ON TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
WHERE TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND KU.TABLE_NAME = ?
ORDER BY KU.ORDINAL_POSITION;
"""
mssql_cursor.execute(query, (table_name,))
return [row[0] for row in mssql_cursor.fetchall()]
def row_summary(columns, row):
summary = []
for i, col in enumerate(columns[:3]): # max. 3 Spalten
val = row[i]
summary.append(f"{col}={repr(val)}")
return ", ".join(summary)
# =========================
# HAUPTPROGRAMM
# =========================
def main():
global mssql_cursor_global
try:
# --- MSSQL ---
try:
mssql_conn = pyodbc.connect(MSSQL_CONNECTION_STR)
mssql_cursor = mssql_conn.cursor()
mssql_cursor_global = mssql_cursor
write_log("✅ Verbindung zu MSSQL erfolgreich hergestellt.")
except Exception as mssql_err:
err_msg = f"❌ Fehler bei der Verbindung zu MSSQL: {mssql_err}"
write_log(err_msg)
mail.send_error_email(err_msg, process)
#return # abbrechen
# --- SDF ---
try:
sdf_conn = adodbapi.connect(sdf_connection_str)
sdf_cursor = sdf_conn.cursor()
write_log("✅ Verbindung zur SDF erfolgreich geöffnet.")
except Exception as sdf_err:
err_msg = f"❌ Fehler beim Zugriff auf SDF: {sdf_err}"
write_log(err_msg)
mail.send_error_email(err_msg, process)
return
report_lines = []
# ============ Tabellen-Export ============
for table in tables:
write_log(f"\n🔹 Verarbeite Tabelle: {table}")
try:
sdf_cursor.execute(f"SELECT * FROM [{table}]")
columns = [col[0] for col in sdf_cursor.description]
write_log(f"Spalten in {table}: {columns}")
except Exception as e:
write_log(f"❌ Fehler beim Lesen der Tabelle {table}: {e}")
continue
pk_columns = get_pk_columns(mssql_cursor, table)
if pk_columns:
write_log(f"Primary Key(s) in {table}: {pk_columns}")
pk_indices = [columns.index(pk) for pk in pk_columns if pk in columns]
else:
write_log(f"⚠️ Kein Primary Key in {table}. Alle Datensätze werden eingefügt.")
pk_indices = []
placeholders = ", ".join("?" for _ in columns)
insert_sql = f"INSERT INTO {table} ({', '.join('[' + c + ']' for c in columns)}) VALUES ({placeholders})"
inserted = 0
skipped = 0
errors = 0
inserted_rows = []
rows = sdf_cursor.fetchall()
write_log(f"{len(rows)} Datensätze in {table} gefunden.")
for row in rows:
try:
if pk_indices:
pk_values = tuple(row[i] for i in pk_indices)
pk_clause = " AND ".join(f"[{col}] = ?" for col in pk_columns)
select_sql = f"SELECT COUNT(*) FROM {table} WHERE {pk_clause}"
mssql_cursor.execute(select_sql, pk_values)
if mssql_cursor.fetchone()[0] > 0:
skipped += 1
continue
mssql_cursor.execute(insert_sql, row)
inserted += 1
if len(inserted_rows) < 5:
inserted_rows.append(row_summary(columns, row))
except Exception as insert_err:
errors += 1
error_details = f"Fehler beim Einfügen in {table}: {insert_err}"
write_log(error_details)
mail.send_error_email(error_details, process)
mssql_conn.commit()
write_log(f"{table}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.")
write_log_summary(table, inserted, skipped, errors, inserted_rows)
report_lines.append(f"{table}: {inserted} eingefügt, {skipped} übersprungen, {errors} Fehler.")
# --- .export-Marker ---
export_marker_path = os.path.join(MAIN_DIR, ".export")
with open(export_marker_path, "w"):
pass
write_log(f"Leere .export-Datei erstellt: {export_marker_path}")
# --- Abschlusslog ---
try:
mssql_cursor.execute(
"INSERT INTO logs (timestamp, message, process) VALUES (?, ?, ?)",
datetime.datetime.now(), "Export abgeschlossen (.export erzeugt)", process
)
mssql_conn.commit()
except Exception as log_err:
print(f"Fehler beim finalen DB-Log: {log_err}")
sdf_cursor.close()
sdf_conn.close()
mssql_cursor.close()
mssql_conn.close()
except Exception as e:
err = f"Allgemeiner Fehler: {e}\n{traceback.format_exc()}"
write_log(err)
mail.send_error_email(err, process)
if __name__ == "__main__":
main()