belege-import/app/amazon_processor.py

1385 lines
54 KiB
Python

import asyncio
import logging
import os
import random
import re
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
from app.database import get_settings, save_settings, add_log_entry, is_invoice_downloaded, mark_invoice_downloaded
from app.mail_processor import _connect_smtp, _build_forward_email, _send_with_log
logger = logging.getLogger(__name__)
SESSION_DIR = Path(os.environ.get("AMAZON_SESSION_DIR", "/data/amazon_session"))
DEBUG_DIR = Path(os.environ.get("UPLOAD_DIR", "/data/uploads")) / "amazon_debug"
# Login state machine
_login_state = {"status": "idle", "message": ""}
_login_lock = asyncio.Lock()
_otp_future: asyncio.Future | None = None
_browser_context = None
_playwright_instance = None
# Process lock to prevent concurrent runs
_process_lock = asyncio.Lock()
# Flag: True while process_amazon is actively working (page consumed but session valid)
_processing_active = False
# Interactive login session (browser page kept alive for user interaction)
_interactive_page = None
async def _human_delay(min_s: float = 1.0, max_s: float = 3.0):
"""Random delay to mimic human behavior."""
await asyncio.sleep(random.uniform(min_s, max_s))
async def _apply_stealth_to_context(context):
"""Apply stealth measures to the browser context (all pages)."""
try:
from playwright_stealth import Stealth
stealth = Stealth()
await stealth.apply_stealth_async(context)
logger.info("Stealth erfolgreich auf Browser-Kontext angewendet")
except ImportError:
logger.warning("playwright-stealth nicht installiert, überspringe")
except Exception as e:
logger.warning(f"Stealth konnte nicht angewendet werden: {e}")
async def _add_virtual_authenticator(page):
"""Add virtual WebAuthn authenticator to prevent passkey dialogs."""
try:
client = await page.context.new_cdp_session(page)
await client.send("WebAuthn.enable")
await client.send("WebAuthn.addVirtualAuthenticator", {
"options": {
"protocol": "ctap2",
"transport": "internal",
"hasResidentKey": True,
"hasUserVerification": True,
"isUserVerified": True,
"automaticPresenceSimulation": True,
}
})
logger.debug("Virtueller WebAuthn-Authenticator hinzugefügt")
except Exception as e:
logger.debug(f"Virtueller Authenticator fehlgeschlagen: {e}")
async def _get_browser_context():
"""Get or create persistent Chromium browser context."""
global _browser_context, _playwright_instance
if _browser_context is not None:
try:
# Check if context is still alive
pages = _browser_context.pages
return _browser_context
except Exception:
_browser_context = None
from playwright.async_api import async_playwright
SESSION_DIR.mkdir(parents=True, exist_ok=True)
# Clean up stale Chromium lock files from previous container runs
for lock_file in ["SingletonLock", "SingletonSocket", "SingletonCookie"]:
lock_path = SESSION_DIR / lock_file
if lock_path.exists():
try:
lock_path.unlink()
logger.info(f"Stale Lock-File entfernt: {lock_file}")
except Exception:
pass
if _playwright_instance is None:
_playwright_instance = await async_playwright().start()
_browser_context = await _playwright_instance.chromium.launch_persistent_context(
user_data_dir=str(SESSION_DIR),
headless=True,
locale="de-DE",
user_agent=(
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
),
viewport={"width": 1280, "height": 800},
args=[
"--disable-blink-features=AutomationControlled",
"--disable-gpu",
"--disable-dev-shm-usage",
"--disable-extensions",
"--disable-background-networking",
"--disable-translate",
"--no-first-run",
"--no-sandbox",
],
)
await _apply_stealth_to_context(_browser_context)
return _browser_context
async def close_browser_context():
"""Close browser context and playwright instance."""
global _browser_context, _playwright_instance
if _browser_context is not None:
try:
await _browser_context.close()
except Exception:
pass
_browser_context = None
if _playwright_instance is not None:
try:
await _playwright_instance.stop()
except Exception:
pass
_playwright_instance = None
def get_login_state() -> dict:
"""Return current login state for polling."""
return dict(_login_state)
async def _save_debug(page, name: str):
"""Save screenshot and HTML dump for debugging (max 50 files)."""
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
try:
DEBUG_DIR.mkdir(parents=True, exist_ok=True)
# Limit to 50 files - delete oldest if over limit
existing = sorted(DEBUG_DIR.iterdir(), key=lambda p: p.stat().st_mtime)
while len(existing) > 48: # leave room for 2 new files
existing.pop(0).unlink()
except Exception as e:
logger.error(f"Amazon Debug-Verzeichnis Fehler: {e}")
return
# Save HTML (most reliable)
try:
html_path = DEBUG_DIR / f"{name}_{ts}.html"
content = await page.content()
html_path.write_text(content, encoding="utf-8")
logger.info(f"Amazon Debug-HTML gespeichert: {html_path} ({len(content)} Bytes)")
except Exception as e:
logger.error(f"Amazon Debug-HTML fehlgeschlagen: {e}")
# Save screenshot
try:
png_path = DEBUG_DIR / f"{name}_{ts}.png"
await page.screenshot(path=str(png_path), full_page=True)
logger.info(f"Amazon Debug-Screenshot gespeichert: {png_path}")
except Exception as e:
logger.error(f"Amazon Debug-Screenshot fehlgeschlagen: {e}")
async def check_session_valid() -> bool:
"""Check if an active Amazon session exists.
Returns True if we have a live interactive page, or if processing is active
(page consumed but still working), or if login is in progress.
"""
has_page = _interactive_page is not None
is_logging_in = _login_state.get("status") in ("interactive", "logging_in")
logger.info(f"Amazon Session-Check: has_page={has_page}, login_active={is_logging_in}, processing={_processing_active}")
return has_page or is_logging_in or _processing_active
def is_interactive_login_active() -> bool:
"""Check if interactive login modal is currently open (browser in use by user).
Also returns True if user has logged in but hasn't closed the modal yet.
"""
status = _login_state.get("status", "idle")
# Active if login dialog is open (interactive, logging_in, or logged_in but page still held)
if status in ("interactive", "logging_in"):
return True
if status == "logged_in" and _interactive_page is not None:
return True
return False
async def clear_session():
"""Clear browser session data."""
global _login_state, _interactive_page
await close_interactive_login(force_close=True)
await close_browser_context()
# Remove session files
if SESSION_DIR.exists():
import shutil
try:
shutil.rmtree(SESSION_DIR)
except Exception as e:
logger.warning(f"Session-Verzeichnis konnte nicht gelöscht werden: {e}")
SESSION_DIR.mkdir(parents=True, exist_ok=True)
_login_state = {"status": "idle", "message": ""}
# --- Interactive Login (user solves CAPTCHAs via screenshot/click/type) ---
async def start_interactive_login():
"""Open browser page to Amazon login and keep it alive for user interaction."""
global _login_state, _interactive_page
if _interactive_page is not None:
# Already have an interactive session
return
if _process_lock.locked():
_login_state = {"status": "login_failed", "message": "Amazon-Abruf läuft gerade. Bitte warten bis der Abruf fertig ist."}
return
_login_state = {"status": "interactive", "message": "Browser wird gestartet..."}
try:
settings = await get_settings()
domain = settings.get("amazon_domain", "amazon.de")
ctx = await _get_browser_context()
page = await ctx.new_page()
# Stealth is applied at context level
await _add_virtual_authenticator(page)
# Navigate to order history - Amazon redirects to login if not authenticated
await page.goto(
f"https://www.{domain}/gp/css/order-history",
wait_until="domcontentloaded",
timeout=60000,
)
# Wait a bit for page to settle
await asyncio.sleep(2)
_interactive_page = page
# Check if already logged in (not on a login/auth page)
url = page.url
is_login = "signin" in url or "/ap/" in url or "/auth/" in url
if not is_login and "amazon." in url:
_login_state = {"status": "logged_in", "message": "Bereits angemeldet"}
else:
_login_state = {"status": "interactive", "message": "Bitte im Browser anmelden"}
logger.info(f"Interaktive Login-Session gestartet, URL: {url}")
except Exception as e:
logger.error(f"Interaktive Login-Session fehlgeschlagen: {e}")
_login_state = {"status": "login_failed", "message": f"Browser konnte nicht gestartet werden: {e}"}
if _interactive_page:
try:
await _interactive_page.close()
except Exception:
pass
_interactive_page = None
async def get_browser_screenshot() -> bytes | None:
"""Take a screenshot of the interactive login page."""
if _interactive_page is None:
return None
try:
return await _interactive_page.screenshot(type="png")
except Exception as e:
logger.error(f"Screenshot fehlgeschlagen: {e}")
return None
async def send_browser_click(x: int, y: int):
"""Forward a mouse click to the interactive browser page."""
global _login_state
if _interactive_page is None:
return
try:
await _interactive_page.mouse.click(x, y)
await asyncio.sleep(0.3)
# Check if login completed after click
await _check_interactive_login_complete()
except Exception as e:
logger.error(f"Browser-Klick fehlgeschlagen: {e}")
async def send_browser_type(text: str):
"""Type text into the currently focused element in the browser."""
global _login_state
if _interactive_page is None:
return
try:
await _interactive_page.keyboard.type(text, delay=50)
await asyncio.sleep(0.2)
except Exception as e:
logger.error(f"Browser-Texteingabe fehlgeschlagen: {e}")
async def send_browser_key(key: str):
"""Send a special key (Enter, Tab, Backspace, Escape) to the browser."""
global _login_state
if _interactive_page is None:
return
try:
await _interactive_page.keyboard.press(key)
await asyncio.sleep(0.5)
# Check if login completed after key press (e.g. Enter on password)
await _check_interactive_login_complete()
except Exception as e:
logger.error(f"Browser-Taste fehlgeschlagen: {e}")
async def _check_interactive_login_complete():
"""Check if the interactive login page has left the login flow."""
global _login_state
if _interactive_page is None:
return
try:
url = _interactive_page.url
is_login = "signin" in url or "/ap/" in url or "/auth/" in url
is_captcha = "captcha" in url.lower()
# Check page content for error indicators
try:
title = await _interactive_page.title()
except Exception:
title = ""
is_error = any(t in title.lower() for t in [
"tut uns leid", "sorry", "fehler", "error",
"problem", "bot", "automated", "unusual",
])
is_blocked = "errors" in url or "/hz/approvalrequest" in url
if is_error or is_blocked:
_login_state = {"status": "interactive", "message": "Amazon blockiert den Zugriff. Versuchen Sie es erneut oder lösen Sie die Sicherheitsabfrage."}
logger.warning(f"Interaktiver Login: Error-Seite erkannt. URL: {url}, Titel: {title}")
elif is_captcha:
_login_state = {"status": "interactive", "message": "Bitte CAPTCHA lösen"}
elif not is_login and "amazon." in url:
_login_state = {"status": "logged_in", "message": "Erfolgreich angemeldet"}
logger.info(f"Interaktiver Login erfolgreich! URL: {url}")
except Exception:
pass
async def close_interactive_login(force_close: bool = False):
"""Close the interactive login modal. Page stays alive for reuse by process_amazon.
Args:
force_close: If True, actually close the page (e.g. on error or explicit logout).
"""
global _interactive_page, _login_state
if force_close and _interactive_page is not None:
try:
await _interactive_page.close()
except Exception:
pass
_interactive_page = None
# Reset login state (page stays alive for process_amazon to consume)
_login_state = {"status": "idle", "message": ""}
logger.info(f"Interactive Login Modal geschlossen (page={'behalten' if _interactive_page else 'geschlossen'})")
async def start_login():
"""Start interactive Amazon login as background task."""
global _login_state, _otp_future
if _login_lock.locked():
return
async with _login_lock:
_login_state = {"status": "logging_in", "message": "Browser wird gestartet..."}
_otp_future = None
try:
settings = await get_settings()
amazon_email = settings.get("amazon_email", "")
amazon_password = settings.get("amazon_password", "")
domain = settings.get("amazon_domain", "amazon.de")
if not amazon_email or not amazon_password:
_login_state = {
"status": "login_failed",
"message": "Amazon E-Mail oder Passwort nicht konfiguriert",
}
return
ctx = await _get_browser_context()
page = await ctx.new_page()
try:
await _do_login(page, domain, amazon_email, amazon_password)
finally:
await page.close()
except Exception as e:
logger.error(f"Amazon-Login fehlgeschlagen: {e}")
_login_state = {
"status": "login_failed",
"message": f"Login fehlgeschlagen: {e}",
}
async def _do_login(page, domain, email, password):
"""Execute the login flow using semantic Playwright locators."""
global _login_state, _otp_future
_login_state = {"status": "logging_in", "message": "Navigiere zu Amazon..."}
# Apply stealth and virtual authenticator to avoid bot detection
# Stealth is applied at context level
await _add_virtual_authenticator(page)
# Navigate to order history - Amazon will redirect to login if not authenticated
await page.goto(
f"https://www.{domain}/gp/css/order-history",
wait_until="networkidle",
timeout=60000,
)
await _save_debug(page, "login_start")
# Check if we're already logged in (no redirect to login page)
url = page.url
if ("order-history" in url or "your-orders" in url) and "signin" not in url and "/ap/" not in url:
logger.info("Amazon Login: Bereits eingeloggt!")
_login_state = {"status": "logged_in", "message": "Bereits angemeldet"}
return
await _human_delay()
# --- Step 1: Enter email ---
_login_state = {"status": "logging_in", "message": "E-Mail wird eingegeben..."}
email_field = None
for locator in [
page.locator("#ap_email_login"),
page.locator("#ap_email"),
page.locator("input[name='email']"),
page.locator("input[type='email']"),
page.get_by_label("Mobiltelefonnummer oder E-Mail-Adresse eingeben"),
page.get_by_label("E-Mail"),
page.get_by_label("E-Mail-Adresse"),
page.get_by_label("Email"),
]:
try:
if await locator.count() > 0 and await locator.first.is_visible():
email_field = locator.first
logger.info("Amazon Login: Email-Feld gefunden")
break
except Exception:
continue
if not email_field:
await _save_debug(page, "login_no_email_field")
_login_state = {"status": "login_failed", "message": "Email-Feld nicht gefunden"}
return
await email_field.fill(email)
await _human_delay(0.5, 1.5)
# Click continue button
continue_btn = None
for locator in [
page.get_by_role("button", name="Weiter"),
page.get_by_role("button", name="Continue"),
page.locator("#continue"),
page.locator("input[type='submit']"),
]:
try:
if await locator.count() > 0 and await locator.first.is_visible():
continue_btn = locator.first
break
except Exception:
continue
if continue_btn:
logger.info("Amazon Login: Weiter-Button geklickt")
await continue_btn.click()
await page.wait_for_load_state("networkidle")
await _human_delay()
await _save_debug(page, "login_after_email")
# Check for CAPTCHA
if await page.locator("#auth-captcha-image, #captchacharacters, #cvf-aamation-container, #captcha-container, #aa-challenge-whole-page-iframe").count() > 0 or "captcha" in (await page.title()).lower() or "bestätige deine Identität" in (await page.title()):
_login_state = {"status": "login_failed", "message": "CAPTCHA/Sicherheitsabfrage erkannt. Bitte über den interaktiven Browser anmelden."}
await _save_debug(page, "login_captcha")
return
# --- Step 2: Enter password ---
_login_state = {"status": "logging_in", "message": "Passwort wird eingegeben..."}
pw_field = None
for locator in [
page.get_by_label("Passwort"),
page.get_by_label("Password"),
page.locator("#ap_password"),
page.locator("input[name='password']"),
page.locator("input[type='password']"),
]:
try:
if await locator.count() > 0 and await locator.first.is_visible():
pw_field = locator.first
logger.info("Amazon Login: Passwort-Feld gefunden")
break
except Exception:
continue
if not pw_field:
logger.info("Amazon Login: Kein Passwort-Feld sichtbar, prüfe ob bereits eingeloggt...")
await _save_debug(page, "login_no_password_field")
else:
await pw_field.fill(password)
await _human_delay(0.5, 1.5)
# Click sign-in button
signin_btn = None
for locator in [
page.get_by_role("button", name="Anmelden"),
page.get_by_role("button", name="Sign in"),
page.locator("#signInSubmit"),
page.locator("#auth-signin-button"),
page.locator("input[type='submit']"),
]:
try:
if await locator.count() > 0 and await locator.first.is_visible():
signin_btn = locator.first
break
except Exception:
continue
if signin_btn:
logger.info("Amazon Login: Anmelden-Button geklickt")
await signin_btn.click()
await page.wait_for_load_state("networkidle")
await _human_delay(1.5, 3.0)
await _save_debug(page, "login_after_password")
# Check for CAPTCHA again
if await page.locator("#auth-captcha-image, #captchacharacters, #cvf-aamation-container, #captcha-container, #aa-challenge-whole-page-iframe").count() > 0 or "captcha" in (await page.title()).lower() or "bestätige deine Identität" in (await page.title()):
_login_state = {"status": "login_failed", "message": "CAPTCHA/Sicherheitsabfrage erkannt. Bitte über den interaktiven Browser anmelden."}
await _save_debug(page, "login_captcha")
return
# --- Step 3: Handle 2FA/OTP ---
otp_field = page.locator("#auth-mfa-otpcode, input[name='otpCode'], #ap_dcq_hint")
if await otp_field.count() > 0:
_login_state = {
"status": "awaiting_otp",
"message": "Bitte geben Sie den Bestätigungscode ein",
}
loop = asyncio.get_event_loop()
_otp_future = loop.create_future()
try:
otp_code = await asyncio.wait_for(_otp_future, timeout=300)
except asyncio.TimeoutError:
_login_state = {"status": "login_failed", "message": "OTP-Zeitüberschreitung (5 Minuten)"}
return
finally:
_otp_future = None
_login_state = {"status": "logging_in", "message": "OTP wird eingegeben..."}
for sel in ["#auth-mfa-otpcode", "input[name='otpCode']"]:
field = page.locator(sel)
if await field.count() > 0:
await field.first.fill(otp_code)
break
for sel in ["#auth-signin-button", "input[type='submit']", "#submitButton"]:
btn = page.locator(sel)
if await btn.count() > 0:
await btn.first.click()
break
await page.wait_for_load_state("networkidle")
await _human_delay(1.5, 3.0)
# --- Step 4: Handle device approval ---
approval = page.locator("#auth-approve-form, .cvf-widget-form-approve")
if await approval.count() > 0:
_login_state = {
"status": "awaiting_otp",
"message": "Bitte bestätigen Sie die Anmeldung auf Ihrem Gerät",
}
for _ in range(60):
await asyncio.sleep(2)
url = page.url
if ("signin" not in url and "/ap/" not in url) or domain + "/?ref" in url:
break
if await approval.count() == 0:
break
# --- Verify login success ---
url = page.url
is_login_page = "signin" in url or "/ap/" in url
page_content = await page.content()
content_len = len(page_content)
is_error_page = "Suchen Sie etwas" in page_content or "Seite wurde nicht gefunden" in page_content
is_order_page = "order-history" in url or "your-orders" in url or "Meine Bestellungen" in page_content
is_success = not is_login_page and not is_error_page and domain in url and (is_order_page or content_len > 10000)
logger.info(f"Amazon Login: URL={url}, is_login_page={is_login_page}, is_error_page={is_error_page}, is_order_page={is_order_page}, content_len={content_len}, success={is_success}")
await _save_debug(page, "login_result")
if is_success:
_login_state = {"status": "logged_in", "message": "Erfolgreich angemeldet"}
logger.info("Amazon-Login erfolgreich")
else:
error_el = page.locator("#auth-error-message-box, .a-alert-content")
error_msg = ""
if await error_el.count() > 0:
error_msg = await error_el.first.inner_text()
if is_error_page:
error_msg = "Amazon hat den Zugriff blockiert (Fehlerseite). Bitte später erneut versuchen."
_login_state = {
"status": "login_failed",
"message": f"Login fehlgeschlagen. {error_msg}".strip(),
}
async def submit_otp(code: str) -> bool:
"""Submit OTP code from web UI."""
global _otp_future
if _otp_future is not None and not _otp_future.done():
_otp_future.set_result(code)
return True
return False
async def process_amazon() -> dict:
"""Main function: fetch Amazon invoices and forward via email."""
if _process_lock.locked():
logger.info("Amazon-Import: Läuft bereits, überspringe")
return {"processed": 0, "errors": 0, "error": "Amazon-Abruf läuft bereits"}
# Don't start processing while user is logging in (would freeze the browser)
if is_interactive_login_active():
logger.info("Amazon-Import: Interaktiver Login läuft, überspringe")
return {"processed": 0, "errors": 0, "error": "Bitte zuerst den Login abschließen"}
async with _process_lock:
return await _process_amazon_inner()
async def _process_amazon_inner() -> dict:
"""Inner processing function (protected by _process_lock)."""
global _interactive_page, _processing_active
settings = await get_settings()
if settings.get("amazon_enabled") != "true":
return {"processed": 0, "errors": 0}
# Check prerequisites
if not settings.get("smtp_server") or not settings.get("import_email"):
logger.warning("Amazon-Import: SMTP oder Import-Email nicht konfiguriert")
return {"processed": 0, "errors": 0, "error": "SMTP/Import-Email nicht konfiguriert"}
if not settings.get("amazon_email") or not settings.get("amazon_password"):
logger.warning("Amazon-Import: Zugangsdaten nicht konfiguriert")
return {"processed": 0, "errors": 0, "error": "Amazon-Zugangsdaten nicht konfiguriert"}
# Without interactive login page, new pages can't authenticate (session bound to page)
if _interactive_page is None:
logger.info("Amazon-Import: Keine aktive Login-Session, überspringe (bitte zuerst manuell anmelden)")
return {"processed": 0, "errors": 0, "error": "Bitte zuerst unter Plattformen bei Amazon anmelden"}
domain = settings.get("amazon_domain", "amazon.de")
since_str = settings.get("amazon_since_date", "")
if since_str:
try:
since_date = datetime.strptime(since_str, "%Y-%m-%d")
except ValueError:
logger.warning(f"Amazon: Ungültiges Startdatum: {since_str}")
since_date = datetime.now() - timedelta(days=30)
else:
since_date = datetime.now() - timedelta(days=30)
logger.info(f"Amazon-Import gestartet: domain={domain}, seit={since_date.strftime('%Y-%m-%d')}")
processed = 0
skipped = 0
errors = 0
# Reuse interactive login page if available (session is bound to the page)
reused_page = False
if _interactive_page is not None:
page = _interactive_page
_interactive_page = None # Take ownership
_processing_active = True # Signal that session is still valid while processing
_login_state = {"status": "idle", "message": ""} # Reset login state
reused_page = True
logger.info("Amazon: Verwende interaktive Login-Page für Abruf")
else:
ctx = await _get_browser_context()
page = await ctx.new_page()
await _add_virtual_authenticator(page)
smtp_conn = None
try:
logger.info("Amazon: SMTP-Verbindung wird hergestellt...")
smtp_conn = _connect_smtp(settings)
logger.info("Amazon: SMTP-Verbindung OK, verarbeite Bestellungen seitenweise...")
import_email = settings.get("import_email_eingang") or settings.get("import_email", "")
# Process orders PAGE BY PAGE (collect + process on same page so buttons are visible)
result = await _collect_and_process_orders(
page, domain, since_date, smtp_conn, settings, import_email
)
if result is None:
error_detail = "Amazon-Sitzung abgelaufen. Bitte manuell unter Plattformen neu anmelden."
logger.warning(f"Amazon-Import: {error_detail}")
await add_log_entry(
email_subject="Amazon-Import",
email_from="Amazon",
attachments_count=0,
status="error",
error_message=error_detail,
)
return {"processed": 0, "errors": 0, "error": error_detail}
processed, skipped, errors = result["processed"], result["skipped"], result["errors"]
# Update last sync date
await save_settings({"amazon_last_sync": datetime.now().strftime("%Y-%m-%d %H:%M")})
# Log summary if nothing was processed
if processed == 0 and errors == 0:
if skipped > 0:
summary = f"Alle Rechnungen bereits importiert ({skipped} übersprungen)"
else:
summary = "Keine neuen Rechnungen gefunden"
await add_log_entry(
email_subject="Amazon-Import (Zusammenfassung)",
email_from=f"Amazon ({domain})",
attachments_count=0,
status="success",
error_message=summary,
sent_to="",
)
except Exception as e:
logger.error(f"Amazon-Import Fehler: {e}")
await add_log_entry(
email_subject="Amazon-Import",
email_from=f"Amazon ({domain})",
attachments_count=0,
status="error",
error_message=str(e),
)
return {"processed": processed, "skipped": skipped, "errors": errors + 1, "error": str(e)}
finally:
_processing_active = False
# Keep page alive for next run instead of closing it (preserves session)
if reused_page and page:
_interactive_page = page # Return page for reuse
logger.info("Amazon: Page zurück in Session-Pool (Session bleibt erhalten)")
else:
await page.close()
if smtp_conn:
try:
smtp_conn.quit()
except Exception:
pass
logger.info(f"Amazon-Import fertig: {processed} verarbeitet, {skipped} übersprungen, {errors} Fehler")
return {"processed": processed, "skipped": skipped, "errors": errors}
async def _collect_and_process_orders(page, domain, since_date, smtp_conn, settings, import_email) -> dict | None:
"""Collect orders AND process invoices page by page.
This ensures invoice buttons are visible when we try to click them,
because we process each page's orders before navigating to the next page.
Returns None if session is invalid, otherwise dict with processed/skipped/errors counts.
"""
processed = 0
skipped = 0
errors = 0
# Navigate to orders page if needed
actual_url = page.url
if "order-history" not in actual_url and "your-orders" not in actual_url:
if "signin" in actual_url or "/ap/" in actual_url:
return None
logger.info("Amazon: Nicht auf Bestellseite, versuche Navigation über Link...")
orders_link = page.locator("a[href*='order-history'], a[href*='your-orders']")
if await orders_link.count() > 0:
await orders_link.first.click()
await asyncio.sleep(3)
try:
await page.wait_for_load_state("networkidle", timeout=15000)
except Exception:
pass
actual_url = page.url
if "order-history" not in actual_url and "your-orders" not in actual_url:
return None
# Try to set time filter
now = datetime.now()
days_back = (now - since_date).days
if days_back <= 30:
desired_filter = "last30"
elif days_back <= 90:
desired_filter = "months-3"
else:
desired_filter = f"year-{since_date.year}"
logger.info(f"Amazon: Setze Zeitfilter: {desired_filter}")
try:
filter_dropdown = page.locator("select[name='orderFilter'], select#orderFilter, select#time-filter")
if await filter_dropdown.count() > 0:
await filter_dropdown.first.select_option(desired_filter)
await asyncio.sleep(3)
try:
await page.wait_for_load_state("networkidle", timeout=15000)
except Exception:
pass
else:
logger.info("Amazon: Kein Filter-Dropdown gefunden, verwende aktuelle Ansicht")
except Exception as e:
logger.warning(f"Amazon: Filter setzen fehlgeschlagen: {e}")
await asyncio.sleep(2)
seen_ids = set()
page_num = 1
total_orders = 0
while True:
logger.info(f"Amazon: Verarbeite Seite {page_num}...")
# Check for login redirect
if "signin" in page.url or "/ap/" in page.url:
if total_orders > 0:
logger.warning(f"Amazon: Login-Redirect auf Seite {page_num}, breche ab")
break
return None
# Extract orders from current page
page_orders = await _extract_orders_from_page(page, since_date)
new_orders = [o for o in page_orders if o["id"] not in seen_ids]
for o in new_orders:
seen_ids.add(o["id"])
logger.info(f"Amazon: Seite {page_num}: {len(page_orders)} gefunden, {len(new_orders)} neu")
total_orders += len(new_orders)
# Process invoices for THIS page's orders immediately (buttons are visible now)
for order in new_orders:
order_id = order.get("id", "?")
try:
if await is_invoice_downloaded(order_id, order_id):
skipped += 1
logger.debug(f"Amazon: Bestellung {order_id} bereits importiert")
continue
pdf_list = await _download_order_invoices(page, domain, order_id)
if not pdf_list:
logger.debug(f"Amazon: Keine Rechnung für Bestellung {order_id}")
continue
for inv_idx, pdf_bytes in enumerate(pdf_list):
suffix = f"_{inv_idx+1}" if len(pdf_list) > 1 else ""
try:
filename = f"Amazon_Rechnung_{order_id}{suffix}.pdf"
if settings.get("debug_save_amazon_pdfs") == "true":
try:
tmp_dir = Path(os.environ.get("UPLOAD_DIR", "/data/uploads")) / "amazon_invoices"
tmp_dir.mkdir(parents=True, exist_ok=True)
(tmp_dir / filename).write_bytes(pdf_bytes)
logger.info(f"Amazon: Debug-PDF gespeichert: {tmp_dir / filename} ({len(pdf_bytes)} Bytes)")
except Exception as e:
logger.warning(f"Amazon: Debug-PDF speichern fehlgeschlagen: {e}")
forward_msg = _build_forward_email(
from_addr=settings.get("smtp_username", ""),
to_addr=import_email,
original_subject=f"Amazon Rechnung - Bestellung {order_id}{suffix}",
original_from=f"Amazon ({domain})",
attachments=[(filename, pdf_bytes)],
)
smtp_log = _send_with_log(smtp_conn, forward_msg)
processed += 1
logger.info(f"Amazon: Rechnung {inv_idx+1}/{len(pdf_list)} für {order_id} gesendet")
await add_log_entry(
email_subject=f"Amazon Rechnung - {order_id}{suffix}",
email_from=f"Amazon ({domain})",
attachments_count=1,
status="success",
sent_to=import_email,
smtp_log=smtp_log,
)
except Exception as e:
errors += 1
logger.error(f"Amazon: Fehler bei Rechnung {inv_idx+1} für {order_id}: {e}")
await add_log_entry(
email_subject=f"Amazon Rechnung - {order_id}{suffix}",
email_from=f"Amazon ({domain})",
attachments_count=0,
status="error",
error_message=str(e),
)
await mark_invoice_downloaded(order_id, order_id)
await _human_delay(2.0, 4.0)
except Exception as e:
errors += 1
logger.error(f"Amazon: Fehler bei Bestellung {order_id}: {e}")
await add_log_entry(
email_subject=f"Amazon Rechnung - {order_id}",
email_from=f"Amazon ({domain})",
attachments_count=0,
status="error",
error_message=str(e),
)
# Navigate to next page
has_next = await page.evaluate("""() => {
const nextLink = document.querySelector('.a-pagination .a-last:not(.a-disabled) a');
if (nextLink) {
nextLink.scrollIntoView({behavior: 'smooth', block: 'center'});
return true;
}
return false;
}""")
if has_next and page_orders:
logger.info("Amazon: Klicke auf nächste Seite (JS)...")
await asyncio.sleep(0.5)
await page.evaluate("""() => {
const nextLink = document.querySelector('.a-pagination .a-last:not(.a-disabled) a');
if (nextLink) nextLink.click();
}""")
await asyncio.sleep(3)
try:
await page.wait_for_load_state("networkidle", timeout=30000)
except Exception:
pass
page_num += 1
await _human_delay(1.0, 2.0)
else:
break
logger.info(f"Amazon: Gesamt {total_orders} Bestellungen auf {page_num} Seite(n)")
return {"processed": processed, "skipped": skipped, "errors": errors}
async def _collect_orders(page, domain: str, since_date: datetime) -> list[dict] | None:
"""Collect order IDs from Amazon order history using SPA navigation (no page.goto).
The page must already be on the Amazon orders page (from interactive login).
Uses dropdown/click navigation to avoid losing the session.
"""
orders = []
actual_url = page.url
logger.info(f"Amazon: Aktuelle Seite: {actual_url}")
# Check if we're on the orders page or need to navigate there
if "order-history" not in actual_url and "your-orders" not in actual_url:
if "signin" in actual_url or "/ap/" in actual_url:
logger.error("Amazon: Seite ist Login-Seite - Session ungültig!")
await _save_debug(page, "orders_not_on_orders_page")
return None
# Try clicking the orders link within Amazon's SPA
logger.info("Amazon: Nicht auf Bestellseite, versuche Navigation über Link...")
orders_link = page.locator("a[href*='order-history'], a[href*='your-orders'], a:has-text('Bestellungen'), a:has-text('Meine Bestellungen')")
if await orders_link.count() > 0:
await orders_link.first.click()
await asyncio.sleep(3)
try:
await page.wait_for_load_state("networkidle", timeout=15000)
except Exception:
pass
actual_url = page.url
if "order-history" not in actual_url and "your-orders" not in actual_url:
logger.error(f"Amazon: Konnte nicht zur Bestellseite navigieren. URL: {actual_url}")
await _save_debug(page, "orders_navigation_failed")
return None
# Determine desired time filter
now = datetime.now()
days_back = (now - since_date).days
if days_back <= 30:
desired_filter = "last30"
elif days_back <= 90:
desired_filter = "months-3"
else:
desired_filter = f"year-{since_date.year}"
# Try to set the time filter via the dropdown
logger.info(f"Amazon: Setze Zeitfilter: {desired_filter}")
try:
filter_dropdown = page.locator("select[name='orderFilter'], select#orderFilter, select#time-filter")
if await filter_dropdown.count() > 0:
await filter_dropdown.first.select_option(desired_filter)
await asyncio.sleep(3)
try:
await page.wait_for_load_state("networkidle", timeout=15000)
except Exception:
pass
logger.info(f"Amazon: Zeitfilter '{desired_filter}' gesetzt")
else:
logger.info("Amazon: Kein Filter-Dropdown gefunden, verwende aktuelle Ansicht")
except Exception as e:
logger.warning(f"Amazon: Filter setzen fehlgeschlagen: {e}")
# Wait for content to load
await asyncio.sleep(2)
seen_ids = set()
page_num = 1
while True:
logger.info(f"Amazon: Verarbeite Seite {page_num}...")
# Check for login redirect
if "signin" in page.url or "/ap/" in page.url:
if orders:
logger.warning(f"Amazon: Login-Redirect auf Seite {page_num}, verwende {len(orders)} bereits gefundene Bestellung(en)")
return orders
logger.error("Amazon: Session ungültig!")
await _save_debug(page, "orders_redirect_login")
return None
page_orders = await _extract_orders_from_page(page, since_date)
new_orders = [o for o in page_orders if o["id"] not in seen_ids]
for o in new_orders:
seen_ids.add(o["id"])
logger.info(f"Amazon: Seite {page_num}: {len(page_orders)} gefunden, {len(new_orders)} neu")
orders.extend(new_orders)
# Try to click "Next" button for pagination via JavaScript (avoids visibility issues)
has_next = await page.evaluate("""() => {
const nextLink = document.querySelector('.a-pagination .a-last:not(.a-disabled) a');
if (nextLink) {
nextLink.scrollIntoView({behavior: 'smooth', block: 'center'});
return true;
}
return false;
}""")
if has_next and page_orders:
logger.info("Amazon: Klicke auf nächste Seite (JS)...")
await asyncio.sleep(0.5) # Wait for scroll
# Use JavaScript click to bypass Playwright visibility checks
await page.evaluate("""() => {
const nextLink = document.querySelector('.a-pagination .a-last:not(.a-disabled) a');
if (nextLink) nextLink.click();
}""")
await asyncio.sleep(3)
try:
await page.wait_for_load_state("networkidle", timeout=30000)
except Exception:
pass
page_num += 1
await _human_delay(1.0, 2.0)
else:
break
if not orders:
logger.warning("Amazon: Keine Bestellungen gefunden!")
await _save_debug(page, "no_orders_found")
return orders
async def _extract_orders_from_page(page, since_date: datetime) -> list[dict]:
"""Extract order data from the current page."""
orders = []
title = await page.title()
logger.info(f"Amazon: Seite analysieren: Titel='{title}', URL={page.url}")
await _save_debug(page, "order_page")
# Use JavaScript to extract only VISIBLE order cards (Amazon loads all in DOM, shows ~10 per page)
visible_orders = await page.evaluate("""() => {
const results = [];
// Try multiple selectors
const selectors = [
'.order-card.js-order-card',
'.order-card',
'.order-info',
'.a-box-group.order',
'.order',
];
const seen = new Set();
for (const sel of selectors) {
for (const el of document.querySelectorAll(sel)) {
// Only process visible elements (offsetParent !== null or check display)
if (el.offsetParent === null && getComputedStyle(el).position !== 'fixed') continue;
const text = el.innerText || '';
const idMatch = text.match(/(\d{3}-\d{7}-\d{7})/);
if (idMatch && !seen.has(idMatch[1])) {
seen.add(idMatch[1]);
results.push({id: idMatch[1], text: text.substring(0, 500)});
}
}
if (results.length > 0) break;
}
return results;
}""")
logger.info(f"Amazon: Sichtbare Order-Cards gefunden: {len(visible_orders)}")
if not visible_orders:
# Last resort: regex fallback on visible page text
visible_text = await page.evaluate("() => document.body.innerText")
order_ids = re.findall(r"\b(\d{3}-\d{7}-\d{7})\b", visible_text)
unique_ids = {oid for oid in set(order_ids) if not oid.startswith("000-")}
logger.info(f"Amazon: Keine Order-Cards, Fallback-Regex: {len(unique_ids)} Bestell-ID(s) im sichtbaren Text")
if not unique_ids:
logger.warning(f"Amazon: Seite hat keine Bestell-IDs. Titel: '{title}', URL: {page.url}")
for oid in unique_ids:
orders.append({"id": oid, "date": None})
return orders
for vo in visible_orders:
order_id = vo["id"]
if order_id.startswith("000-"):
continue
order_date = _parse_german_date(vo["text"])
if order_date and order_date < since_date:
logger.debug(f"Amazon: Bestellung {order_id} übersprungen (Datum {order_date.strftime('%Y-%m-%d')} < {since_date.strftime('%Y-%m-%d')})")
continue
logger.debug(f"Amazon: Bestellung gefunden: {order_id}, Datum: {order_date}")
orders.append({"id": order_id, "date": order_date})
return orders
def _parse_german_date(text: str) -> datetime | None:
"""Parse German date formats from order text."""
months_de = {
"Januar": 1, "Februar": 2, "März": 3, "April": 4,
"Mai": 5, "Juni": 6, "Juli": 7, "August": 8,
"September": 9, "Oktober": 10, "November": 11, "Dezember": 12,
}
pattern = r"(\d{1,2})\.\s*(" + "|".join(months_de.keys()) + r")\s+(\d{4})"
match = re.search(pattern, text)
if match:
day = int(match.group(1))
month = months_de[match.group(2)]
year = int(match.group(3))
try:
return datetime(year, month, day)
except ValueError:
pass
match = re.search(r"(\d{2})\.(\d{2})\.(\d{4})", text)
if match:
try:
return datetime(int(match.group(3)), int(match.group(2)), int(match.group(1)))
except ValueError:
pass
return None
async def _close_all_popovers(page):
"""Close all open Amazon popovers reliably.
IMPORTANT: Do NOT set display:none - Amazon recycles popover containers,
so hiding them prevents future popovers from appearing.
"""
try:
await page.evaluate("""() => {
// Close via close buttons
document.querySelectorAll('.a-popover-footer button, .a-popover .a-button-close, .a-popover-close').forEach(b => {
try { b.click(); } catch(e) {}
});
// Use Amazon's own popover API to close if available
if (window.P && window.P.when) {
try {
window.P.when('A').execute(function(A) {
if (A && A.popover) {
document.querySelectorAll('.a-popover:not(.a-popover-hidden)').forEach(p => {
const id = p.getAttribute('data-a-popover-id');
if (id) try { A.popover.close(id); } catch(e) {}
});
}
});
} catch(e) {}
}
// Click outside to dismiss any remaining popovers
document.body.click();
}""")
await asyncio.sleep(0.5)
except Exception:
pass
async def _download_order_invoices(page, domain: str, order_id: str) -> list[bytes]:
"""Download invoice PDFs for an order.
Strategy: Extract popover AJAX URL from data-a-popover attribute,
then use XMLHttpRequest with proper Amazon headers (anti-CSRF token,
X-Requested-With) to load the invoice popover HTML.
This is exactly what Amazon's own JavaScript does internally.
"""
import base64
pdfs = []
logger.info(f"Amazon: Hole Rechnungs-Links für {order_id}")
# Step 1: Extract the popover AJAX URL and download links via XMLHttpRequest
invoice_result = await page.evaluate(f"""async () => {{
// Find the order card containing this order ID
const cards = document.querySelectorAll('.order-card, .order-info, .a-box-group, div');
let popoverUrl = null;
for (const card of cards) {{
if (!card.innerText || !card.innerText.includes('{order_id}')) continue;
// Find the popover trigger with invoice URL
const triggers = card.querySelectorAll('[data-a-popover*="invoice"]');
for (const trigger of triggers) {{
try {{
const config = JSON.parse(trigger.getAttribute('data-a-popover'));
if (config && config.url && config.url.includes('{order_id}')) {{
popoverUrl = config.url;
break;
}}
}} catch(e) {{}}
}}
if (popoverUrl) break;
}}
if (!popoverUrl) {{
return {{ found: false, error: 'Kein Popover-URL gefunden' }};
}}
// Step 2: Make XMLHttpRequest with proper Amazon headers
try {{
const response = await new Promise((resolve, reject) => {{
const xhr = new XMLHttpRequest();
xhr.open('GET', popoverUrl, true);
xhr.setRequestHeader('X-Requested-With', 'XMLHttpRequest');
xhr.setRequestHeader('Accept', 'text/html,*/*');
xhr.onload = function() {{
resolve({{ status: xhr.status, html: xhr.responseText }});
}};
xhr.onerror = function() {{
reject(new Error('XHR failed'));
}};
xhr.send();
}});
if (response.status !== 200) {{
return {{ found: false, error: 'HTTP ' + response.status, url: popoverUrl }};
}}
const html = response.html;
// Check if response is a login page
if (html.includes('ap_signin') || html.includes('ap_error') || html.includes('/ap/')) {{
return {{ found: false, error: 'Login-Seite erhalten', url: popoverUrl, isLogin: true }};
}}
// Extract PDF download links from the response HTML
const parser = new DOMParser();
const doc = parser.parseFromString(html, 'text/html');
const links = doc.querySelectorAll('a[href]');
const pdfLinks = [];
for (const link of links) {{
const href = link.getAttribute('href') || '';
const text = (link.innerText || '').trim();
if (href.includes('/ap/') || href.includes('openid')) continue;
if (href.includes('contact.html') || href.includes('help/contact')) continue;
if (text.toLowerCase().includes('anfordern')) continue;
if (
href.includes('.pdf') ||
href.includes('documents/download') ||
href.includes('/document/') ||
href.includes('invoice/download') ||
href.includes('generated_invoices')
) {{
pdfLinks.push({{ href: href, text: text.substring(0, 100) }});
}}
}}
return {{ found: true, url: popoverUrl, links: pdfLinks, htmlSize: html.length }};
}} catch(e) {{
return {{ found: false, error: e.message, url: popoverUrl }};
}}
}}""")
logger.info(f"Amazon: Invoice-Ergebnis für {order_id}: found={invoice_result.get('found')}, "
f"links={invoice_result.get('links', [])}, error={invoice_result.get('error', '')}")
if not invoice_result.get("found") or not invoice_result.get("links"):
if invoice_result.get("isLogin"):
logger.warning(f"Amazon: Session abgelaufen beim Rechnungsabruf für {order_id}")
return pdfs
# Step 3: Download each PDF via XMLHttpRequest as base64
for link_info in invoice_result["links"]:
href = link_info["href"]
text = link_info.get("text", "")
# Make href absolute if relative
if href.startswith("/"):
fetch_href = href
elif href.startswith("http"):
from urllib.parse import urlparse
parsed = urlparse(href)
fetch_href = parsed.path + ("?" + parsed.query if parsed.query else "")
else:
fetch_href = "/" + href
logger.info(f"Amazon: Lade PDF '{text}' -> {fetch_href[:100]}")
try:
pdf_result = await page.evaluate(f"""async () => {{
try {{
const resp = await new Promise((resolve, reject) => {{
const xhr = new XMLHttpRequest();
xhr.open('GET', '{fetch_href}', true);
xhr.responseType = 'arraybuffer';
xhr.onload = function() {{
const bytes = new Uint8Array(xhr.response);
let binary = '';
for (let i = 0; i < bytes.length; i++) {{
binary += String.fromCharCode(bytes[i]);
}}
resolve({{
ok: xhr.status === 200,
status: xhr.status,
data: btoa(binary),
size: bytes.length,
contentType: xhr.getResponseHeader('content-type') || ''
}});
}};
xhr.onerror = function() {{ reject(new Error('XHR failed')); }};
xhr.send();
}});
return resp;
}} catch(e) {{
return {{ ok: false, error: e.message }};
}}
}}""")
if pdf_result and pdf_result.get("ok") and pdf_result.get("size", 0) > 500:
pdf_bytes = base64.b64decode(pdf_result["data"])
content_type = pdf_result.get("contentType", "")
if pdf_bytes[:5] == b"%PDF-" or "pdf" in content_type.lower():
logger.info(f"Amazon: PDF heruntergeladen für {order_id}: {len(pdf_bytes)} Bytes")
pdfs.append(pdf_bytes)
else:
logger.debug(f"Amazon: Download kein PDF für {order_id} (type: {content_type}, size: {len(pdf_bytes)})")
elif pdf_result:
logger.debug(f"Amazon: PDF-Download fehlgeschlagen für {order_id}: {pdf_result.get('error', 'status=' + str(pdf_result.get('status')))}")
except Exception as e:
logger.warning(f"Amazon: PDF-Download Exception für {order_id}: {e}")
if not pdfs:
logger.info(f"Amazon: Keine PDFs für {order_id}")
return pdfs