193 lines
5.9 KiB
Python
193 lines
5.9 KiB
Python
import os
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from dotenv import load_dotenv
|
|
from flask import Flask, jsonify, request
|
|
from yt_dlp import YoutubeDL
|
|
|
|
load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
|
|
DOWNLOAD_DIR = Path(os.getenv("DOWNLOAD_DIR", "/downloads")).resolve()
|
|
MAX_DURATION_SECONDS = int(os.getenv("MAX_DURATION_SECONDS", "5400"))
|
|
|
|
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def _format_size(bytes_count: Optional[int]) -> Optional[str]:
|
|
if bytes_count is None:
|
|
return None
|
|
for unit in ["B", "KB", "MB", "GB"]:
|
|
if bytes_count < 1024 or unit == "GB":
|
|
return f"{bytes_count:.1f} {unit}"
|
|
bytes_count /= 1024
|
|
return None
|
|
|
|
|
|
def _extract_info(url: str) -> Dict:
|
|
ydl_opts = {
|
|
"quiet": True,
|
|
"skip_download": True,
|
|
"noprogress": True,
|
|
"extractor_args": {"youtube": {"player_client": ["android", "web"]}},
|
|
}
|
|
with YoutubeDL(ydl_opts) as ydl:
|
|
return ydl.extract_info(url, download=False)
|
|
|
|
|
|
def _filter_video_formats(formats: List[Dict]) -> List[Dict]:
|
|
videos = []
|
|
for f in formats:
|
|
if f.get("vcodec") == "none":
|
|
continue
|
|
height = f.get("height") or 0
|
|
resolution = None
|
|
if f.get("height") and f.get("width"):
|
|
resolution = f"{f['height']}p"
|
|
videos.append(
|
|
{
|
|
"id": f.get("format_id"),
|
|
"note": f.get("format_note"),
|
|
"ext": f.get("ext"),
|
|
"resolution": resolution,
|
|
"fps": f.get("fps"),
|
|
"size": _format_size(f.get("filesize")),
|
|
"height": height,
|
|
}
|
|
)
|
|
# Sort by height descending, put formats without height at the end
|
|
videos.sort(key=lambda x: (x["height"] if x["height"] > 0 else -1), reverse=True)
|
|
return videos
|
|
|
|
|
|
def _filter_audio_formats(formats: List[Dict]) -> List[Dict]:
|
|
audios = []
|
|
for f in formats:
|
|
if f.get("acodec") == "none":
|
|
continue
|
|
audios.append(
|
|
{
|
|
"id": f.get("format_id"),
|
|
"ext": f.get("ext"),
|
|
"abr": f.get("abr"),
|
|
"size": _format_size(f.get("filesize")),
|
|
}
|
|
)
|
|
return audios
|
|
|
|
|
|
def _ensure_duration_allowed(info: Dict):
|
|
duration = info.get("duration")
|
|
if duration and duration > MAX_DURATION_SECONDS:
|
|
raise ValueError(
|
|
f"Il video dura {duration // 60} minuti, supera il limite di {MAX_DURATION_SECONDS // 60} minuti."
|
|
)
|
|
|
|
|
|
def _pick_output_path(info: Dict) -> Optional[Path]:
|
|
requested = info.get("requested_downloads") or []
|
|
for item in requested:
|
|
filepath = item.get("filepath")
|
|
if filepath:
|
|
return Path(filepath)
|
|
filename = info.get("_filename")
|
|
if filename:
|
|
return Path(filename)
|
|
return None
|
|
|
|
|
|
@app.get("/api/health")
|
|
def health():
|
|
return jsonify({"status": "ok"})
|
|
|
|
|
|
@app.post("/api/info")
|
|
def info():
|
|
data = request.get_json(force=True, silent=True) or {}
|
|
url = data.get("url")
|
|
if not url:
|
|
return jsonify({"error": "URL mancante"}), 400
|
|
try:
|
|
info = _extract_info(url)
|
|
_ensure_duration_allowed(info)
|
|
return jsonify(
|
|
{
|
|
"title": info.get("title"),
|
|
"duration": info.get("duration"),
|
|
"thumbnails": info.get("thumbnails", []),
|
|
"video_formats": _filter_video_formats(info.get("formats", [])),
|
|
"audio_formats": _filter_audio_formats(info.get("formats", [])),
|
|
}
|
|
)
|
|
except ValueError as ve:
|
|
return jsonify({"error": str(ve)}), 400
|
|
except Exception as e: # noqa: BLE001
|
|
return jsonify({"error": "Errore nel recupero informazioni", "detail": str(e)}), 500
|
|
|
|
|
|
@app.post("/api/download")
|
|
def download():
|
|
data = request.get_json(force=True, silent=True) or {}
|
|
url = data.get("url")
|
|
mode = data.get("mode", "video")
|
|
format_id = data.get("format_id")
|
|
audio_ext = data.get("audio_ext", "mp3")
|
|
|
|
if not url:
|
|
return jsonify({"error": "URL mancante"}), 400
|
|
if mode not in {"video", "audio"}:
|
|
return jsonify({"error": "Mode non valido"}), 400
|
|
|
|
try:
|
|
meta = _extract_info(url)
|
|
_ensure_duration_allowed(meta)
|
|
|
|
ydl_opts = {
|
|
"quiet": True,
|
|
"outtmpl": str(DOWNLOAD_DIR / "%(title)s.%(ext)s"),
|
|
"noprogress": True,
|
|
"restrictfilenames": True,
|
|
"merge_output_format": "mp4",
|
|
"extractor_args": {"youtube": {"player_client": ["android", "web"]}},
|
|
}
|
|
|
|
if mode == "video":
|
|
if format_id:
|
|
# Use specific format ID + best audio, then merge
|
|
ydl_opts["format"] = f"{format_id}+bestaudio/best"
|
|
else:
|
|
ydl_opts["format"] = "bestvideo*+bestaudio/best"
|
|
else:
|
|
if format_id:
|
|
ydl_opts["format"] = format_id
|
|
else:
|
|
ydl_opts["format"] = "bestaudio/best"
|
|
if audio_ext:
|
|
ydl_opts["postprocessors"] = [
|
|
{
|
|
"key": "FFmpegExtractAudio",
|
|
"preferredcodec": audio_ext,
|
|
"preferredquality": "0",
|
|
}
|
|
]
|
|
|
|
with YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(url, download=True)
|
|
output_path = _pick_output_path(info)
|
|
|
|
if not output_path:
|
|
raise RuntimeError("Impossibile determinare il file scaricato")
|
|
|
|
download_url = f"/downloads/{output_path.name}"
|
|
return jsonify({"file": output_path.name, "url": download_url})
|
|
except ValueError as ve:
|
|
return jsonify({"error": str(ve)}), 400
|
|
except Exception as e: # noqa: BLE001
|
|
return jsonify({"error": "Download fallito", "detail": str(e)}), 500
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=int(os.getenv("API_PORT", "5000")))
|