rdp-client/files/session-watcher.py

202 lines
6.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Session Watcher - Überwacht Tastenkombination zum Verlassen der RDP-Sitzung
Speichere als: files/session-watcher.py
"""
import subprocess
import sys
import time
import configparser
from pathlib import Path
from Xlib import X, XK, display
from Xlib.ext import record
from Xlib.protocol import rq
CONFIG_FILE = Path.home() / ".config" / "rdp-profiles" / "profiles.ini"
class SessionWatcher:
def __init__(self):
self.display = display.Display()
self.root = self.display.screen().root
self.ctx = None
self.pressed_keys = set()
# Default hotkey
self.exit_hotkey = "Control+Alt+q"
# Parse hotkey from active profile (if any)
self.load_hotkey()
def load_hotkey(self):
"""Lädt Hotkey aus dem ersten aktiven Profil"""
if not CONFIG_FILE.exists():
return
config = configparser.ConfigParser()
config.read(CONFIG_FILE)
# Nutze ersten Profil-Hotkey als Standard
for section in config.sections():
hotkey = config[section].get('exit_hotkey', '').strip()
if hotkey:
self.exit_hotkey = hotkey
break
print(f"Exit hotkey: {self.exit_hotkey}")
def parse_hotkey(self):
"""Wandelt Hotkey-String in Keycodes um"""
parts = self.exit_hotkey.lower().split('+')
modifiers = []
key = None
for part in parts:
part = part.strip()
if part in ['control', 'ctrl']:
modifiers.append('Control_L')
modifiers.append('Control_R')
elif part in ['alt']:
modifiers.append('Alt_L')
modifiers.append('Alt_R')
elif part in ['shift']:
modifiers.append('Shift_L')
modifiers.append('Shift_R')
elif part in ['super', 'win', 'meta']:
modifiers.append('Super_L')
modifiers.append('Super_R')
else:
key = part
return modifiers, key
def check_rdp_running(self):
"""Prüft ob FreeRDP läuft"""
try:
result = subprocess.run(['pgrep', '-x', 'xfreerdp'],
capture_output=True, text=True)
return result.returncode == 0
except:
return False
def kill_rdp(self):
"""Beendet alle RDP-Verbindungen - AGGRESSIV!"""
print("Killing RDP sessions...")
try:
# Erst SIGTERM versuchen (sauber)
result = subprocess.run(['pkill', '-15', 'xfreerdp'], check=False)
# Warte kurz
time.sleep(0.5)
# Prüfe ob noch läuft
check = subprocess.run(['pgrep', '-x', 'xfreerdp'],
capture_output=True, check=False)
if check.returncode == 0:
# Immer noch da? KILL IT WITH FIRE! (SIGKILL)
print("RDP process still running, using SIGKILL...")
subprocess.run(['pkill', '-9', 'xfreerdp'], check=False)
subprocess.run(['pkill', '-9', 'xfreerdp3'], check=False)
time.sleep(0.3)
# Auch xfreerdp3 killen (falls vorhanden)
subprocess.run(['pkill', '-9', 'xfreerdp3'], check=False)
time.sleep(0.5)
# Starte Profile Manager
subprocess.Popen(['/usr/local/bin/rdp-profile-manager.py'])
except Exception as e:
print(f"Error killing RDP: {e}")
def event_handler(self, reply):
"""Behandelt Tastatur-Events"""
if reply.category != record.FromServer:
return
if reply.client_swapped:
return
if not len(reply.data) or reply.data[0] < 2:
return
data = reply.data
while len(data):
event, data = rq.EventField(None).parse_binary_value(
data, self.display.display, None, None)
if event.type == X.KeyPress:
keysym = self.display.keycode_to_keysym(event.detail, 0)
key_name = XK.keysym_to_string(keysym)
if key_name:
self.pressed_keys.add(key_name)
self.check_hotkey()
elif event.type == X.KeyRelease:
keysym = self.display.keycode_to_keysym(event.detail, 0)
key_name = XK.keysym_to_string(keysym)
if key_name and key_name in self.pressed_keys:
self.pressed_keys.discard(key_name)
def check_hotkey(self):
"""Prüft ob Exit-Hotkey gedrückt wurde"""
modifiers, key = self.parse_hotkey()
# Check if any modifier variant is pressed
modifier_pressed = False
for mod in modifiers:
if mod in self.pressed_keys:
modifier_pressed = True
break
if not modifier_pressed:
return
# Check key
if key and key.lower() in [k.lower() for k in self.pressed_keys]:
# Hotkey matched!
if self.check_rdp_running():
print("Exit hotkey detected - killing RDP session")
self.pressed_keys.clear() # Prevent multiple triggers
self.kill_rdp()
def run(self):
"""Startet Event-Loop"""
# Setup record extension
self.ctx = self.display.record_create_context(
0,
[record.AllClients],
[{
'core_requests': (0, 0),
'core_replies': (0, 0),
'ext_requests': (0, 0, 0, 0),
'ext_replies': (0, 0, 0, 0),
'delivered_events': (0, 0),
'device_events': (X.KeyPress, X.KeyRelease),
'errors': (0, 0),
'client_started': False,
'client_died': False,
}]
)
self.display.record_enable_context(self.ctx, self.event_handler)
self.display.record_free_context(self.ctx)
print("Session watcher started - monitoring for exit hotkey")
while True:
event = self.display.next_event()
if __name__ == '__main__':
try:
watcher = SessionWatcher()
watcher.run()
except KeyboardInterrupt:
print("\nSession watcher stopped")
sys.exit(0)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)