belege-import/app/database.py

263 lines
8.2 KiB
Python

import logging
import os
import aiosqlite
from cryptography.fernet import Fernet
DB_PATH = os.environ.get("DB_PATH", "/data/belegimport.db")
SCHEMA_VERSION = 2
logger = logging.getLogger(__name__)
_fernet = None
ENCRYPTED_KEYS = {"imap_password", "smtp_password", "smb_password"}
DEFAULT_SETTINGS = {
"imap_server": "",
"imap_port": "993",
"imap_ssl": "true",
"imap_username": "",
"imap_password": "",
"smtp_server": "",
"smtp_port": "587",
"smtp_ssl": "starttls",
"smtp_username": "",
"smtp_password": "",
"import_email": "",
"source_folder": "Rechnungen",
"processed_folder": "Rechnungen/Verarbeitet",
"interval_minutes": "5",
"scheduler_enabled": "false",
"fetch_since_date": "",
# SMB
"smb_enabled": "false",
"smb_server": "",
"smb_port": "445",
"smb_username": "",
"smb_password": "",
"smb_domain": "",
"smb_share": "",
"smb_source_path": "",
"smb_processed_path": "Verarbeitet",
"smb_mode": "forward",
}
async def _get_fernet() -> Fernet:
global _fernet
if _fernet is not None:
return _fernet
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute(
"SELECT value FROM settings WHERE key = 'encryption_key'"
)
row = await cursor.fetchone()
if row:
key = row[0].encode()
else:
key = Fernet.generate_key()
await db.execute(
"INSERT INTO settings (key, value) VALUES ('encryption_key', ?)",
(key.decode(),),
)
await db.commit()
_fernet = Fernet(key)
return _fernet
def _encrypt(fernet: Fernet, value: str) -> str:
if not value:
return ""
return fernet.encrypt(value.encode()).decode()
def _decrypt(fernet: Fernet, value: str) -> str:
if not value:
return ""
try:
return fernet.decrypt(value.encode()).decode()
except Exception:
return ""
async def _get_schema_version(db: aiosqlite.Connection) -> int:
"""Read current schema version from DB. Returns 0 if not set."""
try:
cursor = await db.execute(
"SELECT value FROM settings WHERE key = 'schema_version'"
)
row = await cursor.fetchone()
return int(row[0]) if row else 0
except Exception:
return 0
async def _set_schema_version(db: aiosqlite.Connection, version: int):
await db.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES ('schema_version', ?)",
(str(version),),
)
async def _run_migrations(db: aiosqlite.Connection, current_version: int):
"""Run all pending migrations sequentially."""
if current_version < 1:
logger.info("Migration v1: Initiale Tabellenstruktur")
# v1: Base tables (idempotent via IF NOT EXISTS)
await db.execute("""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL DEFAULT ''
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS processing_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
email_subject TEXT,
email_from TEXT,
attachments_count INTEGER DEFAULT 0,
status TEXT NOT NULL,
error_message TEXT
)
""")
await db.commit()
await _set_schema_version(db, 1)
if current_version < 2:
logger.info("Migration v2: lexoffice_email -> import_email")
# v2: Rename lexoffice_email -> import_email
cursor = await db.execute(
"SELECT value FROM settings WHERE key = 'lexoffice_email'"
)
row = await cursor.fetchone()
if row and row[0]:
# Copy value to import_email if it's empty
cursor2 = await db.execute(
"SELECT value FROM settings WHERE key = 'import_email'"
)
row2 = await cursor2.fetchone()
if not row2 or not row2[0]:
await db.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES ('import_email', ?)",
(row[0],),
)
logger.info(" lexoffice_email Wert nach import_email übertragen")
await db.execute("DELETE FROM settings WHERE key = 'lexoffice_email'")
await db.commit()
await _set_schema_version(db, 2)
# --- Future migrations go here ---
# if current_version < 3:
# logger.info("Migration v3: ...")
# await _set_schema_version(db, 3)
await db.commit()
async def init_db():
os.makedirs(os.path.dirname(DB_PATH) or ".", exist_ok=True)
async with aiosqlite.connect(DB_PATH) as db:
# Ensure base tables exist (needed before we can read schema_version)
await db.execute("""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL DEFAULT ''
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS processing_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
email_subject TEXT,
email_from TEXT,
attachments_count INTEGER DEFAULT 0,
status TEXT NOT NULL,
error_message TEXT
)
""")
await db.commit()
# Check version and run migrations
current_version = await _get_schema_version(db)
if current_version < SCHEMA_VERSION:
logger.info(
f"DB-Schema v{current_version} -> v{SCHEMA_VERSION}, starte Migrationen..."
)
await _run_migrations(db, current_version)
logger.info(f"DB-Schema auf v{SCHEMA_VERSION} aktualisiert")
else:
logger.info(f"DB-Schema v{current_version} ist aktuell")
# Insert default settings for any new keys (never overwrites existing values)
for key, value in DEFAULT_SETTINGS.items():
await db.execute(
"INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)",
(key, value),
)
await db.commit()
# Ensure encryption key exists
await _get_fernet()
async def get_settings() -> dict:
fernet = await _get_fernet()
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute(
"SELECT key, value FROM settings WHERE key NOT IN ('encryption_key', 'schema_version')"
)
rows = await cursor.fetchall()
settings = {}
for key, value in rows:
if key in ENCRYPTED_KEYS:
settings[key] = _decrypt(fernet, value)
else:
settings[key] = value
return settings
async def save_settings(data: dict):
fernet = await _get_fernet()
async with aiosqlite.connect(DB_PATH) as db:
for key, value in data.items():
if key in ("encryption_key", "schema_version"):
continue
store_value = _encrypt(fernet, value) if key in ENCRYPTED_KEYS else value
await db.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
(key, store_value),
)
await db.commit()
async def add_log_entry(
email_subject: str,
email_from: str,
attachments_count: int,
status: str,
error_message: str = "",
):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"""INSERT INTO processing_log
(email_subject, email_from, attachments_count, status, error_message)
VALUES (?, ?, ?, ?, ?)""",
(email_subject, email_from, attachments_count, status, error_message),
)
await db.commit()
async def get_log_entries(limit: int = 100) -> list[dict]:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM processing_log ORDER BY id DESC LIMIT ?", (limit,)
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]