202 lines
6.6 KiB
Python
Executable File
202 lines
6.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Session Watcher - Überwacht Tastenkombination zum Verlassen der RDP-Sitzung
|
|
Speichere als: files/session-watcher.py
|
|
"""
|
|
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import configparser
|
|
from pathlib import Path
|
|
from Xlib import X, XK, display
|
|
from Xlib.ext import record
|
|
from Xlib.protocol import rq
|
|
|
|
CONFIG_FILE = Path.home() / ".config" / "rdp-profiles" / "profiles.ini"
|
|
|
|
class SessionWatcher:
|
|
def __init__(self):
|
|
self.display = display.Display()
|
|
self.root = self.display.screen().root
|
|
self.ctx = None
|
|
self.pressed_keys = set()
|
|
|
|
# Default hotkey
|
|
self.exit_hotkey = "Control+Alt+q"
|
|
|
|
# Parse hotkey from active profile (if any)
|
|
self.load_hotkey()
|
|
|
|
def load_hotkey(self):
|
|
"""Lädt Hotkey aus dem ersten aktiven Profil"""
|
|
if not CONFIG_FILE.exists():
|
|
return
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read(CONFIG_FILE)
|
|
|
|
# Nutze ersten Profil-Hotkey als Standard
|
|
for section in config.sections():
|
|
hotkey = config[section].get('exit_hotkey', '').strip()
|
|
if hotkey:
|
|
self.exit_hotkey = hotkey
|
|
break
|
|
|
|
print(f"Exit hotkey: {self.exit_hotkey}")
|
|
|
|
def parse_hotkey(self):
|
|
"""Wandelt Hotkey-String in Keycodes um"""
|
|
parts = self.exit_hotkey.lower().split('+')
|
|
|
|
modifiers = []
|
|
key = None
|
|
|
|
for part in parts:
|
|
part = part.strip()
|
|
if part in ['control', 'ctrl']:
|
|
modifiers.append('Control_L')
|
|
modifiers.append('Control_R')
|
|
elif part in ['alt']:
|
|
modifiers.append('Alt_L')
|
|
modifiers.append('Alt_R')
|
|
elif part in ['shift']:
|
|
modifiers.append('Shift_L')
|
|
modifiers.append('Shift_R')
|
|
elif part in ['super', 'win', 'meta']:
|
|
modifiers.append('Super_L')
|
|
modifiers.append('Super_R')
|
|
else:
|
|
key = part
|
|
|
|
return modifiers, key
|
|
|
|
def check_rdp_running(self):
|
|
"""Prüft ob FreeRDP läuft"""
|
|
try:
|
|
result = subprocess.run(['pgrep', '-x', 'xfreerdp'],
|
|
capture_output=True, text=True)
|
|
return result.returncode == 0
|
|
except:
|
|
return False
|
|
|
|
def kill_rdp(self):
|
|
"""Beendet alle RDP-Verbindungen - AGGRESSIV!"""
|
|
print("Killing RDP sessions...")
|
|
try:
|
|
# Erst SIGTERM versuchen (sauber)
|
|
result = subprocess.run(['pkill', '-15', 'xfreerdp'], check=False)
|
|
|
|
# Warte kurz
|
|
time.sleep(0.5)
|
|
|
|
# Prüfe ob noch läuft
|
|
check = subprocess.run(['pgrep', '-x', 'xfreerdp'],
|
|
capture_output=True, check=False)
|
|
|
|
if check.returncode == 0:
|
|
# Immer noch da? KILL IT WITH FIRE! (SIGKILL)
|
|
print("RDP process still running, using SIGKILL...")
|
|
subprocess.run(['pkill', '-9', 'xfreerdp'], check=False)
|
|
subprocess.run(['pkill', '-9', 'xfreerdp3'], check=False)
|
|
time.sleep(0.3)
|
|
|
|
# Auch xfreerdp3 killen (falls vorhanden)
|
|
subprocess.run(['pkill', '-9', 'xfreerdp3'], check=False)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# Starte Profile Manager
|
|
subprocess.Popen(['/usr/local/bin/rdp-profile-manager.py'])
|
|
except Exception as e:
|
|
print(f"Error killing RDP: {e}")
|
|
|
|
def event_handler(self, reply):
|
|
"""Behandelt Tastatur-Events"""
|
|
if reply.category != record.FromServer:
|
|
return
|
|
if reply.client_swapped:
|
|
return
|
|
if not len(reply.data) or reply.data[0] < 2:
|
|
return
|
|
|
|
data = reply.data
|
|
while len(data):
|
|
event, data = rq.EventField(None).parse_binary_value(
|
|
data, self.display.display, None, None)
|
|
|
|
if event.type == X.KeyPress:
|
|
keysym = self.display.keycode_to_keysym(event.detail, 0)
|
|
key_name = XK.keysym_to_string(keysym)
|
|
|
|
if key_name:
|
|
self.pressed_keys.add(key_name)
|
|
self.check_hotkey()
|
|
|
|
elif event.type == X.KeyRelease:
|
|
keysym = self.display.keycode_to_keysym(event.detail, 0)
|
|
key_name = XK.keysym_to_string(keysym)
|
|
|
|
if key_name and key_name in self.pressed_keys:
|
|
self.pressed_keys.discard(key_name)
|
|
|
|
def check_hotkey(self):
|
|
"""Prüft ob Exit-Hotkey gedrückt wurde"""
|
|
modifiers, key = self.parse_hotkey()
|
|
|
|
# Check if any modifier variant is pressed
|
|
modifier_pressed = False
|
|
for mod in modifiers:
|
|
if mod in self.pressed_keys:
|
|
modifier_pressed = True
|
|
break
|
|
|
|
if not modifier_pressed:
|
|
return
|
|
|
|
# Check key
|
|
if key and key.lower() in [k.lower() for k in self.pressed_keys]:
|
|
# Hotkey matched!
|
|
if self.check_rdp_running():
|
|
print("Exit hotkey detected - killing RDP session")
|
|
self.pressed_keys.clear() # Prevent multiple triggers
|
|
self.kill_rdp()
|
|
|
|
def run(self):
|
|
"""Startet Event-Loop"""
|
|
# Setup record extension
|
|
self.ctx = self.display.record_create_context(
|
|
0,
|
|
[record.AllClients],
|
|
[{
|
|
'core_requests': (0, 0),
|
|
'core_replies': (0, 0),
|
|
'ext_requests': (0, 0, 0, 0),
|
|
'ext_replies': (0, 0, 0, 0),
|
|
'delivered_events': (0, 0),
|
|
'device_events': (X.KeyPress, X.KeyRelease),
|
|
'errors': (0, 0),
|
|
'client_started': False,
|
|
'client_died': False,
|
|
}]
|
|
)
|
|
|
|
self.display.record_enable_context(self.ctx, self.event_handler)
|
|
self.display.record_free_context(self.ctx)
|
|
|
|
print("Session watcher started - monitoring for exit hotkey")
|
|
|
|
while True:
|
|
event = self.display.next_event()
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
watcher = SessionWatcher()
|
|
watcher.run()
|
|
except KeyboardInterrupt:
|
|
print("\nSession watcher stopped")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
sys.exit(1)
|