belege-import/app/amazon_api.py

516 lines
20 KiB
Python

"""Amazon Business API client using SP-API (Reconciliation + Document API).
This module provides API-based invoice retrieval as an alternative to browser automation.
Uses OAuth2 with LWA (Login with Amazon) for authentication.
Document API workflow (EU):
1. POST /reports/.../reports → reportId
2. GET /reports/.../reports/{reportId} → poll until DONE → reportDocumentId
3. GET /reports/.../documents/{reportDocumentId} → presigned URL
4. Download + decompress (gzip then zip) → PDF
"""
import asyncio
import gzip
import io
import logging
import urllib.parse
import zipfile
from datetime import datetime, timedelta
from pathlib import Path
import httpx
from app.database import get_settings, save_settings, add_log_entry, is_invoice_downloaded, mark_invoice_downloaded
from app.mail_processor import _connect_smtp, _build_forward_email, _send_with_log
logger = logging.getLogger(__name__)
# Amazon LWA (Login with Amazon) endpoints
LWA_TOKEN_URL = "https://api.amazon.com/auth/o2/token"
# Amazon Business OAuth consent URLs per domain (NOT sellercentral!)
AB_OAUTH_URLS = {
"amazon.de": "https://www.amazon.de/b2b/abws/oauth",
"amazon.at": "https://www.amazon.de/b2b/abws/oauth", # AT uses DE
"amazon.fr": "https://www.amazon.fr/b2b/abws/oauth",
"amazon.it": "https://www.amazon.it/b2b/abws/oauth",
"amazon.es": "https://www.amazon.es/b2b/abws/oauth",
"amazon.co.uk": "https://www.amazon.co.uk/b2b/abws/oauth",
"amazon.com": "https://www.amazon.com/b2b/abws/oauth",
}
# Amazon Business API endpoints per region
AB_API_ENDPOINTS = {
"eu": "https://eu.business-api.amazon.com",
"na": "https://na.business-api.amazon.com",
}
# API versions
RECONCILIATION_VERSION = "2021-01-08"
REPORTS_VERSION = "2021-09-30"
# Domain to region mapping
DOMAIN_REGION = {
"amazon.de": "eu",
"amazon.at": "eu",
"amazon.fr": "eu",
"amazon.it": "eu",
"amazon.es": "eu",
"amazon.co.uk": "eu",
"amazon.com": "na",
}
# Domain to marketplace ID
DOMAIN_MARKETPLACE = {
"amazon.de": "A1PA6795UKMFR9",
"amazon.at": "A2NODRKZP88ZB9",
"amazon.fr": "A13V1IB3VIYZZH",
"amazon.it": "APJ6JRA9NG5V4",
"amazon.es": "A1RKKUPIHCS9HS",
"amazon.co.uk": "A1F83G8C2ARO7P",
"amazon.com": "ATVPDKIKX0DER",
}
def get_oauth_authorize_url(application_id: str, redirect_uri: str, domain: str = "amazon.de", state: str = "") -> str:
"""Generate the OAuth authorization URL for Amazon Business API consent."""
base_url = AB_OAUTH_URLS.get(domain, AB_OAUTH_URLS["amazon.de"])
params = {
"applicationId": application_id,
"state": state or "auth",
"redirect_uri": redirect_uri,
}
return f"{base_url}?{urllib.parse.urlencode(params)}"
async def exchange_auth_code(code: str, client_id: str, client_secret: str, redirect_uri: str) -> dict:
"""Exchange authorization code for refresh token via LWA."""
async with httpx.AsyncClient() as client:
resp = await client.post(LWA_TOKEN_URL, data={
"grant_type": "authorization_code",
"code": code,
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
})
if resp.status_code != 200:
logger.error(f"LWA Token-Exchange fehlgeschlagen: {resp.status_code} {resp.text}")
return {"error": f"Token-Exchange fehlgeschlagen: {resp.status_code} - {resp.text}"}
data = resp.json()
logger.info("LWA Token-Exchange erfolgreich")
return data
async def get_access_token(client_id: str, client_secret: str, refresh_token: str) -> str | None:
"""Get a fresh access token using the refresh token."""
async with httpx.AsyncClient() as client:
resp = await client.post(LWA_TOKEN_URL, data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": client_id,
"client_secret": client_secret,
})
if resp.status_code != 200:
logger.error(f"Access-Token-Refresh fehlgeschlagen: {resp.status_code} {resp.text}")
return None
data = resp.json()
return data.get("access_token")
async def check_api_configured() -> dict:
"""Check if API credentials are configured and valid."""
settings = await get_settings()
client_id = settings.get("amazon_client_id", "")
client_secret = settings.get("amazon_client_secret", "")
refresh_token = settings.get("amazon_refresh_token", "")
if not client_id or not client_secret:
return {"configured": False, "authorized": False, "error": "Client-ID oder Client-Secret fehlt"}
if not refresh_token:
return {"configured": True, "authorized": False, "error": "Noch nicht autorisiert (Refresh-Token fehlt)"}
# Try to get an access token to verify credentials
access_token = await get_access_token(client_id, client_secret, refresh_token)
if not access_token:
return {"configured": True, "authorized": False, "error": "Autorisierung abgelaufen - bitte erneut autorisieren"}
return {"configured": True, "authorized": True}
async def _get_api_client(settings: dict) -> tuple[httpx.AsyncClient, str] | None:
"""Create an authenticated API client. Returns (client, region) or None."""
client_id = settings.get("amazon_client_id", "")
client_secret = settings.get("amazon_client_secret", "")
refresh_token = settings.get("amazon_refresh_token", "")
if not all([client_id, client_secret, refresh_token]):
return None
access_token = await get_access_token(client_id, client_secret, refresh_token)
if not access_token:
return None
domain = settings.get("amazon_domain", "amazon.de")
region = DOMAIN_REGION.get(domain, "eu")
client = httpx.AsyncClient(
base_url=AB_API_ENDPOINTS.get(region, AB_API_ENDPOINTS["eu"]),
headers={
"x-amz-access-token": access_token,
"Content-Type": "application/json",
"user-agent": "Belegimport/1.0 (Language=Python/3.12)",
},
timeout=30.0,
)
return client, region
async def get_transactions(settings: dict, since_date: datetime) -> list[dict]:
"""Get transactions via Reconciliation API."""
result = await _get_api_client(settings)
if not result:
return []
client, region = result
transactions = []
try:
# feedEndDate must not exceed current UTC time
now_utc = datetime.utcnow()
params = {
"feedStartDate": since_date.strftime("%Y-%m-%dT00:00:00Z"),
"feedEndDate": now_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
}
next_token = None
page = 0
while True:
page += 1
if next_token:
params["nextPageToken"] = next_token
logger.info(f"Amazon API: Reconciliation-Abfrage Seite {page}...")
resp = await client.get(
f"/reconciliation/{RECONCILIATION_VERSION}/transactions",
params=params,
)
if resp.status_code != 200:
logger.error(f"Amazon API: Reconciliation fehlgeschlagen: {resp.status_code} {resp.text}")
break
data = resp.json()
page_transactions = data.get("transactions", [])
transactions.extend(page_transactions)
logger.info(f"Amazon API: Seite {page}: {len(page_transactions)} Transaktionen")
next_token = data.get("nextPageToken")
if not next_token:
break
except Exception as e:
logger.error(f"Amazon API: Reconciliation-Fehler: {e}")
finally:
await client.aclose()
logger.info(f"Amazon API: {len(transactions)} Transaktionen gesamt")
return transactions
async def _create_invoice_report(client: httpx.AsyncClient, order_id: str, marketplace_id: str) -> str | None:
"""Step 1: Create a report request for invoice PDF."""
body = {
"reportType": "GET_AB_INVOICE_PDF",
"marketplaceIds": [marketplace_id],
"reportOptions": {
"orderId": order_id,
"documentType": "Invoice",
},
}
try:
resp = await client.post(f"/reports/{REPORTS_VERSION}/reports", json=body)
if resp.status_code in (200, 202):
data = resp.json()
report_id = data.get("reportId")
logger.info(f"Amazon API: Report erstellt für {order_id}: {report_id}")
return report_id
else:
logger.warning(f"Amazon API: Report-Erstellung fehlgeschlagen für {order_id}: {resp.status_code} {resp.text}")
return None
except Exception as e:
logger.error(f"Amazon API: Report-Erstellung Fehler: {e}")
return None
async def _poll_report_status(client: httpx.AsyncClient, report_id: str, max_wait: int = 120) -> str | None:
"""Step 2: Poll report status until DONE. Returns reportDocumentId."""
for i in range(max_wait // 15 + 1):
try:
resp = await client.get(f"/reports/{REPORTS_VERSION}/reports/{report_id}")
if resp.status_code != 200:
logger.warning(f"Amazon API: Report-Status fehlgeschlagen: {resp.status_code}")
return None
data = resp.json()
status = data.get("processingStatus", "")
if status == "DONE":
doc_id = data.get("reportDocumentId")
logger.info(f"Amazon API: Report {report_id} fertig: documentId={doc_id}")
return doc_id
elif status in ("CANCELLED", "FATAL"):
logger.warning(f"Amazon API: Report {report_id} fehlgeschlagen: {status}")
return None
else:
logger.debug(f"Amazon API: Report {report_id} Status: {status}, warte...")
await asyncio.sleep(15)
except Exception as e:
logger.error(f"Amazon API: Report-Status Fehler: {e}")
return None
logger.warning(f"Amazon API: Report {report_id} Timeout nach {max_wait}s")
return None
async def _download_report_document(client: httpx.AsyncClient, document_id: str) -> bytes | None:
"""Step 3: Get presigned URL and download + decompress the PDF."""
try:
resp = await client.get(f"/reports/{REPORTS_VERSION}/documents/{document_id}")
if resp.status_code != 200:
logger.warning(f"Amazon API: Document-URL fehlgeschlagen: {resp.status_code}")
return None
data = resp.json()
url = data.get("url", "")
compression = data.get("compressionAlgorithm", "")
if not url:
logger.warning(f"Amazon API: Keine Download-URL für Document {document_id}")
return None
# Download the document (presigned S3 URL, expires in 5 min)
async with httpx.AsyncClient(timeout=60.0) as dl_client:
dl_resp = await dl_client.get(url)
if dl_resp.status_code != 200:
logger.warning(f"Amazon API: Document-Download fehlgeschlagen: {dl_resp.status_code}")
return None
content = dl_resp.content
# Decompress: EU documents are gzip-compressed, then the content is a zip file
if compression == "GZIP" or content[:2] == b'\x1f\x8b':
try:
content = gzip.decompress(content)
except Exception:
pass # might not be gzipped
# Check if it's a zip file containing the PDF
if content[:2] == b'PK':
try:
with zipfile.ZipFile(io.BytesIO(content)) as zf:
for name in zf.namelist():
if name.lower().endswith('.pdf'):
content = zf.read(name)
break
except Exception:
pass # might not be a zip
# Verify it's a PDF
if content[:4] == b'%PDF':
logger.info(f"Amazon API: PDF heruntergeladen: {len(content)} Bytes")
return content
else:
logger.warning(f"Amazon API: Heruntergeladenes Dokument ist kein PDF (starts: {content[:20]})")
return None
except Exception as e:
logger.error(f"Amazon API: Document-Download Fehler: {e}")
return None
async def download_invoice(settings: dict, order_id: str) -> bytes | None:
"""Download invoice PDF via Document API (3-step async process)."""
result = await _get_api_client(settings)
if not result:
return None
client, region = result
domain = settings.get("amazon_domain", "amazon.de")
marketplace_id = DOMAIN_MARKETPLACE.get(domain, DOMAIN_MARKETPLACE["amazon.de"])
try:
# Step 1: Create report
report_id = await _create_invoice_report(client, order_id, marketplace_id)
if not report_id:
return None
# Step 2: Poll until done
document_id = await _poll_report_status(client, report_id)
if not document_id:
return None
# Step 3: Download document
return await _download_report_document(client, document_id)
except Exception as e:
logger.error(f"Amazon API: Invoice-Download-Fehler für {order_id}: {e}")
return None
finally:
await client.aclose()
async def process_amazon_api() -> dict:
"""Process Amazon invoices via API (Reconciliation + Document API)."""
settings = await get_settings()
if settings.get("amazon_enabled") != "true":
return {"processed": 0, "skipped": 0, "errors": 0}
# Check API credentials
status = await check_api_configured()
if not status.get("authorized"):
error_msg = status.get("error", "API nicht konfiguriert")
logger.warning(f"Amazon API: {error_msg}")
return {"processed": 0, "skipped": 0, "errors": 0, "error": error_msg}
domain = settings.get("amazon_domain", "amazon.de")
# Determine date range
since_str = settings.get("amazon_since_date", "")
if since_str:
try:
since_date = datetime.strptime(since_str, "%Y-%m-%d")
except ValueError:
since_date = datetime.now() - timedelta(days=30)
else:
since_date = datetime.now() - timedelta(days=30)
logger.info(f"Amazon API: Import gestartet: domain={domain}, seit={since_date.strftime('%Y-%m-%d')}")
# Connect SMTP
import_email = settings.get("import_email_eingang") or settings.get("import_email", "")
if not import_email:
error_msg = "Keine Import-Email für Eingangsbelege konfiguriert"
logger.error(f"Amazon API: {error_msg}")
await add_log_entry("Amazon-Import", f"Amazon ({domain})", 0, "error", error_msg, beleg_type="eingang")
return {"processed": 0, "skipped": 0, "errors": 1, "error": error_msg}
smtp = _connect_smtp(settings)
if not smtp:
error_msg = "SMTP-Verbindung fehlgeschlagen"
logger.error(f"Amazon API: {error_msg}")
await add_log_entry("Amazon-Import", f"Amazon ({domain})", 0, "error", error_msg, beleg_type="eingang")
return {"processed": 0, "skipped": 0, "errors": 1, "error": error_msg}
processed = 0
skipped = 0
errors = 0
try:
# Get transactions via Reconciliation API
transactions = await get_transactions(settings, since_date)
if not transactions:
logger.info("Amazon API: Keine Transaktionen gefunden")
await save_settings({"amazon_last_sync": datetime.now().strftime("%Y-%m-%d %H:%M")})
await add_log_entry(
"Amazon-Import (API)", f"Amazon ({domain})", 0,
"success", "Keine neuen Rechnungen gefunden", beleg_type="eingang",
)
smtp.quit()
return {"processed": 0, "skipped": 0, "errors": 0}
# Extract unique orders with their line items
orders = {}
for txn in transactions:
line_items = txn.get("transactionLineItems", [])
for item in line_items:
oid = item.get("orderId", "")
if oid and oid not in orders:
orders[oid] = {
"orderId": oid,
"invoiceNumber": txn.get("invoiceNumber", ""),
"transactionDate": txn.get("transactionDate", ""),
}
# Fallback: if no line items, use transaction-level orderId
if not line_items:
oid = txn.get("orderId", "")
if oid and oid not in orders:
orders[oid] = {
"orderId": oid,
"invoiceNumber": txn.get("invoiceNumber", ""),
"transactionDate": txn.get("transactionDate", ""),
}
logger.info(f"Amazon API: {len(orders)} eindeutige Bestellungen gefunden")
for oid, order_info in orders.items():
# Check if already downloaded
if await is_invoice_downloaded(oid, oid):
skipped += 1
continue
# Download invoice PDF
pdf_data = await download_invoice(settings, oid)
if pdf_data:
# Save debug copy if enabled
if settings.get("debug_save_amazon_pdfs") == "true":
debug_dir = Path("/data/uploads") / "amazon_invoices"
debug_dir.mkdir(parents=True, exist_ok=True)
debug_path = debug_dir / f"Amazon_Rechnung_{oid}.pdf"
debug_path.write_bytes(pdf_data)
logger.info(f"Amazon API: Debug-PDF gespeichert: {debug_path}")
# Send via SMTP
filename = f"Amazon_Rechnung_{oid}.pdf"
subject = f"Amazon Rechnung - {oid}"
from_addr = settings.get("smtp_username", "belegimport@local")
msg = _build_forward_email(
from_addr=from_addr,
to_addr=import_email,
original_subject=subject,
original_from=f"Amazon ({domain})",
attachments=[(filename, pdf_data)],
)
smtp_log = _send_with_log(smtp, msg)
await add_log_entry(
subject, f"Amazon ({domain})", 1,
"success", "", import_email, smtp_log, beleg_type="eingang",
)
await mark_invoice_downloaded(oid, oid)
processed += 1
logger.info(f"Amazon API: Rechnung für {oid} gesendet")
else:
# No invoice available for this order
await mark_invoice_downloaded(oid, oid)
skipped += 1
logger.debug(f"Amazon API: Keine Rechnung für {oid}")
except Exception as e:
logger.error(f"Amazon API: Import-Fehler: {e}", exc_info=True)
errors += 1
await add_log_entry(
"Amazon-Import (API)", f"Amazon ({domain})", 0,
"error", str(e), beleg_type="eingang",
)
finally:
try:
smtp.quit()
except Exception:
pass
await save_settings({"amazon_last_sync": datetime.now().strftime("%Y-%m-%d %H:%M")})
if processed > 0 or errors > 0:
summary = f"{processed} verarbeitet, {skipped} übersprungen, {errors} Fehler"
await add_log_entry(
"Amazon-Import (API, Zusammenfassung)", f"Amazon ({domain})", processed,
"success" if errors == 0 else "warning", summary, beleg_type="eingang",
)
logger.info(f"Amazon API: Import fertig: {processed} verarbeitet, {skipped} übersprungen, {errors} Fehler")
return {"processed": processed, "skipped": skipped, "errors": errors}