import logging import os import aiosqlite from cryptography.fernet import Fernet DB_PATH = os.environ.get("DB_PATH", "/data/belegimport.db") SCHEMA_VERSION = 2 logger = logging.getLogger(__name__) _fernet = None ENCRYPTED_KEYS = {"imap_password", "smtp_password", "smb_password"} DEFAULT_SETTINGS = { "imap_server": "", "imap_port": "993", "imap_ssl": "true", "imap_username": "", "imap_password": "", "smtp_server": "", "smtp_port": "587", "smtp_ssl": "starttls", "smtp_username": "", "smtp_password": "", "import_email": "", "source_folder": "Rechnungen", "processed_folder": "Rechnungen/Verarbeitet", "interval_minutes": "5", "scheduler_enabled": "false", "fetch_since_date": "", # SMB "smb_enabled": "false", "smb_server": "", "smb_port": "445", "smb_username": "", "smb_password": "", "smb_domain": "", "smb_share": "", "smb_source_path": "", "smb_processed_path": "Verarbeitet", "smb_mode": "forward", } async def _get_fernet() -> Fernet: global _fernet if _fernet is not None: return _fernet async with aiosqlite.connect(DB_PATH) as db: cursor = await db.execute( "SELECT value FROM settings WHERE key = 'encryption_key'" ) row = await cursor.fetchone() if row: key = row[0].encode() else: key = Fernet.generate_key() await db.execute( "INSERT INTO settings (key, value) VALUES ('encryption_key', ?)", (key.decode(),), ) await db.commit() _fernet = Fernet(key) return _fernet def _encrypt(fernet: Fernet, value: str) -> str: if not value: return "" return fernet.encrypt(value.encode()).decode() def _decrypt(fernet: Fernet, value: str) -> str: if not value: return "" try: return fernet.decrypt(value.encode()).decode() except Exception: return "" async def _get_schema_version(db: aiosqlite.Connection) -> int: """Read current schema version from DB. Returns 0 if not set.""" try: cursor = await db.execute( "SELECT value FROM settings WHERE key = 'schema_version'" ) row = await cursor.fetchone() return int(row[0]) if row else 0 except Exception: return 0 async def _set_schema_version(db: aiosqlite.Connection, version: int): await db.execute( "INSERT OR REPLACE INTO settings (key, value) VALUES ('schema_version', ?)", (str(version),), ) async def _run_migrations(db: aiosqlite.Connection, current_version: int): """Run all pending migrations sequentially.""" if current_version < 1: logger.info("Migration v1: Initiale Tabellenstruktur") # v1: Base tables (idempotent via IF NOT EXISTS) await db.execute(""" CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL DEFAULT '' ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS processing_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), email_subject TEXT, email_from TEXT, attachments_count INTEGER DEFAULT 0, status TEXT NOT NULL, error_message TEXT ) """) await db.commit() await _set_schema_version(db, 1) if current_version < 2: logger.info("Migration v2: lexoffice_email -> import_email") # v2: Rename lexoffice_email -> import_email cursor = await db.execute( "SELECT value FROM settings WHERE key = 'lexoffice_email'" ) row = await cursor.fetchone() if row and row[0]: # Copy value to import_email if it's empty cursor2 = await db.execute( "SELECT value FROM settings WHERE key = 'import_email'" ) row2 = await cursor2.fetchone() if not row2 or not row2[0]: await db.execute( "INSERT OR REPLACE INTO settings (key, value) VALUES ('import_email', ?)", (row[0],), ) logger.info(" lexoffice_email Wert nach import_email übertragen") await db.execute("DELETE FROM settings WHERE key = 'lexoffice_email'") await db.commit() await _set_schema_version(db, 2) # --- Future migrations go here --- # if current_version < 3: # logger.info("Migration v3: ...") # await _set_schema_version(db, 3) await db.commit() async def init_db(): os.makedirs(os.path.dirname(DB_PATH) or ".", exist_ok=True) async with aiosqlite.connect(DB_PATH) as db: # Ensure base tables exist (needed before we can read schema_version) await db.execute(""" CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL DEFAULT '' ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS processing_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), email_subject TEXT, email_from TEXT, attachments_count INTEGER DEFAULT 0, status TEXT NOT NULL, error_message TEXT ) """) await db.commit() # Check version and run migrations current_version = await _get_schema_version(db) if current_version < SCHEMA_VERSION: logger.info( f"DB-Schema v{current_version} -> v{SCHEMA_VERSION}, starte Migrationen..." ) await _run_migrations(db, current_version) logger.info(f"DB-Schema auf v{SCHEMA_VERSION} aktualisiert") else: logger.info(f"DB-Schema v{current_version} ist aktuell") # Insert default settings for any new keys (never overwrites existing values) for key, value in DEFAULT_SETTINGS.items(): await db.execute( "INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)", (key, value), ) await db.commit() # Ensure encryption key exists await _get_fernet() async def get_settings() -> dict: fernet = await _get_fernet() async with aiosqlite.connect(DB_PATH) as db: cursor = await db.execute( "SELECT key, value FROM settings WHERE key NOT IN ('encryption_key', 'schema_version')" ) rows = await cursor.fetchall() settings = {} for key, value in rows: if key in ENCRYPTED_KEYS: settings[key] = _decrypt(fernet, value) else: settings[key] = value return settings async def save_settings(data: dict): fernet = await _get_fernet() async with aiosqlite.connect(DB_PATH) as db: for key, value in data.items(): if key in ("encryption_key", "schema_version"): continue store_value = _encrypt(fernet, value) if key in ENCRYPTED_KEYS else value await db.execute( "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)", (key, store_value), ) await db.commit() async def add_log_entry( email_subject: str, email_from: str, attachments_count: int, status: str, error_message: str = "", ): async with aiosqlite.connect(DB_PATH) as db: await db.execute( """INSERT INTO processing_log (email_subject, email_from, attachments_count, status, error_message) VALUES (?, ?, ?, ?, ?)""", (email_subject, email_from, attachments_count, status, error_message), ) await db.commit() async def get_log_entries(limit: int = 100) -> list[dict]: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM processing_log ORDER BY id DESC LIMIT ?", (limit,) ) rows = await cursor.fetchall() return [dict(row) for row in rows]