Files
ocr-receipt-extractor/extract_receipt.py
2026-04-02 11:41:05 +02:00

233 lines
7.6 KiB
Python

#!/usr/bin/env python3
"""
Script per estrarre l'importo da foto di scontrini/ricevute.
Pensato per integrazione con n8n (Execute Command o HTTP endpoint).
Uso CLI:
python3 extract_receipt.py /path/to/receipt.jpg
Uso HTTP (per n8n HTTP Request node):
python3 extract_receipt.py --serve --port 5000
POST /extract con multipart form-data (campo "file")
"""
import argparse
import base64
import json
import re
import sys
import os
import tempfile
from pathlib import Path
try:
from PIL import Image, ImageEnhance, ImageFilter
except ImportError:
sys.exit("Errore: installa Pillow → pip install Pillow")
try:
import pytesseract
except ImportError:
sys.exit("Errore: installa pytesseract → pip install pytesseract (e tesseract-ocr sul sistema)")
# ---------- Pattern regex per importi su scontrini italiani ----------
AMOUNT_PATTERNS = [
# IMPORTO EUR 90,93 / IMPORTO: € 12,50
r'IMPORTO\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
# TOTALE EUR 90,93 / TOTALE: 12,50 / TOTALE EURO 12.50
r'TOTALE\s*(?:COMPLESSIVO|GENERALE|EURO|EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
# TOT. 12,50 / TOT 12.50
r'\bTOT\.?\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
# TOTAL 12,50 (EN)
r'\bTOTAL\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
# AMOUNT / AMOUNT EUR
r'\bAMOUNT\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
# €90,93 / EUR 90,93 (standalone, ultima riga)
r'(?:EUR|€)\s*(\d{1,6}[.,]\d{2})',
# DOVUTO 12,50
r'DOVUTO\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
# PAGAMENTO 12,50
r'PAGAMENTO\s*(?:EUR|€)?\s*[:.]?\s*(\d{1,6}[.,]\d{2})',
]
def preprocess_image(image: Image.Image) -> Image.Image:
"""Migliora l'immagine per l'OCR: scala di grigi, contrasto, nitidezza."""
img = image.convert("L")
img = ImageEnhance.Contrast(img).enhance(2.0)
img = ImageEnhance.Sharpness(img).enhance(2.0)
img = img.filter(ImageFilter.MedianFilter(size=3))
return img
def ocr_image(image_path: str) -> str:
"""Esegue OCR sull'immagine e ritorna il testo estratto."""
img = Image.open(image_path)
processed = preprocess_image(img)
# Tesseract con lingua italiana + inglese
custom_config = r'--oem 3 --psm 6 -l ita+eng'
text = pytesseract.image_to_string(processed, config=custom_config)
return text
def parse_amount(text: str) -> dict:
"""Cerca l'importo nel testo OCR usando i pattern definiti."""
text_upper = text.upper()
matches = []
for i, pattern in enumerate(AMOUNT_PATTERNS):
for m in re.finditer(pattern, text_upper):
raw = m.group(1)
# Normalizza: virgola → punto
value = float(raw.replace(",", "."))
matches.append({
"raw": raw,
"amount": value,
"currency": "EUR",
"pattern_index": i,
"position": m.start(),
})
if not matches:
return {
"success": False,
"amount": None,
"currency": None,
"raw_match": None,
"confidence": "none",
"all_matches": [],
"ocr_text": text,
}
# Priorità: pattern con indice più basso (più specifico)
best = min(matches, key=lambda m: (m["pattern_index"], -m["position"]))
confidence = "high" if best["pattern_index"] <= 2 else "medium"
return {
"success": True,
"amount": best["amount"],
"currency": best["currency"],
"raw_match": best["raw"],
"confidence": confidence,
"all_matches": [{"amount": m["amount"], "raw": m["raw"]} for m in matches],
"ocr_text": text,
}
def extract_receipt(image_path: str) -> dict:
"""Pipeline completa: OCR + parsing importo."""
if not os.path.isfile(image_path):
return {"success": False, "error": f"File non trovato: {image_path}"}
text = ocr_image(image_path)
result = parse_amount(text)
result["file"] = os.path.basename(image_path)
return result
# ---------- Modalità HTTP (Flask) per n8n ----------
def create_app():
"""Factory Flask app — usata sia da gunicorn che da run_server."""
from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename
app = Flask(__name__)
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "bmp", "tiff", "webp"}
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route("/extract", methods=["POST"])
def extract():
# --- Modalità 1: JSON con base64 (ideale per n8n + Telegram) ---
if request.is_json:
data = request.get_json()
image_b64 = data.get("image") or data.get("file") or data.get("data")
if not image_b64:
return jsonify({"success": False, "error": "Campo 'image' (base64) mancante nel JSON"}), 400
# Rimuovi eventuale header data:image/...;base64,
if "," in image_b64:
image_b64 = image_b64.split(",", 1)[1]
try:
img_bytes = base64.b64decode(image_b64)
except Exception:
return jsonify({"success": False, "error": "Base64 non valido"}), 400
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
tmp.write(img_bytes)
tmp_path = tmp.name
try:
result = extract_receipt(tmp_path)
return jsonify(result)
finally:
os.unlink(tmp_path)
# --- Modalità 2: multipart form-data (upload file) ---
if "file" not in request.files:
return jsonify({"success": False, "error": "Nessun file inviato (campo 'file') e nessun JSON con base64"}), 400
file = request.files["file"]
if file.filename == "":
return jsonify({"success": False, "error": "Filename vuoto"}), 400
if not allowed_file(file.filename):
return jsonify({"success": False, "error": f"Formato non supportato. Usa: {ALLOWED_EXTENSIONS}"}), 400
filename = secure_filename(file.filename)
with tempfile.NamedTemporaryFile(suffix=Path(filename).suffix, delete=False) as tmp:
file.save(tmp.name)
tmp_path = tmp.name
try:
result = extract_receipt(tmp_path)
return jsonify(result)
finally:
os.unlink(tmp_path)
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "ok"})
return app
def run_server(host: str, port: int):
app = create_app()
print(f"Server avviato su http://{host}:{port}")
print(f" POST /extract (multipart form-data o JSON base64)")
print(f" GET /health")
app.run(host=host, port=port)
# App WSGI per gunicorn: gunicorn extract_receipt:app
app = create_app()
# ---------- Main ----------
def main():
parser = argparse.ArgumentParser(description="Estrai importo da scontrino/ricevuta")
parser.add_argument("image", nargs="?", help="Percorso immagine dello scontrino")
parser.add_argument("--serve", action="store_true", help="Avvia server HTTP (per n8n)")
parser.add_argument("--host", default="0.0.0.0", help="Host server (default: 0.0.0.0)")
parser.add_argument("--port", type=int, default=5000, help="Porta server (default: 5000)")
args = parser.parse_args()
if args.serve:
run_server(args.host, args.port)
elif args.image:
result = extract_receipt(args.image)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0 if result.get("success") else 1)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()