"""Kerio Connect Admin API Client (JSON-RPC). Endpoint: POST https://:4040/admin/api/jsonrpc/ Login liefert ein Token, das via X-Token-Header bei jedem Folgeaufruf gesetzt wird. Hinweis: die exakten Methodennamen (Pop3Accounts.create vs RemotePop3.set etc.) können sich zwischen Kerio Connect Versionen leicht unterscheiden. Die hier verwendeten Namen entsprechen Kerio Connect 9.x. Bei Fehlern wirft der Client die Originalmeldung mit Methodenname – so siehst du sofort, was anzupassen ist. """ from typing import Optional import requests import urllib3 class KerioError(Exception): pass class KerioClient: def __init__(self, host: str, *, user: str, password: str, port: int = 4040, verify: bool = True): if not host: raise KerioError("Kerio-Host ist leer") self.base = f"https://{host}:{port}" self.session = requests.Session() self.session.verify = verify if not verify: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.token: Optional[str] = None self._req_id = 0 self._login(user, password) def _next_id(self) -> int: self._req_id += 1 return self._req_id def _call(self, method: str, params: Optional[dict] = None) -> dict: payload = { "jsonrpc": "2.0", "id": self._next_id(), "method": method, "params": params or {}, } headers = {"Content-Type": "application/json"} if self.token: headers["X-Token"] = self.token try: r = self.session.post( f"{self.base}/admin/api/jsonrpc/", json=payload, headers=headers, timeout=30, ) except requests.RequestException as e: raise KerioError(f"Kerio Verbindung fehlgeschlagen: {e}") if not r.ok: raise KerioError(f"Kerio HTTP {r.status_code} bei {method}: {r.text[:300]}") try: data = r.json() except ValueError: raise KerioError(f"Kerio: ungültige Antwort: {r.text[:300]}") if "error" in data: err = data["error"] raise KerioError( f"Kerio {method} → {err.get('code')} {err.get('message')} " f"({err.get('data', {})})" ) return data.get("result") or {} def _login(self, user: str, password: str) -> None: result = self._call("Session.login", { "userName": user, "password": password, "application": { "name": "deploy-email-plesk-kerio-nextcloud", "vendor": "intern", "version": "1.0", }, }) self.token = result.get("token") if not self.token: raise KerioError("Kerio: Login lieferte kein Token") def logout(self) -> None: try: self._call("Session.logout") except Exception: pass # ----- Domains / Users ----- def get_domain_id(self, domain_name: str) -> str: result = self._call("Domains.get", { "query": { "fields": ["id", "name"], "conditions": [{ "fieldName": "name", "comparator": "Eq", "value": domain_name, }], "combining": "And", "start": 0, "limit": 50, "orderBy": [], }, }) items = result.get("list", []) if not items: raise KerioError(f"Kerio: Domain '{domain_name}' nicht in Kerio konfiguriert") return items[0]["id"] def find_user_id(self, email: str) -> Optional[str]: """User-ID aus E-Mail finden, oder None wenn nicht vorhanden.""" local, domain = email.split("@", 1) try: domain_id = self.get_domain_id(domain) except KerioError: return None result = self._call("Users.get", { "query": { "fields": ["id", "loginName"], "conditions": [{ "fieldName": "loginName", "comparator": "Eq", "value": local, }], "combining": "And", "start": 0, "limit": 5, "orderBy": [], }, "domainId": domain_id, }) items = result.get("list") or [] if not items: return None return items[0].get("id") def user_exists(self, email: str) -> bool: return self.find_user_id(email) is not None # ----- POP3 Download Accounts ----- # Korrekte API laut Kerio Connect Doku (Delivery.idl): # Delivery.getPop3AccountList() → {"list": [Pop3Account, …]} # Delivery.addPop3AccountList(accs) → {"errors": [...]} # Delivery.removePop3AccountList(ids) # Delivery.setPop3Account(id, acc) # Delivery.runPop3Downloads() def _pop3_get_all(self) -> list: # SearchQuery ist Pflicht-Input – ohne den liefert Kerio eine leere Liste # (still!), und unser Existenz-Check würde immer "nicht vorhanden" sagen. got = self._call("Delivery.getPop3AccountList", { "query": { "fields": [], "start": 0, "limit": 100000, "orderBy": [], "conditions": [], "combining": "And", }, }) return got.get("list") or [] def find_pop3_collector(self, *, deliver_to_email: str, server: str, login_name: str) -> Optional[dict]: """Sucht einen Sammler über (deliveryAddress, server, userName). Gibt das Pop3Account-Dict zurück (inkl. id) oder None.""" try: accounts = self._pop3_get_all() except KerioError: return None server_l = (server or "").lower() login_l = (login_name or "").lower() deliver_l = (deliver_to_email or "").lower() for acc in accounts: if ((acc.get("deliveryAddress") or "").lower() == deliver_l and (acc.get("server") or "").lower() == server_l and (acc.get("userName") or "").lower() == login_l): return acc return None def pop3_collector_exists(self, **kw) -> bool: return self.find_pop3_collector(**kw) is not None def create_user(self, email: str, password: str, full_name: str) -> str: local, domain = email.split("@", 1) domain_id = self.get_domain_id(domain) # Minimal-Set: # - authType/role weggelassen (versions-spezifische Enums, Default reicht) # - emailAddresses weggelassen: das ist die Alias-Liste in Kerio. # Die primäre Adresse leitet Kerio automatisch aus loginName@domain # ab, also `local@domain`. # - allowPasswordChange=false: User kann sein Passwort NICHT selbst # ändern (Feldname laut User-Struct in der Kerio-Doku). user_def = { "loginName": local, "fullName": full_name, "domainId": domain_id, "password": password, "isEnabled": True, "allowPasswordChange": False, } result = self._call("Users.create", {"users": [user_def]}) errors = result.get("errors") or [] if errors: raise KerioError(f"Kerio Users.create errors: {errors}") items = result.get("result") or [] if not items: raise KerioError("Kerio Users.create lieferte keinen Datensatz zurück") return items[0].get("id") # ----- POP3 Sammler ----- def upsert_pop3_collection(self, *, deliver_to_email: str, server: str, login_name: str, password: str, port: int = 995, ssl: bool = True, leave_days: int = 14) -> str: """Legt einen POP3-Sammler an oder aktualisiert ihn (Idempotent). Returns: "angelegt" | "aktualisiert" """ existing = self.find_pop3_collector( deliver_to_email=deliver_to_email, server=server, login_name=login_name, ) new_account = self._build_pop3_account( deliver_to_email=deliver_to_email, server=server, login_name=login_name, password=password, port=port, ssl=ssl, leave_days=leave_days, ) if existing: account_id = existing.get("id") if not account_id: raise KerioError("Kerio: vorhandener Sammler ohne id zurückgegeben") new_account["id"] = account_id self._call("Delivery.setPop3Account", {"accountId": account_id, "account": new_account}) return "aktualisiert" self._add_pop3_account_list([new_account]) return "angelegt" def _add_pop3_account_list(self, accounts: list) -> None: result = self._call("Delivery.addPop3AccountList", {"accounts": accounts}) errors = result.get("errors") or [] if errors: raise KerioError(f"Kerio Delivery.addPop3AccountList errors: {errors}") def _build_pop3_account(self, *, deliver_to_email: str, server: str, login_name: str, password: str, port: int, ssl: bool, leave_days: int) -> dict: # Pop3Account-Felder, exakt wie in der Kerio-Doku # (struct Pop3Account + LeaveOnServer): # isActive (bool, NICHT "enabled"!) – sonst defaultet auf inaktiv # server, port, userName, password, deliveryAddress # mode: SslMode = NoSsl | SpecialPort | StlsCommand # authentication: Pop3Authentication = PlainPop3 | Apop # leaveOnServer { enabled, removeAfterPeriod: OptionalLong{enabled,value} } new_account = { "isActive": True, "server": server, "port": port, "mode": "SpecialPort" if ssl else "NoSsl", "authentication": "PlainPop3", "userName": login_name, "password": password, "deliveryAddress": deliver_to_email, "leaveOnServer": { "enabled": True, "removeAfterPeriod": { "enabled": True, "value": leave_days, }, }, } return new_account # Backwards-kompatibler Alias def add_pop3_collection(self, **kw) -> None: self.upsert_pop3_collection(**kw)