#!/usr/bin/env python3 """ Session Watcher - Überwacht Tastenkombination zum Verlassen der RDP-Sitzung Speichere als: files/session-watcher.py """ import subprocess import sys import time import configparser from pathlib import Path from Xlib import X, XK, display from Xlib.ext import record from Xlib.protocol import rq CONFIG_FILE = Path.home() / ".config" / "rdp-profiles" / "profiles.ini" class SessionWatcher: def __init__(self): self.display = display.Display() self.root = self.display.screen().root self.ctx = None self.pressed_keys = set() # Default hotkey self.exit_hotkey = "Control+Alt+q" # Parse hotkey from active profile (if any) self.load_hotkey() def load_hotkey(self): """Lädt Hotkey aus dem ersten aktiven Profil""" if not CONFIG_FILE.exists(): return config = configparser.ConfigParser() config.read(CONFIG_FILE) # Nutze ersten Profil-Hotkey als Standard for section in config.sections(): hotkey = config[section].get('exit_hotkey', '').strip() if hotkey: self.exit_hotkey = hotkey break print(f"Exit hotkey: {self.exit_hotkey}") def parse_hotkey(self): """Wandelt Hotkey-String in Keycodes um""" parts = self.exit_hotkey.lower().split('+') modifiers = [] key = None for part in parts: part = part.strip() if part in ['control', 'ctrl']: modifiers.append('Control_L') modifiers.append('Control_R') elif part in ['alt']: modifiers.append('Alt_L') modifiers.append('Alt_R') elif part in ['shift']: modifiers.append('Shift_L') modifiers.append('Shift_R') elif part in ['super', 'win', 'meta']: modifiers.append('Super_L') modifiers.append('Super_R') else: key = part return modifiers, key def check_rdp_running(self): """Prüft ob FreeRDP läuft""" try: result = subprocess.run(['pgrep', '-x', 'xfreerdp'], capture_output=True, text=True) return result.returncode == 0 except: return False def kill_rdp(self): """Beendet alle RDP-Verbindungen - AGGRESSIV!""" print("Killing RDP sessions...") try: # Erst SIGTERM versuchen (sauber) result = subprocess.run(['pkill', '-15', 'xfreerdp'], check=False) # Warte kurz time.sleep(0.5) # Prüfe ob noch läuft check = subprocess.run(['pgrep', '-x', 'xfreerdp'], capture_output=True, check=False) if check.returncode == 0: # Immer noch da? KILL IT WITH FIRE! (SIGKILL) print("RDP process still running, using SIGKILL...") subprocess.run(['pkill', '-9', 'xfreerdp'], check=False) subprocess.run(['pkill', '-9', 'xfreerdp3'], check=False) time.sleep(0.3) # Auch xfreerdp3 killen (falls vorhanden) subprocess.run(['pkill', '-9', 'xfreerdp3'], check=False) time.sleep(0.5) # Starte Profile Manager subprocess.Popen(['/usr/local/bin/rdp-profile-manager.py']) except Exception as e: print(f"Error killing RDP: {e}") def event_handler(self, reply): """Behandelt Tastatur-Events""" if reply.category != record.FromServer: return if reply.client_swapped: return if not len(reply.data) or reply.data[0] < 2: return data = reply.data while len(data): event, data = rq.EventField(None).parse_binary_value( data, self.display.display, None, None) if event.type == X.KeyPress: keysym = self.display.keycode_to_keysym(event.detail, 0) key_name = XK.keysym_to_string(keysym) if key_name: self.pressed_keys.add(key_name) self.check_hotkey() elif event.type == X.KeyRelease: keysym = self.display.keycode_to_keysym(event.detail, 0) key_name = XK.keysym_to_string(keysym) if key_name and key_name in self.pressed_keys: self.pressed_keys.discard(key_name) def check_hotkey(self): """Prüft ob Exit-Hotkey gedrückt wurde""" modifiers, key = self.parse_hotkey() # Check if any modifier variant is pressed modifier_pressed = False for mod in modifiers: if mod in self.pressed_keys: modifier_pressed = True break if not modifier_pressed: return # Check key if key and key.lower() in [k.lower() for k in self.pressed_keys]: # Hotkey matched! if self.check_rdp_running(): print("Exit hotkey detected - killing RDP session") self.pressed_keys.clear() # Prevent multiple triggers self.kill_rdp() def run(self): """Startet Event-Loop""" # Setup record extension self.ctx = self.display.record_create_context( 0, [record.AllClients], [{ 'core_requests': (0, 0), 'core_replies': (0, 0), 'ext_requests': (0, 0, 0, 0), 'ext_replies': (0, 0, 0, 0), 'delivered_events': (0, 0), 'device_events': (X.KeyPress, X.KeyRelease), 'errors': (0, 0), 'client_started': False, 'client_died': False, }] ) self.display.record_enable_context(self.ctx, self.event_handler) self.display.record_free_context(self.ctx) print("Session watcher started - monitoring for exit hotkey") while True: event = self.display.next_event() if __name__ == '__main__': try: watcher = SessionWatcher() watcher.run() except KeyboardInterrupt: print("\nSession watcher stopped") sys.exit(0) except Exception as e: print(f"Error: {e}") sys.exit(1)