233 lines
7.6 KiB
Python
233 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script per estrarre l'importo da foto di scontrini/ricevute.
|
|
Pensato per integrazione con n8n (Execute Command o HTTP endpoint).
|
|
|
|
Uso CLI:
|
|
python3 extract_receipt.py /path/to/receipt.jpg
|
|
|
|
Uso HTTP (per n8n HTTP Request node):
|
|
python3 extract_receipt.py --serve --port 5000
|
|
POST /extract con multipart form-data (campo "file")
|
|
"""
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import re
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
try:
|
|
from PIL import Image, ImageEnhance, ImageFilter
|
|
except ImportError:
|
|
sys.exit("Errore: installa Pillow → pip install Pillow")
|
|
|
|
try:
|
|
import pytesseract
|
|
except ImportError:
|
|
sys.exit("Errore: installa pytesseract → pip install pytesseract (e tesseract-ocr sul sistema)")
|
|
|
|
|
|
# ---------- Pattern regex per importi su scontrini italiani ----------
|
|
AMOUNT_PATTERNS = [
|
|
# IMPORTO EUR 90,93 / IMPORTO: € 12,50
|
|
r'IMPORTO\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
|
|
# TOTALE EUR 90,93 / TOTALE: 12,50 / TOTALE EURO 12.50
|
|
r'TOTALE\s*(?:COMPLESSIVO|GENERALE|EURO|EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
|
|
# TOT. 12,50 / TOT 12.50
|
|
r'\bTOT\.?\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
|
|
# TOTAL 12,50 (EN)
|
|
r'\bTOTAL\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
|
|
# AMOUNT / AMOUNT EUR
|
|
r'\bAMOUNT\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
|
|
# €90,93 / EUR 90,93 (standalone, ultima riga)
|
|
r'(?:EUR|€)\s*(\d{1,6}[.,]\d{2})',
|
|
# DOVUTO 12,50
|
|
r'DOVUTO\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
|
|
# PAGAMENTO 12,50
|
|
r'PAGAMENTO\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
|
|
]
|
|
|
|
|
|
def preprocess_image(image: Image.Image) -> Image.Image:
|
|
"""Migliora l'immagine per l'OCR: scala di grigi, contrasto, nitidezza."""
|
|
img = image.convert("L")
|
|
img = ImageEnhance.Contrast(img).enhance(2.0)
|
|
img = ImageEnhance.Sharpness(img).enhance(2.0)
|
|
img = img.filter(ImageFilter.MedianFilter(size=3))
|
|
return img
|
|
|
|
|
|
def ocr_image(image_path: str) -> str:
|
|
"""Esegue OCR sull'immagine e ritorna il testo estratto."""
|
|
img = Image.open(image_path)
|
|
processed = preprocess_image(img)
|
|
# Tesseract con lingua italiana + inglese
|
|
custom_config = r'--oem 3 --psm 6 -l ita+eng'
|
|
text = pytesseract.image_to_string(processed, config=custom_config)
|
|
return text
|
|
|
|
|
|
def parse_amount(text: str) -> dict:
|
|
"""Cerca l'importo nel testo OCR usando i pattern definiti."""
|
|
text_upper = text.upper()
|
|
matches = []
|
|
|
|
for i, pattern in enumerate(AMOUNT_PATTERNS):
|
|
for m in re.finditer(pattern, text_upper):
|
|
raw = m.group(1)
|
|
# Normalizza: virgola → punto
|
|
value = float(raw.replace(",", "."))
|
|
matches.append({
|
|
"raw": raw,
|
|
"amount": value,
|
|
"currency": "EUR",
|
|
"pattern_index": i,
|
|
"position": m.start(),
|
|
})
|
|
|
|
if not matches:
|
|
return {
|
|
"success": False,
|
|
"amount": None,
|
|
"currency": None,
|
|
"raw_match": None,
|
|
"confidence": "none",
|
|
"all_matches": [],
|
|
"ocr_text": text,
|
|
}
|
|
|
|
# Priorità: pattern con indice più basso (più specifico)
|
|
best = min(matches, key=lambda m: (m["pattern_index"], -m["position"]))
|
|
confidence = "high" if best["pattern_index"] <= 2 else "medium"
|
|
|
|
return {
|
|
"success": True,
|
|
"amount": best["amount"],
|
|
"currency": best["currency"],
|
|
"raw_match": best["raw"],
|
|
"confidence": confidence,
|
|
"all_matches": [{"amount": m["amount"], "raw": m["raw"]} for m in matches],
|
|
"ocr_text": text,
|
|
}
|
|
|
|
|
|
def extract_receipt(image_path: str) -> dict:
|
|
"""Pipeline completa: OCR + parsing importo."""
|
|
if not os.path.isfile(image_path):
|
|
return {"success": False, "error": f"File non trovato: {image_path}"}
|
|
|
|
text = ocr_image(image_path)
|
|
result = parse_amount(text)
|
|
result["file"] = os.path.basename(image_path)
|
|
return result
|
|
|
|
|
|
# ---------- Modalità HTTP (Flask) per n8n ----------
|
|
def create_app():
|
|
"""Factory Flask app — usata sia da gunicorn che da run_server."""
|
|
from flask import Flask, request, jsonify
|
|
from werkzeug.utils import secure_filename
|
|
|
|
app = Flask(__name__)
|
|
|
|
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "bmp", "tiff", "webp"}
|
|
|
|
def allowed_file(filename):
|
|
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
@app.route("/extract", methods=["POST"])
|
|
def extract():
|
|
# --- Modalità 1: JSON con base64 (ideale per n8n + Telegram) ---
|
|
if request.is_json:
|
|
data = request.get_json()
|
|
image_b64 = data.get("image") or data.get("file") or data.get("data")
|
|
if not image_b64:
|
|
return jsonify({"success": False, "error": "Campo 'image' (base64) mancante nel JSON"}), 400
|
|
|
|
# Rimuovi eventuale header data:image/...;base64,
|
|
if "," in image_b64:
|
|
image_b64 = image_b64.split(",", 1)[1]
|
|
|
|
try:
|
|
img_bytes = base64.b64decode(image_b64)
|
|
except Exception:
|
|
return jsonify({"success": False, "error": "Base64 non valido"}), 400
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
|
|
tmp.write(img_bytes)
|
|
tmp_path = tmp.name
|
|
|
|
try:
|
|
result = extract_receipt(tmp_path)
|
|
return jsonify(result)
|
|
finally:
|
|
os.unlink(tmp_path)
|
|
|
|
# --- Modalità 2: multipart form-data (upload file) ---
|
|
if "file" not in request.files:
|
|
return jsonify({"success": False, "error": "Nessun file inviato (campo 'file') e nessun JSON con base64"}), 400
|
|
|
|
file = request.files["file"]
|
|
if file.filename == "":
|
|
return jsonify({"success": False, "error": "Filename vuoto"}), 400
|
|
|
|
if not allowed_file(file.filename):
|
|
return jsonify({"success": False, "error": f"Formato non supportato. Usa: {ALLOWED_EXTENSIONS}"}), 400
|
|
|
|
filename = secure_filename(file.filename)
|
|
with tempfile.NamedTemporaryFile(suffix=Path(filename).suffix, delete=False) as tmp:
|
|
file.save(tmp.name)
|
|
tmp_path = tmp.name
|
|
|
|
try:
|
|
result = extract_receipt(tmp_path)
|
|
return jsonify(result)
|
|
finally:
|
|
os.unlink(tmp_path)
|
|
|
|
@app.route("/health", methods=["GET"])
|
|
def health():
|
|
return jsonify({"status": "ok"})
|
|
|
|
return app
|
|
|
|
|
|
def run_server(host: str, port: int):
|
|
app = create_app()
|
|
print(f"Server avviato su http://{host}:{port}")
|
|
print(f" POST /extract (multipart form-data o JSON base64)")
|
|
print(f" GET /health")
|
|
app.run(host=host, port=port)
|
|
|
|
|
|
# App WSGI per gunicorn: gunicorn extract_receipt:app
|
|
app = create_app()
|
|
|
|
|
|
# ---------- Main ----------
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Estrai importo da scontrino/ricevuta")
|
|
parser.add_argument("image", nargs="?", help="Percorso immagine dello scontrino")
|
|
parser.add_argument("--serve", action="store_true", help="Avvia server HTTP (per n8n)")
|
|
parser.add_argument("--host", default="0.0.0.0", help="Host server (default: 0.0.0.0)")
|
|
parser.add_argument("--port", type=int, default=5000, help="Porta server (default: 5000)")
|
|
args = parser.parse_args()
|
|
|
|
if args.serve:
|
|
run_server(args.host, args.port)
|
|
elif args.image:
|
|
result = extract_receipt(args.image)
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
sys.exit(0 if result.get("success") else 1)
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|