"""Amazon Business API client using SP-API (Reconciliation + Document API). This module provides API-based invoice retrieval as an alternative to browser automation. Uses OAuth2 with LWA (Login with Amazon) for authentication. Document API workflow (EU): 1. POST /reports/.../reports → reportId 2. GET /reports/.../reports/{reportId} → poll until DONE → reportDocumentId 3. GET /reports/.../documents/{reportDocumentId} → presigned URL 4. Download + decompress (gzip then zip) → PDF """ import asyncio import gzip import io import logging import urllib.parse import zipfile from datetime import datetime, timedelta from pathlib import Path import httpx from app.database import get_settings, save_settings, add_log_entry, is_invoice_downloaded, mark_invoice_downloaded from app.mail_processor import _connect_smtp, _build_forward_email, _send_with_log logger = logging.getLogger(__name__) # Amazon LWA (Login with Amazon) endpoints LWA_TOKEN_URL = "https://api.amazon.com/auth/o2/token" # Amazon Business OAuth consent URLs per domain (NOT sellercentral!) AB_OAUTH_URLS = { "amazon.de": "https://www.amazon.de/b2b/abws/oauth", "amazon.at": "https://www.amazon.de/b2b/abws/oauth", # AT uses DE "amazon.fr": "https://www.amazon.fr/b2b/abws/oauth", "amazon.it": "https://www.amazon.it/b2b/abws/oauth", "amazon.es": "https://www.amazon.es/b2b/abws/oauth", "amazon.co.uk": "https://www.amazon.co.uk/b2b/abws/oauth", "amazon.com": "https://www.amazon.com/b2b/abws/oauth", } # Amazon Business API endpoints per region AB_API_ENDPOINTS = { "eu": "https://eu.business-api.amazon.com", "na": "https://na.business-api.amazon.com", } # API versions RECONCILIATION_VERSION = "2021-01-08" REPORTS_VERSION = "2021-09-30" # Domain to region mapping DOMAIN_REGION = { "amazon.de": "eu", "amazon.at": "eu", "amazon.fr": "eu", "amazon.it": "eu", "amazon.es": "eu", "amazon.co.uk": "eu", "amazon.com": "na", } # Domain to marketplace ID DOMAIN_MARKETPLACE = { "amazon.de": "A1PA6795UKMFR9", "amazon.at": "A2NODRKZP88ZB9", "amazon.fr": "A13V1IB3VIYZZH", "amazon.it": "APJ6JRA9NG5V4", "amazon.es": "A1RKKUPIHCS9HS", "amazon.co.uk": "A1F83G8C2ARO7P", "amazon.com": "ATVPDKIKX0DER", } def get_oauth_authorize_url(application_id: str, redirect_uri: str, domain: str = "amazon.de", state: str = "") -> str: """Generate the OAuth authorization URL for Amazon Business API consent.""" base_url = AB_OAUTH_URLS.get(domain, AB_OAUTH_URLS["amazon.de"]) params = { "applicationId": application_id, "state": state or "auth", "redirect_uri": redirect_uri, } return f"{base_url}?{urllib.parse.urlencode(params)}" async def exchange_auth_code(code: str, client_id: str, client_secret: str, redirect_uri: str) -> dict: """Exchange authorization code for refresh token via LWA.""" async with httpx.AsyncClient() as client: resp = await client.post(LWA_TOKEN_URL, data={ "grant_type": "authorization_code", "code": code, "client_id": client_id, "client_secret": client_secret, "redirect_uri": redirect_uri, }) if resp.status_code != 200: logger.error(f"LWA Token-Exchange fehlgeschlagen: {resp.status_code} {resp.text}") return {"error": f"Token-Exchange fehlgeschlagen: {resp.status_code} - {resp.text}"} data = resp.json() logger.info("LWA Token-Exchange erfolgreich") return data async def get_access_token(client_id: str, client_secret: str, refresh_token: str) -> str | None: """Get a fresh access token using the refresh token.""" async with httpx.AsyncClient() as client: resp = await client.post(LWA_TOKEN_URL, data={ "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": client_id, "client_secret": client_secret, }) if resp.status_code != 200: logger.error(f"Access-Token-Refresh fehlgeschlagen: {resp.status_code} {resp.text}") return None data = resp.json() return data.get("access_token") async def check_api_configured() -> dict: """Check if API credentials are configured and valid.""" settings = await get_settings() client_id = settings.get("amazon_client_id", "") client_secret = settings.get("amazon_client_secret", "") refresh_token = settings.get("amazon_refresh_token", "") if not client_id or not client_secret: return {"configured": False, "authorized": False, "error": "Client-ID oder Client-Secret fehlt"} if not refresh_token: return {"configured": True, "authorized": False, "error": "Noch nicht autorisiert (Refresh-Token fehlt)"} # Try to get an access token to verify credentials access_token = await get_access_token(client_id, client_secret, refresh_token) if not access_token: return {"configured": True, "authorized": False, "error": "Autorisierung abgelaufen - bitte erneut autorisieren"} return {"configured": True, "authorized": True} async def _get_api_client(settings: dict) -> tuple[httpx.AsyncClient, str] | None: """Create an authenticated API client. Returns (client, region) or None.""" client_id = settings.get("amazon_client_id", "") client_secret = settings.get("amazon_client_secret", "") refresh_token = settings.get("amazon_refresh_token", "") if not all([client_id, client_secret, refresh_token]): return None access_token = await get_access_token(client_id, client_secret, refresh_token) if not access_token: return None domain = settings.get("amazon_domain", "amazon.de") region = DOMAIN_REGION.get(domain, "eu") client = httpx.AsyncClient( base_url=AB_API_ENDPOINTS.get(region, AB_API_ENDPOINTS["eu"]), headers={ "x-amz-access-token": access_token, "Content-Type": "application/json", "user-agent": "Belegimport/1.0 (Language=Python/3.12)", }, timeout=30.0, ) return client, region async def get_transactions(settings: dict, since_date: datetime) -> list[dict]: """Get transactions via Reconciliation API.""" result = await _get_api_client(settings) if not result: return [] client, region = result transactions = [] try: # feedEndDate must not exceed current UTC time now_utc = datetime.utcnow() params = { "feedStartDate": since_date.strftime("%Y-%m-%dT00:00:00Z"), "feedEndDate": now_utc.strftime("%Y-%m-%dT%H:%M:%SZ"), } next_token = None page = 0 while True: page += 1 if next_token: params["nextPageToken"] = next_token logger.info(f"Amazon API: Reconciliation-Abfrage Seite {page}...") resp = await client.get( f"/reconciliation/{RECONCILIATION_VERSION}/transactions", params=params, ) if resp.status_code != 200: logger.error(f"Amazon API: Reconciliation fehlgeschlagen: {resp.status_code} {resp.text}") break data = resp.json() page_transactions = data.get("transactions", []) transactions.extend(page_transactions) logger.info(f"Amazon API: Seite {page}: {len(page_transactions)} Transaktionen") next_token = data.get("nextPageToken") if not next_token: break except Exception as e: logger.error(f"Amazon API: Reconciliation-Fehler: {e}") finally: await client.aclose() logger.info(f"Amazon API: {len(transactions)} Transaktionen gesamt") return transactions async def _create_invoice_report(client: httpx.AsyncClient, order_id: str, marketplace_id: str) -> str | None: """Step 1: Create a report request for invoice PDF.""" body = { "reportType": "GET_AB_INVOICE_PDF", "marketplaceIds": [marketplace_id], "reportOptions": { "orderId": order_id, "documentType": "Invoice", }, } try: resp = await client.post(f"/reports/{REPORTS_VERSION}/reports", json=body) if resp.status_code in (200, 202): data = resp.json() report_id = data.get("reportId") logger.info(f"Amazon API: Report erstellt für {order_id}: {report_id}") return report_id else: logger.warning(f"Amazon API: Report-Erstellung fehlgeschlagen für {order_id}: {resp.status_code} {resp.text}") return None except Exception as e: logger.error(f"Amazon API: Report-Erstellung Fehler: {e}") return None async def _poll_report_status(client: httpx.AsyncClient, report_id: str, max_wait: int = 120) -> str | None: """Step 2: Poll report status until DONE. Returns reportDocumentId.""" for i in range(max_wait // 15 + 1): try: resp = await client.get(f"/reports/{REPORTS_VERSION}/reports/{report_id}") if resp.status_code != 200: logger.warning(f"Amazon API: Report-Status fehlgeschlagen: {resp.status_code}") return None data = resp.json() status = data.get("processingStatus", "") if status == "DONE": doc_id = data.get("reportDocumentId") logger.info(f"Amazon API: Report {report_id} fertig: documentId={doc_id}") return doc_id elif status in ("CANCELLED", "FATAL"): logger.warning(f"Amazon API: Report {report_id} fehlgeschlagen: {status}") return None else: logger.debug(f"Amazon API: Report {report_id} Status: {status}, warte...") await asyncio.sleep(15) except Exception as e: logger.error(f"Amazon API: Report-Status Fehler: {e}") return None logger.warning(f"Amazon API: Report {report_id} Timeout nach {max_wait}s") return None async def _download_report_document(client: httpx.AsyncClient, document_id: str) -> bytes | None: """Step 3: Get presigned URL and download + decompress the PDF.""" try: resp = await client.get(f"/reports/{REPORTS_VERSION}/documents/{document_id}") if resp.status_code != 200: logger.warning(f"Amazon API: Document-URL fehlgeschlagen: {resp.status_code}") return None data = resp.json() url = data.get("url", "") compression = data.get("compressionAlgorithm", "") if not url: logger.warning(f"Amazon API: Keine Download-URL für Document {document_id}") return None # Download the document (presigned S3 URL, expires in 5 min) async with httpx.AsyncClient(timeout=60.0) as dl_client: dl_resp = await dl_client.get(url) if dl_resp.status_code != 200: logger.warning(f"Amazon API: Document-Download fehlgeschlagen: {dl_resp.status_code}") return None content = dl_resp.content # Decompress: EU documents are gzip-compressed, then the content is a zip file if compression == "GZIP" or content[:2] == b'\x1f\x8b': try: content = gzip.decompress(content) except Exception: pass # might not be gzipped # Check if it's a zip file containing the PDF if content[:2] == b'PK': try: with zipfile.ZipFile(io.BytesIO(content)) as zf: for name in zf.namelist(): if name.lower().endswith('.pdf'): content = zf.read(name) break except Exception: pass # might not be a zip # Verify it's a PDF if content[:4] == b'%PDF': logger.info(f"Amazon API: PDF heruntergeladen: {len(content)} Bytes") return content else: logger.warning(f"Amazon API: Heruntergeladenes Dokument ist kein PDF (starts: {content[:20]})") return None except Exception as e: logger.error(f"Amazon API: Document-Download Fehler: {e}") return None async def download_invoice(settings: dict, order_id: str) -> bytes | None: """Download invoice PDF via Document API (3-step async process).""" result = await _get_api_client(settings) if not result: return None client, region = result domain = settings.get("amazon_domain", "amazon.de") marketplace_id = DOMAIN_MARKETPLACE.get(domain, DOMAIN_MARKETPLACE["amazon.de"]) try: # Step 1: Create report report_id = await _create_invoice_report(client, order_id, marketplace_id) if not report_id: return None # Step 2: Poll until done document_id = await _poll_report_status(client, report_id) if not document_id: return None # Step 3: Download document return await _download_report_document(client, document_id) except Exception as e: logger.error(f"Amazon API: Invoice-Download-Fehler für {order_id}: {e}") return None finally: await client.aclose() async def process_amazon_api() -> dict: """Process Amazon invoices via API (Reconciliation + Document API).""" settings = await get_settings() if settings.get("amazon_enabled") != "true": return {"processed": 0, "skipped": 0, "errors": 0} # Check API credentials status = await check_api_configured() if not status.get("authorized"): error_msg = status.get("error", "API nicht konfiguriert") logger.warning(f"Amazon API: {error_msg}") return {"processed": 0, "skipped": 0, "errors": 0, "error": error_msg} domain = settings.get("amazon_domain", "amazon.de") # Determine date range since_str = settings.get("amazon_since_date", "") if since_str: try: since_date = datetime.strptime(since_str, "%Y-%m-%d") except ValueError: since_date = datetime.now() - timedelta(days=30) else: since_date = datetime.now() - timedelta(days=30) logger.info(f"Amazon API: Import gestartet: domain={domain}, seit={since_date.strftime('%Y-%m-%d')}") # Connect SMTP import_email = settings.get("import_email_eingang") or settings.get("import_email", "") if not import_email: error_msg = "Keine Import-Email für Eingangsbelege konfiguriert" logger.error(f"Amazon API: {error_msg}") await add_log_entry("Amazon-Import", f"Amazon ({domain})", 0, "error", error_msg, beleg_type="eingang") return {"processed": 0, "skipped": 0, "errors": 1, "error": error_msg} smtp = _connect_smtp(settings) if not smtp: error_msg = "SMTP-Verbindung fehlgeschlagen" logger.error(f"Amazon API: {error_msg}") await add_log_entry("Amazon-Import", f"Amazon ({domain})", 0, "error", error_msg, beleg_type="eingang") return {"processed": 0, "skipped": 0, "errors": 1, "error": error_msg} processed = 0 skipped = 0 errors = 0 try: # Get transactions via Reconciliation API transactions = await get_transactions(settings, since_date) if not transactions: logger.info("Amazon API: Keine Transaktionen gefunden") await save_settings({"amazon_last_sync": datetime.now().strftime("%Y-%m-%d %H:%M")}) await add_log_entry( "Amazon-Import (API)", f"Amazon ({domain})", 0, "success", "Keine neuen Rechnungen gefunden", beleg_type="eingang", ) smtp.quit() return {"processed": 0, "skipped": 0, "errors": 0} # Extract unique orders with their line items orders = {} for txn in transactions: line_items = txn.get("transactionLineItems", []) for item in line_items: oid = item.get("orderId", "") if oid and oid not in orders: orders[oid] = { "orderId": oid, "invoiceNumber": txn.get("invoiceNumber", ""), "transactionDate": txn.get("transactionDate", ""), } # Fallback: if no line items, use transaction-level orderId if not line_items: oid = txn.get("orderId", "") if oid and oid not in orders: orders[oid] = { "orderId": oid, "invoiceNumber": txn.get("invoiceNumber", ""), "transactionDate": txn.get("transactionDate", ""), } logger.info(f"Amazon API: {len(orders)} eindeutige Bestellungen gefunden") for oid, order_info in orders.items(): # Check if already downloaded if await is_invoice_downloaded(oid, oid): skipped += 1 continue # Download invoice PDF pdf_data = await download_invoice(settings, oid) if pdf_data: # Save debug copy if enabled if settings.get("debug_save_amazon_pdfs") == "true": debug_dir = Path("/data/uploads") / "amazon_invoices" debug_dir.mkdir(parents=True, exist_ok=True) debug_path = debug_dir / f"Amazon_Rechnung_{oid}.pdf" debug_path.write_bytes(pdf_data) logger.info(f"Amazon API: Debug-PDF gespeichert: {debug_path}") # Send via SMTP filename = f"Amazon_Rechnung_{oid}.pdf" subject = f"Amazon Rechnung - {oid}" from_addr = settings.get("smtp_username", "belegimport@local") msg = _build_forward_email( from_addr=from_addr, to_addr=import_email, original_subject=subject, original_from=f"Amazon ({domain})", attachments=[(filename, pdf_data)], ) smtp_log = _send_with_log(smtp, msg) await add_log_entry( subject, f"Amazon ({domain})", 1, "success", "", import_email, smtp_log, beleg_type="eingang", ) await mark_invoice_downloaded(oid, oid) processed += 1 logger.info(f"Amazon API: Rechnung für {oid} gesendet") else: # No invoice available for this order await mark_invoice_downloaded(oid, oid) skipped += 1 logger.debug(f"Amazon API: Keine Rechnung für {oid}") except Exception as e: logger.error(f"Amazon API: Import-Fehler: {e}", exc_info=True) errors += 1 await add_log_entry( "Amazon-Import (API)", f"Amazon ({domain})", 0, "error", str(e), beleg_type="eingang", ) finally: try: smtp.quit() except Exception: pass await save_settings({"amazon_last_sync": datetime.now().strftime("%Y-%m-%d %H:%M")}) if processed > 0 or errors > 0: summary = f"{processed} verarbeitet, {skipped} übersprungen, {errors} Fehler" await add_log_entry( "Amazon-Import (API, Zusammenfassung)", f"Amazon ({domain})", processed, "success" if errors == 0 else "warning", summary, beleg_type="eingang", ) logger.info(f"Amazon API: Import fertig: {processed} verarbeitet, {skipped} übersprungen, {errors} Fehler") return {"processed": processed, "skipped": skipped, "errors": errors}