++ update project

This commit is contained in:
2026-03-18 17:09:11 +01:00
parent c06c916e1d
commit 405b8d24bd
9 changed files with 1134 additions and 0 deletions

477
app/main.py Normal file
View File

@@ -0,0 +1,477 @@
import json
import logging
import os
import threading
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import boto3
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from botocore.exceptions import BotoCoreError, ClientError
from flask import Flask, jsonify, render_template
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s | %(levelname)s | %(message)s",
)
logger = logging.getLogger("s3-folder-sync")
app = Flask(__name__)
scheduler = BackgroundScheduler(timezone="UTC")
log_buffer = deque(maxlen=int(os.getenv("LOG_BUFFER_SIZE", "500")))
status_lock = threading.Lock()
job_locks: dict[str, threading.Lock] = {}
@dataclass
class JobConfig:
name: str
bucket: str
local_dir: str
prefix: str = ""
schedule: str = "*/30 * * * *"
delete_local_extras: bool = True
region: str | None = None
endpoint_url: str | None = None
@dataclass
class JobState:
name: str
bucket: str
local_dir: str
last_start: str | None = None
last_end: str | None = None
last_status: str = "never"
last_summary: dict[str, Any] = field(default_factory=dict)
running: bool = False
job_configs: list[JobConfig] = []
job_states: dict[str, JobState] = {}
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def parse_bool(value: str | None, default: bool = False) -> bool:
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
def add_log(level: str, message: str, job_name: str | None = None, details: dict[str, Any] | None = None) -> None:
event = {
"time": utc_now_iso(),
"level": level.upper(),
"job": job_name,
"message": message,
"details": details or {},
}
log_buffer.appendleft(event)
text = f"[{event['level']}]"
if job_name:
text += f"[{job_name}]"
text += f" {message}"
if event["details"]:
text += f" | {event['details']}"
if level.lower() == "error":
logger.error(text)
elif level.lower() == "warning":
logger.warning(text)
else:
logger.info(text)
def notify_gotify(title: str, message: str, priority: int = 5) -> None:
gotify_url = os.getenv("GOTIFY_URL", "").strip().rstrip("/")
gotify_token = os.getenv("GOTIFY_TOKEN", "").strip()
if not gotify_url or not gotify_token:
return
try:
requests.post(
f"{gotify_url}/message",
json={"title": title, "message": message, "priority": priority},
params={"token": gotify_token},
timeout=10,
)
except requests.RequestException as exc:
add_log("warning", "Gotify notification failed", details={"error": str(exc)})
def get_s3_client(region: str | None = None, endpoint_url: str | None = None):
kwargs: dict[str, Any] = {}
if region:
kwargs["region_name"] = region
if endpoint_url:
kwargs["endpoint_url"] = endpoint_url
return boto3.client("s3", **kwargs)
def is_within_directory(base_dir: Path, candidate: Path) -> bool:
try:
candidate.resolve().relative_to(base_dir.resolve())
return True
except ValueError:
return False
def should_download(local_path: Path, s3_size: int, s3_mtime_ts: float) -> bool:
if not local_path.exists():
return True
stat = local_path.stat()
size_mismatch = stat.st_size != s3_size
mtime_mismatch = abs(stat.st_mtime - s3_mtime_ts) > 1
return size_mismatch or mtime_mismatch
def build_job_config_from_item(
item: dict[str, Any],
idx: int,
default_cron: str,
default_delete: bool,
default_region: str | None,
default_endpoint: str | None,
) -> JobConfig:
bucket = str(item.get("bucket", "")).strip()
local_dir = str(item.get("local_dir", "")).strip()
if not bucket or not local_dir:
raise ValueError(f"Job #{idx} must define 'bucket' and 'local_dir'")
name = str(item.get("name", f"job-{idx}"))
schedule = str(item.get("schedule", default_cron)).strip() or default_cron
prefix = str(item.get("prefix", "")).lstrip("/")
if prefix and not prefix.endswith("/"):
prefix += "/"
delete_local_extras = item.get("delete_local_extras")
if delete_local_extras is None:
delete_local_extras = default_delete
elif isinstance(delete_local_extras, str):
delete_local_extras = parse_bool(delete_local_extras, default_delete)
else:
delete_local_extras = bool(delete_local_extras)
return JobConfig(
name=name,
bucket=bucket,
local_dir=local_dir,
prefix=prefix,
schedule=schedule,
delete_local_extras=delete_local_extras,
region=str(item.get("region", "")).strip() or default_region,
endpoint_url=str(item.get("endpoint_url", "")).strip() or default_endpoint,
)
def load_jobs_from_indexed_env(
default_cron: str,
default_delete: bool,
default_region: str | None,
default_endpoint: str | None,
) -> list[JobConfig]:
job_count_raw = os.getenv("JOB_COUNT", "").strip()
if not job_count_raw:
return []
try:
job_count = int(job_count_raw)
except ValueError as exc:
raise ValueError("JOB_COUNT must be an integer value") from exc
if job_count <= 0:
raise ValueError("JOB_COUNT must be greater than zero")
parsed_jobs: list[JobConfig] = []
for idx in range(1, job_count + 1):
var_prefix = f"JOB_{idx}_"
item = {
"name": os.getenv(f"{var_prefix}NAME", f"job-{idx}").strip(),
"bucket": os.getenv(f"{var_prefix}BUCKET", "").strip(),
"local_dir": os.getenv(f"{var_prefix}LOCAL_DIR", "").strip(),
"prefix": os.getenv(f"{var_prefix}PREFIX", "").strip(),
"schedule": os.getenv(f"{var_prefix}SCHEDULE", default_cron).strip() or default_cron,
"delete_local_extras": os.getenv(f"{var_prefix}DELETE_LOCAL_EXTRAS"),
"region": os.getenv(f"{var_prefix}REGION", "").strip() or default_region,
"endpoint_url": os.getenv(f"{var_prefix}ENDPOINT_URL", "").strip() or default_endpoint,
}
parsed_jobs.append(
build_job_config_from_item(
item=item,
idx=idx,
default_cron=default_cron,
default_delete=default_delete,
default_region=default_region,
default_endpoint=default_endpoint,
)
)
return parsed_jobs
def load_jobs_from_env() -> list[JobConfig]:
default_cron = os.getenv("DEFAULT_SCHEDULE_CRON", "*/30 * * * *")
default_delete = parse_bool(os.getenv("MIRROR_DELETE_LOCAL_EXTRAS"), True)
default_region = os.getenv("AWS_REGION", "").strip() or None
default_endpoint = os.getenv("S3_ENDPOINT_URL", "").strip() or None
indexed_jobs = load_jobs_from_indexed_env(
default_cron=default_cron,
default_delete=default_delete,
default_region=default_region,
default_endpoint=default_endpoint,
)
if indexed_jobs:
return indexed_jobs
jobs_json = os.getenv("SYNC_JOBS_JSON", "").strip()
if not jobs_json:
raise ValueError("Define jobs with JOB_COUNT/JOB_1_* variables or SYNC_JOBS_JSON")
try:
raw_jobs = json.loads(jobs_json)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid SYNC_JOBS_JSON: {exc}") from exc
if not isinstance(raw_jobs, list) or not raw_jobs:
raise ValueError("SYNC_JOBS_JSON must be a non-empty JSON array")
parsed_jobs: list[JobConfig] = []
for idx, item in enumerate(raw_jobs, start=1):
if not isinstance(item, dict):
raise ValueError(f"SYNC_JOBS_JSON item #{idx} must be an object")
parsed_jobs.append(
build_job_config_from_item(
item=item,
idx=idx,
default_cron=default_cron,
default_delete=default_delete,
default_region=default_region,
default_endpoint=default_endpoint,
)
)
return parsed_jobs
def sync_job(job: JobConfig, trigger: str = "scheduled") -> None:
lock = job_locks[job.name]
if not lock.acquire(blocking=False):
add_log("warning", "Sync skipped because previous run is still active", job.name)
return
started_at = utc_now_iso()
with status_lock:
state = job_states[job.name]
state.running = True
state.last_start = started_at
notify_gotify(
title=f"Sync started: {job.name}",
message=f"Bucket '{job.bucket}' -> '{job.local_dir}' ({trigger})",
priority=int(os.getenv("GOTIFY_PRIORITY_START", "5")),
)
add_log("info", "Sync started", job.name, {"trigger": trigger})
downloaded = 0
kept = 0
deleted = 0
errors = 0
local_dir = Path(job.local_dir)
try:
local_dir.mkdir(parents=True, exist_ok=True)
s3_client = get_s3_client(job.region, job.endpoint_url)
paginator = s3_client.get_paginator("list_objects_v2")
expected_rel_paths: set[str] = set()
for page in paginator.paginate(Bucket=job.bucket, Prefix=job.prefix):
contents = page.get("Contents", [])
for obj in contents:
key = obj["Key"]
if key.endswith("/"):
continue
rel_key = key[len(job.prefix) :] if job.prefix else key
rel_key = rel_key.lstrip("/")
if not rel_key:
continue
local_path = local_dir / rel_key
if not is_within_directory(local_dir, local_path):
add_log("warning", "Skipped key due to unsafe path", job.name, {"key": key})
continue
expected_rel_paths.add(rel_key.replace("\\", "/"))
local_path.parent.mkdir(parents=True, exist_ok=True)
s3_size = int(obj.get("Size", 0))
s3_ts = obj["LastModified"].timestamp()
if should_download(local_path, s3_size, s3_ts):
s3_client.download_file(job.bucket, key, str(local_path))
os.utime(local_path, (s3_ts, s3_ts))
downloaded += 1
else:
kept += 1
if job.delete_local_extras:
for file_path in local_dir.rglob("*"):
if not file_path.is_file():
continue
rel_path = file_path.relative_to(local_dir).as_posix()
if rel_path not in expected_rel_paths:
file_path.unlink(missing_ok=True)
deleted += 1
ended_at = utc_now_iso()
summary = {
"downloaded": downloaded,
"kept": kept,
"deleted": deleted,
"errors": errors,
"trigger": trigger,
}
with status_lock:
state = job_states[job.name]
state.running = False
state.last_end = ended_at
state.last_status = "success"
state.last_summary = summary
add_log("info", "Sync completed", job.name, summary)
notify_gotify(
title=f"Sync completed: {job.name}",
message=(
f"ok | downloaded={downloaded}, kept={kept}, "
f"deleted={deleted}, errors={errors}"
),
priority=int(os.getenv("GOTIFY_PRIORITY_END", "5")),
)
except (BotoCoreError, ClientError, OSError, requests.RequestException) as exc:
errors += 1
ended_at = utc_now_iso()
summary = {
"downloaded": downloaded,
"kept": kept,
"deleted": deleted,
"errors": errors,
"trigger": trigger,
"error": str(exc),
}
with status_lock:
state = job_states[job.name]
state.running = False
state.last_end = ended_at
state.last_status = "failed"
state.last_summary = summary
add_log("error", "Sync failed", job.name, {"error": str(exc)})
notify_gotify(
title=f"Sync failed: {job.name}",
message=f"error: {exc}",
priority=int(os.getenv("GOTIFY_PRIORITY_END", "8")),
)
finally:
lock.release()
def schedule_jobs() -> None:
for job in job_configs:
trigger = CronTrigger.from_crontab(job.schedule, timezone="UTC")
scheduler.add_job(
func=sync_job,
trigger=trigger,
args=[job, "scheduled"],
id=job.name,
max_instances=1,
replace_existing=True,
coalesce=True,
)
add_log(
"info",
"Job scheduled",
job.name,
{"cron": job.schedule, "bucket": job.bucket, "local_dir": job.local_dir},
)
def initialize() -> None:
global job_configs
job_configs = load_jobs_from_env()
for job in job_configs:
job_locks[job.name] = threading.Lock()
job_states[job.name] = JobState(name=job.name, bucket=job.bucket, local_dir=job.local_dir)
schedule_jobs()
scheduler.start()
if parse_bool(os.getenv("RUN_ON_STARTUP"), True):
for job in job_configs:
thread = threading.Thread(target=sync_job, args=(job, "startup"), daemon=True)
thread.start()
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/status")
def api_status():
with status_lock:
jobs = [vars(state) for state in job_states.values()]
return jsonify(
{
"server_time": utc_now_iso(),
"timezone": "UTC",
"jobs": jobs,
"logs": list(log_buffer),
}
)
@app.route("/api/run/<job_name>", methods=["POST"])
def api_run_job(job_name: str):
found = next((job for job in job_configs if job.name == job_name), None)
if not found:
return jsonify({"ok": False, "error": f"Unknown job '{job_name}'"}), 404
thread = threading.Thread(target=sync_job, args=(found, "manual"), daemon=True)
thread.start()
return jsonify({"ok": True, "message": f"Sync started for {job_name}"})
@app.route("/api/run-all", methods=["POST"])
def api_run_all():
for job in job_configs:
thread = threading.Thread(target=sync_job, args=(job, "manual-all"), daemon=True)
thread.start()
return jsonify({"ok": True, "message": "Sync started for all jobs"})
if __name__ == "__main__":
initialize()
web_port = int(os.getenv("WEB_PORT", "8080"))
app.run(host="0.0.0.0", port=web_port)