"""Certificate Authority model for PKI management.""" from datetime import datetime from enum import Enum as PyEnum from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Enum, Text from sqlalchemy.orm import relationship from ..database import Base class CAStatus(str, PyEnum): """Certificate Authority status.""" PENDING = "pending" # DH parameters being generated ACTIVE = "active" EXPIRED = "expired" REVOKED = "revoked" class CAAlgorithm(str, PyEnum): """Key algorithm for CA.""" RSA = "rsa" ECDSA = "ecdsa" class CertificateAuthority(Base): """Certificate Authority for issuing VPN certificates.""" __tablename__ = "certificate_authorities" id = Column(Integer, primary_key=True, index=True) tenant_id = Column(Integer, ForeignKey("tenants.id"), nullable=True) # NULL for global CA name = Column(String(255), nullable=False) description = Column(Text, nullable=True) # Certificate data (PEM encoded) ca_cert = Column(Text, nullable=True) # CA certificate ca_key = Column(Text, nullable=True) # CA private key (encrypted) # DH parameters (shared across servers using this CA) dh_params = Column(Text, nullable=True) # Pre-generated DH parameters dh_generating = Column(Boolean, default=False) # DH generation in progress # Key configuration key_size = Column(Integer, default=4096) algorithm = Column(Enum(CAAlgorithm), default=CAAlgorithm.RSA) # Validity valid_from = Column(DateTime, nullable=True) valid_until = Column(DateTime, nullable=True) # Status is_default = Column(Boolean, default=False) # Default CA for new certificates status = Column(Enum(CAStatus), default=CAStatus.PENDING) # CRL (Certificate Revocation List) crl = Column(Text, nullable=True) crl_updated_at = Column(DateTime, nullable=True) # Serial number tracking next_serial = Column(Integer, default=1) # Audit created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) created_by_id = Column(Integer, ForeignKey("users.id"), nullable=True) # Relationships tenant = relationship("Tenant", back_populates="certificate_authorities") created_by = relationship("User", foreign_keys=[created_by_id]) vpn_servers = relationship("VPNServer", back_populates="certificate_authority") vpn_profiles = relationship("VPNProfile", back_populates="certificate_authority") client_vpn_profiles = relationship("ClientVPNProfile", back_populates="certificate_authority") def __repr__(self): return f"" @property def is_ready(self) -> bool: """Check if CA is ready for issuing certificates.""" return ( self.status == CAStatus.ACTIVE and self.ca_cert is not None and self.ca_key is not None and self.dh_params is not None ) @property def is_expired(self) -> bool: """Check if CA certificate is expired.""" if self.valid_until: return datetime.utcnow() > self.valid_until return False @property def days_until_expiry(self) -> int | None: """Days until CA expires.""" if self.valid_until: delta = self.valid_until - datetime.utcnow() return max(0, delta.days) return None