import json import logging import os import threading from collections import deque from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any import boto3 import requests from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from botocore.exceptions import BotoCoreError, ClientError from flask import Flask, jsonify, render_template logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), format="%(asctime)s | %(levelname)s | %(message)s", ) logger = logging.getLogger("s3-folder-sync") app = Flask(__name__) scheduler = BackgroundScheduler(timezone="UTC") log_buffer = deque(maxlen=int(os.getenv("LOG_BUFFER_SIZE", "500"))) status_lock = threading.Lock() job_locks: dict[str, threading.Lock] = {} @dataclass class JobConfig: name: str bucket: str local_dir: str prefix: str = "" schedule: str = "*/30 * * * *" delete_local_extras: bool = True region: str | None = None endpoint_url: str | None = None @dataclass class JobState: name: str bucket: str local_dir: str last_start: str | None = None last_end: str | None = None last_status: str = "never" last_summary: dict[str, Any] = field(default_factory=dict) running: bool = False current_file: str | None = None files_total: int = 0 files_done: int = 0 phase: str = "" job_configs: list[JobConfig] = [] job_states: dict[str, JobState] = {} def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def parse_bool(value: str | None, default: bool = False) -> bool: if value is None: return default return value.strip().lower() in {"1", "true", "yes", "y", "on"} def add_log(level: str, message: str, job_name: str | None = None, details: dict[str, Any] | None = None) -> None: event = { "time": utc_now_iso(), "level": level.upper(), "job": job_name, "message": message, "details": details or {}, } log_buffer.appendleft(event) text = f"[{event['level']}]" if job_name: text += f"[{job_name}]" text += f" {message}" if event["details"]: text += f" | {event['details']}" if level.lower() == "error": logger.error(text) elif level.lower() == "warning": logger.warning(text) else: logger.info(text) def notify_gotify(title: str, message: str, priority: int = 5) -> None: gotify_url = os.getenv("GOTIFY_URL", "").strip().rstrip("/") gotify_token = os.getenv("GOTIFY_TOKEN", "").strip() if not gotify_url or not gotify_token: return try: requests.post( f"{gotify_url}/message", json={"title": title, "message": message, "priority": priority}, params={"token": gotify_token}, timeout=10, ) except requests.RequestException as exc: add_log("warning", "Gotify notification failed", details={"error": str(exc)}) def get_s3_client(region: str | None = None, endpoint_url: str | None = None): kwargs: dict[str, Any] = {} if region: kwargs["region_name"] = region if endpoint_url: kwargs["endpoint_url"] = endpoint_url return boto3.client("s3", **kwargs) def is_within_directory(base_dir: Path, candidate: Path) -> bool: try: candidate.resolve().relative_to(base_dir.resolve()) return True except ValueError: return False def should_download(local_path: Path, s3_size: int, s3_mtime_ts: float) -> bool: if not local_path.exists(): return True stat = local_path.stat() size_mismatch = stat.st_size != s3_size mtime_mismatch = abs(stat.st_mtime - s3_mtime_ts) > 1 return size_mismatch or mtime_mismatch def build_job_config_from_item( item: dict[str, Any], idx: int, default_cron: str, default_delete: bool, default_region: str | None, default_endpoint: str | None, ) -> JobConfig: bucket = str(item.get("bucket", "")).strip() local_dir = str(item.get("local_dir", "")).strip() if not bucket or not local_dir: raise ValueError(f"Job #{idx} must define 'bucket' and 'local_dir'") name = str(item.get("name", f"job-{idx}")) schedule = str(item.get("schedule", default_cron)).strip() or default_cron prefix = str(item.get("prefix", "")).lstrip("/") if prefix and not prefix.endswith("/"): prefix += "/" delete_local_extras = item.get("delete_local_extras") if delete_local_extras is None: delete_local_extras = default_delete elif isinstance(delete_local_extras, str): delete_local_extras = parse_bool(delete_local_extras, default_delete) else: delete_local_extras = bool(delete_local_extras) return JobConfig( name=name, bucket=bucket, local_dir=local_dir, prefix=prefix, schedule=schedule, delete_local_extras=delete_local_extras, region=str(item.get("region", "")).strip() or default_region, endpoint_url=str(item.get("endpoint_url", "")).strip() or default_endpoint, ) def load_jobs_from_indexed_env( default_cron: str, default_delete: bool, default_region: str | None, default_endpoint: str | None, ) -> list[JobConfig]: job_count_raw = os.getenv("JOB_COUNT", "").strip() if not job_count_raw: return [] try: job_count = int(job_count_raw) except ValueError as exc: raise ValueError("JOB_COUNT must be an integer value") from exc if job_count <= 0: raise ValueError("JOB_COUNT must be greater than zero") parsed_jobs: list[JobConfig] = [] for idx in range(1, job_count + 1): var_prefix = f"JOB_{idx}_" item = { "name": os.getenv(f"{var_prefix}NAME", f"job-{idx}").strip(), "bucket": os.getenv(f"{var_prefix}BUCKET", "").strip(), "local_dir": os.getenv(f"{var_prefix}LOCAL_DIR", "").strip(), "prefix": os.getenv(f"{var_prefix}PREFIX", "").strip(), "schedule": os.getenv(f"{var_prefix}SCHEDULE", default_cron).strip() or default_cron, "delete_local_extras": os.getenv(f"{var_prefix}DELETE_LOCAL_EXTRAS"), "region": os.getenv(f"{var_prefix}REGION", "").strip() or default_region, "endpoint_url": os.getenv(f"{var_prefix}ENDPOINT_URL", "").strip() or default_endpoint, } parsed_jobs.append( build_job_config_from_item( item=item, idx=idx, default_cron=default_cron, default_delete=default_delete, default_region=default_region, default_endpoint=default_endpoint, ) ) return parsed_jobs def load_jobs_from_env() -> list[JobConfig]: default_cron = os.getenv("DEFAULT_SCHEDULE_CRON", "*/30 * * * *") default_delete = parse_bool(os.getenv("MIRROR_DELETE_LOCAL_EXTRAS"), True) default_region = os.getenv("AWS_REGION", "").strip() or None default_endpoint = os.getenv("S3_ENDPOINT_URL", "").strip() or None indexed_jobs = load_jobs_from_indexed_env( default_cron=default_cron, default_delete=default_delete, default_region=default_region, default_endpoint=default_endpoint, ) if indexed_jobs: return indexed_jobs jobs_json = os.getenv("SYNC_JOBS_JSON", "").strip() if not jobs_json: raise ValueError("Define jobs with JOB_COUNT/JOB_1_* variables or SYNC_JOBS_JSON") try: raw_jobs = json.loads(jobs_json) except json.JSONDecodeError as exc: raise ValueError(f"Invalid SYNC_JOBS_JSON: {exc}") from exc if not isinstance(raw_jobs, list) or not raw_jobs: raise ValueError("SYNC_JOBS_JSON must be a non-empty JSON array") parsed_jobs: list[JobConfig] = [] for idx, item in enumerate(raw_jobs, start=1): if not isinstance(item, dict): raise ValueError(f"SYNC_JOBS_JSON item #{idx} must be an object") parsed_jobs.append( build_job_config_from_item( item=item, idx=idx, default_cron=default_cron, default_delete=default_delete, default_region=default_region, default_endpoint=default_endpoint, ) ) return parsed_jobs def sync_job(job: JobConfig, trigger: str = "scheduled") -> None: lock = job_locks[job.name] if not lock.acquire(blocking=False): add_log("warning", "Sync skipped because previous run is still active", job.name) return started_at = utc_now_iso() with status_lock: state = job_states[job.name] state.running = True state.last_start = started_at state.current_file = None state.files_total = 0 state.files_done = 0 state.phase = "elencazione file" notify_gotify( title=f"Sync started: {job.name}", message=f"Bucket '{job.bucket}' -> '{job.local_dir}' ({trigger})", priority=int(os.getenv("GOTIFY_PRIORITY_START", "5")), ) add_log("info", "Sync started", job.name, {"trigger": trigger}) downloaded = 0 kept = 0 deleted = 0 errors = 0 local_dir = Path(job.local_dir) try: local_dir.mkdir(parents=True, exist_ok=True) s3_client = get_s3_client(job.region, job.endpoint_url) paginator = s3_client.get_paginator("list_objects_v2") expected_rel_paths: set[str] = set() pending_downloads: list[tuple[str, str, Path, int, float]] = [] with status_lock: job_states[job.name].phase = "elencazione file" for page in paginator.paginate(Bucket=job.bucket, Prefix=job.prefix): contents = page.get("Contents", []) for obj in contents: key = obj["Key"] if key.endswith("/"): continue rel_key = key[len(job.prefix) :] if job.prefix else key rel_key = rel_key.lstrip("/") if not rel_key: continue local_path = local_dir / rel_key if not is_within_directory(local_dir, local_path): add_log("warning", "Skipped key due to unsafe path", job.name, {"key": key}) continue expected_rel_paths.add(rel_key.replace("\\", "/")) local_path.parent.mkdir(parents=True, exist_ok=True) s3_size = int(obj.get("Size", 0)) s3_ts = obj["LastModified"].timestamp() if should_download(local_path, s3_size, s3_ts): pending_downloads.append((key, rel_key, local_path, s3_size, s3_ts)) else: kept += 1 with status_lock: job_states[job.name].files_total = len(pending_downloads) job_states[job.name].files_done = 0 job_states[job.name].phase = "download" for key, rel_key, local_path, s3_size, s3_ts in pending_downloads: with status_lock: job_states[job.name].current_file = rel_key s3_client.download_file(job.bucket, key, str(local_path)) os.utime(local_path, (s3_ts, s3_ts)) downloaded += 1 with status_lock: job_states[job.name].files_done = downloaded if job.delete_local_extras: with status_lock: job_states[job.name].phase = "pulizia file extra" job_states[job.name].current_file = None for file_path in local_dir.rglob("*"): if not file_path.is_file(): continue rel_path = file_path.relative_to(local_dir).as_posix() if rel_path not in expected_rel_paths: file_path.unlink(missing_ok=True) deleted += 1 ended_at = utc_now_iso() summary = { "downloaded": downloaded, "kept": kept, "deleted": deleted, "errors": errors, "trigger": trigger, } with status_lock: state = job_states[job.name] state.running = False state.last_end = ended_at state.last_status = "success" state.last_summary = summary state.current_file = None state.files_total = 0 state.files_done = 0 state.phase = "" add_log("info", "Sync completata", job.name, summary) notify_gotify( title=f"Sync completed: {job.name}", message=( f"ok | downloaded={downloaded}, kept={kept}, " f"deleted={deleted}, errors={errors}" ), priority=int(os.getenv("GOTIFY_PRIORITY_END", "5")), ) except (BotoCoreError, ClientError, OSError, requests.RequestException) as exc: errors += 1 ended_at = utc_now_iso() summary = { "downloaded": downloaded, "kept": kept, "deleted": deleted, "errors": errors, "trigger": trigger, "error": str(exc), } with status_lock: state = job_states[job.name] state.running = False state.last_end = ended_at state.last_status = "failed" state.last_summary = summary state.current_file = None state.files_total = 0 state.files_done = 0 state.phase = "" add_log("error", "Sync fallita", job.name, {"error": str(exc)}) notify_gotify( title=f"Sync failed: {job.name}", message=f"error: {exc}", priority=int(os.getenv("GOTIFY_PRIORITY_END", "8")), ) finally: lock.release() def schedule_jobs() -> None: for job in job_configs: trigger = CronTrigger.from_crontab(job.schedule, timezone="UTC") scheduler.add_job( func=sync_job, trigger=trigger, args=[job, "scheduled"], id=job.name, max_instances=1, replace_existing=True, coalesce=True, ) add_log( "info", "Job scheduled", job.name, {"cron": job.schedule, "bucket": job.bucket, "local_dir": job.local_dir}, ) def initialize() -> None: global job_configs job_configs = load_jobs_from_env() for job in job_configs: job_locks[job.name] = threading.Lock() job_states[job.name] = JobState(name=job.name, bucket=job.bucket, local_dir=job.local_dir) schedule_jobs() scheduler.start() if parse_bool(os.getenv("RUN_ON_STARTUP"), True): for job in job_configs: thread = threading.Thread(target=sync_job, args=(job, "startup"), daemon=True) thread.start() @app.route("/") def index(): return render_template("index.html") @app.route("/api/status") def api_status(): with status_lock: jobs = [vars(state) for state in job_states.values()] return jsonify( { "server_time": utc_now_iso(), "timezone": "UTC", "jobs": jobs, "logs": list(log_buffer), } ) @app.route("/api/run/", methods=["POST"]) def api_run_job(job_name: str): found = next((job for job in job_configs if job.name == job_name), None) if not found: return jsonify({"ok": False, "error": f"Unknown job '{job_name}'"}), 404 thread = threading.Thread(target=sync_job, args=(found, "manual"), daemon=True) thread.start() return jsonify({"ok": True, "message": f"Sync started for {job_name}"}) @app.route("/api/run-all", methods=["POST"]) def api_run_all(): for job in job_configs: thread = threading.Thread(target=sync_job, args=(job, "manual-all"), daemon=True) thread.start() return jsonify({"ok": True, "message": "Sync started for all jobs"}) if __name__ == "__main__": initialize() web_port = int(os.getenv("WEB_PORT", "8080")) app.run(host="0.0.0.0", port=web_port)