minmal-file-cloud-email-pim.../backend/app/api/email.py

576 lines
19 KiB
Python

import email as email_lib
import email.header
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from datetime import datetime, timezone
from flask import request, jsonify, current_app
from app.api import api_bp
from app.api.auth import token_required, admin_required
from app.extensions import db
from app.models.email_account import EmailAccount
from app.models.user import User
from app.services.crypto_service import encrypt_field, decrypt_field
def _get_account_or_err(account_id, user):
account = db.session.get(EmailAccount, account_id)
if not account or account.user_id != user.id:
return None, (jsonify({'error': 'Konto nicht gefunden'}), 404)
return account, None
def _get_imap_connection(account, user_password_key):
import imapclient
password = decrypt_field(account.password_encrypted, user_password_key)
host = account.imap_host
port = account.imap_port
if account.imap_ssl:
conn = imapclient.IMAPClient(host, port=port, ssl=True)
else:
conn = imapclient.IMAPClient(host, port=port, ssl=False)
conn.starttls()
conn.login(account.username, password)
return conn
def _decode_header(header_value):
if not header_value:
return ''
decoded_parts = email.header.decode_header(header_value)
result = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
result.append(part.decode(charset or 'utf-8', errors='replace'))
else:
result.append(part)
return ' '.join(result)
# --- Accounts ---
@api_bp.route('/email/accounts', methods=['GET'])
@token_required
def list_email_accounts():
user = request.current_user
accounts = EmailAccount.query.filter_by(user_id=user.id)\
.order_by(EmailAccount.sort_order).all()
return jsonify([a.to_dict() for a in accounts]), 200
@api_bp.route('/email/accounts', methods=['POST'])
@token_required
def create_email_account():
user = request.current_user
data = request.get_json()
required = ['display_name', 'email_address', 'imap_host', 'smtp_host', 'username', 'password']
for field in required:
if not data.get(field):
return jsonify({'error': f'{field} erforderlich'}), 400
# Get encryption key from header
enc_key = request.headers.get('X-Encryption-Key', '')
if not enc_key:
return jsonify({'error': 'Verschluesselungs-Key erforderlich (X-Encryption-Key Header)'}), 400
encrypted_pw = encrypt_field(data['password'], enc_key)
account = EmailAccount(
user_id=user.id,
display_name=data['display_name'],
email_address=data['email_address'],
imap_host=data['imap_host'],
imap_port=data.get('imap_port', 993),
imap_ssl=data.get('imap_ssl', True),
smtp_host=data['smtp_host'],
smtp_port=data.get('smtp_port', 587),
smtp_ssl=data.get('smtp_ssl', True),
username=data['username'],
password_encrypted=encrypted_pw,
is_default=data.get('is_default', False),
sort_order=data.get('sort_order', 0),
)
db.session.add(account)
# Update email account count
db.session.commit()
return jsonify(account.to_dict()), 201
@api_bp.route('/email/accounts/<int:account_id>', methods=['PUT'])
@token_required
def update_email_account(account_id):
user = request.current_user
account, err = _get_account_or_err(account_id, user)
if err:
return err
data = request.get_json()
for field in ['display_name', 'email_address', 'imap_host', 'imap_port',
'imap_ssl', 'smtp_host', 'smtp_port', 'smtp_ssl',
'username', 'is_default', 'sort_order']:
if field in data:
setattr(account, field, data[field])
if 'password' in data and data['password']:
enc_key = request.headers.get('X-Encryption-Key', '')
if enc_key:
account.password_encrypted = encrypt_field(data['password'], enc_key)
db.session.commit()
return jsonify(account.to_dict()), 200
@api_bp.route('/email/accounts/<int:account_id>', methods=['DELETE'])
@token_required
def delete_email_account(account_id):
user = request.current_user
account, err = _get_account_or_err(account_id, user)
if err:
return err
db.session.delete(account)
db.session.commit()
return jsonify({'message': 'E-Mail-Konto geloescht'}), 200
@api_bp.route('/email/accounts/<int:account_id>/test', methods=['POST'])
@token_required
def test_email_account(account_id):
user = request.current_user
account, err = _get_account_or_err(account_id, user)
if err:
return err
enc_key = request.headers.get('X-Encryption-Key', '')
if not enc_key:
return jsonify({'error': 'Verschluesselungs-Key erforderlich'}), 400
try:
conn = _get_imap_connection(account, enc_key)
conn.logout()
return jsonify({'message': 'Verbindung erfolgreich'}), 200
except Exception as e:
return jsonify({'error': f'Verbindungsfehler: {str(e)}'}), 400
# --- Folders ---
@api_bp.route('/email/accounts/<int:account_id>/folders', methods=['GET'])
@token_required
def list_email_folders(account_id):
user = request.current_user
account, err = _get_account_or_err(account_id, user)
if err:
return err
enc_key = request.headers.get('X-Encryption-Key', '')
if not enc_key:
return jsonify({'error': 'Verschluesselungs-Key erforderlich'}), 400
try:
conn = _get_imap_connection(account, enc_key)
folders_raw = conn.list_folders()
folders = []
for flags, delimiter, name in folders_raw:
flag_strs = [f.decode() if isinstance(f, bytes) else f for f in flags]
folders.append({
'name': name,
'delimiter': delimiter.decode() if isinstance(delimiter, bytes) else delimiter,
'flags': flag_strs,
})
conn.logout()
return jsonify(folders), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
# --- Messages ---
@api_bp.route('/email/accounts/<int:account_id>/folders/<path:folder>/messages', methods=['GET'])
@token_required
def list_messages(account_id, folder):
user = request.current_user
account, err = _get_account_or_err(account_id, user)
if err:
return err
enc_key = request.headers.get('X-Encryption-Key', '')
if not enc_key:
return jsonify({'error': 'Verschluesselungs-Key erforderlich'}), 400
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 50, type=int)
try:
conn = _get_imap_connection(account, enc_key)
conn.select_folder(folder)
# Get all UIDs, sorted newest first
uids = conn.search(['ALL'])
uids.reverse()
total = len(uids)
start = (page - 1) * limit
page_uids = uids[start:start + limit]
messages = []
if page_uids:
fetch_data = conn.fetch(page_uids, ['ENVELOPE', 'FLAGS', 'RFC822.SIZE'])
for uid in page_uids:
if uid not in fetch_data:
continue
msg_data = fetch_data[uid]
envelope = msg_data.get(b'ENVELOPE')
flags = msg_data.get(b'FLAGS', ())
size = msg_data.get(b'RFC822.SIZE', 0)
flag_strs = [f.decode() if isinstance(f, bytes) else str(f) for f in flags]
from_addr = ''
if envelope and envelope.from_:
f = envelope.from_[0]
name = f.name.decode(errors='replace') if f.name else ''
mailbox = f.mailbox.decode(errors='replace') if f.mailbox else ''
host = f.host.decode(errors='replace') if f.host else ''
from_addr = f'{name} <{mailbox}@{host}>' if name else f'{mailbox}@{host}'
subject = ''
if envelope and envelope.subject:
subject = _decode_header(envelope.subject.decode(errors='replace'))
date_str = ''
if envelope and envelope.date:
date_str = envelope.date.isoformat() if hasattr(envelope.date, 'isoformat') else str(envelope.date)
messages.append({
'uid': uid,
'subject': subject,
'from': from_addr,
'date': date_str,
'flags': flag_strs,
'size': size,
'seen': '\\Seen' in flag_strs,
})
conn.logout()
return jsonify({
'messages': messages,
'total': total,
'page': page,
'limit': limit,
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/email/accounts/<int:account_id>/messages/<int:uid>', methods=['GET'])
@token_required
def get_message(account_id, uid):
user = request.current_user
account, err = _get_account_or_err(account_id, user)
if err:
return err
enc_key = request.headers.get('X-Encryption-Key', '')
folder = request.args.get('folder', 'INBOX')
try:
conn = _get_imap_connection(account, enc_key)
conn.select_folder(folder)
fetch_data = conn.fetch([uid], ['RFC822', 'FLAGS'])
if uid not in fetch_data:
conn.logout()
return jsonify({'error': 'Nachricht nicht gefunden'}), 404
raw = fetch_data[uid][b'RFC822']
flags = fetch_data[uid].get(b'FLAGS', ())
# Mark as seen
conn.set_flags([uid], ['\\Seen'])
conn.logout()
msg = email_lib.message_from_bytes(raw)
# Extract body
html_body = ''
text_body = ''
attachments = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
disposition = str(part.get('Content-Disposition', ''))
if 'attachment' in disposition:
filename = part.get_filename() or 'attachment'
attachments.append({
'filename': _decode_header(filename),
'content_type': content_type,
'size': len(part.get_payload(decode=True) or b''),
})
elif content_type == 'text/html':
html_body = part.get_payload(decode=True).decode(errors='replace')
elif content_type == 'text/plain':
text_body = part.get_payload(decode=True).decode(errors='replace')
else:
content_type = msg.get_content_type()
payload = msg.get_payload(decode=True)
if payload:
if content_type == 'text/html':
html_body = payload.decode(errors='replace')
else:
text_body = payload.decode(errors='replace')
return jsonify({
'uid': uid,
'subject': _decode_header(msg.get('Subject', '')),
'from': _decode_header(msg.get('From', '')),
'to': _decode_header(msg.get('To', '')),
'cc': _decode_header(msg.get('Cc', '')),
'date': msg.get('Date', ''),
'html_body': html_body,
'text_body': text_body,
'attachments': attachments,
'flags': [f.decode() if isinstance(f, bytes) else str(f) for f in flags],
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
# --- Send ---
@api_bp.route('/email/send', methods=['POST'])
@token_required
def send_email():
user = request.current_user
data = request.get_json()
account_id = data.get('account_id')
if not account_id:
return jsonify({'error': 'Konto-ID erforderlich'}), 400
account, err = _get_account_or_err(account_id, user)
if err:
return err
enc_key = request.headers.get('X-Encryption-Key', '')
if not enc_key:
return jsonify({'error': 'Verschluesselungs-Key erforderlich'}), 400
to_addr = data.get('to', '')
cc_addr = data.get('cc', '')
subject = data.get('subject', '')
body_html = data.get('body_html', '')
body_text = data.get('body_text', '')
if not to_addr:
return jsonify({'error': 'Empfaenger erforderlich'}), 400
password = decrypt_field(account.password_encrypted, enc_key)
# Build message
msg = MIMEMultipart('alternative')
msg['From'] = f'{account.display_name} <{account.email_address}>'
msg['To'] = to_addr
if cc_addr:
msg['Cc'] = cc_addr
msg['Subject'] = subject
msg['Date'] = email_lib.utils.formatdate(localtime=True)
if body_text:
msg.attach(MIMEText(body_text, 'plain', 'utf-8'))
if body_html:
msg.attach(MIMEText(body_html, 'html', 'utf-8'))
try:
if account.smtp_ssl and account.smtp_port == 465:
server = smtplib.SMTP_SSL(account.smtp_host, account.smtp_port)
else:
server = smtplib.SMTP(account.smtp_host, account.smtp_port)
server.starttls()
server.login(account.username, password)
recipients = [to_addr]
if cc_addr:
recipients.extend(cc_addr.split(','))
server.sendmail(account.email_address, recipients, msg.as_string())
server.quit()
return jsonify({'message': 'E-Mail gesendet'}), 200
except Exception as e:
return jsonify({'error': f'Sendefehler: {str(e)}'}), 500
# --- Flag / Move / Delete ---
@api_bp.route('/email/accounts/<int:account_id>/messages/<int:uid>/flag', methods=['POST'])
@token_required
def flag_message(account_id, uid):
user = request.current_user
account, err = _get_account_or_err(account_id, user)
if err:
return err
enc_key = request.headers.get('X-Encryption-Key', '')
data = request.get_json()
folder = data.get('folder', 'INBOX')
flag = data.get('flag', '\\Seen')
add = data.get('add', True)
try:
conn = _get_imap_connection(account, enc_key)
conn.select_folder(folder)
if add:
conn.add_flags([uid], [flag])
else:
conn.remove_flags([uid], [flag])
conn.logout()
return jsonify({'message': 'Flag gesetzt'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/email/accounts/<int:account_id>/messages/<int:uid>/move', methods=['POST'])
@token_required
def move_message(account_id, uid):
user = request.current_user
account, err = _get_account_or_err(account_id, user)
if err:
return err
enc_key = request.headers.get('X-Encryption-Key', '')
data = request.get_json()
folder = data.get('folder', 'INBOX')
target = data.get('target')
if not target:
return jsonify({'error': 'Ziel-Ordner erforderlich'}), 400
try:
conn = _get_imap_connection(account, enc_key)
conn.select_folder(folder)
conn.move([uid], target)
conn.logout()
return jsonify({'message': 'Nachricht verschoben'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/email/accounts/<int:account_id>/messages/<int:uid>', methods=['DELETE'])
@token_required
def delete_message(account_id, uid):
user = request.current_user
account, err = _get_account_or_err(account_id, user)
if err:
return err
enc_key = request.headers.get('X-Encryption-Key', '')
folder = request.args.get('folder', 'INBOX')
try:
conn = _get_imap_connection(account, enc_key)
conn.select_folder(folder)
conn.delete_messages([uid])
conn.expunge()
conn.logout()
return jsonify({'message': 'Nachricht geloescht'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
# --- Admin: manage email accounts for any user ---
@api_bp.route('/admin/users/<int:user_id>/email-accounts', methods=['GET'])
@admin_required
def admin_list_email_accounts(user_id):
user = db.session.get(User, user_id)
if not user:
return jsonify({'error': 'Benutzer nicht gefunden'}), 404
accounts = EmailAccount.query.filter_by(user_id=user_id)\
.order_by(EmailAccount.sort_order).all()
return jsonify([a.to_dict() for a in accounts]), 200
@api_bp.route('/admin/users/<int:user_id>/email-accounts', methods=['POST'])
@admin_required
def admin_create_email_account(user_id):
user = db.session.get(User, user_id)
if not user:
return jsonify({'error': 'Benutzer nicht gefunden'}), 404
data = request.get_json()
required = ['display_name', 'email_address', 'imap_host', 'smtp_host', 'username', 'password']
for field in required:
if not data.get(field):
return jsonify({'error': f'{field} erforderlich'}), 400
enc_key = request.headers.get('X-Encryption-Key', '')
if not enc_key:
return jsonify({'error': 'Verschluesselungs-Key erforderlich (X-Encryption-Key Header)'}), 400
account = EmailAccount(
user_id=user_id,
display_name=data['display_name'],
email_address=data['email_address'],
imap_host=data['imap_host'],
imap_port=data.get('imap_port', 993),
imap_ssl=data.get('imap_ssl', True),
smtp_host=data['smtp_host'],
smtp_port=data.get('smtp_port', 587),
smtp_ssl=data.get('smtp_ssl', True),
username=data['username'],
password_encrypted=encrypt_field(data['password'], enc_key),
is_default=data.get('is_default', False),
sort_order=data.get('sort_order', 0),
)
db.session.add(account)
db.session.commit()
return jsonify(account.to_dict()), 201
@api_bp.route('/admin/email-accounts/<int:account_id>', methods=['PUT'])
@admin_required
def admin_update_email_account(account_id):
account = db.session.get(EmailAccount, account_id)
if not account:
return jsonify({'error': 'Konto nicht gefunden'}), 404
data = request.get_json()
for field in ['display_name', 'email_address', 'imap_host', 'imap_port',
'imap_ssl', 'smtp_host', 'smtp_port', 'smtp_ssl',
'username', 'is_default', 'sort_order']:
if field in data:
setattr(account, field, data[field])
if 'password' in data and data['password']:
enc_key = request.headers.get('X-Encryption-Key', '')
if enc_key:
account.password_encrypted = encrypt_field(data['password'], enc_key)
db.session.commit()
return jsonify(account.to_dict()), 200
@api_bp.route('/admin/email-accounts/<int:account_id>', methods=['DELETE'])
@admin_required
def admin_delete_email_account(account_id):
account = db.session.get(EmailAccount, account_id)
if not account:
return jsonify({'error': 'Konto nicht gefunden'}), 404
db.session.delete(account)
db.session.commit()
return jsonify({'message': 'E-Mail-Konto geloescht'}), 200