"""Nextcloud Provisioning API (OCS) Client.""" from typing import Optional import requests import urllib3 class NextcloudError(Exception): pass class NextcloudClient: def __init__(self, host: str, *, admin_user: str, admin_password: str, verify: bool = True, scheme: str = "https"): if not host: raise NextcloudError("Nextcloud-Host ist leer") self.base = f"{scheme}://{host}" self.session = requests.Session() self.session.verify = verify if not verify: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.session.auth = (admin_user, admin_password) self.session.headers["OCS-APIRequest"] = "true" self.session.headers["Accept"] = "application/json" def _meta(self, r: requests.Response, op: str) -> dict: if not r.ok and r.status_code not in (400, 401, 403, 404): raise NextcloudError(f"Nextcloud {op} HTTP {r.status_code}: {r.text[:300]}") try: data = r.json() except ValueError: raise NextcloudError(f"Nextcloud {op}: ungültige Antwort: {r.text[:300]}") return data.get("ocs", {}) def user_exists(self, userid: str) -> bool: r = self.session.get(f"{self.base}/ocs/v2.php/cloud/users/{userid}") ocs = self._meta(r, "user-lookup") sc = ocs.get("meta", {}).get("statuscode") return sc in (100, 200) def ensure_group(self, group: str) -> None: if not group: return r = self.session.post( f"{self.base}/ocs/v2.php/cloud/groups", data={"groupid": group}, ) ocs = self._meta(r, "group-create") sc = ocs.get("meta", {}).get("statuscode") # 100/200 = ok, 102 = exists already if sc not in (100, 200, 102): raise NextcloudError( f"Nextcloud Gruppe '{group}': {sc} {ocs.get('meta', {}).get('message')}" ) def create_user(self, *, userid: str, password: str, email: Optional[str] = None, display_name: Optional[str] = None, group: Optional[str] = None, quota_gb: Optional[int] = None) -> None: body = [("userid", userid), ("password", password)] if email: body.append(("email", email)) if display_name: body.append(("displayName", display_name)) if group: body.append(("groups[]", group)) body.append(("quota", f"{quota_gb} GB" if quota_gb else "none")) r = self.session.post( f"{self.base}/ocs/v2.php/cloud/users", data=body, ) ocs = self._meta(r, "user-create") sc = ocs.get("meta", {}).get("statuscode") if sc not in (100, 200): raise NextcloudError( f"Nextcloud user-create: {sc} {ocs.get('meta', {}).get('message')}" )