minmal-file-cloud-email-pim.../backend/app/api/files.py

707 lines
22 KiB
Python

import os
import uuid
import hashlib
import secrets
import mimetypes
from datetime import datetime, timezone
from pathlib import Path
from flask import request, jsonify, send_file, current_app
from app.api import api_bp
from app.api.auth import token_required
from app.extensions import db, bcrypt
from app.models.file import File, FilePermission, ShareLink
def _user_upload_dir(user_id):
base = Path(current_app.config['UPLOAD_PATH'])
user_dir = base / str(user_id)
user_dir.mkdir(parents=True, exist_ok=True)
return user_dir
def _check_file_access(file_obj, user, permission='read'):
"""Check if user has access to file. Owner always has full access."""
if file_obj.owner_id == user.id:
return True
perm = FilePermission.query.filter_by(
file_id=file_obj.id, user_id=user.id
).first()
if not perm:
return False
perm_levels = {'read': 0, 'write': 1, 'admin': 2}
return perm_levels.get(perm.permission, -1) >= perm_levels.get(permission, 0)
def _get_file_or_403(file_id, user, permission='read'):
f = db.session.get(File, file_id)
if not f:
return None, (jsonify({'error': 'Datei nicht gefunden'}), 404)
if not _check_file_access(f, user, permission):
return None, (jsonify({'error': 'Zugriff verweigert'}), 403)
return f, None
def _compute_checksum(filepath):
h = hashlib.sha256()
with open(filepath, 'rb') as fh:
for chunk in iter(lambda: fh.read(8192), b''):
h.update(chunk)
return h.hexdigest()
# --- Folder / File listing ---
@api_bp.route('/files', methods=['GET'])
@token_required
def list_files():
user = request.current_user
parent_id = request.args.get('parent_id', None, type=int)
# Own files in this folder
query = File.query.filter_by(owner_id=user.id, parent_id=parent_id)
files = query.order_by(File.is_folder.desc(), File.name).all()
# Shared files at root level
shared = []
if parent_id is None:
shared_perms = FilePermission.query.filter_by(user_id=user.id).all()
shared_file_ids = [p.file_id for p in shared_perms]
if shared_file_ids:
shared = File.query.filter(
File.id.in_(shared_file_ids),
File.parent_id.is_(None)
).order_by(File.is_folder.desc(), File.name).all()
result = [f.to_dict() for f in files]
for f in shared:
d = f.to_dict()
d['shared'] = True
result.append(d)
# Build breadcrumb
breadcrumb = []
if parent_id:
current = db.session.get(File, parent_id)
while current:
breadcrumb.insert(0, {'id': current.id, 'name': current.name})
current = current.parent
return jsonify({'files': result, 'breadcrumb': breadcrumb}), 200
@api_bp.route('/files/folder', methods=['POST'])
@token_required
def create_folder():
user = request.current_user
data = request.get_json()
name = data.get('name', '').strip()
parent_id = data.get('parent_id', None)
if not name:
return jsonify({'error': 'Ordnername erforderlich'}), 400
if parent_id:
parent, err = _get_file_or_403(parent_id, user, 'write')
if err:
return err
if not parent.is_folder:
return jsonify({'error': 'Uebergeordnetes Element ist kein Ordner'}), 400
existing = File.query.filter_by(
owner_id=user.id, parent_id=parent_id, name=name, is_folder=True
).first()
if existing:
return jsonify({'error': 'Ordner existiert bereits'}), 409
folder = File(
owner_id=user.id,
parent_id=parent_id,
name=name,
is_folder=True,
)
db.session.add(folder)
db.session.commit()
return jsonify(folder.to_dict()), 201
def _ensure_folder_path(user_id, parent_id, path_parts):
"""Create nested folder structure. Returns the ID of the deepest folder."""
current_parent = parent_id
for part in path_parts:
part = part.strip()
if not part:
continue
existing = File.query.filter_by(
owner_id=user_id, parent_id=current_parent, name=part, is_folder=True
).first()
if existing:
current_parent = existing.id
else:
folder = File(owner_id=user_id, parent_id=current_parent, name=part, is_folder=True)
db.session.add(folder)
db.session.flush()
current_parent = folder.id
return current_parent
@api_bp.route('/files/ensure-path', methods=['POST'])
@token_required
def ensure_folder_path():
"""Create nested folder structure from a path string like 'Docs/Work/Project'.
Returns the ID of the deepest folder."""
user = request.current_user
data = request.get_json()
path = data.get('path', '').strip().strip('/')
parent_id = data.get('parent_id', None)
if not path:
return jsonify({'folder_id': parent_id}), 200
parts = [p for p in path.split('/') if p.strip()]
folder_id = _ensure_folder_path(user.id, parent_id, parts)
db.session.commit()
return jsonify({'folder_id': folder_id}), 200
# --- Upload ---
@api_bp.route('/files/upload', methods=['POST'])
@token_required
def upload_file():
user = request.current_user
parent_id = request.form.get('parent_id', None, type=int)
if parent_id:
parent, err = _get_file_or_403(parent_id, user, 'write')
if err:
return err
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei gesendet'}), 400
uploaded = request.files['file']
if not uploaded.filename:
return jsonify({'error': 'Leerer Dateiname'}), 400
filename = uploaded.filename
mime = uploaded.content_type or mimetypes.guess_type(filename)[0] or 'application/octet-stream'
# Save to disk with UUID name
storage_name = str(uuid.uuid4())
user_dir = _user_upload_dir(user.id)
storage_path = user_dir / storage_name
uploaded.save(str(storage_path))
size = os.path.getsize(str(storage_path))
checksum = _compute_checksum(str(storage_path))
# Check if file with same name exists -> overwrite
existing = File.query.filter_by(
owner_id=user.id, parent_id=parent_id, name=filename, is_folder=False
).first()
if existing:
# Remove old file from disk
old_path = Path(current_app.config['UPLOAD_PATH']) / str(user.id) / existing.storage_path
if old_path.exists():
old_path.unlink()
existing.storage_path = storage_name
existing.size = size
existing.mime_type = mime
existing.checksum = checksum
existing.updated_at = datetime.now(timezone.utc)
db.session.commit()
return jsonify(existing.to_dict()), 200
file_obj = File(
owner_id=user.id,
parent_id=parent_id,
name=filename,
is_folder=False,
mime_type=mime,
size=size,
storage_path=storage_name,
checksum=checksum,
)
db.session.add(file_obj)
db.session.commit()
return jsonify(file_obj.to_dict()), 201
# --- Download ---
@api_bp.route('/files/<int:file_id>/download', methods=['GET'])
@token_required
def download_file(file_id):
user = request.current_user
f, err = _get_file_or_403(file_id, user, 'read')
if err:
return err
if f.is_folder:
return jsonify({'error': 'Ordner koennen nicht heruntergeladen werden'}), 400
filepath = Path(current_app.config['UPLOAD_PATH']) / str(f.owner_id) / f.storage_path
if not filepath.exists():
return jsonify({'error': 'Datei auf Datentraeger nicht gefunden'}), 404
return send_file(str(filepath), mimetype=f.mime_type, as_attachment=True,
download_name=f.name)
# --- Rename / Move ---
@api_bp.route('/files/<int:file_id>', methods=['PUT'])
@token_required
def update_file(file_id):
user = request.current_user
f, err = _get_file_or_403(file_id, user, 'write')
if err:
return err
data = request.get_json()
if 'name' in data:
name = data['name'].strip()
if name:
f.name = name
if 'parent_id' in data:
new_parent = data['parent_id']
if new_parent is not None:
parent, perr = _get_file_or_403(new_parent, user, 'write')
if perr:
return perr
if not parent.is_folder:
return jsonify({'error': 'Ziel ist kein Ordner'}), 400
# Prevent moving folder into itself
if f.is_folder:
check = parent
while check:
if check.id == f.id:
return jsonify({'error': 'Ordner kann nicht in sich selbst verschoben werden'}), 400
check = check.parent
f.parent_id = new_parent
f.updated_at = datetime.now(timezone.utc)
db.session.commit()
return jsonify(f.to_dict()), 200
# --- Delete ---
@api_bp.route('/files/<int:file_id>', methods=['DELETE'])
@token_required
def delete_file(file_id):
user = request.current_user
f, err = _get_file_or_403(file_id, user, 'admin')
if err:
# Owner can always delete
f = db.session.get(File, file_id)
if not f or f.owner_id != user.id:
return jsonify({'error': 'Zugriff verweigert'}), 403
_delete_recursive(f, user.id)
db.session.commit()
return jsonify({'message': 'Geloescht'}), 200
def _delete_recursive(file_obj, user_id):
if file_obj.is_folder:
children = File.query.filter_by(parent_id=file_obj.id).all()
for child in children:
_delete_recursive(child, user_id)
else:
if file_obj.storage_path:
filepath = Path(current_app.config['UPLOAD_PATH']) / str(file_obj.owner_id) / file_obj.storage_path
if filepath.exists():
filepath.unlink()
db.session.delete(file_obj)
# --- Permissions ---
@api_bp.route('/files/<int:file_id>/permissions', methods=['GET'])
@token_required
def get_permissions(file_id):
user = request.current_user
f, err = _get_file_or_403(file_id, user, 'admin')
if err:
if not (f := db.session.get(File, file_id)) or f.owner_id != user.id:
return jsonify({'error': 'Zugriff verweigert'}), 403
perms = FilePermission.query.filter_by(file_id=file_id).all()
from app.models.user import User
result = []
for p in perms:
u = db.session.get(User, p.user_id)
result.append({
'id': p.id,
'user_id': p.user_id,
'username': u.username if u else None,
'permission': p.permission,
})
return jsonify(result), 200
@api_bp.route('/files/<int:file_id>/permissions', methods=['POST'])
@token_required
def set_permission(file_id):
user = request.current_user
f = db.session.get(File, file_id)
if not f or f.owner_id != user.id:
return jsonify({'error': 'Nur der Eigentuemer kann Berechtigungen setzen'}), 403
data = request.get_json()
target_user_id = data.get('user_id')
permission = data.get('permission', 'read')
if permission not in ('read', 'write', 'admin'):
return jsonify({'error': 'Ungueltige Berechtigung'}), 400
from app.models.user import User
target = db.session.get(User, target_user_id)
if not target:
return jsonify({'error': 'Benutzer nicht gefunden'}), 404
existing = FilePermission.query.filter_by(
file_id=file_id, user_id=target_user_id
).first()
is_new = not existing
if existing:
existing.permission = permission
else:
perm = FilePermission(file_id=file_id, user_id=target_user_id, permission=permission)
db.session.add(perm)
db.session.commit()
# Notify user via email
if is_new:
try:
from app.services.system_mail import notify_file_shared_with_user
notify_file_shared_with_user(f.name, user.username, target)
except Exception:
pass
return jsonify({'message': 'Berechtigung gesetzt'}), 200
@api_bp.route('/files/<int:file_id>/permissions/<int:perm_id>', methods=['DELETE'])
@token_required
def remove_permission(file_id, perm_id):
user = request.current_user
f = db.session.get(File, file_id)
if not f or f.owner_id != user.id:
return jsonify({'error': 'Nur der Eigentuemer kann Berechtigungen entfernen'}), 403
perm = db.session.get(FilePermission, perm_id)
if not perm or perm.file_id != file_id:
return jsonify({'error': 'Berechtigung nicht gefunden'}), 404
db.session.delete(perm)
db.session.commit()
return jsonify({'message': 'Berechtigung entfernt'}), 200
# --- Share Links ---
@api_bp.route('/files/<int:file_id>/share', methods=['POST'])
@token_required
def create_share_link(file_id):
user = request.current_user
f, err = _get_file_or_403(file_id, user, 'read')
if err:
return err
data = request.get_json() or {}
password = data.get('password')
expires_at = data.get('expires_at')
max_downloads = data.get('max_downloads')
permission = data.get('permission', 'read')
if permission not in ('read', 'write', 'upload_only'):
return jsonify({'error': 'Berechtigung muss "read", "write" oder "upload_only" sein'}), 400
token = secrets.token_urlsafe(32)
password_hash = None
if password:
password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
exp_dt = None
if expires_at:
try:
exp_dt = datetime.fromisoformat(expires_at).replace(tzinfo=timezone.utc)
except ValueError:
return jsonify({'error': 'Ungueltiges Datumsformat'}), 400
link = ShareLink(
file_id=file_id,
token=token,
permission=permission,
password_hash=password_hash,
expires_at=exp_dt,
created_by=user.id,
max_downloads=max_downloads,
)
db.session.add(link)
db.session.commit()
return jsonify({
'token': token,
'url': f'/share/{token}',
'permission': permission,
'expires_at': exp_dt.isoformat() if exp_dt else None,
'has_password': bool(password),
}), 201
@api_bp.route('/files/<int:file_id>/shares', methods=['GET'])
@token_required
def list_share_links(file_id):
user = request.current_user
f, err = _get_file_or_403(file_id, user, 'read')
if err:
return err
links = ShareLink.query.filter_by(file_id=file_id).all()
return jsonify([{
'id': l.id,
'token': l.token,
'permission': l.permission,
'has_password': bool(l.password_hash),
'expires_at': l.expires_at.isoformat() if l.expires_at else None,
'download_count': l.download_count,
'max_downloads': l.max_downloads,
'created_at': l.created_at.isoformat(),
} for l in links]), 200
@api_bp.route('/share/<token>/info', methods=['GET'])
def share_info(token):
link = ShareLink.query.filter_by(token=token).first()
if not link:
return jsonify({'error': 'Link nicht gefunden'}), 404
if link.is_expired():
return jsonify({'error': 'Link abgelaufen'}), 410
if link.is_download_limit_reached():
return jsonify({'error': 'Download-Limit erreicht'}), 410
f = db.session.get(File, link.file_id)
return jsonify({
'name': f.name,
'is_folder': f.is_folder,
'size': f.size,
'mime_type': f.mime_type,
'has_password': bool(link.password_hash),
'permission': link.permission,
'upload_allowed': f.is_folder and link.permission in ('write', 'upload_only'),
'download_allowed': link.permission in ('read', 'write'),
}), 200
@api_bp.route('/share/<token>/verify', methods=['POST'])
def share_verify(token):
link = ShareLink.query.filter_by(token=token).first()
if not link:
return jsonify({'error': 'Link nicht gefunden'}), 404
if link.is_expired():
return jsonify({'error': 'Link abgelaufen'}), 410
data = request.get_json() or {}
password = data.get('password', '')
if link.password_hash:
if not bcrypt.check_password_hash(link.password_hash, password):
return jsonify({'error': 'Falsches Passwort'}), 401
# Generate temporary download token
download_token = secrets.token_urlsafe(16)
# Store in link temporarily (simple approach)
link._download_token = download_token
return jsonify({'download_token': download_token}), 200
@api_bp.route('/share/<token>/download', methods=['GET'])
def share_download(token):
link = ShareLink.query.filter_by(token=token).first()
if not link:
return jsonify({'error': 'Link nicht gefunden'}), 404
if link.permission == 'upload_only':
return jsonify({'error': 'Dieser Link erlaubt nur Upload, keinen Download'}), 403
if link.is_expired():
return jsonify({'error': 'Link abgelaufen'}), 410
if link.is_download_limit_reached():
return jsonify({'error': 'Download-Limit erreicht'}), 410
# Check password if set
if link.password_hash:
# For password-protected links, require the password as query param or header
password = request.args.get('password', '') or request.headers.get('X-Share-Password', '')
if not bcrypt.check_password_hash(link.password_hash, password):
return jsonify({'error': 'Passwort erforderlich'}), 401
f = db.session.get(File, link.file_id)
if f.is_folder:
return jsonify({'error': 'Ordner-Download noch nicht implementiert'}), 501
filepath = Path(current_app.config['UPLOAD_PATH']) / str(f.owner_id) / f.storage_path
if not filepath.exists():
return jsonify({'error': 'Datei nicht gefunden'}), 404
link.download_count += 1
db.session.commit()
# Notify creator about download
try:
from app.services.system_mail import notify_share_link_accessed
notify_share_link_accessed(link, f.name, request.remote_addr)
except Exception:
pass
return send_file(str(filepath), mimetype=f.mime_type, as_attachment=True,
download_name=f.name)
@api_bp.route('/share/<token>/upload', methods=['POST'])
def share_upload(token):
"""Upload a file via a share link (only if the shared item is a folder with write permission)."""
link = ShareLink.query.filter_by(token=token).first()
if not link:
return jsonify({'error': 'Link nicht gefunden'}), 404
if link.is_expired():
return jsonify({'error': 'Link abgelaufen'}), 410
# Check write/upload permission
if link.permission not in ('write', 'upload_only'):
return jsonify({'error': 'Dieser Link erlaubt keinen Upload'}), 403
# Check password if set
if link.password_hash:
password = request.form.get('password', '') or request.headers.get('X-Share-Password', '')
if not bcrypt.check_password_hash(link.password_hash, password):
return jsonify({'error': 'Passwort erforderlich'}), 401
f = db.session.get(File, link.file_id)
if not f.is_folder:
return jsonify({'error': 'Upload nur in freigegebene Ordner moeglich'}), 400
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei gesendet'}), 400
uploaded = request.files['file']
if not uploaded.filename:
return jsonify({'error': 'Leerer Dateiname'}), 400
filename = uploaded.filename
mime = uploaded.content_type or mimetypes.guess_type(filename)[0] or 'application/octet-stream'
storage_name = str(uuid.uuid4())
user_dir = _user_upload_dir(f.owner_id)
storage_path = user_dir / storage_name
uploaded.save(str(storage_path))
size = os.path.getsize(str(storage_path))
checksum = _compute_checksum(str(storage_path))
file_obj = File(
owner_id=f.owner_id,
parent_id=f.id,
name=filename,
is_folder=False,
mime_type=mime,
size=size,
storage_path=storage_name,
checksum=checksum,
)
db.session.add(file_obj)
db.session.commit()
# Notify share link creator about the upload
try:
from app.services.system_mail import notify_share_link_upload
notify_share_link_upload(link, f.name, filename, size, request.remote_addr)
except Exception:
pass
return jsonify(file_obj.to_dict()), 201
@api_bp.route('/share/<token>', methods=['DELETE'])
@token_required
def delete_share_link(token):
user = request.current_user
link = ShareLink.query.filter_by(token=token).first()
if not link:
return jsonify({'error': 'Link nicht gefunden'}), 404
if link.created_by != user.id:
return jsonify({'error': 'Nur der Ersteller kann den Link loeschen'}), 403
db.session.delete(link)
db.session.commit()
return jsonify({'message': 'Link geloescht'}), 200
# --- Sync API ---
@api_bp.route('/sync/tree', methods=['GET'])
@token_required
def sync_tree():
"""Returns complete file tree with checksums for sync clients."""
user = request.current_user
def _build_tree(parent_id):
files = File.query.filter_by(owner_id=user.id, parent_id=parent_id)\
.order_by(File.is_folder.desc(), File.name).all()
result = []
for f in files:
entry = {
'id': f.id,
'name': f.name,
'is_folder': f.is_folder,
'size': f.size,
'checksum': f.checksum,
'updated_at': f.updated_at.isoformat() if f.updated_at else None,
}
if f.is_folder:
entry['children'] = _build_tree(f.id)
result.append(entry)
return result
return jsonify({'tree': _build_tree(None)}), 200
@api_bp.route('/sync/changes', methods=['GET'])
@token_required
def sync_changes():
"""Returns files changed since a given timestamp."""
user = request.current_user
since = request.args.get('since')
if not since:
return jsonify({'error': 'Parameter "since" erforderlich'}), 400
try:
since_dt = datetime.fromisoformat(since).replace(tzinfo=timezone.utc)
except ValueError:
return jsonify({'error': 'Ungueltiges Datumsformat'}), 400
changed = File.query.filter(
File.owner_id == user.id,
File.updated_at > since_dt
).all()
return jsonify({
'changes': [f.to_dict() for f in changed],
'server_time': datetime.now(timezone.utc).isoformat(),
}), 200