import asyncio import logging import os import random import re import tempfile from datetime import datetime, timedelta from pathlib import Path from app.database import get_settings, save_settings, add_log_entry, is_invoice_downloaded, mark_invoice_downloaded from app.mail_processor import _connect_smtp, _build_forward_email, _send_with_log logger = logging.getLogger(__name__) SESSION_DIR = Path(os.environ.get("AMAZON_SESSION_DIR", "/data/amazon_session")) DEBUG_DIR = Path(os.environ.get("UPLOAD_DIR", "/data/uploads")) / "amazon_debug" # Login state machine _login_state = {"status": "idle", "message": ""} _login_lock = asyncio.Lock() _otp_future: asyncio.Future | None = None _browser_context = None _playwright_instance = None # Process lock to prevent concurrent runs _process_lock = asyncio.Lock() # Flag: True while process_amazon is actively working (page consumed but session valid) _processing_active = False # Interactive login session (browser page kept alive for user interaction) _interactive_page = None async def _human_delay(min_s: float = 1.0, max_s: float = 3.0): """Random delay to mimic human behavior.""" await asyncio.sleep(random.uniform(min_s, max_s)) async def _apply_stealth_to_context(context): """Apply stealth measures to the browser context (all pages).""" try: from playwright_stealth import Stealth stealth = Stealth() await stealth.apply_stealth_async(context) logger.info("Stealth erfolgreich auf Browser-Kontext angewendet") except ImportError: logger.warning("playwright-stealth nicht installiert, überspringe") except Exception as e: logger.warning(f"Stealth konnte nicht angewendet werden: {e}") async def _add_virtual_authenticator(page): """Add virtual WebAuthn authenticator to prevent passkey dialogs.""" try: client = await page.context.new_cdp_session(page) await client.send("WebAuthn.enable") await client.send("WebAuthn.addVirtualAuthenticator", { "options": { "protocol": "ctap2", "transport": "internal", "hasResidentKey": True, "hasUserVerification": True, "isUserVerified": True, "automaticPresenceSimulation": True, } }) logger.debug("Virtueller WebAuthn-Authenticator hinzugefügt") except Exception as e: logger.debug(f"Virtueller Authenticator fehlgeschlagen: {e}") async def _get_browser_context(): """Get or create persistent Chromium browser context.""" global _browser_context, _playwright_instance if _browser_context is not None: try: # Check if context is still alive pages = _browser_context.pages return _browser_context except Exception: _browser_context = None from playwright.async_api import async_playwright SESSION_DIR.mkdir(parents=True, exist_ok=True) # Clean up stale Chromium lock files from previous container runs for lock_file in ["SingletonLock", "SingletonSocket", "SingletonCookie"]: lock_path = SESSION_DIR / lock_file if lock_path.exists(): try: lock_path.unlink() logger.info(f"Stale Lock-File entfernt: {lock_file}") except Exception: pass if _playwright_instance is None: _playwright_instance = await async_playwright().start() _browser_context = await _playwright_instance.chromium.launch_persistent_context( user_data_dir=str(SESSION_DIR), headless=True, locale="de-DE", user_agent=( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" ), viewport={"width": 1280, "height": 800}, args=[ "--disable-blink-features=AutomationControlled", "--disable-gpu", "--disable-dev-shm-usage", "--disable-extensions", "--disable-background-networking", "--disable-translate", "--no-first-run", "--no-sandbox", ], ) await _apply_stealth_to_context(_browser_context) return _browser_context async def close_browser_context(): """Close browser context and playwright instance.""" global _browser_context, _playwright_instance if _browser_context is not None: try: await _browser_context.close() except Exception: pass _browser_context = None if _playwright_instance is not None: try: await _playwright_instance.stop() except Exception: pass _playwright_instance = None def get_login_state() -> dict: """Return current login state for polling.""" return dict(_login_state) async def _save_debug(page, name: str): """Save screenshot and HTML dump for debugging (max 50 files).""" ts = datetime.now().strftime('%Y%m%d_%H%M%S') try: DEBUG_DIR.mkdir(parents=True, exist_ok=True) # Limit to 50 files - delete oldest if over limit existing = sorted(DEBUG_DIR.iterdir(), key=lambda p: p.stat().st_mtime) while len(existing) > 48: # leave room for 2 new files existing.pop(0).unlink() except Exception as e: logger.error(f"Amazon Debug-Verzeichnis Fehler: {e}") return # Save HTML (most reliable) try: html_path = DEBUG_DIR / f"{name}_{ts}.html" content = await page.content() html_path.write_text(content, encoding="utf-8") logger.info(f"Amazon Debug-HTML gespeichert: {html_path} ({len(content)} Bytes)") except Exception as e: logger.error(f"Amazon Debug-HTML fehlgeschlagen: {e}") # Save screenshot try: png_path = DEBUG_DIR / f"{name}_{ts}.png" await page.screenshot(path=str(png_path), full_page=True) logger.info(f"Amazon Debug-Screenshot gespeichert: {png_path}") except Exception as e: logger.error(f"Amazon Debug-Screenshot fehlgeschlagen: {e}") async def check_session_valid() -> bool: """Check if an active Amazon session exists. Returns True if we have a live interactive page, or if processing is active (page consumed but still working), or if login is in progress. """ has_page = _interactive_page is not None is_logging_in = _login_state.get("status") in ("interactive", "logging_in") logger.info(f"Amazon Session-Check: has_page={has_page}, login_active={is_logging_in}, processing={_processing_active}") return has_page or is_logging_in or _processing_active def is_interactive_login_active() -> bool: """Check if interactive login modal is currently open (browser in use by user). Also returns True if user has logged in but hasn't closed the modal yet. """ status = _login_state.get("status", "idle") # Active if login dialog is open (interactive, logging_in, or logged_in but page still held) if status in ("interactive", "logging_in"): return True if status == "logged_in" and _interactive_page is not None: return True return False async def clear_session(): """Clear browser session data.""" global _login_state, _interactive_page await close_interactive_login(force_close=True) await close_browser_context() # Remove session files if SESSION_DIR.exists(): import shutil try: shutil.rmtree(SESSION_DIR) except Exception as e: logger.warning(f"Session-Verzeichnis konnte nicht gelöscht werden: {e}") SESSION_DIR.mkdir(parents=True, exist_ok=True) _login_state = {"status": "idle", "message": ""} # --- Interactive Login (user solves CAPTCHAs via screenshot/click/type) --- async def start_interactive_login(): """Open browser page to Amazon login and keep it alive for user interaction.""" global _login_state, _interactive_page if _interactive_page is not None: # Already have an interactive session return if _process_lock.locked(): _login_state = {"status": "login_failed", "message": "Amazon-Abruf läuft gerade. Bitte warten bis der Abruf fertig ist."} return _login_state = {"status": "interactive", "message": "Browser wird gestartet..."} try: settings = await get_settings() domain = settings.get("amazon_domain", "amazon.de") ctx = await _get_browser_context() page = await ctx.new_page() # Stealth is applied at context level await _add_virtual_authenticator(page) # Navigate to order history - Amazon redirects to login if not authenticated await page.goto( f"https://www.{domain}/gp/css/order-history", wait_until="domcontentloaded", timeout=60000, ) # Wait a bit for page to settle await asyncio.sleep(2) _interactive_page = page # Check if already logged in (not on a login/auth page) url = page.url is_login = "signin" in url or "/ap/" in url or "/auth/" in url if not is_login and "amazon." in url: _login_state = {"status": "logged_in", "message": "Bereits angemeldet"} else: _login_state = {"status": "interactive", "message": "Bitte im Browser anmelden"} logger.info(f"Interaktive Login-Session gestartet, URL: {url}") except Exception as e: logger.error(f"Interaktive Login-Session fehlgeschlagen: {e}") _login_state = {"status": "login_failed", "message": f"Browser konnte nicht gestartet werden: {e}"} if _interactive_page: try: await _interactive_page.close() except Exception: pass _interactive_page = None async def get_browser_screenshot() -> bytes | None: """Take a screenshot of the interactive login page.""" if _interactive_page is None: return None try: return await _interactive_page.screenshot(type="png") except Exception as e: logger.error(f"Screenshot fehlgeschlagen: {e}") return None async def send_browser_click(x: int, y: int): """Forward a mouse click to the interactive browser page.""" global _login_state if _interactive_page is None: return try: await _interactive_page.mouse.click(x, y) await asyncio.sleep(0.3) # Check if login completed after click await _check_interactive_login_complete() except Exception as e: logger.error(f"Browser-Klick fehlgeschlagen: {e}") async def send_browser_type(text: str): """Type text into the currently focused element in the browser.""" global _login_state if _interactive_page is None: return try: await _interactive_page.keyboard.type(text, delay=50) await asyncio.sleep(0.2) except Exception as e: logger.error(f"Browser-Texteingabe fehlgeschlagen: {e}") async def send_browser_key(key: str): """Send a special key (Enter, Tab, Backspace, Escape) to the browser.""" global _login_state if _interactive_page is None: return try: await _interactive_page.keyboard.press(key) await asyncio.sleep(0.5) # Check if login completed after key press (e.g. Enter on password) await _check_interactive_login_complete() except Exception as e: logger.error(f"Browser-Taste fehlgeschlagen: {e}") async def _check_interactive_login_complete(): """Check if the interactive login page has left the login flow.""" global _login_state if _interactive_page is None: return try: url = _interactive_page.url is_login = "signin" in url or "/ap/" in url or "/auth/" in url is_captcha = "captcha" in url.lower() # Check page content for error indicators try: title = await _interactive_page.title() except Exception: title = "" is_error = any(t in title.lower() for t in [ "tut uns leid", "sorry", "fehler", "error", "problem", "bot", "automated", "unusual", ]) is_blocked = "errors" in url or "/hz/approvalrequest" in url if is_error or is_blocked: _login_state = {"status": "interactive", "message": "Amazon blockiert den Zugriff. Versuchen Sie es erneut oder lösen Sie die Sicherheitsabfrage."} logger.warning(f"Interaktiver Login: Error-Seite erkannt. URL: {url}, Titel: {title}") elif is_captcha: _login_state = {"status": "interactive", "message": "Bitte CAPTCHA lösen"} elif not is_login and "amazon." in url: _login_state = {"status": "logged_in", "message": "Erfolgreich angemeldet"} logger.info(f"Interaktiver Login erfolgreich! URL: {url}") except Exception: pass async def close_interactive_login(force_close: bool = False): """Close the interactive login modal. Page stays alive for reuse by process_amazon. Args: force_close: If True, actually close the page (e.g. on error or explicit logout). """ global _interactive_page, _login_state if force_close and _interactive_page is not None: try: await _interactive_page.close() except Exception: pass _interactive_page = None # Reset login state (page stays alive for process_amazon to consume) _login_state = {"status": "idle", "message": ""} logger.info(f"Interactive Login Modal geschlossen (page={'behalten' if _interactive_page else 'geschlossen'})") async def start_login(): """Start interactive Amazon login as background task.""" global _login_state, _otp_future if _login_lock.locked(): return async with _login_lock: _login_state = {"status": "logging_in", "message": "Browser wird gestartet..."} _otp_future = None try: settings = await get_settings() amazon_email = settings.get("amazon_email", "") amazon_password = settings.get("amazon_password", "") domain = settings.get("amazon_domain", "amazon.de") if not amazon_email or not amazon_password: _login_state = { "status": "login_failed", "message": "Amazon E-Mail oder Passwort nicht konfiguriert", } return ctx = await _get_browser_context() page = await ctx.new_page() try: await _do_login(page, domain, amazon_email, amazon_password) finally: await page.close() except Exception as e: logger.error(f"Amazon-Login fehlgeschlagen: {e}") _login_state = { "status": "login_failed", "message": f"Login fehlgeschlagen: {e}", } async def _do_login(page, domain, email, password): """Execute the login flow using semantic Playwright locators.""" global _login_state, _otp_future _login_state = {"status": "logging_in", "message": "Navigiere zu Amazon..."} # Apply stealth and virtual authenticator to avoid bot detection # Stealth is applied at context level await _add_virtual_authenticator(page) # Navigate to order history - Amazon will redirect to login if not authenticated await page.goto( f"https://www.{domain}/gp/css/order-history", wait_until="networkidle", timeout=60000, ) await _save_debug(page, "login_start") # Check if we're already logged in (no redirect to login page) url = page.url if ("order-history" in url or "your-orders" in url) and "signin" not in url and "/ap/" not in url: logger.info("Amazon Login: Bereits eingeloggt!") _login_state = {"status": "logged_in", "message": "Bereits angemeldet"} return await _human_delay() # --- Step 1: Enter email --- _login_state = {"status": "logging_in", "message": "E-Mail wird eingegeben..."} email_field = None for locator in [ page.locator("#ap_email_login"), page.locator("#ap_email"), page.locator("input[name='email']"), page.locator("input[type='email']"), page.get_by_label("Mobiltelefonnummer oder E-Mail-Adresse eingeben"), page.get_by_label("E-Mail"), page.get_by_label("E-Mail-Adresse"), page.get_by_label("Email"), ]: try: if await locator.count() > 0 and await locator.first.is_visible(): email_field = locator.first logger.info("Amazon Login: Email-Feld gefunden") break except Exception: continue if not email_field: await _save_debug(page, "login_no_email_field") _login_state = {"status": "login_failed", "message": "Email-Feld nicht gefunden"} return await email_field.fill(email) await _human_delay(0.5, 1.5) # Click continue button continue_btn = None for locator in [ page.get_by_role("button", name="Weiter"), page.get_by_role("button", name="Continue"), page.locator("#continue"), page.locator("input[type='submit']"), ]: try: if await locator.count() > 0 and await locator.first.is_visible(): continue_btn = locator.first break except Exception: continue if continue_btn: logger.info("Amazon Login: Weiter-Button geklickt") await continue_btn.click() await page.wait_for_load_state("networkidle") await _human_delay() await _save_debug(page, "login_after_email") # Check for CAPTCHA if await page.locator("#auth-captcha-image, #captchacharacters, #cvf-aamation-container, #captcha-container, #aa-challenge-whole-page-iframe").count() > 0 or "captcha" in (await page.title()).lower() or "bestätige deine Identität" in (await page.title()): _login_state = {"status": "login_failed", "message": "CAPTCHA/Sicherheitsabfrage erkannt. Bitte über den interaktiven Browser anmelden."} await _save_debug(page, "login_captcha") return # --- Step 2: Enter password --- _login_state = {"status": "logging_in", "message": "Passwort wird eingegeben..."} pw_field = None for locator in [ page.get_by_label("Passwort"), page.get_by_label("Password"), page.locator("#ap_password"), page.locator("input[name='password']"), page.locator("input[type='password']"), ]: try: if await locator.count() > 0 and await locator.first.is_visible(): pw_field = locator.first logger.info("Amazon Login: Passwort-Feld gefunden") break except Exception: continue if not pw_field: logger.info("Amazon Login: Kein Passwort-Feld sichtbar, prüfe ob bereits eingeloggt...") await _save_debug(page, "login_no_password_field") else: await pw_field.fill(password) await _human_delay(0.5, 1.5) # Click sign-in button signin_btn = None for locator in [ page.get_by_role("button", name="Anmelden"), page.get_by_role("button", name="Sign in"), page.locator("#signInSubmit"), page.locator("#auth-signin-button"), page.locator("input[type='submit']"), ]: try: if await locator.count() > 0 and await locator.first.is_visible(): signin_btn = locator.first break except Exception: continue if signin_btn: logger.info("Amazon Login: Anmelden-Button geklickt") await signin_btn.click() await page.wait_for_load_state("networkidle") await _human_delay(1.5, 3.0) await _save_debug(page, "login_after_password") # Check for CAPTCHA again if await page.locator("#auth-captcha-image, #captchacharacters, #cvf-aamation-container, #captcha-container, #aa-challenge-whole-page-iframe").count() > 0 or "captcha" in (await page.title()).lower() or "bestätige deine Identität" in (await page.title()): _login_state = {"status": "login_failed", "message": "CAPTCHA/Sicherheitsabfrage erkannt. Bitte über den interaktiven Browser anmelden."} await _save_debug(page, "login_captcha") return # --- Step 3: Handle 2FA/OTP --- otp_field = page.locator("#auth-mfa-otpcode, input[name='otpCode'], #ap_dcq_hint") if await otp_field.count() > 0: _login_state = { "status": "awaiting_otp", "message": "Bitte geben Sie den Bestätigungscode ein", } loop = asyncio.get_event_loop() _otp_future = loop.create_future() try: otp_code = await asyncio.wait_for(_otp_future, timeout=300) except asyncio.TimeoutError: _login_state = {"status": "login_failed", "message": "OTP-Zeitüberschreitung (5 Minuten)"} return finally: _otp_future = None _login_state = {"status": "logging_in", "message": "OTP wird eingegeben..."} for sel in ["#auth-mfa-otpcode", "input[name='otpCode']"]: field = page.locator(sel) if await field.count() > 0: await field.first.fill(otp_code) break for sel in ["#auth-signin-button", "input[type='submit']", "#submitButton"]: btn = page.locator(sel) if await btn.count() > 0: await btn.first.click() break await page.wait_for_load_state("networkidle") await _human_delay(1.5, 3.0) # --- Step 4: Handle device approval --- approval = page.locator("#auth-approve-form, .cvf-widget-form-approve") if await approval.count() > 0: _login_state = { "status": "awaiting_otp", "message": "Bitte bestätigen Sie die Anmeldung auf Ihrem Gerät", } for _ in range(60): await asyncio.sleep(2) url = page.url if ("signin" not in url and "/ap/" not in url) or domain + "/?ref" in url: break if await approval.count() == 0: break # --- Verify login success --- url = page.url is_login_page = "signin" in url or "/ap/" in url page_content = await page.content() content_len = len(page_content) is_error_page = "Suchen Sie etwas" in page_content or "Seite wurde nicht gefunden" in page_content is_order_page = "order-history" in url or "your-orders" in url or "Meine Bestellungen" in page_content is_success = not is_login_page and not is_error_page and domain in url and (is_order_page or content_len > 10000) logger.info(f"Amazon Login: URL={url}, is_login_page={is_login_page}, is_error_page={is_error_page}, is_order_page={is_order_page}, content_len={content_len}, success={is_success}") await _save_debug(page, "login_result") if is_success: _login_state = {"status": "logged_in", "message": "Erfolgreich angemeldet"} logger.info("Amazon-Login erfolgreich") else: error_el = page.locator("#auth-error-message-box, .a-alert-content") error_msg = "" if await error_el.count() > 0: error_msg = await error_el.first.inner_text() if is_error_page: error_msg = "Amazon hat den Zugriff blockiert (Fehlerseite). Bitte später erneut versuchen." _login_state = { "status": "login_failed", "message": f"Login fehlgeschlagen. {error_msg}".strip(), } async def submit_otp(code: str) -> bool: """Submit OTP code from web UI.""" global _otp_future if _otp_future is not None and not _otp_future.done(): _otp_future.set_result(code) return True return False async def process_amazon() -> dict: """Main function: fetch Amazon invoices and forward via email.""" if _process_lock.locked(): logger.info("Amazon-Import: Läuft bereits, überspringe") return {"processed": 0, "errors": 0, "error": "Amazon-Abruf läuft bereits"} # Don't start processing while user is logging in (would freeze the browser) if is_interactive_login_active(): logger.info("Amazon-Import: Interaktiver Login läuft, überspringe") return {"processed": 0, "errors": 0, "error": "Bitte zuerst den Login abschließen"} async with _process_lock: return await _process_amazon_inner() async def _process_amazon_inner() -> dict: """Inner processing function (protected by _process_lock).""" global _interactive_page, _processing_active settings = await get_settings() if settings.get("amazon_enabled") != "true": return {"processed": 0, "errors": 0} # Check prerequisites if not settings.get("smtp_server") or not settings.get("import_email"): logger.warning("Amazon-Import: SMTP oder Import-Email nicht konfiguriert") return {"processed": 0, "errors": 0, "error": "SMTP/Import-Email nicht konfiguriert"} if not settings.get("amazon_email") or not settings.get("amazon_password"): logger.warning("Amazon-Import: Zugangsdaten nicht konfiguriert") return {"processed": 0, "errors": 0, "error": "Amazon-Zugangsdaten nicht konfiguriert"} # Without interactive login page, new pages can't authenticate (session bound to page) if _interactive_page is None: logger.info("Amazon-Import: Keine aktive Login-Session, überspringe (bitte zuerst manuell anmelden)") return {"processed": 0, "errors": 0, "error": "Bitte zuerst unter Plattformen bei Amazon anmelden"} domain = settings.get("amazon_domain", "amazon.de") since_str = settings.get("amazon_since_date", "") if since_str: try: since_date = datetime.strptime(since_str, "%Y-%m-%d") except ValueError: logger.warning(f"Amazon: Ungültiges Startdatum: {since_str}") since_date = datetime.now() - timedelta(days=30) else: since_date = datetime.now() - timedelta(days=30) logger.info(f"Amazon-Import gestartet: domain={domain}, seit={since_date.strftime('%Y-%m-%d')}") processed = 0 skipped = 0 errors = 0 # Reuse interactive login page if available (session is bound to the page) reused_page = False if _interactive_page is not None: page = _interactive_page _interactive_page = None # Take ownership _processing_active = True # Signal that session is still valid while processing _login_state = {"status": "idle", "message": ""} # Reset login state reused_page = True logger.info("Amazon: Verwende interaktive Login-Page für Abruf") else: ctx = await _get_browser_context() page = await ctx.new_page() await _add_virtual_authenticator(page) smtp_conn = None try: logger.info("Amazon: SMTP-Verbindung wird hergestellt...") smtp_conn = _connect_smtp(settings) logger.info("Amazon: SMTP-Verbindung OK, verarbeite Bestellungen seitenweise...") import_email = settings.get("import_email_eingang") or settings.get("import_email", "") # Process orders PAGE BY PAGE (collect + process on same page so buttons are visible) result = await _collect_and_process_orders( page, domain, since_date, smtp_conn, settings, import_email ) if result is None: error_detail = "Amazon-Sitzung abgelaufen. Bitte manuell unter Plattformen neu anmelden." logger.warning(f"Amazon-Import: {error_detail}") await add_log_entry( email_subject="Amazon-Import", email_from="Amazon", attachments_count=0, status="error", error_message=error_detail, ) return {"processed": 0, "errors": 0, "error": error_detail} processed, skipped, errors = result["processed"], result["skipped"], result["errors"] batch_done = result.get("batch_done", False) # Update last sync date await save_settings({"amazon_last_sync": datetime.now().strftime("%Y-%m-%d %H:%M")}) # Log summary if processed > 0 and batch_done: summary = f"{processed} Rechnung(en) importiert. Weitere beim nächsten Abruf." await add_log_entry( email_subject="Amazon-Import (Batch)", email_from=f"Amazon ({domain})", attachments_count=processed, status="success", error_message=summary, sent_to=import_email, ) elif processed == 0 and errors == 0: if skipped > 0: summary = f"Alle Rechnungen bereits importiert ({skipped} übersprungen)" else: summary = "Keine neuen Rechnungen gefunden" await add_log_entry( email_subject="Amazon-Import (Zusammenfassung)", email_from=f"Amazon ({domain})", attachments_count=0, status="success", error_message=summary, sent_to="", ) except Exception as e: logger.error(f"Amazon-Import Fehler: {e}") await add_log_entry( email_subject="Amazon-Import", email_from=f"Amazon ({domain})", attachments_count=0, status="error", error_message=str(e), ) return {"processed": processed, "skipped": skipped, "errors": errors + 1, "error": str(e)} finally: _processing_active = False # Keep page alive for next run instead of closing it (preserves session) if reused_page and page: _interactive_page = page # Return page for reuse logger.info("Amazon: Page zurück in Session-Pool (Session bleibt erhalten)") else: await page.close() if smtp_conn: try: smtp_conn.quit() except Exception: pass logger.info(f"Amazon-Import fertig: {processed} verarbeitet, {skipped} übersprungen, {errors} Fehler") return {"processed": processed, "skipped": skipped, "errors": errors} async def _collect_and_process_orders(page, domain, since_date, smtp_conn, settings, import_email) -> dict | None: """Collect orders AND process invoices page by page. Uses BATCH processing: only processes a limited number of invoices per run to avoid Amazon session degradation. The scheduler will pick up remaining orders in subsequent runs (already-imported orders are skipped automatically). Returns None if session is invalid, otherwise dict with processed/skipped/errors counts. """ MAX_INVOICES_PER_RUN = 2 # Limit to avoid Amazon session issues processed = 0 skipped = 0 errors = 0 batch_done = False # Flag: batch limit reached, stop processing # Navigate to orders page if needed actual_url = page.url if "order-history" not in actual_url and "your-orders" not in actual_url: if "signin" in actual_url or "/ap/" in actual_url: return None logger.info("Amazon: Nicht auf Bestellseite, versuche Navigation über Link...") orders_link = page.locator("a[href*='order-history'], a[href*='your-orders']") if await orders_link.count() > 0: await orders_link.first.click() await asyncio.sleep(3) try: await page.wait_for_load_state("networkidle", timeout=15000) except Exception: pass actual_url = page.url if "order-history" not in actual_url and "your-orders" not in actual_url: return None # Reset to page 1 via SPA navigation (NOT page.reload() which kills session!) # Click the "Bestellungen" tab or use the time filter to refresh the order list logger.info(f"Amazon: Refreshe Bestellliste via SPA (aktuelle URL: {actual_url})...") try: refreshed = await page.evaluate("""() => { // Strategy 1: Click the "Bestellungen" tab to reset to page 1 const tabs = document.querySelectorAll('a[href*="your-orders"], a[href*="order-history"]'); for (const tab of tabs) { const text = (tab.innerText || '').trim(); if ((text === 'Bestellungen' || text === 'Orders') && tab.offsetParent !== null) { tab.click(); return 'tab'; } } // Strategy 2: Click pagination page 1 link const page1Links = document.querySelectorAll('.a-pagination a[href*="pagination/1"], .a-pagination li:first-child a'); for (const link of page1Links) { if (link.offsetParent !== null) { link.click(); return 'pagination'; } } // Strategy 3: Click the time filter to trigger a refresh const filterSelect = document.querySelector('select[name="orderFilter"], select#orderFilter, select#time-filter'); if (filterSelect) { // Re-select the current value to trigger change event const event = new Event('change', {bubbles: true}); filterSelect.dispatchEvent(event); return 'filter'; } return null; }""") if refreshed: logger.info(f"Amazon: Bestellliste refreshed via {refreshed}") await asyncio.sleep(3) try: await page.wait_for_load_state("networkidle", timeout=15000) except Exception: pass else: logger.info("Amazon: Kein SPA-Refresh möglich, verwende aktuelle Ansicht") except Exception as e: logger.warning(f"Amazon: SPA-Refresh fehlgeschlagen: {e}") # Try to set time filter now = datetime.now() days_back = (now - since_date).days if days_back <= 30: desired_filter = "last30" elif days_back <= 90: desired_filter = "months-3" else: desired_filter = f"year-{since_date.year}" logger.info(f"Amazon: Setze Zeitfilter: {desired_filter}") try: filter_dropdown = page.locator("select[name='orderFilter'], select#orderFilter, select#time-filter") if await filter_dropdown.count() > 0: await filter_dropdown.first.select_option(desired_filter) await asyncio.sleep(3) try: await page.wait_for_load_state("networkidle", timeout=15000) except Exception: pass else: logger.info("Amazon: Kein Filter-Dropdown gefunden, verwende aktuelle Ansicht") except Exception as e: logger.warning(f"Amazon: Filter setzen fehlgeschlagen: {e}") await asyncio.sleep(2) seen_ids = set() page_num = 1 total_orders = 0 while True: logger.info(f"Amazon: Verarbeite Seite {page_num}...") # Check for login redirect if "signin" in page.url or "/ap/" in page.url: if total_orders > 0: logger.warning(f"Amazon: Login-Redirect auf Seite {page_num}, breche ab") break return None # Extract orders from current page page_orders = await _extract_orders_from_page(page, since_date) new_orders = [o for o in page_orders if o["id"] not in seen_ids] for o in new_orders: seen_ids.add(o["id"]) logger.info(f"Amazon: Seite {page_num}: {len(page_orders)} gefunden, {len(new_orders)} neu") total_orders += len(new_orders) # Process invoices for THIS page's orders immediately for order in new_orders: # Check batch limit if processed >= MAX_INVOICES_PER_RUN: batch_done = True logger.info(f"Amazon: Batch-Limit erreicht ({MAX_INVOICES_PER_RUN} Rechnungen). Rest beim nächsten Abruf.") break order_id = order.get("id", "?") try: if await is_invoice_downloaded(order_id, order_id): skipped += 1 logger.debug(f"Amazon: Bestellung {order_id} bereits importiert") continue pdf_list = await _download_order_invoices(page, domain, order_id) if not pdf_list: logger.debug(f"Amazon: Keine Rechnung für Bestellung {order_id}") continue for inv_idx, pdf_bytes in enumerate(pdf_list): suffix = f"_{inv_idx+1}" if len(pdf_list) > 1 else "" try: filename = f"Amazon_Rechnung_{order_id}{suffix}.pdf" if settings.get("debug_save_amazon_pdfs") == "true": try: tmp_dir = Path(os.environ.get("UPLOAD_DIR", "/data/uploads")) / "amazon_invoices" tmp_dir.mkdir(parents=True, exist_ok=True) (tmp_dir / filename).write_bytes(pdf_bytes) logger.info(f"Amazon: Debug-PDF gespeichert: {tmp_dir / filename} ({len(pdf_bytes)} Bytes)") except Exception as e: logger.warning(f"Amazon: Debug-PDF speichern fehlgeschlagen: {e}") forward_msg = _build_forward_email( from_addr=settings.get("smtp_username", ""), to_addr=import_email, original_subject=f"Amazon Rechnung - Bestellung {order_id}{suffix}", original_from=f"Amazon ({domain})", attachments=[(filename, pdf_bytes)], ) smtp_log = _send_with_log(smtp_conn, forward_msg) processed += 1 logger.info(f"Amazon: Rechnung {inv_idx+1}/{len(pdf_list)} für {order_id} gesendet") await add_log_entry( email_subject=f"Amazon Rechnung - {order_id}{suffix}", email_from=f"Amazon ({domain})", attachments_count=1, status="success", sent_to=import_email, smtp_log=smtp_log, ) except Exception as e: errors += 1 logger.error(f"Amazon: Fehler bei Rechnung {inv_idx+1} für {order_id}: {e}") await add_log_entry( email_subject=f"Amazon Rechnung - {order_id}{suffix}", email_from=f"Amazon ({domain})", attachments_count=0, status="error", error_message=str(e), ) await mark_invoice_downloaded(order_id, order_id) # Long delay between orders to avoid Amazon rate-limiting await _human_delay(8.0, 15.0) except Exception as e: errors += 1 logger.error(f"Amazon: Fehler bei Bestellung {order_id}: {e}") await add_log_entry( email_subject=f"Amazon Rechnung - {order_id}", email_from=f"Amazon ({domain})", attachments_count=0, status="error", error_message=str(e), ) # Stop if batch limit reached if batch_done: break # Navigate to next page has_next = await page.evaluate("""() => { const nextLink = document.querySelector('.a-pagination .a-last:not(.a-disabled) a'); if (nextLink) { nextLink.scrollIntoView({behavior: 'smooth', block: 'center'}); return true; } return false; }""") if has_next and page_orders: logger.info("Amazon: Klicke auf nächste Seite (JS)...") await asyncio.sleep(0.5) await page.evaluate("""() => { const nextLink = document.querySelector('.a-pagination .a-last:not(.a-disabled) a'); if (nextLink) nextLink.click(); }""") await asyncio.sleep(3) try: await page.wait_for_load_state("networkidle", timeout=30000) except Exception: pass page_num += 1 await _human_delay(1.0, 2.0) else: break status = "Batch-Limit" if batch_done else "komplett" logger.info(f"Amazon: Gesamt {total_orders} Bestellungen auf {page_num} Seite(n), Status: {status}") return {"processed": processed, "skipped": skipped, "errors": errors, "batch_done": batch_done} async def _collect_orders(page, domain: str, since_date: datetime) -> list[dict] | None: """Collect order IDs from Amazon order history using SPA navigation (no page.goto). The page must already be on the Amazon orders page (from interactive login). Uses dropdown/click navigation to avoid losing the session. """ orders = [] actual_url = page.url logger.info(f"Amazon: Aktuelle Seite: {actual_url}") # Check if we're on the orders page or need to navigate there if "order-history" not in actual_url and "your-orders" not in actual_url: if "signin" in actual_url or "/ap/" in actual_url: logger.error("Amazon: Seite ist Login-Seite - Session ungültig!") await _save_debug(page, "orders_not_on_orders_page") return None # Try clicking the orders link within Amazon's SPA logger.info("Amazon: Nicht auf Bestellseite, versuche Navigation über Link...") orders_link = page.locator("a[href*='order-history'], a[href*='your-orders'], a:has-text('Bestellungen'), a:has-text('Meine Bestellungen')") if await orders_link.count() > 0: await orders_link.first.click() await asyncio.sleep(3) try: await page.wait_for_load_state("networkidle", timeout=15000) except Exception: pass actual_url = page.url if "order-history" not in actual_url and "your-orders" not in actual_url: logger.error(f"Amazon: Konnte nicht zur Bestellseite navigieren. URL: {actual_url}") await _save_debug(page, "orders_navigation_failed") return None # Determine desired time filter now = datetime.now() days_back = (now - since_date).days if days_back <= 30: desired_filter = "last30" elif days_back <= 90: desired_filter = "months-3" else: desired_filter = f"year-{since_date.year}" # Try to set the time filter via the dropdown logger.info(f"Amazon: Setze Zeitfilter: {desired_filter}") try: filter_dropdown = page.locator("select[name='orderFilter'], select#orderFilter, select#time-filter") if await filter_dropdown.count() > 0: await filter_dropdown.first.select_option(desired_filter) await asyncio.sleep(3) try: await page.wait_for_load_state("networkidle", timeout=15000) except Exception: pass logger.info(f"Amazon: Zeitfilter '{desired_filter}' gesetzt") else: logger.info("Amazon: Kein Filter-Dropdown gefunden, verwende aktuelle Ansicht") except Exception as e: logger.warning(f"Amazon: Filter setzen fehlgeschlagen: {e}") # Wait for content to load await asyncio.sleep(2) seen_ids = set() page_num = 1 while True: logger.info(f"Amazon: Verarbeite Seite {page_num}...") # Check for login redirect if "signin" in page.url or "/ap/" in page.url: if orders: logger.warning(f"Amazon: Login-Redirect auf Seite {page_num}, verwende {len(orders)} bereits gefundene Bestellung(en)") return orders logger.error("Amazon: Session ungültig!") await _save_debug(page, "orders_redirect_login") return None page_orders = await _extract_orders_from_page(page, since_date) new_orders = [o for o in page_orders if o["id"] not in seen_ids] for o in new_orders: seen_ids.add(o["id"]) logger.info(f"Amazon: Seite {page_num}: {len(page_orders)} gefunden, {len(new_orders)} neu") orders.extend(new_orders) # Try to click "Next" button for pagination via JavaScript (avoids visibility issues) has_next = await page.evaluate("""() => { const nextLink = document.querySelector('.a-pagination .a-last:not(.a-disabled) a'); if (nextLink) { nextLink.scrollIntoView({behavior: 'smooth', block: 'center'}); return true; } return false; }""") if has_next and page_orders: logger.info("Amazon: Klicke auf nächste Seite (JS)...") await asyncio.sleep(0.5) # Wait for scroll # Use JavaScript click to bypass Playwright visibility checks await page.evaluate("""() => { const nextLink = document.querySelector('.a-pagination .a-last:not(.a-disabled) a'); if (nextLink) nextLink.click(); }""") await asyncio.sleep(3) try: await page.wait_for_load_state("networkidle", timeout=30000) except Exception: pass page_num += 1 await _human_delay(1.0, 2.0) else: break if not orders: logger.warning("Amazon: Keine Bestellungen gefunden!") await _save_debug(page, "no_orders_found") return orders async def _extract_orders_from_page(page, since_date: datetime) -> list[dict]: """Extract order data from the current page.""" orders = [] title = await page.title() logger.info(f"Amazon: Seite analysieren: Titel='{title}', URL={page.url}") await _save_debug(page, "order_page") # Use JavaScript to extract only VISIBLE order cards (Amazon loads all in DOM, shows ~10 per page) visible_orders = await page.evaluate("""() => { const results = []; // Try multiple selectors const selectors = [ '.order-card.js-order-card', '.order-card', '.order-info', '.a-box-group.order', '.order', ]; const seen = new Set(); for (const sel of selectors) { for (const el of document.querySelectorAll(sel)) { // Only process visible elements (offsetParent !== null or check display) if (el.offsetParent === null && getComputedStyle(el).position !== 'fixed') continue; const text = el.innerText || ''; const idMatch = text.match(/(\d{3}-\d{7}-\d{7})/); if (idMatch && !seen.has(idMatch[1])) { seen.add(idMatch[1]); results.push({id: idMatch[1], text: text.substring(0, 500)}); } } if (results.length > 0) break; } return results; }""") logger.info(f"Amazon: Sichtbare Order-Cards gefunden: {len(visible_orders)}") if not visible_orders: # Last resort: regex fallback on visible page text visible_text = await page.evaluate("() => document.body.innerText") order_ids = re.findall(r"\b(\d{3}-\d{7}-\d{7})\b", visible_text) unique_ids = {oid for oid in set(order_ids) if not oid.startswith("000-")} logger.info(f"Amazon: Keine Order-Cards, Fallback-Regex: {len(unique_ids)} Bestell-ID(s) im sichtbaren Text") if not unique_ids: logger.warning(f"Amazon: Seite hat keine Bestell-IDs. Titel: '{title}', URL: {page.url}") for oid in unique_ids: orders.append({"id": oid, "date": None}) return orders for vo in visible_orders: order_id = vo["id"] if order_id.startswith("000-"): continue order_date = _parse_german_date(vo["text"]) if order_date and order_date < since_date: logger.debug(f"Amazon: Bestellung {order_id} übersprungen (Datum {order_date.strftime('%Y-%m-%d')} < {since_date.strftime('%Y-%m-%d')})") continue logger.debug(f"Amazon: Bestellung gefunden: {order_id}, Datum: {order_date}") orders.append({"id": order_id, "date": order_date}) return orders def _parse_german_date(text: str) -> datetime | None: """Parse German date formats from order text.""" months_de = { "Januar": 1, "Februar": 2, "März": 3, "April": 4, "Mai": 5, "Juni": 6, "Juli": 7, "August": 8, "September": 9, "Oktober": 10, "November": 11, "Dezember": 12, } pattern = r"(\d{1,2})\.\s*(" + "|".join(months_de.keys()) + r")\s+(\d{4})" match = re.search(pattern, text) if match: day = int(match.group(1)) month = months_de[match.group(2)] year = int(match.group(3)) try: return datetime(year, month, day) except ValueError: pass match = re.search(r"(\d{2})\.(\d{2})\.(\d{4})", text) if match: try: return datetime(int(match.group(3)), int(match.group(2)), int(match.group(1))) except ValueError: pass return None async def _close_all_popovers(page): """Close all open Amazon popovers reliably. IMPORTANT: Do NOT set display:none - Amazon recycles popover containers, so hiding them prevents future popovers from appearing. """ try: await page.evaluate("""() => { // Close via close buttons document.querySelectorAll('.a-popover-footer button, .a-popover .a-button-close, .a-popover-close').forEach(b => { try { b.click(); } catch(e) {} }); // Use Amazon's own popover API to close if available if (window.P && window.P.when) { try { window.P.when('A').execute(function(A) { if (A && A.popover) { document.querySelectorAll('.a-popover:not(.a-popover-hidden)').forEach(p => { const id = p.getAttribute('data-a-popover-id'); if (id) try { A.popover.close(id); } catch(e) {} }); } }); } catch(e) {} } // Click outside to dismiss any remaining popovers document.body.click(); }""") await asyncio.sleep(0.5) except Exception: pass async def _download_order_invoices(page, domain: str, order_id: str) -> list[bytes]: """Download invoice PDFs for an order. Strategy: Extract popover AJAX URL from data-a-popover attribute, then use XMLHttpRequest with proper Amazon headers (anti-CSRF token, X-Requested-With) to load the invoice popover HTML. This is exactly what Amazon's own JavaScript does internally. """ import base64 pdfs = [] logger.info(f"Amazon: Hole Rechnungs-Links für {order_id}") # Step 1: Extract the popover AJAX URL and download links via XMLHttpRequest invoice_result = await page.evaluate(f"""async () => {{ // Find the order card containing this order ID const cards = document.querySelectorAll('.order-card, .order-info, .a-box-group, div'); let popoverUrl = null; for (const card of cards) {{ if (!card.innerText || !card.innerText.includes('{order_id}')) continue; // Find the popover trigger with invoice URL const triggers = card.querySelectorAll('[data-a-popover*="invoice"]'); for (const trigger of triggers) {{ try {{ const config = JSON.parse(trigger.getAttribute('data-a-popover')); if (config && config.url && config.url.includes('{order_id}')) {{ popoverUrl = config.url; break; }} }} catch(e) {{}} }} if (popoverUrl) break; }} if (!popoverUrl) {{ return {{ found: false, error: 'Kein Popover-URL gefunden' }}; }} // Step 2: Make XMLHttpRequest with proper Amazon headers try {{ const response = await new Promise((resolve, reject) => {{ const xhr = new XMLHttpRequest(); xhr.open('GET', popoverUrl, true); xhr.setRequestHeader('X-Requested-With', 'XMLHttpRequest'); xhr.setRequestHeader('Accept', 'text/html,*/*'); xhr.onload = function() {{ resolve({{ status: xhr.status, html: xhr.responseText }}); }}; xhr.onerror = function() {{ reject(new Error('XHR failed')); }}; xhr.send(); }}); if (response.status !== 200) {{ return {{ found: false, error: 'HTTP ' + response.status, url: popoverUrl }}; }} const html = response.html; // Check if response is a login page if (html.includes('ap_signin') || html.includes('ap_error') || html.includes('/ap/')) {{ return {{ found: false, error: 'Login-Seite erhalten', url: popoverUrl, isLogin: true }}; }} // Extract PDF download links from the response HTML const parser = new DOMParser(); const doc = parser.parseFromString(html, 'text/html'); const links = doc.querySelectorAll('a[href]'); const pdfLinks = []; for (const link of links) {{ const href = link.getAttribute('href') || ''; const text = (link.innerText || '').trim(); if (href.includes('/ap/') || href.includes('openid')) continue; if (href.includes('contact.html') || href.includes('help/contact')) continue; if (text.toLowerCase().includes('anfordern')) continue; if ( href.includes('.pdf') || href.includes('documents/download') || href.includes('/document/') || href.includes('invoice/download') || href.includes('generated_invoices') ) {{ pdfLinks.push({{ href: href, text: text.substring(0, 100) }}); }} }} return {{ found: true, url: popoverUrl, links: pdfLinks, htmlSize: html.length }}; }} catch(e) {{ return {{ found: false, error: e.message, url: popoverUrl }}; }} }}""") logger.info(f"Amazon: Invoice-Ergebnis für {order_id}: found={invoice_result.get('found')}, " f"links={invoice_result.get('links', [])}, error={invoice_result.get('error', '')}") if not invoice_result.get("found") or not invoice_result.get("links"): if invoice_result.get("isLogin"): logger.warning(f"Amazon: Session abgelaufen beim Rechnungsabruf für {order_id}") return pdfs # Step 3: Download each PDF via XMLHttpRequest as base64 for link_info in invoice_result["links"]: href = link_info["href"] text = link_info.get("text", "") # Make href absolute if relative if href.startswith("/"): fetch_href = href elif href.startswith("http"): from urllib.parse import urlparse parsed = urlparse(href) fetch_href = parsed.path + ("?" + parsed.query if parsed.query else "") else: fetch_href = "/" + href logger.info(f"Amazon: Lade PDF '{text}' -> {fetch_href[:100]}") try: pdf_result = await page.evaluate(f"""async () => {{ try {{ const resp = await new Promise((resolve, reject) => {{ const xhr = new XMLHttpRequest(); xhr.open('GET', '{fetch_href}', true); xhr.responseType = 'arraybuffer'; xhr.onload = function() {{ const bytes = new Uint8Array(xhr.response); let binary = ''; for (let i = 0; i < bytes.length; i++) {{ binary += String.fromCharCode(bytes[i]); }} resolve({{ ok: xhr.status === 200, status: xhr.status, data: btoa(binary), size: bytes.length, contentType: xhr.getResponseHeader('content-type') || '' }}); }}; xhr.onerror = function() {{ reject(new Error('XHR failed')); }}; xhr.send(); }}); return resp; }} catch(e) {{ return {{ ok: false, error: e.message }}; }} }}""") if pdf_result and pdf_result.get("ok") and pdf_result.get("size", 0) > 500: pdf_bytes = base64.b64decode(pdf_result["data"]) content_type = pdf_result.get("contentType", "") if pdf_bytes[:5] == b"%PDF-" or "pdf" in content_type.lower(): logger.info(f"Amazon: PDF heruntergeladen für {order_id}: {len(pdf_bytes)} Bytes") pdfs.append(pdf_bytes) else: logger.debug(f"Amazon: Download kein PDF für {order_id} (type: {content_type}, size: {len(pdf_bytes)})") elif pdf_result: logger.debug(f"Amazon: PDF-Download fehlgeschlagen für {order_id}: {pdf_result.get('error', 'status=' + str(pdf_result.get('status')))}") except Exception as e: logger.warning(f"Amazon: PDF-Download Exception für {order_id}: {e}") if not pdfs: logger.info(f"Amazon: Keine PDFs für {order_id}") return pdfs