minmal-file-cloud-email-pim.../backend/app/api/office.py

515 lines
18 KiB
Python

import io
import os
import hashlib
from datetime import datetime, timezone, timedelta
from pathlib import Path
from flask import request, jsonify, current_app, send_file
from app.api import api_bp
from app.api.auth import token_required
from app.api.files import _get_file_or_403
from app.extensions import db
from app.models.settings import AppSettings
@api_bp.route('/files/<int:file_id>/preview', methods=['GET'])
@token_required
def preview_file(file_id):
from flask import after_this_request
@after_this_request
def add_no_cache(response):
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
return response
user = request.current_user
f, err = _get_file_or_403(file_id, user, 'read')
if err:
return err
if f.is_folder:
return jsonify({'error': 'Ordner haben keine Vorschau'}), 400
mime = f.mime_type or ''
filepath = Path(current_app.config['UPLOAD_PATH']) / str(f.owner_id) / f.storage_path
if not filepath.exists():
return jsonify({'error': 'Datei nicht gefunden'}), 404
# PDF -> just return URL for PDF.js to load
if 'pdf' in mime:
return jsonify({
'type': 'pdf',
'url': f'/api/files/{file_id}/download',
'name': f.name,
}), 200
# DOCX
if mime in ('application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/msword') or f.name.endswith('.docx'):
try:
html = _convert_docx(filepath)
return jsonify({'type': 'html', 'content': html, 'name': f.name}), 200
except Exception as e:
return jsonify({'error': f'DOCX-Vorschau fehlgeschlagen: {str(e)}'}), 500
# XLSX
if mime in ('application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.ms-excel') or f.name.endswith('.xlsx'):
try:
data = _convert_xlsx(filepath)
return jsonify({'type': 'spreadsheet', 'sheets': data, 'name': f.name}), 200
except Exception as e:
return jsonify({'error': f'XLSX-Vorschau fehlgeschlagen: {str(e)}'}), 500
# PPTX
if mime in ('application/vnd.openxmlformats-officedocument.presentationml.presentation',
'application/vnd.ms-powerpoint') or f.name.endswith('.pptx'):
try:
slides = _convert_pptx(filepath)
return jsonify({'type': 'slides', 'slides': slides, 'name': f.name}), 200
except Exception as e:
return jsonify({'error': f'PPTX-Vorschau fehlgeschlagen: {str(e)}'}), 500
# Images
if mime.startswith('image/'):
return jsonify({
'type': 'image',
'url': f'/api/files/{file_id}/download',
'name': f.name,
}), 200
# Text files
if mime.startswith('text/') or f.name.endswith(('.txt', '.md', '.json', '.xml', '.csv',
'.py', '.js', '.html', '.css', '.yml', '.yaml')):
try:
content = filepath.read_text(encoding='utf-8', errors='replace')[:100000]
return jsonify({'type': 'text', 'content': content, 'name': f.name}), 200
except Exception:
pass
return jsonify({'type': 'unsupported', 'name': f.name, 'mime_type': mime}), 200
def _convert_docx(filepath):
from docx import Document
doc = Document(str(filepath))
html_parts = []
for para in doc.paragraphs:
style = para.style.name if para.style else ''
text = para.text
if not text.strip():
html_parts.append('<br/>')
continue
if 'Heading 1' in style:
html_parts.append(f'<h1>{text}</h1>')
elif 'Heading 2' in style:
html_parts.append(f'<h2>{text}</h2>')
elif 'Heading 3' in style:
html_parts.append(f'<h3>{text}</h3>')
else:
# Check for bold/italic runs
run_html = ''
for run in para.runs:
t = run.text
if run.bold:
t = f'<strong>{t}</strong>'
if run.italic:
t = f'<em>{t}</em>'
if run.underline:
t = f'<u>{t}</u>'
run_html += t
html_parts.append(f'<p>{run_html}</p>')
# Tables
for table in doc.tables:
html_parts.append('<table border="1" cellpadding="4" cellspacing="0" style="border-collapse: collapse; width: 100%">')
for i, row in enumerate(table.rows):
html_parts.append('<tr>')
tag = 'th' if i == 0 else 'td'
for cell in row.cells:
html_parts.append(f'<{tag}>{cell.text}</{tag}>')
html_parts.append('</tr>')
html_parts.append('</table>')
return '\n'.join(html_parts)
def _convert_xlsx(filepath):
from openpyxl import load_workbook
wb = load_workbook(str(filepath), read_only=True, data_only=True)
sheets = []
for ws in wb.worksheets:
rows = []
for row in ws.iter_rows(max_row=500, values_only=True):
rows.append([str(cell) if cell is not None else '' for cell in row])
sheets.append({
'name': ws.title,
'rows': rows,
})
wb.close()
return sheets
def _convert_pptx(filepath):
from pptx import Presentation
prs = Presentation(str(filepath))
slides = []
for i, slide in enumerate(prs.slides):
content_parts = []
for shape in slide.shapes:
if shape.has_text_frame:
for para in shape.text_frame.paragraphs:
text = para.text.strip()
if text:
content_parts.append(f'<p>{text}</p>')
if shape.has_table:
table_html = '<table border="1" cellpadding="4" style="border-collapse: collapse">'
for row in shape.table.rows:
table_html += '<tr>'
for cell in row.cells:
table_html += f'<td>{cell.text}</td>'
table_html += '</tr>'
table_html += '</table>'
content_parts.append(table_html)
slides.append({
'index': i,
'html': '\n'.join(content_parts) if content_parts else '<p>(Leere Folie)</p>',
})
return slides
# ========== Save (write back edited documents) ==========
@api_bp.route('/files/<int:file_id>/save', methods=['POST'])
@token_required
def save_file(file_id):
"""Save edited content back to the original file format."""
user = request.current_user
f, err = _get_file_or_403(file_id, user, 'write')
if err:
return err
if f.is_folder:
return jsonify({'error': 'Ordner koennen nicht gespeichert werden'}), 400
data = request.get_json()
save_type = data.get('type', '')
filepath = Path(current_app.config['UPLOAD_PATH']) / str(f.owner_id) / f.storage_path
try:
if save_type == 'html' and f.name.endswith('.docx'):
_save_html_to_docx(filepath, data.get('content', ''))
elif save_type == 'spreadsheet' and (f.name.endswith('.xlsx') or f.name.endswith('.xls')):
_save_sheets_to_xlsx(filepath, data.get('sheets', []))
elif save_type == 'text':
filepath.write_text(data.get('content', ''), encoding='utf-8')
else:
return jsonify({'error': f'Speichern fuer diesen Typ nicht unterstuetzt'}), 400
# Update file metadata
f.size = os.path.getsize(str(filepath))
h = hashlib.sha256()
with open(str(filepath), 'rb') as fh:
for chunk in iter(lambda: fh.read(8192), b''):
h.update(chunk)
f.checksum = h.hexdigest()
f.updated_at = datetime.now(timezone.utc)
db.session.commit()
return jsonify({'message': 'Gespeichert', 'size': f.size}), 200
except Exception as e:
return jsonify({'error': f'Speichern fehlgeschlagen: {str(e)}'}), 500
def _save_html_to_docx(filepath, html_content):
"""Convert HTML content back to DOCX."""
from docx import Document
from docx.shared import Pt
import re
doc = Document()
# Simple HTML to DOCX conversion
# Strip tags and convert basic elements
html = html_content.replace('\r\n', '\n').replace('\r', '\n')
# Process block elements
blocks = re.split(r'<(?:p|h[1-3]|br\s*/?)(?:\s[^>]*)?>|</(?:p|h[1-3])>', html)
# Find tag types
tags = re.findall(r'<(/?(?:p|h[1-3]|br\s*/?)(?:\s[^>]*)?)>', html)
current_tag = 'p'
for i, block in enumerate(blocks):
text = re.sub(r'<[^>]+>', '', block).strip()
if not text:
if i < len(tags):
tag = tags[i] if i < len(tags) else ''
if tag.startswith('h1'):
current_tag = 'h1'
elif tag.startswith('h2'):
current_tag = 'h2'
elif tag.startswith('h3'):
current_tag = 'h3'
else:
current_tag = 'p'
continue
if current_tag == 'h1':
doc.add_heading(text, level=1)
elif current_tag == 'h2':
doc.add_heading(text, level=2)
elif current_tag == 'h3':
doc.add_heading(text, level=3)
else:
# Check for bold/italic in remaining inline tags
para = doc.add_paragraph()
# Simple inline parsing
parts = re.split(r'(</?(?:strong|b|em|i|u)>)', block)
bold = False
italic = False
underline = False
for part in parts:
if part in ('<strong>', '<b>'):
bold = True
elif part in ('</strong>', '</b>'):
bold = False
elif part in ('<em>', '<i>'):
italic = True
elif part in ('</em>', '</i>'):
italic = False
elif part in ('<u>',):
underline = True
elif part in ('</u>',):
underline = False
else:
clean = re.sub(r'<[^>]+>', '', part)
if clean:
run = para.add_run(clean)
run.bold = bold
run.italic = italic
run.underline = underline
current_tag = 'p'
doc.save(str(filepath))
def _save_sheets_to_xlsx(filepath, sheets_data):
"""Save spreadsheet data back to XLSX."""
from openpyxl import Workbook
wb = Workbook()
# Remove default sheet
wb.remove(wb.active)
for sheet_data in sheets_data:
ws = wb.create_sheet(title=sheet_data.get('name', 'Sheet'))
for ri, row in enumerate(sheet_data.get('rows', []), 1):
for ci, cell_value in enumerate(row, 1):
val = cell_value
# Try to convert to number
try:
if '.' in str(val):
val = float(val)
else:
val = int(val)
except (ValueError, TypeError):
pass
ws.cell(row=ri, column=ci, value=val if val != '' else None)
wb.save(str(filepath))
# ========== OnlyOffice Integration ==========
@api_bp.route('/files/<int:file_id>/onlyoffice-config', methods=['GET'])
@token_required
def onlyoffice_config(file_id):
"""Generate OnlyOffice editor config for a file."""
import secrets as _secrets
user = request.current_user
f, err = _get_file_or_403(file_id, user, 'read')
if err:
return err
oo_url = os.environ.get('ONLYOFFICE_URL', '')
if not oo_url:
return jsonify({'error': 'OnlyOffice nicht konfiguriert', 'available': False}), 200
# Determine document type
ext = f.name.rsplit('.', 1)[-1].lower() if '.' in f.name else ''
doc_type_map = {
'docx': 'word', 'doc': 'word', 'odt': 'word', 'rtf': 'word', 'txt': 'word',
'xlsx': 'cell', 'xls': 'cell', 'ods': 'cell', 'csv': 'cell',
'pptx': 'slide', 'ppt': 'slide', 'odp': 'slide',
}
doc_type = doc_type_map.get(ext)
if not doc_type:
return jsonify({'error': 'Dateityp nicht von OnlyOffice unterstuetzt', 'available': False}), 200
# Check write permission
can_write = _get_file_or_403(file_id, user, 'write')[1] is None
# Generate a callback key for this editing session
callback_key = _secrets.token_urlsafe(16)
AppSettings.set(f'oo_callback_{callback_key}', str(file_id))
# Build the config
internal_url = os.environ.get('ONLYOFFICE_INTERNAL_URL', 'http://minicloud:5000')
# Generate a one-time file access key (no JWT needed, simpler for OnlyOffice)
file_access_key = _secrets.token_urlsafe(32)
AppSettings.set(f'oo_file_{file_access_key}', f'{file_id}:{user.id}')
config = {
'available': True,
'onlyoffice_url': oo_url.rstrip('/'),
'config': {
'document': {
'fileType': ext,
'key': f'{file_id}_{int(datetime.now(timezone.utc).timestamp())}_{callback_key[:8]}',
'title': f.name,
'url': f'{internal_url}/api/files/oo-download/{file_access_key}',
},
'documentType': doc_type,
'editorConfig': {
'callbackUrl': f'{internal_url}/api/files/onlyoffice-callback?key={callback_key}',
'mode': 'edit' if can_write else 'view',
'lang': 'de',
'user': {
'id': str(user.id),
'name': user.username,
},
},
},
}
# Sign config with JWT for OnlyOffice validation
jwt_secret = os.environ.get('JWT_SECRET_KEY', '')
if jwt_secret:
import jwt as pyjwt
config['config']['token'] = pyjwt.encode(config['config'], jwt_secret, algorithm='HS256')
return jsonify(config), 200
@api_bp.route('/files/oo-download/<access_key>', methods=['GET'])
def oo_download(access_key):
"""Dedicated download endpoint for OnlyOffice - no JWT auth, uses one-time key."""
data = AppSettings.get(f'oo_file_{access_key}', '')
if not data:
return jsonify({'error': 'Ungueltiger Zugangsschluessel'}), 403
parts = data.split(':')
if len(parts) != 2:
return jsonify({'error': 'Ungueltiger Zugangsschluessel'}), 403
file_id = int(parts[0])
from app.models.file import File
f = db.session.get(File, file_id)
if not f:
return jsonify({'error': 'Datei nicht gefunden'}), 404
filepath = Path(current_app.config['UPLOAD_PATH']) / str(f.owner_id) / f.storage_path
if not filepath.exists():
return jsonify({'error': 'Datei nicht auf Datentraeger'}), 404
return send_file(str(filepath), mimetype=f.mime_type or 'application/octet-stream',
as_attachment=False, download_name=f.name)
@api_bp.route('/files/onlyoffice-callback', methods=['POST'])
def onlyoffice_callback():
"""Callback from OnlyOffice when document is saved.
OnlyOffice sends status codes:
1 = editing, 2 = ready to save, 4 = closed no changes, 6 = force save
Must always return {"error": 0} for success.
"""
try:
import jwt as pyjwt
import urllib.request
import shutil
jwt_secret = os.environ.get('JWT_SECRET_KEY', '')
# Get callback data - may be JWT-wrapped
data = request.get_json(silent=True) or {}
print(f'[OnlyOffice Callback] Raw status={data.get("status")}, key={request.args.get("key", "")}')
# If body contains a JWT token, decode it to get the real data
if 'token' in data and jwt_secret:
try:
decoded = pyjwt.decode(data['token'], jwt_secret, algorithms=['HS256'])
data = decoded
except Exception as e:
print(f'[OnlyOffice Callback] Body JWT decode failed (using raw data): {e}')
status = data.get('status', 0)
callback_key = request.args.get('key', '')
# Status 2 or 6: save the document
if status in (2, 6):
file_id_str = AppSettings.get(f'oo_callback_{callback_key}', '')
if file_id_str:
download_url = data.get('url', '')
if download_url:
from app.models.file import File
file_id = int(file_id_str)
f = db.session.get(File, file_id)
if f and f.storage_path:
filepath = Path(current_app.config['UPLOAD_PATH']) / str(f.owner_id) / f.storage_path
print(f'[OnlyOffice Callback] Saving file {f.name} from {download_url}')
# Download saved doc from OnlyOffice
req = urllib.request.Request(download_url)
with urllib.request.urlopen(req, timeout=30) as resp, \
open(str(filepath), 'wb') as out:
shutil.copyfileobj(resp, out)
# Update metadata
f.size = os.path.getsize(str(filepath))
h = hashlib.sha256()
with open(str(filepath), 'rb') as fh:
for chunk in iter(lambda: fh.read(8192), b''):
h.update(chunk)
f.checksum = h.hexdigest()
f.updated_at = datetime.now(timezone.utc)
db.session.commit()
print(f'[OnlyOffice Callback] File saved: {f.name} ({f.size} bytes)')
# Status 2, 4, 6: cleanup
if status in (2, 4, 6):
try:
setting = db.session.get(AppSettings, f'oo_callback_{callback_key}')
if setting:
db.session.delete(setting)
db.session.commit()
except Exception:
pass
except Exception as e:
print(f'[OnlyOffice Callback] ERROR: {e}')
import traceback
traceback.print_exc()
# Still return error: 0 so OnlyOffice doesn't retry endlessly
return jsonify({'error': 0}), 200
return jsonify({'error': 0}), 200
@api_bp.route('/files/onlyoffice-status', methods=['GET'])
@token_required
def onlyoffice_status():
"""Check if OnlyOffice is available."""
oo_url = os.environ.get('ONLYOFFICE_URL', '')
return jsonify({
'available': bool(oo_url),
'url': oo_url,
}), 200