79 lines
2.9 KiB
Python
79 lines
2.9 KiB
Python
"""Nextcloud Provisioning API (OCS) Client."""
|
|
from typing import Optional
|
|
import requests
|
|
import urllib3
|
|
|
|
|
|
class NextcloudError(Exception):
|
|
pass
|
|
|
|
|
|
class NextcloudClient:
|
|
def __init__(self, host: str, *, admin_user: str, admin_password: str,
|
|
verify: bool = True, scheme: str = "https"):
|
|
if not host:
|
|
raise NextcloudError("Nextcloud-Host ist leer")
|
|
self.base = f"{scheme}://{host}"
|
|
self.session = requests.Session()
|
|
self.session.verify = verify
|
|
if not verify:
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
self.session.auth = (admin_user, admin_password)
|
|
self.session.headers["OCS-APIRequest"] = "true"
|
|
self.session.headers["Accept"] = "application/json"
|
|
|
|
def _meta(self, r: requests.Response, op: str) -> dict:
|
|
if not r.ok and r.status_code not in (400, 401, 403, 404):
|
|
raise NextcloudError(f"Nextcloud {op} HTTP {r.status_code}: {r.text[:300]}")
|
|
try:
|
|
data = r.json()
|
|
except ValueError:
|
|
raise NextcloudError(f"Nextcloud {op}: ungültige Antwort: {r.text[:300]}")
|
|
return data.get("ocs", {})
|
|
|
|
def user_exists(self, userid: str) -> bool:
|
|
r = self.session.get(f"{self.base}/ocs/v2.php/cloud/users/{userid}")
|
|
ocs = self._meta(r, "user-lookup")
|
|
sc = ocs.get("meta", {}).get("statuscode")
|
|
return sc in (100, 200)
|
|
|
|
def ensure_group(self, group: str) -> None:
|
|
if not group:
|
|
return
|
|
r = self.session.post(
|
|
f"{self.base}/ocs/v2.php/cloud/groups",
|
|
data={"groupid": group},
|
|
)
|
|
ocs = self._meta(r, "group-create")
|
|
sc = ocs.get("meta", {}).get("statuscode")
|
|
# 100/200 = ok, 102 = exists already
|
|
if sc not in (100, 200, 102):
|
|
raise NextcloudError(
|
|
f"Nextcloud Gruppe '{group}': {sc} {ocs.get('meta', {}).get('message')}"
|
|
)
|
|
|
|
def create_user(self, *, userid: str, password: str,
|
|
email: Optional[str] = None,
|
|
display_name: Optional[str] = None,
|
|
group: Optional[str] = None,
|
|
quota_gb: Optional[int] = None) -> None:
|
|
body = [("userid", userid), ("password", password)]
|
|
if email:
|
|
body.append(("email", email))
|
|
if display_name:
|
|
body.append(("displayName", display_name))
|
|
if group:
|
|
body.append(("groups[]", group))
|
|
body.append(("quota", f"{quota_gb} GB" if quota_gb else "none"))
|
|
|
|
r = self.session.post(
|
|
f"{self.base}/ocs/v2.php/cloud/users",
|
|
data=body,
|
|
)
|
|
ocs = self._meta(r, "user-create")
|
|
sc = ocs.get("meta", {}).get("statuscode")
|
|
if sc not in (100, 200):
|
|
raise NextcloudError(
|
|
f"Nextcloud user-create: {sc} {ocs.get('meta', {}).get('message')}"
|
|
)
|