first release

This commit is contained in:
2026-05-12 11:04:17 +02:00
parent d49cebf616
commit e8ef2081ed
13 changed files with 1403 additions and 0 deletions
+181
View File
@@ -0,0 +1,181 @@
"""Kerio Connect Admin API Client (JSON-RPC).
Endpoint: POST https://<host>:4040/admin/api/jsonrpc/
Login liefert ein Token, das via X-Token-Header bei jedem Folgeaufruf gesetzt wird.
Hinweis: die exakten Methodennamen (Pop3Accounts.create vs RemotePop3.set etc.)
können sich zwischen Kerio Connect Versionen leicht unterscheiden. Die hier
verwendeten Namen entsprechen Kerio Connect 9.x. Bei Fehlern wirft der Client
die Originalmeldung mit Methodenname so siehst du sofort, was anzupassen ist.
"""
from typing import Optional
import requests
import urllib3
class KerioError(Exception):
pass
class KerioClient:
def __init__(self, host: str, *, user: str, password: str,
port: int = 4040, verify: bool = True):
if not host:
raise KerioError("Kerio-Host ist leer")
self.base = f"https://{host}:{port}"
self.session = requests.Session()
self.session.verify = verify
if not verify:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.token: Optional[str] = None
self._req_id = 0
self._login(user, password)
def _next_id(self) -> int:
self._req_id += 1
return self._req_id
def _call(self, method: str, params: Optional[dict] = None) -> dict:
payload = {
"jsonrpc": "2.0",
"id": self._next_id(),
"method": method,
"params": params or {},
}
headers = {"Content-Type": "application/json"}
if self.token:
headers["X-Token"] = self.token
try:
r = self.session.post(
f"{self.base}/admin/api/jsonrpc/",
json=payload, headers=headers, timeout=30,
)
except requests.RequestException as e:
raise KerioError(f"Kerio Verbindung fehlgeschlagen: {e}")
if not r.ok:
raise KerioError(f"Kerio HTTP {r.status_code} bei {method}: {r.text[:300]}")
try:
data = r.json()
except ValueError:
raise KerioError(f"Kerio: ungültige Antwort: {r.text[:300]}")
if "error" in data:
err = data["error"]
raise KerioError(
f"Kerio {method}{err.get('code')} {err.get('message')} "
f"({err.get('data', {})})"
)
return data.get("result") or {}
def _login(self, user: str, password: str) -> None:
result = self._call("Session.login", {
"userName": user,
"password": password,
"application": {
"name": "deploy-email-plesk-kerio-nextcloud",
"vendor": "intern",
"version": "1.0",
},
})
self.token = result.get("token")
if not self.token:
raise KerioError("Kerio: Login lieferte kein Token")
def logout(self) -> None:
try:
self._call("Session.logout")
except Exception:
pass
# ----- Domains / Users -----
def get_domain_id(self, domain_name: str) -> str:
result = self._call("Domains.get", {
"query": {
"fields": ["id", "name"],
"conditions": [{
"fieldName": "name",
"comparator": "Eq",
"value": domain_name,
}],
"combining": "And",
"start": 0,
"limit": 50,
"orderBy": [],
},
})
items = result.get("list", [])
if not items:
raise KerioError(f"Kerio: Domain '{domain_name}' nicht in Kerio konfiguriert")
return items[0]["id"]
def user_exists(self, email: str) -> bool:
local, domain = email.split("@", 1)
try:
domain_id = self.get_domain_id(domain)
except KerioError:
return False
result = self._call("Users.get", {
"query": {
"fields": ["id", "loginName"],
"conditions": [{
"fieldName": "loginName",
"comparator": "Eq",
"value": local,
}],
"combining": "And",
"start": 0,
"limit": 5,
"orderBy": [],
},
"domainId": domain_id,
})
return bool(result.get("list"))
def create_user(self, email: str, password: str, full_name: str) -> str:
local, domain = email.split("@", 1)
domain_id = self.get_domain_id(domain)
user_def = {
"loginName": local,
"fullName": full_name,
"domainId": domain_id,
"password": password,
"authType": "Internal",
"isEnabled": True,
"role": "UserRole",
"emailAddresses": [email],
# User darf sein Passwort NICHT selbst ändern
"mayChangePassword": False,
"forceChangePassword": False,
}
result = self._call("Users.create", {"users": [user_def]})
errors = result.get("errors") or []
if errors:
raise KerioError(f"Kerio Users.create errors: {errors}")
items = result.get("result") or []
if not items:
raise KerioError("Kerio Users.create lieferte keinen Datensatz zurück")
return items[0].get("id")
# ----- POP3 Sammler -----
def add_pop3_collection(self, *, kerio_user_id: str,
server: str, login_name: str, password: str,
port: int = 465, ssl: bool = True,
leave_days: int = 14) -> None:
account = {
"enabled": True,
"deliverTo": kerio_user_id,
"server": server,
"loginName": login_name,
"password": password,
"port": port,
"ssl": ssl,
"useSpecificPort": True,
"leaveOnServer": True,
"deleteOnServer": True,
"deleteOnServerDays": leave_days,
}
result = self._call("Pop3Accounts.create", {"accounts": [account]})
errors = result.get("errors") or []
if errors:
raise KerioError(f"Kerio Pop3Accounts.create errors: {errors}")