Files
deploy-email-plesk-kerio-ne…/clients/plesk_ssh.py
T
2026-05-12 11:04:17 +02:00

113 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Plesk SSH-Backend.
Wird genutzt, wenn die Plesk-Instanz keinen REST-API-Zugriff erlaubt.
Verbindet sich per SSH zum Plesk-Server und ruft `plesk bin mail …` auf.
Funktioniert auch ohne API-Key nur SSH-Login (Key oder Passwort) nötig.
"""
from __future__ import annotations
import shlex
from typing import Optional
import paramiko
class PleskSshError(Exception):
pass
class PleskSshClient:
def __init__(self, host: str, *, ssh_port: int = 22,
ssh_user: str, ssh_password: Optional[str] = None,
ssh_key: Optional[str] = None,
ssh_key_passphrase: Optional[str] = None,
use_sudo: bool = False):
if not host:
raise PleskSshError("Plesk-Host ist leer")
if not ssh_user:
raise PleskSshError("PLESK_SSH_USER ist nicht gesetzt")
if not (ssh_password or ssh_key):
raise PleskSshError(
"Weder PLESK_SSH_PASSWORD noch PLESK_SSH_KEY gesetzt"
)
self.host = host
self.use_sudo = use_sudo
self.client = paramiko.SSHClient()
self.client.load_system_host_keys()
# AutoAdd: praktisch beim erstmaligen Lauf. Wer's strenger will,
# füllt vorher ~/.ssh/known_hosts und setzt RejectPolicy hier.
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
kwargs = {
"hostname": host, "port": ssh_port, "username": ssh_user,
"timeout": 15, "allow_agent": True, "look_for_keys": True,
}
if ssh_key:
kwargs["pkey"] = self._load_key(ssh_key, ssh_key_passphrase or None)
elif ssh_password:
kwargs["password"] = ssh_password
kwargs["allow_agent"] = False
kwargs["look_for_keys"] = False
try:
self.client.connect(**kwargs)
except paramiko.AuthenticationException as e:
raise PleskSshError(f"SSH-Login {ssh_user}@{host}:{ssh_port} abgelehnt: {e}") from e
except Exception as e:
raise PleskSshError(f"SSH-Verbindung {ssh_user}@{host}:{ssh_port}: {e}") from e
@staticmethod
def _load_key(path: str, passphrase: Optional[str]):
last_err = None
for KeyClass in (paramiko.Ed25519Key, paramiko.RSAKey, paramiko.ECDSAKey):
try:
return KeyClass.from_private_key_file(path, password=passphrase)
except paramiko.SSHException as e:
last_err = e
continue
raise PleskSshError(f"SSH-Key {path} konnte nicht geladen werden: {last_err}")
def close(self) -> None:
try:
self.client.close()
except Exception:
pass
def _run_mail(self, args: list) -> tuple:
cmd = ["plesk", "bin", "mail", *args]
if self.use_sudo:
cmd = ["sudo", "-n", *cmd]
cmd_str = " ".join(shlex.quote(c) for c in cmd)
try:
_stdin, stdout, stderr = self.client.exec_command(cmd_str, timeout=60)
except Exception as e:
raise PleskSshError(f"SSH exec fehlgeschlagen: {e}") from e
rc = stdout.channel.recv_exit_status()
out = stdout.read().decode("utf-8", "replace")
err = stderr.read().decode("utf-8", "replace")
return rc, out, err
def mail_exists(self, email: str) -> bool:
rc, out, err = self._run_mail(["--info", email])
if rc == 0:
return True
msg = (err + " " + out).lower()
if any(s in msg for s in (
"does not exist", "not found", "unknown mailname",
"no such mail", "non existent",
)):
return False
raise PleskSshError(
f"plesk bin mail --info {email} → rc={rc}: "
f"{(err or out).strip() or '<keine Ausgabe>'}"
)
def create_mail(self, email: str, password: str) -> None:
rc, out, err = self._run_mail([
"--create", email,
"-passwd", password,
"-mailbox", "true",
])
if rc != 0:
raise PleskSshError(
f"plesk bin mail --create {email} → rc={rc}: "
f"{(err or out).strip()}"
)