Files
deploy-email-plesk-kerio-ne…/clients/kerio.py
T
duffyduck 06d7e00e49 fix(kerio): korrekte Admin-API gemäß Delivery.idl + Pop3Account-Doku
- Methoden: Delivery.getPop3AccountList / addPop3AccountList /
  setPop3Account (vorher geraten als Pop3Accounts.set/.create →
  Method not found).
- Pop3Account-Felder mit den richtigen Namen: isActive (statt enabled),
  mode (statt sslMode), authentication (statt authType), und
  leaveOnServer.removeAfterPeriod als OptionalLong-Wrapper.
  Falsche Namen wurden von Kerio still ignoriert → Sammler war inaktiv.
- User-Struct: allowPasswordChange=false (statt mayChangePassword,
  das es nicht gibt). emailAddresses weggelassen, Kerio leitet die
  primäre Adresse aus loginName+domain ab.
- Kerio-Step in 2 Sub-Steps aufgeteilt: User (skip wenn vorhanden) +
  POP3 (upsert). Damit wird bei einem zweiten Lauf der Sammler nicht
  übersprungen, nur weil der User schon existiert.
- POP3-Sammler ist jetzt UPSERT: existierende werden via setPop3Account
  überschrieben → Selbstreparatur kaputter Einträge + Passwort-
  Änderungen aus der CSV ziehen sich von selbst nach.

GUI: 👁/🙈-Toggle pro Passwort-Feld (Klartext temporär einsehbar).

Filenames der Sammel-PDFs + Admin-Report ohne Zeitstempel –
erneuter Lauf überschreibt statt anzuhäufen.

README: Ablauf-Sektion + Idempotenz-Tabelle aktualisiert; Kerio-
Caveat ersetzt durch konkrete Methoden-/Feld-Liste mit Doku-Link.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 14:09:06 +02:00

281 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Kerio Connect Admin API Client (JSON-RPC).
Endpoint: POST https://<host>:4040/admin/api/jsonrpc/
Login liefert ein Token, das via X-Token-Header bei jedem Folgeaufruf gesetzt wird.
Hinweis: die exakten Methodennamen (Pop3Accounts.create vs RemotePop3.set etc.)
können sich zwischen Kerio Connect Versionen leicht unterscheiden. Die hier
verwendeten Namen entsprechen Kerio Connect 9.x. Bei Fehlern wirft der Client
die Originalmeldung mit Methodenname so siehst du sofort, was anzupassen ist.
"""
from typing import Optional
import requests
import urllib3
class KerioError(Exception):
pass
class KerioClient:
def __init__(self, host: str, *, user: str, password: str,
port: int = 4040, verify: bool = True):
if not host:
raise KerioError("Kerio-Host ist leer")
self.base = f"https://{host}:{port}"
self.session = requests.Session()
self.session.verify = verify
if not verify:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.token: Optional[str] = None
self._req_id = 0
self._login(user, password)
def _next_id(self) -> int:
self._req_id += 1
return self._req_id
def _call(self, method: str, params: Optional[dict] = None) -> dict:
payload = {
"jsonrpc": "2.0",
"id": self._next_id(),
"method": method,
"params": params or {},
}
headers = {"Content-Type": "application/json"}
if self.token:
headers["X-Token"] = self.token
try:
r = self.session.post(
f"{self.base}/admin/api/jsonrpc/",
json=payload, headers=headers, timeout=30,
)
except requests.RequestException as e:
raise KerioError(f"Kerio Verbindung fehlgeschlagen: {e}")
if not r.ok:
raise KerioError(f"Kerio HTTP {r.status_code} bei {method}: {r.text[:300]}")
try:
data = r.json()
except ValueError:
raise KerioError(f"Kerio: ungültige Antwort: {r.text[:300]}")
if "error" in data:
err = data["error"]
raise KerioError(
f"Kerio {method}{err.get('code')} {err.get('message')} "
f"({err.get('data', {})})"
)
return data.get("result") or {}
def _login(self, user: str, password: str) -> None:
result = self._call("Session.login", {
"userName": user,
"password": password,
"application": {
"name": "deploy-email-plesk-kerio-nextcloud",
"vendor": "intern",
"version": "1.0",
},
})
self.token = result.get("token")
if not self.token:
raise KerioError("Kerio: Login lieferte kein Token")
def logout(self) -> None:
try:
self._call("Session.logout")
except Exception:
pass
# ----- Domains / Users -----
def get_domain_id(self, domain_name: str) -> str:
result = self._call("Domains.get", {
"query": {
"fields": ["id", "name"],
"conditions": [{
"fieldName": "name",
"comparator": "Eq",
"value": domain_name,
}],
"combining": "And",
"start": 0,
"limit": 50,
"orderBy": [],
},
})
items = result.get("list", [])
if not items:
raise KerioError(f"Kerio: Domain '{domain_name}' nicht in Kerio konfiguriert")
return items[0]["id"]
def find_user_id(self, email: str) -> Optional[str]:
"""User-ID aus E-Mail finden, oder None wenn nicht vorhanden."""
local, domain = email.split("@", 1)
try:
domain_id = self.get_domain_id(domain)
except KerioError:
return None
result = self._call("Users.get", {
"query": {
"fields": ["id", "loginName"],
"conditions": [{
"fieldName": "loginName",
"comparator": "Eq",
"value": local,
}],
"combining": "And",
"start": 0,
"limit": 5,
"orderBy": [],
},
"domainId": domain_id,
})
items = result.get("list") or []
if not items:
return None
return items[0].get("id")
def user_exists(self, email: str) -> bool:
return self.find_user_id(email) is not None
# ----- POP3 Download Accounts -----
# Korrekte API laut Kerio Connect Doku (Delivery.idl):
# Delivery.getPop3AccountList() → {"list": [Pop3Account, …]}
# Delivery.addPop3AccountList(accs) → {"errors": [...]}
# Delivery.removePop3AccountList(ids)
# Delivery.setPop3Account(id, acc)
# Delivery.runPop3Downloads()
def _pop3_get_all(self) -> list:
# SearchQuery ist Pflicht-Input ohne den liefert Kerio eine leere Liste
# (still!), und unser Existenz-Check würde immer "nicht vorhanden" sagen.
got = self._call("Delivery.getPop3AccountList", {
"query": {
"fields": [],
"start": 0,
"limit": 100000,
"orderBy": [],
"conditions": [],
"combining": "And",
},
})
return got.get("list") or []
def find_pop3_collector(self, *, deliver_to_email: str,
server: str, login_name: str) -> Optional[dict]:
"""Sucht einen Sammler über (deliveryAddress, server, userName).
Gibt das Pop3Account-Dict zurück (inkl. id) oder None."""
try:
accounts = self._pop3_get_all()
except KerioError:
return None
server_l = (server or "").lower()
login_l = (login_name or "").lower()
deliver_l = (deliver_to_email or "").lower()
for acc in accounts:
if ((acc.get("deliveryAddress") or "").lower() == deliver_l
and (acc.get("server") or "").lower() == server_l
and (acc.get("userName") or "").lower() == login_l):
return acc
return None
def pop3_collector_exists(self, **kw) -> bool:
return self.find_pop3_collector(**kw) is not None
def create_user(self, email: str, password: str, full_name: str) -> str:
local, domain = email.split("@", 1)
domain_id = self.get_domain_id(domain)
# Minimal-Set:
# - authType/role weggelassen (versions-spezifische Enums, Default reicht)
# - emailAddresses weggelassen: das ist die Alias-Liste in Kerio.
# Die primäre Adresse leitet Kerio automatisch aus loginName@domain
# ab, also `local@domain`.
# - allowPasswordChange=false: User kann sein Passwort NICHT selbst
# ändern (Feldname laut User-Struct in der Kerio-Doku).
user_def = {
"loginName": local,
"fullName": full_name,
"domainId": domain_id,
"password": password,
"isEnabled": True,
"allowPasswordChange": False,
}
result = self._call("Users.create", {"users": [user_def]})
errors = result.get("errors") or []
if errors:
raise KerioError(f"Kerio Users.create errors: {errors}")
items = result.get("result") or []
if not items:
raise KerioError("Kerio Users.create lieferte keinen Datensatz zurück")
return items[0].get("id")
# ----- POP3 Sammler -----
def upsert_pop3_collection(self, *, deliver_to_email: str,
server: str, login_name: str, password: str,
port: int = 995, ssl: bool = True,
leave_days: int = 14) -> str:
"""Legt einen POP3-Sammler an oder aktualisiert ihn (Idempotent).
Returns: "angelegt" | "aktualisiert"
"""
existing = self.find_pop3_collector(
deliver_to_email=deliver_to_email,
server=server, login_name=login_name,
)
new_account = self._build_pop3_account(
deliver_to_email=deliver_to_email, server=server,
login_name=login_name, password=password,
port=port, ssl=ssl, leave_days=leave_days,
)
if existing:
account_id = existing.get("id")
if not account_id:
raise KerioError("Kerio: vorhandener Sammler ohne id zurückgegeben")
new_account["id"] = account_id
self._call("Delivery.setPop3Account",
{"accountId": account_id, "account": new_account})
return "aktualisiert"
self._add_pop3_account_list([new_account])
return "angelegt"
def _add_pop3_account_list(self, accounts: list) -> None:
result = self._call("Delivery.addPop3AccountList",
{"accounts": accounts})
errors = result.get("errors") or []
if errors:
raise KerioError(f"Kerio Delivery.addPop3AccountList errors: {errors}")
def _build_pop3_account(self, *, deliver_to_email: str,
server: str, login_name: str, password: str,
port: int, ssl: bool, leave_days: int) -> dict:
# Pop3Account-Felder, exakt wie in der Kerio-Doku
# (struct Pop3Account + LeaveOnServer):
# isActive (bool, NICHT "enabled"!) sonst defaultet auf inaktiv
# server, port, userName, password, deliveryAddress
# mode: SslMode = NoSsl | SpecialPort | StlsCommand
# authentication: Pop3Authentication = PlainPop3 | Apop
# leaveOnServer { enabled, removeAfterPeriod: OptionalLong{enabled,value} }
new_account = {
"isActive": True,
"server": server,
"port": port,
"mode": "SpecialPort" if ssl else "NoSsl",
"authentication": "PlainPop3",
"userName": login_name,
"password": password,
"deliveryAddress": deliver_to_email,
"leaveOnServer": {
"enabled": True,
"removeAfterPeriod": {
"enabled": True,
"value": leave_days,
},
},
}
return new_account
# Backwards-kompatibler Alias
def add_pop3_collection(self, **kw) -> None:
self.upsert_pop3_collection(**kw)