Files
s3_to_folder_sync/app/main.py
2026-03-18 17:28:29 +01:00

513 lines
16 KiB
Python

import json
import logging
import os
import threading
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import boto3
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from botocore.exceptions import BotoCoreError, ClientError
from flask import Flask, jsonify, render_template
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s | %(levelname)s | %(message)s",
)
logger = logging.getLogger("s3-folder-sync")
app = Flask(__name__)
scheduler = BackgroundScheduler(timezone="UTC")
log_buffer = deque(maxlen=int(os.getenv("LOG_BUFFER_SIZE", "500")))
status_lock = threading.Lock()
job_locks: dict[str, threading.Lock] = {}
@dataclass
class JobConfig:
name: str
bucket: str
local_dir: str
prefix: str = ""
schedule: str = "*/30 * * * *"
delete_local_extras: bool = True
region: str | None = None
endpoint_url: str | None = None
@dataclass
class JobState:
name: str
bucket: str
local_dir: str
last_start: str | None = None
last_end: str | None = None
last_status: str = "never"
last_summary: dict[str, Any] = field(default_factory=dict)
running: bool = False
current_file: str | None = None
files_total: int = 0
files_done: int = 0
phase: str = ""
job_configs: list[JobConfig] = []
job_states: dict[str, JobState] = {}
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def parse_bool(value: str | None, default: bool = False) -> bool:
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
def add_log(level: str, message: str, job_name: str | None = None, details: dict[str, Any] | None = None) -> None:
event = {
"time": utc_now_iso(),
"level": level.upper(),
"job": job_name,
"message": message,
"details": details or {},
}
log_buffer.appendleft(event)
text = f"[{event['level']}]"
if job_name:
text += f"[{job_name}]"
text += f" {message}"
if event["details"]:
text += f" | {event['details']}"
if level.lower() == "error":
logger.error(text)
elif level.lower() == "warning":
logger.warning(text)
else:
logger.info(text)
def notify_gotify(title: str, message: str, priority: int = 5) -> None:
gotify_url = os.getenv("GOTIFY_URL", "").strip().rstrip("/")
gotify_token = os.getenv("GOTIFY_TOKEN", "").strip()
if not gotify_url or not gotify_token:
return
try:
requests.post(
f"{gotify_url}/message",
json={"title": title, "message": message, "priority": priority},
params={"token": gotify_token},
timeout=10,
)
except requests.RequestException as exc:
add_log("warning", "Gotify notification failed", details={"error": str(exc)})
def get_s3_client(region: str | None = None, endpoint_url: str | None = None):
kwargs: dict[str, Any] = {}
if region:
kwargs["region_name"] = region
if endpoint_url:
kwargs["endpoint_url"] = endpoint_url
return boto3.client("s3", **kwargs)
def is_within_directory(base_dir: Path, candidate: Path) -> bool:
try:
candidate.resolve().relative_to(base_dir.resolve())
return True
except ValueError:
return False
def should_download(local_path: Path, s3_size: int, s3_mtime_ts: float) -> bool:
if not local_path.exists():
return True
stat = local_path.stat()
size_mismatch = stat.st_size != s3_size
mtime_mismatch = abs(stat.st_mtime - s3_mtime_ts) > 1
return size_mismatch or mtime_mismatch
def build_job_config_from_item(
item: dict[str, Any],
idx: int,
default_cron: str,
default_delete: bool,
default_region: str | None,
default_endpoint: str | None,
) -> JobConfig:
bucket = str(item.get("bucket", "")).strip()
local_dir = str(item.get("local_dir", "")).strip()
if not bucket or not local_dir:
raise ValueError(f"Job #{idx} must define 'bucket' and 'local_dir'")
name = str(item.get("name", f"job-{idx}"))
schedule = str(item.get("schedule", default_cron)).strip() or default_cron
prefix = str(item.get("prefix", "")).lstrip("/")
if prefix and not prefix.endswith("/"):
prefix += "/"
delete_local_extras = item.get("delete_local_extras")
if delete_local_extras is None:
delete_local_extras = default_delete
elif isinstance(delete_local_extras, str):
delete_local_extras = parse_bool(delete_local_extras, default_delete)
else:
delete_local_extras = bool(delete_local_extras)
return JobConfig(
name=name,
bucket=bucket,
local_dir=local_dir,
prefix=prefix,
schedule=schedule,
delete_local_extras=delete_local_extras,
region=str(item.get("region", "")).strip() or default_region,
endpoint_url=str(item.get("endpoint_url", "")).strip() or default_endpoint,
)
def load_jobs_from_indexed_env(
default_cron: str,
default_delete: bool,
default_region: str | None,
default_endpoint: str | None,
) -> list[JobConfig]:
job_count_raw = os.getenv("JOB_COUNT", "").strip()
if not job_count_raw:
return []
try:
job_count = int(job_count_raw)
except ValueError as exc:
raise ValueError("JOB_COUNT must be an integer value") from exc
if job_count <= 0:
raise ValueError("JOB_COUNT must be greater than zero")
parsed_jobs: list[JobConfig] = []
for idx in range(1, job_count + 1):
var_prefix = f"JOB_{idx}_"
item = {
"name": os.getenv(f"{var_prefix}NAME", f"job-{idx}").strip(),
"bucket": os.getenv(f"{var_prefix}BUCKET", "").strip(),
"local_dir": os.getenv(f"{var_prefix}LOCAL_DIR", "").strip(),
"prefix": os.getenv(f"{var_prefix}PREFIX", "").strip(),
"schedule": os.getenv(f"{var_prefix}SCHEDULE", default_cron).strip() or default_cron,
"delete_local_extras": os.getenv(f"{var_prefix}DELETE_LOCAL_EXTRAS"),
"region": os.getenv(f"{var_prefix}REGION", "").strip() or default_region,
"endpoint_url": os.getenv(f"{var_prefix}ENDPOINT_URL", "").strip() or default_endpoint,
}
parsed_jobs.append(
build_job_config_from_item(
item=item,
idx=idx,
default_cron=default_cron,
default_delete=default_delete,
default_region=default_region,
default_endpoint=default_endpoint,
)
)
return parsed_jobs
def load_jobs_from_env() -> list[JobConfig]:
default_cron = os.getenv("DEFAULT_SCHEDULE_CRON", "*/30 * * * *")
default_delete = parse_bool(os.getenv("MIRROR_DELETE_LOCAL_EXTRAS"), True)
default_region = os.getenv("AWS_REGION", "").strip() or None
default_endpoint = os.getenv("S3_ENDPOINT_URL", "").strip() or None
indexed_jobs = load_jobs_from_indexed_env(
default_cron=default_cron,
default_delete=default_delete,
default_region=default_region,
default_endpoint=default_endpoint,
)
if indexed_jobs:
return indexed_jobs
jobs_json = os.getenv("SYNC_JOBS_JSON", "").strip()
if not jobs_json:
raise ValueError("Define jobs with JOB_COUNT/JOB_1_* variables or SYNC_JOBS_JSON")
try:
raw_jobs = json.loads(jobs_json)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid SYNC_JOBS_JSON: {exc}") from exc
if not isinstance(raw_jobs, list) or not raw_jobs:
raise ValueError("SYNC_JOBS_JSON must be a non-empty JSON array")
parsed_jobs: list[JobConfig] = []
for idx, item in enumerate(raw_jobs, start=1):
if not isinstance(item, dict):
raise ValueError(f"SYNC_JOBS_JSON item #{idx} must be an object")
parsed_jobs.append(
build_job_config_from_item(
item=item,
idx=idx,
default_cron=default_cron,
default_delete=default_delete,
default_region=default_region,
default_endpoint=default_endpoint,
)
)
return parsed_jobs
def sync_job(job: JobConfig, trigger: str = "scheduled") -> None:
lock = job_locks[job.name]
if not lock.acquire(blocking=False):
add_log("warning", "Sync skipped because previous run is still active", job.name)
return
started_at = utc_now_iso()
with status_lock:
state = job_states[job.name]
state.running = True
state.last_start = started_at
state.current_file = None
state.files_total = 0
state.files_done = 0
state.phase = "elencazione file"
notify_gotify(
title=f"Sync started: {job.name}",
message=f"Bucket '{job.bucket}' -> '{job.local_dir}' ({trigger})",
priority=int(os.getenv("GOTIFY_PRIORITY_START", "5")),
)
add_log("info", "Sync started", job.name, {"trigger": trigger})
downloaded = 0
kept = 0
deleted = 0
errors = 0
local_dir = Path(job.local_dir)
try:
local_dir.mkdir(parents=True, exist_ok=True)
s3_client = get_s3_client(job.region, job.endpoint_url)
paginator = s3_client.get_paginator("list_objects_v2")
expected_rel_paths: set[str] = set()
pending_downloads: list[tuple[str, str, Path, int, float]] = []
with status_lock:
job_states[job.name].phase = "elencazione file"
for page in paginator.paginate(Bucket=job.bucket, Prefix=job.prefix):
contents = page.get("Contents", [])
for obj in contents:
key = obj["Key"]
if key.endswith("/"):
continue
rel_key = key[len(job.prefix) :] if job.prefix else key
rel_key = rel_key.lstrip("/")
if not rel_key:
continue
local_path = local_dir / rel_key
if not is_within_directory(local_dir, local_path):
add_log("warning", "Skipped key due to unsafe path", job.name, {"key": key})
continue
expected_rel_paths.add(rel_key.replace("\\", "/"))
local_path.parent.mkdir(parents=True, exist_ok=True)
s3_size = int(obj.get("Size", 0))
s3_ts = obj["LastModified"].timestamp()
if should_download(local_path, s3_size, s3_ts):
pending_downloads.append((key, rel_key, local_path, s3_size, s3_ts))
else:
kept += 1
with status_lock:
job_states[job.name].files_total = len(pending_downloads)
job_states[job.name].files_done = 0
job_states[job.name].phase = "download"
for key, rel_key, local_path, s3_size, s3_ts in pending_downloads:
with status_lock:
job_states[job.name].current_file = rel_key
s3_client.download_file(job.bucket, key, str(local_path))
os.utime(local_path, (s3_ts, s3_ts))
downloaded += 1
with status_lock:
job_states[job.name].files_done = downloaded
if job.delete_local_extras:
with status_lock:
job_states[job.name].phase = "pulizia file extra"
job_states[job.name].current_file = None
for file_path in local_dir.rglob("*"):
if not file_path.is_file():
continue
rel_path = file_path.relative_to(local_dir).as_posix()
if rel_path not in expected_rel_paths:
file_path.unlink(missing_ok=True)
deleted += 1
ended_at = utc_now_iso()
summary = {
"downloaded": downloaded,
"kept": kept,
"deleted": deleted,
"errors": errors,
"trigger": trigger,
}
with status_lock:
state = job_states[job.name]
state.running = False
state.last_end = ended_at
state.last_status = "success"
state.last_summary = summary
state.current_file = None
state.files_total = 0
state.files_done = 0
state.phase = ""
add_log("info", "Sync completata", job.name, summary)
notify_gotify(
title=f"Sync completed: {job.name}",
message=(
f"ok | downloaded={downloaded}, kept={kept}, "
f"deleted={deleted}, errors={errors}"
),
priority=int(os.getenv("GOTIFY_PRIORITY_END", "5")),
)
except (BotoCoreError, ClientError, OSError, requests.RequestException) as exc:
errors += 1
ended_at = utc_now_iso()
summary = {
"downloaded": downloaded,
"kept": kept,
"deleted": deleted,
"errors": errors,
"trigger": trigger,
"error": str(exc),
}
with status_lock:
state = job_states[job.name]
state.running = False
state.last_end = ended_at
state.last_status = "failed"
state.last_summary = summary
state.current_file = None
state.files_total = 0
state.files_done = 0
state.phase = ""
add_log("error", "Sync fallita", job.name, {"error": str(exc)})
notify_gotify(
title=f"Sync failed: {job.name}",
message=f"error: {exc}",
priority=int(os.getenv("GOTIFY_PRIORITY_END", "8")),
)
finally:
lock.release()
def schedule_jobs() -> None:
for job in job_configs:
trigger = CronTrigger.from_crontab(job.schedule, timezone="UTC")
scheduler.add_job(
func=sync_job,
trigger=trigger,
args=[job, "scheduled"],
id=job.name,
max_instances=1,
replace_existing=True,
coalesce=True,
)
add_log(
"info",
"Job scheduled",
job.name,
{"cron": job.schedule, "bucket": job.bucket, "local_dir": job.local_dir},
)
def initialize() -> None:
global job_configs
job_configs = load_jobs_from_env()
for job in job_configs:
job_locks[job.name] = threading.Lock()
job_states[job.name] = JobState(name=job.name, bucket=job.bucket, local_dir=job.local_dir)
schedule_jobs()
scheduler.start()
if parse_bool(os.getenv("RUN_ON_STARTUP"), True):
for job in job_configs:
thread = threading.Thread(target=sync_job, args=(job, "startup"), daemon=True)
thread.start()
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/status")
def api_status():
with status_lock:
jobs = [vars(state) for state in job_states.values()]
return jsonify(
{
"server_time": utc_now_iso(),
"timezone": "UTC",
"jobs": jobs,
"logs": list(log_buffer),
}
)
@app.route("/api/run/<job_name>", methods=["POST"])
def api_run_job(job_name: str):
found = next((job for job in job_configs if job.name == job_name), None)
if not found:
return jsonify({"ok": False, "error": f"Unknown job '{job_name}'"}), 404
thread = threading.Thread(target=sync_job, args=(found, "manual"), daemon=True)
thread.start()
return jsonify({"ok": True, "message": f"Sync started for {job_name}"})
@app.route("/api/run-all", methods=["POST"])
def api_run_all():
for job in job_configs:
thread = threading.Thread(target=sync_job, args=(job, "manual-all"), daemon=True)
thread.start()
return jsonify({"ok": True, "message": "Sync started for all jobs"})
if __name__ == "__main__":
initialize()
web_port = int(os.getenv("WEB_PORT", "8080"))
app.run(host="0.0.0.0", port=web_port)