Files
s3_to_folder_sync/web/app.py
2026-03-18 13:49:16 +01:00

213 lines
7.4 KiB
Python

#!/usr/bin/env python3
# ============================================================
# app.py - Dashboard Web per S3-to-Local Sync
# ============================================================
# Server Flask leggero che espone una dashboard in tempo reale
# con Server-Sent Events (SSE) per lo streaming dei log e dello
# stato della sincronizzazione. Ispirato all'UI di Syncthing.
# ============================================================
import json
import os
import sys
import time
import threading
from datetime import datetime
from pathlib import Path
from flask import Flask, render_template, Response, jsonify, stream_with_context
# Aggiungi la directory web al path per importare cron_helper
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# ============================================================
# Configurazione
# ============================================================
# Directory condivisa con sync.sh per lo scambio di stato
STATE_DIR = Path(os.environ.get("STATE_DIR", "/data/state"))
# Porta del web server (configurabile da .env)
WEB_PORT = int(os.environ.get("WEB_PORT", 8080))
# File di stato scritti da sync.sh
STATUS_FILE = STATE_DIR / "status.json" # Stato corrente della sync
HISTORY_FILE = STATE_DIR / "history.json" # Storico delle sincronizzazioni
LOG_FILE = STATE_DIR / "sync.log" # Log in tempo reale
CHANGES_FILE = STATE_DIR / "recent_changes.json" # Ultime modifiche ai file
app = Flask(__name__)
# ============================================================
# Funzioni di utilità
# ============================================================
def read_json_safe(filepath, default=None):
"""Legge un file JSON in modo sicuro, ritorna default se non esiste o è corrotto."""
try:
if filepath.exists():
text = filepath.read_text().strip()
if text:
return json.loads(text)
except (json.JSONDecodeError, OSError):
pass
return default if default is not None else {}
def get_folder_stats(path="/data/local"):
"""Calcola statistiche sulla cartella sincronizzata (numero file, dimensione totale)."""
total_size = 0
file_count = 0
dir_count = 0
try:
for entry in Path(path).rglob("*"):
if entry.is_file():
file_count += 1
total_size += entry.stat().st_size
elif entry.is_dir():
dir_count += 1
except OSError:
pass
return {
"file_count": file_count,
"dir_count": dir_count,
"total_size": total_size,
"total_size_human": format_size(total_size),
}
def format_size(size_bytes):
"""Converte bytes in formato leggibile (es. 1.5 GB)."""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} PB"
# ============================================================
# Routes
# ============================================================
@app.route("/")
def index():
"""Pagina principale della dashboard."""
# Passa la configurazione al template per il riepilogo
# Determina la modalità di pianificazione
sync_schedule = os.environ.get("SYNC_SCHEDULE", "")
sync_interval = int(os.environ.get("SYNC_INTERVAL", 300))
if sync_schedule:
# Modalità cron: mostra l'espressione e la descrizione leggibile
from cron_helper import human_readable
schedule_display = human_readable(sync_schedule)
schedule_mode = "cron"
else:
schedule_display = f"Ogni {sync_interval}s"
schedule_mode = "interval"
config = {
"endpoint": os.environ.get("S3_ENDPOINT", "N/A"),
"bucket": os.environ.get("S3_BUCKET", "N/A"),
"prefix": os.environ.get("S3_PATH_PREFIX", "") or "(tutto il bucket)",
"sync_mode": os.environ.get("SYNC_MODE", "mirror"),
"sync_interval": sync_interval,
"sync_schedule": sync_schedule,
"schedule_mode": schedule_mode,
"schedule_display": schedule_display,
"sync_on_start": os.environ.get("SYNC_ON_START", "true"),
"transfers": os.environ.get("SYNC_TRANSFERS", "4"),
"bandwidth": os.environ.get("SYNC_BANDWIDTH", "0") or "illimitata",
}
return render_template("index.html", config=config)
@app.route("/api/status")
def api_status():
"""API: restituisce lo stato corrente della sincronizzazione."""
status = read_json_safe(STATUS_FILE, {
"state": "starting",
"last_sync": None,
"next_sync": None,
"message": "In avvio...",
})
# Aggiungi statistiche della cartella locale
status["folder_stats"] = get_folder_stats()
return jsonify(status)
@app.route("/api/history")
def api_history():
"""API: restituisce lo storico delle ultime sincronizzazioni."""
history = read_json_safe(HISTORY_FILE, [])
return jsonify(history)
@app.route("/api/changes")
def api_changes():
"""API: restituisce le ultime modifiche ai file."""
changes = read_json_safe(CHANGES_FILE, [])
return jsonify(changes)
@app.route("/api/stream")
def api_stream():
"""SSE: stream in tempo reale dei log della sincronizzazione."""
def generate():
"""Generatore SSE: legge il file di log e invia nuove righe al client."""
# Inizia dalla fine del file se esiste
try:
if LOG_FILE.exists():
# Invia le ultime 50 righe come contesto iniziale
with open(LOG_FILE, "r") as f:
lines = f.readlines()
for line in lines[-50:]:
yield f"data: {json.dumps({'type': 'log', 'message': line.strip()})}\n\n"
# Poi segui il file (tail -f style)
with open(LOG_FILE, "r") as f:
f.seek(0, 2) # Vai alla fine del file
while True:
line = f.readline()
if line:
yield f"data: {json.dumps({'type': 'log', 'message': line.strip()})}\n\n"
else:
# Nessuna nuova riga: invia heartbeat per mantenere la connessione
yield f"data: {json.dumps({'type': 'heartbeat'})}\n\n"
time.sleep(1)
else:
# File non ancora creato: attendi
while not LOG_FILE.exists():
yield f"data: {json.dumps({'type': 'log', 'message': 'In attesa del primo avvio sync...'})}\n\n"
time.sleep(2)
# Ricomincia leggendo il file appena creato
yield from generate()
except GeneratorExit:
pass
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disabilita buffering nginx (se presente)
},
)
# ============================================================
# Avvio server
# ============================================================
if __name__ == "__main__":
# Assicura che la directory di stato esista
STATE_DIR.mkdir(parents=True, exist_ok=True)
app.run(
host="0.0.0.0",
port=WEB_PORT,
debug=False,
# threaded=True per gestire più client SSE contemporaneamente
threaded=True,
)