"""FTP / SFTP file import processor. Same design as smb_processor but for FTP (passive, unencrypted) and SFTP (SSH). Reads PDF files from a remote source folder, forwards them via SMTP, then moves them to a processed folder. """ import asyncio import ftplib import io import logging import os import posixpath import tempfile import paramiko from app.database import get_settings, add_log_entry, get_import_email from app.mail_processor import _connect_smtp, _build_forward_email, _send_with_log from app.scanner import detect_separator_pages, split_pdf logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Generic adapter interface # --------------------------------------------------------------------------- class _FtpAdapter: """Common interface for FTP and SFTP backends.""" def list_pdfs(self, path: str) -> list[str]: raise NotImplementedError def list_dirs(self, path: str, max_depth: int = 5) -> list[str]: raise NotImplementedError def read_file(self, path: str) -> bytes: raise NotImplementedError def ensure_dir(self, path: str): raise NotImplementedError def stat_exists(self, path: str) -> bool: raise NotImplementedError def rename(self, src: str, dst: str): raise NotImplementedError def close(self): pass # --------------------------------------------------------------------------- # FTP (passive, unencrypted) # --------------------------------------------------------------------------- class _PlainFtpAdapter(_FtpAdapter): def __init__(self, server: str, port: int, username: str, password: str): self.ftp = ftplib.FTP() self.ftp.connect(server, port, timeout=15) self.ftp.login(username or "anonymous", password or "") self.ftp.set_pasv(True) # Remember initial CWD - all subsequent operations should resolve relative to this try: self._initial_cwd = self.ftp.pwd() except Exception: self._initial_cwd = None logger.debug(f"FTP initial CWD: {self._initial_cwd}") def _reset_cwd(self): """Reset CWD to initial directory after stateful operations.""" if self._initial_cwd: try: self.ftp.cwd(self._initial_cwd) except Exception: pass def list_pdfs(self, path: str) -> list[str]: """List PDF files via LIST (more reliable than NLST across FTP servers).""" self._reset_cwd() target = path if path else "." lines = [] try: self.ftp.retrlines(f"LIST {target}", lines.append) except ftplib.error_perm: return [] files = [] for line in lines: if not line or line[0:1] == "d": continue # skip directories parts = line.split(maxsplit=8) if len(parts) < 9: continue name = parts[-1] if name.lower().endswith(".pdf") and not name.startswith("."): files.append(name) return sorted(files) def list_dirs(self, path: str, max_depth: int = 5) -> list[str]: self._reset_cwd() base = path or "" logger.debug(f"FTP list_dirs: base={base!r}, max_depth={max_depth}") return self._list_dirs_rec(base, max_depth, 0, "") def _list_dirs_rec(self, base: str, max_depth: int, depth: int, prefix: str) -> list[str]: result = [] try: entries = [] target = base if base else "." self.ftp.retrlines(f"LIST {target}", entries.append) logger.debug(f"FTP LIST {target!r} -> {len(entries)} entries") except ftplib.error_perm as e: logger.warning(f"FTP LIST {base!r} failed: {e}") return [] for line in entries: # Try to detect dirs (line starts with 'd') - works for unix-style listings if not line or not line[0:1] == "d": continue parts = line.split(maxsplit=8) if len(parts) < 9: continue name = parts[-1] if name in (".", "..") or name.startswith("."): continue rel = f"{prefix}/{name}" if prefix else name result.append(rel) if depth < max_depth - 1: sub = posixpath.join(base, name) if base else name result.extend(self._list_dirs_rec(sub, max_depth, depth + 1, rel)) return result def read_file(self, path: str) -> bytes: self._reset_cwd() buf = io.BytesIO() self.ftp.retrbinary(f"RETR {path}", buf.write) return buf.getvalue() def ensure_dir(self, path: str): """Create directory tree, walking step by step from initial CWD. IMPORTANT: FTP's cwd() is stateful - it changes the current working directory for ALL subsequent operations. We must walk the path one segment at a time relative to the current position, not concatenate and re-cwd from initial each iteration. """ if not path: return try: self._reset_cwd() parts = [p for p in path.split("/") if p] for p in parts: try: self.ftp.cwd(p) except ftplib.error_perm: # Doesn't exist - create and enter try: self.ftp.mkd(p) self.ftp.cwd(p) except ftplib.error_perm as e: logger.warning(f"FTP mkd({p}) failed: {e}") return finally: self._reset_cwd() def stat_exists(self, path: str) -> bool: """Check if a file or directory exists at path.""" try: self.ftp.size(path) return True except (ftplib.error_perm, ftplib.error_temp): pass # Try as directory - cwd then immediately reset try: self.ftp.cwd(path) self._reset_cwd() return True except ftplib.error_perm: self._reset_cwd() return False def rename(self, src: str, dst: str): self._reset_cwd() self.ftp.rename(src, dst) def close(self): try: self.ftp.quit() except Exception: try: self.ftp.close() except Exception: pass # --------------------------------------------------------------------------- # SFTP (paramiko) # --------------------------------------------------------------------------- class _SftpAdapter(_FtpAdapter): def __init__(self, server: str, port: int, username: str, password: str): self.transport = paramiko.Transport((server, port)) self.transport.connect(username=username, password=password) self.sftp = paramiko.SFTPClient.from_transport(self.transport) def _resolve(self, path: str) -> str: """Resolve path - empty/None means user's home/root directory.""" if not path: try: return self.sftp.normalize(".") except IOError: return "." return path def list_pdfs(self, path: str) -> list[str]: try: entries = self.sftp.listdir(self._resolve(path)) except IOError: return [] return sorted( e for e in entries if e.lower().endswith(".pdf") and not e.startswith(".") ) def list_dirs(self, path: str, max_depth: int = 5) -> list[str]: base = self._resolve(path) logger.debug(f"SFTP list_dirs: base={base!r}, max_depth={max_depth}") return self._list_dirs_rec(base, max_depth, 0, "") def _list_dirs_rec(self, base: str, max_depth: int, depth: int, prefix: str) -> list[str]: from stat import S_ISDIR result = [] try: entries = self.sftp.listdir_attr(base) logger.debug(f"SFTP listdir_attr({base!r}) -> {[e.filename for e in entries]}") except IOError as e: logger.warning(f"SFTP listdir_attr({base!r}) failed: {e}") return result for entry in entries: if entry.filename.startswith(".") or entry.filename in ("..", "."): continue if entry.st_mode and S_ISDIR(entry.st_mode): rel = f"{prefix}/{entry.filename}" if prefix else entry.filename result.append(rel) if depth < max_depth - 1: sub = posixpath.join(base, entry.filename) result.extend(self._list_dirs_rec(sub, max_depth, depth + 1, rel)) return result def read_file(self, path: str) -> bytes: with self.sftp.open(path, "rb") as f: return f.read() def ensure_dir(self, path: str): if not path: return parts = [p for p in path.split("/") if p] cur = "" for p in parts: cur = f"{cur}/{p}" if cur else p try: self.sftp.stat(cur) except IOError: try: self.sftp.mkdir(cur) except IOError: pass def stat_exists(self, path: str) -> bool: try: self.sftp.stat(path) return True except IOError: return False def rename(self, src: str, dst: str): self.sftp.rename(src, dst) def close(self): try: self.sftp.close() except Exception: pass try: self.transport.close() except Exception: pass # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_adapter(settings: dict) -> _FtpAdapter: protocol = settings.get("ftp_protocol", "sftp").lower() server = settings["ftp_server"] username = settings.get("ftp_username", "") password = settings.get("ftp_password", "") if protocol == "sftp": port = int(settings.get("ftp_port") or 22) return _SftpAdapter(server, port, username, password) else: port = int(settings.get("ftp_port") or 21) return _PlainFtpAdapter(server, port, username, password) def _join_path(*parts: str) -> str: """Join FTP/SFTP path segments using forward slash.""" result = "" for p in parts: if not p: continue p = p.replace("\\", "/").strip("/") if not p: continue result = f"{result}/{p}" if result else p return result def _move_with_dedup(adapter: _FtpAdapter, src: str, dest_dir: str, filename: str): """Move file to dest_dir, renaming if a duplicate exists.""" dest = _join_path(dest_dir, filename) if adapter.stat_exists(dest): name, ext = os.path.splitext(filename) counter = 1 while True: new_name = f"{name}_{counter}{ext}" new_dest = _join_path(dest_dir, new_name) if not adapter.stat_exists(new_dest): dest = new_dest break counter += 1 adapter.rename(src, dest) # --------------------------------------------------------------------------- # Processing pipeline # --------------------------------------------------------------------------- async def _process_ftp_folder( smtp_conn, settings: dict, adapter: _FtpAdapter, source_path: str, processed_path: str, import_email: str, beleg_type: str, mode: str, ) -> dict: """Process one FTP folder pair. Returns counts dict.""" smtp_from = settings.get("smtp_username", "") protocol = settings.get("ftp_protocol", "sftp").upper() processed = 0 skipped = 0 errors = 0 await asyncio.to_thread(adapter.ensure_dir, processed_path) pdf_files = await asyncio.to_thread(adapter.list_pdfs, source_path) if not pdf_files: logger.info(f"Keine PDF-Dateien im {protocol}-Ordner '{source_path}' ({beleg_type})") return {"processed": 0, "skipped": 0, "errors": 0} logger.info(f"{len(pdf_files)} PDF-Datei(en) im {protocol}-Ordner '{source_path}' ({beleg_type})") for filename in pdf_files: file_path = _join_path(source_path, filename) try: pdf_data = await asyncio.to_thread(adapter.read_file, file_path) if mode == "separator": with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp: tmp.write(pdf_data) tmp_path = tmp.name try: separator_pages = await asyncio.to_thread(detect_separator_pages, tmp_path, None) documents = await asyncio.to_thread(split_pdf, tmp_path, separator_pages) finally: os.unlink(tmp_path) if not documents: skipped += 1 continue smtp_log_parts = [] for i, doc_bytes in enumerate(documents): doc_filename = f"{os.path.splitext(filename)[0]}_Teil_{i + 1}.pdf" subject = f"{protocol}-Import: {filename} (Dokument {i + 1}/{len(documents)})" msg = _build_forward_email( from_addr=smtp_from, to_addr=import_email, original_subject=subject, original_from=f"{protocol}-Import", attachments=[(doc_filename, doc_bytes)], ) smtp_log_parts.append(_send_with_log(smtp_conn, msg)) await add_log_entry( email_subject=f"{protocol}: {filename}", email_from=f"{protocol}-Import", attachments_count=len(documents), status="success", sent_to=import_email, smtp_log="\n---\n".join(smtp_log_parts), beleg_type=beleg_type, ) logger.info(f"{protocol} verarbeitet ({beleg_type}): {filename} -> {len(documents)} Dokument(e)") else: msg = _build_forward_email( from_addr=smtp_from, to_addr=import_email, original_subject=f"{protocol}-Import: {filename}", original_from=f"{protocol}-Import", attachments=[(filename, pdf_data)], ) smtp_log = _send_with_log(smtp_conn, msg) await add_log_entry( email_subject=f"{protocol}: {filename}", email_from=f"{protocol}-Import", attachments_count=1, status="success", sent_to=import_email, smtp_log=smtp_log, beleg_type=beleg_type, ) logger.info(f"{protocol} verarbeitet ({beleg_type}): {filename}") await asyncio.to_thread(_move_with_dedup, adapter, file_path, processed_path, filename) processed += 1 except Exception as e: errors += 1 logger.error(f"Fehler bei {protocol}-Datei {filename}: {e}") try: await add_log_entry( email_subject=f"{protocol}: {filename}", email_from=f"{protocol}-Import", attachments_count=0, status="error", error_message=str(e), beleg_type=beleg_type, ) except Exception: pass return {"processed": processed, "skipped": skipped, "errors": errors} async def process_ftp() -> dict: """Process PDF files from FTP/SFTP server - main pipeline.""" settings = await get_settings() if settings.get("ftp_enabled") != "true": return {"processed": 0, "skipped": 0, "errors": 0} if not settings.get("ftp_server"): return {"processed": 0, "skipped": 0, "errors": 0, "error": "FTP nicht konfiguriert"} import_email_eingang = get_import_email(settings, "eingang") if not import_email_eingang: return {"processed": 0, "skipped": 0, "errors": 0, "error": "Import-Email nicht konfiguriert"} mode = settings.get("ftp_mode", "forward") protocol = settings.get("ftp_protocol", "sftp").upper() total = {"processed": 0, "skipped": 0, "errors": 0} smtp_conn = None adapter = None try: adapter = await asyncio.to_thread(_make_adapter, settings) smtp_conn = _connect_smtp(settings) # Eingangsbelege source = settings.get("ftp_source_path", "") processed_path = settings.get("ftp_processed_path", "Verarbeitet") result = await _process_ftp_folder( smtp_conn, settings, adapter, source, processed_path, import_email_eingang, "eingang", mode, ) for k in total: total[k] += result[k] # Ausgangsbelege (optional) import_email_ausgang = get_import_email(settings, "ausgang") source_ausgang = settings.get("ftp_source_path_ausgang", "") processed_ausgang = settings.get("ftp_processed_path_ausgang", "") if import_email_ausgang and source_ausgang: if not processed_ausgang: processed_ausgang = source_ausgang + "/Verarbeitet" result = await _process_ftp_folder( smtp_conn, settings, adapter, source_ausgang, processed_ausgang, import_email_ausgang, "ausgang", mode, ) for k in total: total[k] += result[k] except Exception as e: logger.error(f"{protocol}-Verbindungsfehler: {e}") try: await add_log_entry( email_subject="", email_from=f"{protocol}-Import", attachments_count=0, status="error", error_message=f"{protocol}-Verbindungsfehler: {e}", ) except Exception: pass return {**total, "errors": total["errors"] + 1, "error": str(e)} finally: if adapter: try: await asyncio.to_thread(adapter.close) except Exception: pass if smtp_conn: try: smtp_conn.quit() except Exception: pass logger.info(f"{protocol} fertig: {total['processed']} verarbeitet, {total['skipped']} übersprungen, {total['errors']} Fehler") return total async def test_ftp_connection() -> dict: """Test FTP/SFTP connection and return TOP-LEVEL folders only (lazy loading).""" settings = await get_settings() if not settings.get("ftp_server"): return {"success": False, "error": "FTP-Server nicht konfiguriert", "folders": []} adapter = None try: adapter = await asyncio.to_thread(_make_adapter, settings) folders = await asyncio.to_thread(adapter.list_dirs, "", 1) return {"success": True, "folders": sorted(folders)} except Exception as e: logger.error(f"FTP-Test fehlgeschlagen: {e}") return {"success": False, "error": str(e), "folders": []} finally: if adapter: try: await asyncio.to_thread(adapter.close) except Exception: pass async def create_ftp_folder(folder_path: str) -> dict: """Create a folder on the FTP/SFTP server.""" settings = await get_settings() if not settings.get("ftp_server"): return {"success": False, "error": "FTP nicht konfiguriert"} if not folder_path or not folder_path.strip(): return {"success": False, "error": "Ordnername darf nicht leer sein"} folder_path = folder_path.strip().replace("\\", "/") adapter = None try: adapter = await asyncio.to_thread(_make_adapter, settings) await asyncio.to_thread(adapter.ensure_dir, folder_path) return {"success": True} except Exception as e: logger.error(f"FTP-Ordner erstellen fehlgeschlagen: {e}") return {"success": False, "error": str(e)} finally: if adapter: try: await asyncio.to_thread(adapter.close) except Exception: pass async def list_ftp_folders() -> dict: """Return TOP-LEVEL folder list from FTP/SFTP server (lazy loading).""" settings = await get_settings() if not settings.get("ftp_server"): return {"folders": []} adapter = None try: adapter = await asyncio.to_thread(_make_adapter, settings) folders = await asyncio.to_thread(adapter.list_dirs, "", 1) return {"folders": sorted(folders)} except Exception: return {"folders": []} finally: if adapter: try: await asyncio.to_thread(adapter.close) except Exception: pass async def list_ftp_subfolders(parent_path: str) -> dict: """List direct subfolders of a path (one level deep, for lazy tree expansion).""" settings = await get_settings() if not settings.get("ftp_server"): return {"success": False, "error": "FTP nicht konfiguriert", "folders": []} adapter = None try: adapter = await asyncio.to_thread(_make_adapter, settings) rel_folders = await asyncio.to_thread(adapter.list_dirs, parent_path, 1) # Prefix with parent_path so the frontend has full paths if parent_path: folders = [f"{parent_path}/{f}" for f in rel_folders] else: folders = rel_folders return {"success": True, "folders": sorted(folders)} except Exception as e: logger.error(f"FTP-Subfolder-Liste fehlgeschlagen: {e}") return {"success": False, "error": str(e), "folders": []} finally: if adapter: try: await asyncio.to_thread(adapter.close) except Exception: pass