513 lines
16 KiB
Python
513 lines
16 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
from collections import deque
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import boto3
|
|
import requests
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from botocore.exceptions import BotoCoreError, ClientError
|
|
from flask import Flask, jsonify, render_template
|
|
|
|
|
|
logging.basicConfig(
|
|
level=os.getenv("LOG_LEVEL", "INFO"),
|
|
format="%(asctime)s | %(levelname)s | %(message)s",
|
|
)
|
|
logger = logging.getLogger("s3-folder-sync")
|
|
|
|
app = Flask(__name__)
|
|
scheduler = BackgroundScheduler(timezone="UTC")
|
|
log_buffer = deque(maxlen=int(os.getenv("LOG_BUFFER_SIZE", "500")))
|
|
status_lock = threading.Lock()
|
|
job_locks: dict[str, threading.Lock] = {}
|
|
|
|
|
|
@dataclass
|
|
class JobConfig:
|
|
name: str
|
|
bucket: str
|
|
local_dir: str
|
|
prefix: str = ""
|
|
schedule: str = "*/30 * * * *"
|
|
delete_local_extras: bool = True
|
|
region: str | None = None
|
|
endpoint_url: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class JobState:
|
|
name: str
|
|
bucket: str
|
|
local_dir: str
|
|
last_start: str | None = None
|
|
last_end: str | None = None
|
|
last_status: str = "never"
|
|
last_summary: dict[str, Any] = field(default_factory=dict)
|
|
running: bool = False
|
|
current_file: str | None = None
|
|
files_total: int = 0
|
|
files_done: int = 0
|
|
phase: str = ""
|
|
|
|
|
|
job_configs: list[JobConfig] = []
|
|
job_states: dict[str, JobState] = {}
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def parse_bool(value: str | None, default: bool = False) -> bool:
|
|
if value is None:
|
|
return default
|
|
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
|
|
|
|
|
|
def add_log(level: str, message: str, job_name: str | None = None, details: dict[str, Any] | None = None) -> None:
|
|
event = {
|
|
"time": utc_now_iso(),
|
|
"level": level.upper(),
|
|
"job": job_name,
|
|
"message": message,
|
|
"details": details or {},
|
|
}
|
|
log_buffer.appendleft(event)
|
|
text = f"[{event['level']}]"
|
|
if job_name:
|
|
text += f"[{job_name}]"
|
|
text += f" {message}"
|
|
if event["details"]:
|
|
text += f" | {event['details']}"
|
|
|
|
if level.lower() == "error":
|
|
logger.error(text)
|
|
elif level.lower() == "warning":
|
|
logger.warning(text)
|
|
else:
|
|
logger.info(text)
|
|
|
|
|
|
def notify_gotify(title: str, message: str, priority: int = 5) -> None:
|
|
gotify_url = os.getenv("GOTIFY_URL", "").strip().rstrip("/")
|
|
gotify_token = os.getenv("GOTIFY_TOKEN", "").strip()
|
|
|
|
if not gotify_url or not gotify_token:
|
|
return
|
|
|
|
try:
|
|
requests.post(
|
|
f"{gotify_url}/message",
|
|
json={"title": title, "message": message, "priority": priority},
|
|
params={"token": gotify_token},
|
|
timeout=10,
|
|
)
|
|
except requests.RequestException as exc:
|
|
add_log("warning", "Gotify notification failed", details={"error": str(exc)})
|
|
|
|
|
|
def get_s3_client(region: str | None = None, endpoint_url: str | None = None):
|
|
kwargs: dict[str, Any] = {}
|
|
if region:
|
|
kwargs["region_name"] = region
|
|
if endpoint_url:
|
|
kwargs["endpoint_url"] = endpoint_url
|
|
return boto3.client("s3", **kwargs)
|
|
|
|
|
|
def is_within_directory(base_dir: Path, candidate: Path) -> bool:
|
|
try:
|
|
candidate.resolve().relative_to(base_dir.resolve())
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def should_download(local_path: Path, s3_size: int, s3_mtime_ts: float) -> bool:
|
|
if not local_path.exists():
|
|
return True
|
|
|
|
stat = local_path.stat()
|
|
size_mismatch = stat.st_size != s3_size
|
|
mtime_mismatch = abs(stat.st_mtime - s3_mtime_ts) > 1
|
|
return size_mismatch or mtime_mismatch
|
|
|
|
|
|
def build_job_config_from_item(
|
|
item: dict[str, Any],
|
|
idx: int,
|
|
default_cron: str,
|
|
default_delete: bool,
|
|
default_region: str | None,
|
|
default_endpoint: str | None,
|
|
) -> JobConfig:
|
|
bucket = str(item.get("bucket", "")).strip()
|
|
local_dir = str(item.get("local_dir", "")).strip()
|
|
if not bucket or not local_dir:
|
|
raise ValueError(f"Job #{idx} must define 'bucket' and 'local_dir'")
|
|
|
|
name = str(item.get("name", f"job-{idx}"))
|
|
schedule = str(item.get("schedule", default_cron)).strip() or default_cron
|
|
prefix = str(item.get("prefix", "")).lstrip("/")
|
|
if prefix and not prefix.endswith("/"):
|
|
prefix += "/"
|
|
|
|
delete_local_extras = item.get("delete_local_extras")
|
|
if delete_local_extras is None:
|
|
delete_local_extras = default_delete
|
|
elif isinstance(delete_local_extras, str):
|
|
delete_local_extras = parse_bool(delete_local_extras, default_delete)
|
|
else:
|
|
delete_local_extras = bool(delete_local_extras)
|
|
|
|
return JobConfig(
|
|
name=name,
|
|
bucket=bucket,
|
|
local_dir=local_dir,
|
|
prefix=prefix,
|
|
schedule=schedule,
|
|
delete_local_extras=delete_local_extras,
|
|
region=str(item.get("region", "")).strip() or default_region,
|
|
endpoint_url=str(item.get("endpoint_url", "")).strip() or default_endpoint,
|
|
)
|
|
|
|
|
|
def load_jobs_from_indexed_env(
|
|
default_cron: str,
|
|
default_delete: bool,
|
|
default_region: str | None,
|
|
default_endpoint: str | None,
|
|
) -> list[JobConfig]:
|
|
job_count_raw = os.getenv("JOB_COUNT", "").strip()
|
|
if not job_count_raw:
|
|
return []
|
|
|
|
try:
|
|
job_count = int(job_count_raw)
|
|
except ValueError as exc:
|
|
raise ValueError("JOB_COUNT must be an integer value") from exc
|
|
|
|
if job_count <= 0:
|
|
raise ValueError("JOB_COUNT must be greater than zero")
|
|
|
|
parsed_jobs: list[JobConfig] = []
|
|
for idx in range(1, job_count + 1):
|
|
var_prefix = f"JOB_{idx}_"
|
|
|
|
item = {
|
|
"name": os.getenv(f"{var_prefix}NAME", f"job-{idx}").strip(),
|
|
"bucket": os.getenv(f"{var_prefix}BUCKET", "").strip(),
|
|
"local_dir": os.getenv(f"{var_prefix}LOCAL_DIR", "").strip(),
|
|
"prefix": os.getenv(f"{var_prefix}PREFIX", "").strip(),
|
|
"schedule": os.getenv(f"{var_prefix}SCHEDULE", default_cron).strip() or default_cron,
|
|
"delete_local_extras": os.getenv(f"{var_prefix}DELETE_LOCAL_EXTRAS"),
|
|
"region": os.getenv(f"{var_prefix}REGION", "").strip() or default_region,
|
|
"endpoint_url": os.getenv(f"{var_prefix}ENDPOINT_URL", "").strip() or default_endpoint,
|
|
}
|
|
|
|
parsed_jobs.append(
|
|
build_job_config_from_item(
|
|
item=item,
|
|
idx=idx,
|
|
default_cron=default_cron,
|
|
default_delete=default_delete,
|
|
default_region=default_region,
|
|
default_endpoint=default_endpoint,
|
|
)
|
|
)
|
|
|
|
return parsed_jobs
|
|
|
|
|
|
def load_jobs_from_env() -> list[JobConfig]:
|
|
default_cron = os.getenv("DEFAULT_SCHEDULE_CRON", "*/30 * * * *")
|
|
default_delete = parse_bool(os.getenv("MIRROR_DELETE_LOCAL_EXTRAS"), True)
|
|
default_region = os.getenv("AWS_REGION", "").strip() or None
|
|
default_endpoint = os.getenv("S3_ENDPOINT_URL", "").strip() or None
|
|
|
|
indexed_jobs = load_jobs_from_indexed_env(
|
|
default_cron=default_cron,
|
|
default_delete=default_delete,
|
|
default_region=default_region,
|
|
default_endpoint=default_endpoint,
|
|
)
|
|
if indexed_jobs:
|
|
return indexed_jobs
|
|
|
|
jobs_json = os.getenv("SYNC_JOBS_JSON", "").strip()
|
|
if not jobs_json:
|
|
raise ValueError("Define jobs with JOB_COUNT/JOB_1_* variables or SYNC_JOBS_JSON")
|
|
|
|
try:
|
|
raw_jobs = json.loads(jobs_json)
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Invalid SYNC_JOBS_JSON: {exc}") from exc
|
|
|
|
if not isinstance(raw_jobs, list) or not raw_jobs:
|
|
raise ValueError("SYNC_JOBS_JSON must be a non-empty JSON array")
|
|
|
|
parsed_jobs: list[JobConfig] = []
|
|
for idx, item in enumerate(raw_jobs, start=1):
|
|
if not isinstance(item, dict):
|
|
raise ValueError(f"SYNC_JOBS_JSON item #{idx} must be an object")
|
|
|
|
parsed_jobs.append(
|
|
build_job_config_from_item(
|
|
item=item,
|
|
idx=idx,
|
|
default_cron=default_cron,
|
|
default_delete=default_delete,
|
|
default_region=default_region,
|
|
default_endpoint=default_endpoint,
|
|
)
|
|
)
|
|
|
|
return parsed_jobs
|
|
|
|
|
|
def sync_job(job: JobConfig, trigger: str = "scheduled") -> None:
|
|
lock = job_locks[job.name]
|
|
if not lock.acquire(blocking=False):
|
|
add_log("warning", "Sync skipped because previous run is still active", job.name)
|
|
return
|
|
|
|
started_at = utc_now_iso()
|
|
with status_lock:
|
|
state = job_states[job.name]
|
|
state.running = True
|
|
state.last_start = started_at
|
|
state.current_file = None
|
|
state.files_total = 0
|
|
state.files_done = 0
|
|
state.phase = "elencazione file"
|
|
|
|
notify_gotify(
|
|
title=f"Sync started: {job.name}",
|
|
message=f"Bucket '{job.bucket}' -> '{job.local_dir}' ({trigger})",
|
|
priority=int(os.getenv("GOTIFY_PRIORITY_START", "5")),
|
|
)
|
|
add_log("info", "Sync started", job.name, {"trigger": trigger})
|
|
|
|
downloaded = 0
|
|
kept = 0
|
|
deleted = 0
|
|
errors = 0
|
|
local_dir = Path(job.local_dir)
|
|
|
|
try:
|
|
local_dir.mkdir(parents=True, exist_ok=True)
|
|
s3_client = get_s3_client(job.region, job.endpoint_url)
|
|
paginator = s3_client.get_paginator("list_objects_v2")
|
|
|
|
expected_rel_paths: set[str] = set()
|
|
pending_downloads: list[tuple[str, str, Path, int, float]] = []
|
|
|
|
with status_lock:
|
|
job_states[job.name].phase = "elencazione file"
|
|
|
|
for page in paginator.paginate(Bucket=job.bucket, Prefix=job.prefix):
|
|
contents = page.get("Contents", [])
|
|
for obj in contents:
|
|
key = obj["Key"]
|
|
if key.endswith("/"):
|
|
continue
|
|
|
|
rel_key = key[len(job.prefix) :] if job.prefix else key
|
|
rel_key = rel_key.lstrip("/")
|
|
if not rel_key:
|
|
continue
|
|
|
|
local_path = local_dir / rel_key
|
|
if not is_within_directory(local_dir, local_path):
|
|
add_log("warning", "Skipped key due to unsafe path", job.name, {"key": key})
|
|
continue
|
|
|
|
expected_rel_paths.add(rel_key.replace("\\", "/"))
|
|
local_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
s3_size = int(obj.get("Size", 0))
|
|
s3_ts = obj["LastModified"].timestamp()
|
|
|
|
if should_download(local_path, s3_size, s3_ts):
|
|
pending_downloads.append((key, rel_key, local_path, s3_size, s3_ts))
|
|
else:
|
|
kept += 1
|
|
|
|
with status_lock:
|
|
job_states[job.name].files_total = len(pending_downloads)
|
|
job_states[job.name].files_done = 0
|
|
job_states[job.name].phase = "download"
|
|
|
|
for key, rel_key, local_path, s3_size, s3_ts in pending_downloads:
|
|
with status_lock:
|
|
job_states[job.name].current_file = rel_key
|
|
s3_client.download_file(job.bucket, key, str(local_path))
|
|
os.utime(local_path, (s3_ts, s3_ts))
|
|
downloaded += 1
|
|
with status_lock:
|
|
job_states[job.name].files_done = downloaded
|
|
|
|
if job.delete_local_extras:
|
|
with status_lock:
|
|
job_states[job.name].phase = "pulizia file extra"
|
|
job_states[job.name].current_file = None
|
|
for file_path in local_dir.rglob("*"):
|
|
if not file_path.is_file():
|
|
continue
|
|
rel_path = file_path.relative_to(local_dir).as_posix()
|
|
if rel_path not in expected_rel_paths:
|
|
file_path.unlink(missing_ok=True)
|
|
deleted += 1
|
|
|
|
ended_at = utc_now_iso()
|
|
summary = {
|
|
"downloaded": downloaded,
|
|
"kept": kept,
|
|
"deleted": deleted,
|
|
"errors": errors,
|
|
"trigger": trigger,
|
|
}
|
|
|
|
with status_lock:
|
|
state = job_states[job.name]
|
|
state.running = False
|
|
state.last_end = ended_at
|
|
state.last_status = "success"
|
|
state.last_summary = summary
|
|
state.current_file = None
|
|
state.files_total = 0
|
|
state.files_done = 0
|
|
state.phase = ""
|
|
|
|
add_log("info", "Sync completata", job.name, summary)
|
|
notify_gotify(
|
|
title=f"Sync completed: {job.name}",
|
|
message=(
|
|
f"ok | downloaded={downloaded}, kept={kept}, "
|
|
f"deleted={deleted}, errors={errors}"
|
|
),
|
|
priority=int(os.getenv("GOTIFY_PRIORITY_END", "5")),
|
|
)
|
|
|
|
except (BotoCoreError, ClientError, OSError, requests.RequestException) as exc:
|
|
errors += 1
|
|
ended_at = utc_now_iso()
|
|
summary = {
|
|
"downloaded": downloaded,
|
|
"kept": kept,
|
|
"deleted": deleted,
|
|
"errors": errors,
|
|
"trigger": trigger,
|
|
"error": str(exc),
|
|
}
|
|
|
|
with status_lock:
|
|
state = job_states[job.name]
|
|
state.running = False
|
|
state.last_end = ended_at
|
|
state.last_status = "failed"
|
|
state.last_summary = summary
|
|
state.current_file = None
|
|
state.files_total = 0
|
|
state.files_done = 0
|
|
state.phase = ""
|
|
|
|
add_log("error", "Sync fallita", job.name, {"error": str(exc)})
|
|
notify_gotify(
|
|
title=f"Sync failed: {job.name}",
|
|
message=f"error: {exc}",
|
|
priority=int(os.getenv("GOTIFY_PRIORITY_END", "8")),
|
|
)
|
|
|
|
finally:
|
|
lock.release()
|
|
|
|
|
|
def schedule_jobs() -> None:
|
|
for job in job_configs:
|
|
trigger = CronTrigger.from_crontab(job.schedule, timezone="UTC")
|
|
scheduler.add_job(
|
|
func=sync_job,
|
|
trigger=trigger,
|
|
args=[job, "scheduled"],
|
|
id=job.name,
|
|
max_instances=1,
|
|
replace_existing=True,
|
|
coalesce=True,
|
|
)
|
|
add_log(
|
|
"info",
|
|
"Job scheduled",
|
|
job.name,
|
|
{"cron": job.schedule, "bucket": job.bucket, "local_dir": job.local_dir},
|
|
)
|
|
|
|
|
|
def initialize() -> None:
|
|
global job_configs
|
|
|
|
job_configs = load_jobs_from_env()
|
|
for job in job_configs:
|
|
job_locks[job.name] = threading.Lock()
|
|
job_states[job.name] = JobState(name=job.name, bucket=job.bucket, local_dir=job.local_dir)
|
|
|
|
schedule_jobs()
|
|
scheduler.start()
|
|
|
|
if parse_bool(os.getenv("RUN_ON_STARTUP"), True):
|
|
for job in job_configs:
|
|
thread = threading.Thread(target=sync_job, args=(job, "startup"), daemon=True)
|
|
thread.start()
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return render_template("index.html")
|
|
|
|
|
|
@app.route("/api/status")
|
|
def api_status():
|
|
with status_lock:
|
|
jobs = [vars(state) for state in job_states.values()]
|
|
|
|
return jsonify(
|
|
{
|
|
"server_time": utc_now_iso(),
|
|
"timezone": "UTC",
|
|
"jobs": jobs,
|
|
"logs": list(log_buffer),
|
|
}
|
|
)
|
|
|
|
|
|
@app.route("/api/run/<job_name>", methods=["POST"])
|
|
def api_run_job(job_name: str):
|
|
found = next((job for job in job_configs if job.name == job_name), None)
|
|
if not found:
|
|
return jsonify({"ok": False, "error": f"Unknown job '{job_name}'"}), 404
|
|
|
|
thread = threading.Thread(target=sync_job, args=(found, "manual"), daemon=True)
|
|
thread.start()
|
|
return jsonify({"ok": True, "message": f"Sync started for {job_name}"})
|
|
|
|
|
|
@app.route("/api/run-all", methods=["POST"])
|
|
def api_run_all():
|
|
for job in job_configs:
|
|
thread = threading.Thread(target=sync_job, args=(job, "manual-all"), daemon=True)
|
|
thread.start()
|
|
return jsonify({"ok": True, "message": "Sync started for all jobs"})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
initialize()
|
|
web_port = int(os.getenv("WEB_PORT", "8080"))
|
|
app.run(host="0.0.0.0", port=web_port)
|