113 lines
4.1 KiB
Python
113 lines
4.1 KiB
Python
"""Plesk SSH-Backend.
|
||
|
||
Wird genutzt, wenn die Plesk-Instanz keinen REST-API-Zugriff erlaubt.
|
||
Verbindet sich per SSH zum Plesk-Server und ruft `plesk bin mail …` auf.
|
||
Funktioniert auch ohne API-Key – nur SSH-Login (Key oder Passwort) nötig.
|
||
"""
|
||
from __future__ import annotations
|
||
import shlex
|
||
from typing import Optional
|
||
|
||
import paramiko
|
||
|
||
|
||
class PleskSshError(Exception):
|
||
pass
|
||
|
||
|
||
class PleskSshClient:
|
||
def __init__(self, host: str, *, ssh_port: int = 22,
|
||
ssh_user: str, ssh_password: Optional[str] = None,
|
||
ssh_key: Optional[str] = None,
|
||
ssh_key_passphrase: Optional[str] = None,
|
||
use_sudo: bool = False):
|
||
if not host:
|
||
raise PleskSshError("Plesk-Host ist leer")
|
||
if not ssh_user:
|
||
raise PleskSshError("PLESK_SSH_USER ist nicht gesetzt")
|
||
if not (ssh_password or ssh_key):
|
||
raise PleskSshError(
|
||
"Weder PLESK_SSH_PASSWORD noch PLESK_SSH_KEY gesetzt"
|
||
)
|
||
self.host = host
|
||
self.use_sudo = use_sudo
|
||
self.client = paramiko.SSHClient()
|
||
self.client.load_system_host_keys()
|
||
# AutoAdd: praktisch beim erstmaligen Lauf. Wer's strenger will,
|
||
# füllt vorher ~/.ssh/known_hosts und setzt RejectPolicy hier.
|
||
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
kwargs = {
|
||
"hostname": host, "port": ssh_port, "username": ssh_user,
|
||
"timeout": 15, "allow_agent": True, "look_for_keys": True,
|
||
}
|
||
if ssh_key:
|
||
kwargs["pkey"] = self._load_key(ssh_key, ssh_key_passphrase or None)
|
||
elif ssh_password:
|
||
kwargs["password"] = ssh_password
|
||
kwargs["allow_agent"] = False
|
||
kwargs["look_for_keys"] = False
|
||
try:
|
||
self.client.connect(**kwargs)
|
||
except paramiko.AuthenticationException as e:
|
||
raise PleskSshError(f"SSH-Login {ssh_user}@{host}:{ssh_port} abgelehnt: {e}") from e
|
||
except Exception as e:
|
||
raise PleskSshError(f"SSH-Verbindung {ssh_user}@{host}:{ssh_port}: {e}") from e
|
||
|
||
@staticmethod
|
||
def _load_key(path: str, passphrase: Optional[str]):
|
||
last_err = None
|
||
for KeyClass in (paramiko.Ed25519Key, paramiko.RSAKey, paramiko.ECDSAKey):
|
||
try:
|
||
return KeyClass.from_private_key_file(path, password=passphrase)
|
||
except paramiko.SSHException as e:
|
||
last_err = e
|
||
continue
|
||
raise PleskSshError(f"SSH-Key {path} konnte nicht geladen werden: {last_err}")
|
||
|
||
def close(self) -> None:
|
||
try:
|
||
self.client.close()
|
||
except Exception:
|
||
pass
|
||
|
||
def _run_mail(self, args: list) -> tuple:
|
||
cmd = ["plesk", "bin", "mail", *args]
|
||
if self.use_sudo:
|
||
cmd = ["sudo", "-n", *cmd]
|
||
cmd_str = " ".join(shlex.quote(c) for c in cmd)
|
||
try:
|
||
_stdin, stdout, stderr = self.client.exec_command(cmd_str, timeout=60)
|
||
except Exception as e:
|
||
raise PleskSshError(f"SSH exec fehlgeschlagen: {e}") from e
|
||
rc = stdout.channel.recv_exit_status()
|
||
out = stdout.read().decode("utf-8", "replace")
|
||
err = stderr.read().decode("utf-8", "replace")
|
||
return rc, out, err
|
||
|
||
def mail_exists(self, email: str) -> bool:
|
||
rc, out, err = self._run_mail(["--info", email])
|
||
if rc == 0:
|
||
return True
|
||
msg = (err + " " + out).lower()
|
||
if any(s in msg for s in (
|
||
"does not exist", "not found", "unknown mailname",
|
||
"no such mail", "non existent",
|
||
)):
|
||
return False
|
||
raise PleskSshError(
|
||
f"plesk bin mail --info {email} → rc={rc}: "
|
||
f"{(err or out).strip() or '<keine Ausgabe>'}"
|
||
)
|
||
|
||
def create_mail(self, email: str, password: str) -> None:
|
||
rc, out, err = self._run_mail([
|
||
"--create", email,
|
||
"-passwd", password,
|
||
"-mailbox", "true",
|
||
])
|
||
if rc != 0:
|
||
raise PleskSshError(
|
||
f"plesk bin mail --create {email} → rc={rc}: "
|
||
f"{(err or out).strip()}"
|
||
)
|