4b9df132d7
FTP/SFTP processor: - New ftp_processor.py with adapter pattern for FTP (passive) and SFTP - Same design as smb_processor: read PDFs, forward via SMTP, move to processed - Eingangs-/Ausgangsbelege with separate paths, modes (forward/separator) - paramiko==3.5.0 for SFTP support - Schema v9 with new ftp_* settings - Integrated in scheduler Tree-view folder picker (SMB + FTP): - Reusable tree rendering from flat path lists - Expandable/collapsible nodes with toggle arrows - Lazy loading: only top-level folders on open, sub-folders on-demand - Auto-expand ancestors of currently selected value (with preload) - Reload button stays for manual refresh - Always fresh load when opening picker - New endpoints: /api/list-smb-subfolders, /api/list-ftp-subfolders FTP-specific fixes: - list_pdfs uses LIST instead of NLST (more reliable across servers) - Stateful CWD bug fixed in ensure_dir/stat_exists/rename (previously created /Buch/Buch/X instead of /Buch/X due to CWD drift) - All operations reset CWD via _reset_cwd() before stateful calls - _resolve() helper for SFTP to handle empty path / chroot users Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
98 lines
3.3 KiB
Python
98 lines
3.3 KiB
Python
import asyncio
|
|
import logging
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
from app.mail_processor import process_mailbox
|
|
from app.smb_processor import process_smb_share
|
|
from app.ftp_processor import process_ftp
|
|
from app.amazon_processor import process_amazon
|
|
from app.amazon_api import process_amazon_api
|
|
from app.database import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
JOB_ID = "mail_processor"
|
|
|
|
scheduler = AsyncIOScheduler()
|
|
_is_processing = False
|
|
|
|
|
|
async def _run_processor():
|
|
global _is_processing
|
|
if _is_processing:
|
|
logger.info("Verarbeitung läuft bereits, überspringe diesen Durchlauf")
|
|
return
|
|
_is_processing = True
|
|
try:
|
|
# Email and SMB first - these are fast and must not be blocked by Amazon
|
|
logger.info("Starte automatische Email-Verarbeitung...")
|
|
result = await process_mailbox()
|
|
logger.info(f"Email-Verarbeitung abgeschlossen: {result}")
|
|
|
|
logger.info("Starte automatische SMB-Verarbeitung...")
|
|
smb_result = await process_smb_share()
|
|
logger.info(f"SMB-Verarbeitung abgeschlossen: {smb_result}")
|
|
|
|
logger.info("Starte automatische FTP-Verarbeitung...")
|
|
ftp_result = await process_ftp()
|
|
logger.info(f"FTP-Verarbeitung abgeschlossen: {ftp_result}")
|
|
|
|
# Amazon separately with timeout - must not block next scheduler runs
|
|
logger.info("Starte automatische Amazon-Verarbeitung...")
|
|
try:
|
|
settings = await get_settings()
|
|
amazon_mode = settings.get("amazon_mode", "browser")
|
|
if amazon_mode == "api":
|
|
amazon_result = await asyncio.wait_for(process_amazon_api(), timeout=300)
|
|
else:
|
|
amazon_result = await asyncio.wait_for(process_amazon(), timeout=300)
|
|
logger.info(f"Amazon-Verarbeitung abgeschlossen: {amazon_result}")
|
|
except asyncio.TimeoutError:
|
|
logger.error("Amazon-Verarbeitung nach 5 Minuten abgebrochen (Timeout)")
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei Amazon-Verarbeitung: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei automatischer Verarbeitung: {e}")
|
|
finally:
|
|
_is_processing = False
|
|
|
|
|
|
def start_scheduler():
|
|
if not scheduler.running:
|
|
scheduler.start()
|
|
logger.info("Scheduler gestartet")
|
|
|
|
|
|
def configure_job(interval_minutes: int, enabled: bool):
|
|
existing = scheduler.get_job(JOB_ID)
|
|
if existing:
|
|
scheduler.remove_job(JOB_ID)
|
|
|
|
if enabled and interval_minutes > 0:
|
|
scheduler.add_job(
|
|
_run_processor,
|
|
trigger=IntervalTrigger(minutes=interval_minutes),
|
|
id=JOB_ID,
|
|
name="Email-Verarbeitung",
|
|
replace_existing=True,
|
|
)
|
|
logger.info(f"Scheduler konfiguriert: alle {interval_minutes} Minuten")
|
|
else:
|
|
logger.info("Scheduler deaktiviert")
|
|
|
|
|
|
def get_scheduler_status() -> dict:
|
|
job = scheduler.get_job(JOB_ID)
|
|
if job and job.next_run_time:
|
|
return {
|
|
"enabled": True,
|
|
"next_run": job.next_run_time.strftime("%d.%m.%Y %H:%M:%S"),
|
|
"is_processing": _is_processing,
|
|
}
|
|
return {
|
|
"enabled": False,
|
|
"next_run": None,
|
|
"is_processing": _is_processing,
|
|
}
|