Files
s3_to_folder_sync/web/app.py
2026-03-18 16:35:15 +01:00

311 lines
10 KiB
Python

#!/usr/bin/env python3
# ============================================================
# app.py - Dashboard Web per S3-to-Local Sync (Multi-Sync)
# ============================================================
# Server Flask leggero che espone una dashboard in tempo reale
# con supporto per MULTIPLE associazioni bucket:cartella (1:1).
# Usa Server-Sent Events (SSE) per lo streaming dei log.
# ============================================================
import json
import os
import sys
import time
import threading
from datetime import datetime
from pathlib import Path
from flask import Flask, render_template, Response, jsonify, stream_with_context, request
# Aggiungi la directory web al path per importare cron_helper
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# ============================================================
# Configurazione
# ============================================================
# Directory condivisa con sync.sh per lo scambio di stato
STATE_DIR = Path(os.environ.get("STATE_DIR", "/data/state"))
WEB_PORT = int(os.environ.get("WEB_PORT", 8080))
app = Flask(__name__)
# ============================================================
# Gestione Multi-Sync
# ============================================================
def load_sync_configs():
"""Carica la configurazione delle sync da SYNC_CONFIGS o fallback a singola sync."""
sync_configs_json = os.environ.get("SYNC_CONFIGS", "").strip()
if sync_configs_json:
try:
configs = json.loads(sync_configs_json)
parsed = {}
for i, cfg in enumerate(configs):
sync_id = cfg.get("id", f"sync{i+1}")
parsed[sync_id] = {
"id": sync_id,
"bucket": cfg.get("bucket", "default"),
"prefix": cfg.get("prefix", ""),
"local_path": cfg.get("local_path") or cfg.get("path") or f"/data/local{i+1}",
}
if parsed:
return parsed
except json.JSONDecodeError:
pass
# Fallback: singola sync (compatibilità)
return {
"sync1": {
"id": "sync1",
"bucket": os.environ.get("S3_BUCKET", "default"),
"prefix": os.environ.get("S3_PATH_PREFIX", ""),
"local_path": os.environ.get("LOCAL_PATH", "/data/local"),
}
}
SYNC_CONFIGS = load_sync_configs()
def get_sync_state_dir(sync_id):
"""Ritorna la directory di stato per una specifica sync."""
return STATE_DIR / sync_id
def get_sync_bucket(sync_id):
"""Ritorna il nome del bucket per una specifica sync."""
cfg = SYNC_CONFIGS.get(sync_id)
return cfg.get("bucket", sync_id) if cfg else sync_id
def get_sync_local_path(sync_id):
"""Ritorna la cartella locale della sync specifica."""
cfg = SYNC_CONFIGS.get(sync_id)
if cfg:
return cfg.get("local_path", "/data/local")
return "/data/local"
def get_default_sync_id():
"""Ritorna il primo sync id disponibile."""
if SYNC_CONFIGS:
return next(iter(SYNC_CONFIGS.keys()))
return "sync1"
# ============================================================
# Funzioni di utilità
# ============================================================
def read_json_safe(filepath, default=None):
"""Legge un file JSON in modo sicuro."""
try:
if filepath.exists():
text = filepath.read_text().strip()
if text:
return json.loads(text)
except (json.JSONDecodeError, OSError):
pass
return default if default is not None else {}
def get_folder_stats(path="/data/local"):
"""Calcola statistiche sulla cartella sincronizzata."""
total_size = 0
file_count = 0
dir_count = 0
try:
for entry in Path(path).rglob("*"):
if entry.is_file():
file_count += 1
total_size += entry.stat().st_size
elif entry.is_dir():
dir_count += 1
except OSError:
pass
return {
"file_count": file_count,
"dir_count": dir_count,
"total_size": total_size,
"total_size_human": format_size(total_size),
}
def format_size(size_bytes):
"""Converte bytes in formato leggibile."""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} PB"
# ============================================================
# Routes
# ============================================================
@app.route("/")
def index():
"""Pagina principale della dashboard con tab untuk multi-sync."""
sync_interval = int(os.environ.get("SYNC_INTERVAL", 300))
sync_schedule = os.environ.get("SYNC_SCHEDULE", "")
default_sync = get_default_sync_id()
default_cfg = SYNC_CONFIGS.get(default_sync, {})
if sync_schedule:
from cron_helper import human_readable
schedule_display = human_readable(sync_schedule)
schedule_mode = "cron"
else:
schedule_display = f"Ogni {sync_interval}s"
schedule_mode = "interval"
config = {
"endpoint": os.environ.get("S3_ENDPOINT", "N/A"),
"bucket": default_cfg.get("bucket", os.environ.get("S3_BUCKET", "N/A")),
"prefix": default_cfg.get("prefix", os.environ.get("S3_PATH_PREFIX", "")) or "(tutto il bucket)",
"sync_mode": os.environ.get("SYNC_MODE", "mirror"),
"sync_interval": sync_interval,
"sync_schedule": sync_schedule,
"schedule_mode": schedule_mode,
"schedule_display": schedule_display,
"sync_on_start": os.environ.get("SYNC_ON_START", "true"),
"transfers": os.environ.get("SYNC_TRANSFERS", "4"),
"bandwidth": os.environ.get("SYNC_BANDWIDTH", "0") or "illimitata",
"sync_ids": list(SYNC_CONFIGS.keys()),
"sync_buckets": {sid: get_sync_bucket(sid) for sid in SYNC_CONFIGS.keys()},
}
return render_template("index.html", config=config)
@app.route("/api/syncs")
def api_syncs():
"""API: lista di tutte le sync configurate."""
return jsonify({
"syncs": [
{"id": sid, "bucket": get_sync_bucket(sid)}
for sid in SYNC_CONFIGS.keys()
]
})
@app.route("/api/status/<sync_id>")
def api_status(sync_id):
"""API: restituisce lo stato di una specifica sync."""
state_dir = get_sync_state_dir(sync_id)
status_file = state_dir / "status.json"
status = read_json_safe(status_file, {
"state": "starting",
"last_sync": None,
"next_sync": None,
"message": "In avvio...",
})
# Aggiungi statistiche della cartella locale
status["bucket"] = get_sync_bucket(sync_id)
status["folder_stats"] = get_folder_stats(get_sync_local_path(sync_id))
return jsonify(status)
@app.route("/api/status")
def api_status_default():
"""Compat: stato della sync di default."""
return api_status(get_default_sync_id())
@app.route("/api/changes/<sync_id>")
def api_changes(sync_id):
"""API: restituisce le ultime modifiche ai file per una sync."""
state_dir = get_sync_state_dir(sync_id)
changes_file = state_dir / "recent_changes.json"
changes = read_json_safe(changes_file, [])
return jsonify(changes)
@app.route("/api/changes")
def api_changes_default():
"""Compat: modifiche della sync di default."""
return api_changes(get_default_sync_id())
@app.route("/api/history/<sync_id>")
def api_history(sync_id):
"""API: restituisce lo storico delle sincronizzazioni."""
state_dir = get_sync_state_dir(sync_id)
history_file = state_dir / "history.json"
history = read_json_safe(history_file, [])
return jsonify(history)
@app.route("/api/history")
def api_history_default():
"""Compat: storico della sync di default."""
return api_history(get_default_sync_id())
@app.route("/api/stream/<sync_id>")
def api_stream(sync_id):
"""SSE: stream in tempo reale dei log della sincronizzazione."""
state_dir = get_sync_state_dir(sync_id)
log_file = state_dir / "sync.log"
def generate():
"""Generatore SSE: legge il file di log e invia nuove righe al client."""
try:
if log_file.exists():
# Invia le ultime 50 righe come contesto iniziale
with open(log_file, "r") as f:
lines = f.readlines()
for line in lines[-50:]:
yield f"data: {json.dumps({'type': 'log', 'message': line.strip()})}\n\n"
# Poi segui il file (tail -f style)
with open(log_file, "r") as f:
f.seek(0, 2)
while True:
line = f.readline()
if line:
yield f"data: {json.dumps({'type': 'log', 'message': line.strip()})}\n\n"
else:
yield f"data: {json.dumps({'type': 'heartbeat'})}\n\n"
time.sleep(1)
else:
while not log_file.exists():
yield f"data: {json.dumps({'type': 'log', 'message': 'In attesa del primo avvio sync...'})}\n\n"
time.sleep(2)
yield from generate()
except GeneratorExit:
pass
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@app.route("/api/stream")
def api_stream_default():
"""Compat: stream log della sync di default."""
return api_stream(get_default_sync_id())
# ============================================================
# Avvio server
# ============================================================
if __name__ == "__main__":
STATE_DIR.mkdir(parents=True, exist_ok=True)
# Crea directory di stato per ogni sync se necessarie
for sync_id in SYNC_CONFIGS.keys():
sync_state_dir = get_sync_state_dir(sync_id)
sync_state_dir.mkdir(parents=True, exist_ok=True)
app.run(
host="0.0.0.0",
port=WEB_PORT,
debug=False,
threaded=True,
)