"""Leichtgewichtiger SNTP-Client zum Pruefen des Zeit-Offsets. Im Container koennen wir die Systemzeit nicht wirklich setzen (braucht CAP_SYS_TIME). Aber wir koennen den Offset ermitteln und loggen, damit der Admin weiss, ob der Host driftet. Fuer einen harten Sync muss auf dem Host selbst ein NTP-Daemon laufen. """ from __future__ import annotations import socket import struct import time _NTP_EPOCH_OFFSET = 2208988800 # Sekunden zwischen 1900 und 1970 def query_ntp(server: str, timeout: float = 3.0, port: int = 123) -> float | None: """Fragt einen NTP-Server und gibt das Offset (Server - Local) in Sekunden zurueck, oder None bei Fehler.""" packet = b'\x1b' + 47 * b'\0' # LI=0, VN=3, Mode=3 (client) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(timeout) try: t0 = time.time() sock.sendto(packet, (server, port)) data, _ = sock.recvfrom(1024) t3 = time.time() except (socket.gaierror, socket.timeout, OSError): return None finally: sock.close() if len(data) < 48: return None # Transmit timestamp: Offset 40, 8 bytes, fixed point 32.32 secs, frac = struct.unpack('!II', data[40:48]) if secs == 0: return None t2 = secs - _NTP_EPOCH_OFFSET + frac / 2**32 # Einfacher Offset (sans roundtrip): (t2 - ((t0 + t3) / 2)) return t2 - (t0 + t3) / 2 def check_and_log(server: str, logger=None) -> float | None: import logging log = logger or logging.getLogger('ntp') offset = query_ntp(server) if offset is None: log.warning('NTP-Check: Server %s nicht erreichbar', server) return None if abs(offset) > 5.0: log.warning('NTP-Check: Systemzeit weicht um %.2fs von %s ab -> Host-Uhr synchronisieren!', offset, server) else: log.info('NTP-Check: Offset %.3fs gegen %s (ok)', offset, server) return offset