Spaces:
Runtime error
Runtime error
| import os, io, base64, urllib.request, ssl, time, json, pathlib | |
| from typing import Optional, List | |
| import numpy as np, cv2, torch | |
| from ultralytics import YOLO | |
| import easyocr | |
| from fastapi import FastAPI, HTTPException, File, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| TMP_DIR = "/tmp" | |
| paths = [ | |
| f"{TMP_DIR}/Ultralytics", | |
| f"{TMP_DIR}/.EasyOCR", | |
| f"{TMP_DIR}/.EasyOCR/user_network", | |
| f"{TMP_DIR}/mplconfig", | |
| ] | |
| for p in paths: | |
| os.makedirs(p, exist_ok=True) | |
| from huggingface_hub import hf_hub_download | |
| try: | |
| WEIGHTS = hf_hub_download( | |
| repo_id="keremberke/yolov5n-license-plate", | |
| filename="best.pt", | |
| token=os.getenv("HF_TOKEN", None), # opcional | |
| ) | |
| except Exception: | |
| # 2) Fallback directo a /tmp si falla la caché | |
| local_path = os.path.join(TMP_DIR, "best.pt") | |
| if not os.path.exists(local_path): | |
| url = "https://huggingface.co/keremberke/yolov5n-license-plate/resolve/main/best.pt" | |
| req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) | |
| with urllib.request.urlopen(req) as r, open(local_path, "wb") as f: | |
| f.write(r.read()) | |
| WEIGHTS = local_path | |
| yolo = YOLO(WEIGHTS) | |
| # EasyOCR con GPU si está disponible | |
| reader = easyocr.Reader( | |
| ['en'], | |
| gpu=torch.cuda.is_available(), | |
| model_storage_directory=f"{TMP_DIR}/.EasyOCR", | |
| user_network_directory=f"{TMP_DIR}/.EasyOCR/user_network", | |
| download_enabled=True, | |
| ) | |
| ALLOW = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" | |
| def preprocess_for_ocr(plate_bgr): | |
| img = plate_bgr.copy() | |
| h, w = img.shape[:2] | |
| if max(h, w) < 160: | |
| img = cv2.resize(img, (w*2, h*2), interpolation=cv2.INTER_CUBIC) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| gray = cv2.bilateralFilter(gray, 7, 50, 50) | |
| th = cv2.adaptiveThreshold(gray,255,cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, 31, 5) | |
| return th | |
| def ocr_plate(plate_bgr): | |
| img = preprocess_for_ocr(plate_bgr) | |
| out1 = reader.readtext(img, detail=1, allowlist=ALLOW) | |
| out2 = reader.readtext(plate_bgr, detail=1, allowlist=ALLOW) | |
| cands = [] | |
| for out in (out1, out2): | |
| for _, text, score in out: | |
| t = "".join([c for c in text.upper() if c in ALLOW]) | |
| if len(t) >= 4: | |
| cands.append((t, float(score))) | |
| if not cands: | |
| return "", 0.0 | |
| cands.sort(key=lambda x: (x[1], len(x[0])), reverse=True) | |
| return cands[0] | |
| def draw_box_text(img, xyxy, text, color=(0, 255, 0)): | |
| x1, y1, x2, y2 = [int(v) for v in xyxy] | |
| cv2.rectangle(img, (x1,y1), (x2,y2), color, 2) | |
| if text: | |
| tsize = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] | |
| cv2.rectangle(img, (x1, y1 - tsize[1] - 6), (x1 + tsize[0] + 4, y1), color, -1) | |
| cv2.putText(img, text, (x1 + 2, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 2, cv2.LINE_AA) | |
| def detect_plates_bgr(bgr, conf=0.25, iou=0.45): | |
| res = yolo.predict(bgr, conf=conf, iou=iou, verbose=False)[0] | |
| boxes = res.boxes.xyxy.cpu().numpy() if res.boxes is not None else np.empty((0,4)) | |
| confs = res.boxes.conf.cpu().numpy() if res.boxes is not None else np.empty((0,)) | |
| return boxes, confs | |
| def run_on_image_bgr(bgr, conf=0.25, iou=0.45, with_ocr=True, annotate=True): | |
| h, w = bgr.shape[:2] | |
| vis = bgr.copy() | |
| t0 = time.time() | |
| boxes, confs = detect_plates_bgr(bgr, conf, iou) | |
| detections = [] | |
| for xyxy, c in zip(boxes, confs): | |
| x1, y1, x2, y2 = [int(v) for v in xyxy] | |
| crop = bgr[max(0,y1):max(0,y2), max(0,x1):max(0,x2)] | |
| txt, s = ("", 0.0) | |
| if with_ocr and crop.size: | |
| txt, s = ocr_plate(crop) | |
| if annotate: | |
| label = f"{txt or 'plate'} {c:.2f}" | |
| draw_box_text(vis, xyxy, label) | |
| detections.append({ | |
| "box_xyxy": [x1, y1, x2, y2], | |
| "det_conf": float(c), | |
| "ocr_text": txt, | |
| "ocr_conf": float(s), | |
| }) | |
| dt_ms = int((time.time() - t0) * 1000) | |
| return vis, detections, (w, h), dt_ms | |
| def bgr_to_jpeg_base64(bgr): | |
| ok, buf = cv2.imencode(".jpg", bgr, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) | |
| if not ok: | |
| return None | |
| return base64.b64encode(buf.tobytes()).decode("ascii") | |
| def load_image_from_url(url: str): | |
| ssl._create_default_https_context = ssl._create_unverified_context | |
| data = urllib.request.urlopen(url).read() | |
| arr = np.frombuffer(data, np.uint8) | |
| bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| if bgr is None: | |
| raise ValueError("No pude decodificar la imagen desde URL.") | |
| return bgr | |
| def load_image_from_b64(b64_or_data_url: str): | |
| s = b64_or_data_url | |
| if s.startswith("data:"): | |
| s = s.split(",", 1)[1] | |
| raw = base64.b64decode(s) | |
| arr = np.frombuffer(raw, np.uint8) | |
| bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| if bgr is None: | |
| raise ValueError("No pude decodificar la imagen desde base64.") | |
| return bgr | |
| # --- FastAPI --- | |
| app = FastAPI(title="Plates API (HF Space)") | |
| ALLOWED = [ | |
| "http://localhost:5173", "http://127.0.0.1:5173", | |
| "https://www.omar-cruz.com", "https://omar-cruz.com", | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=ALLOWED, | |
| allow_origin_regex=r"^https?://([a-z0-9-]+\.)*hf\.space$", | |
| allow_credentials=False, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class Detection(BaseModel): | |
| box_xyxy: List[int] | |
| det_conf: float | |
| ocr_text: str = "" | |
| ocr_conf: float = 0.0 | |
| class DetectResponse(BaseModel): | |
| detections: List[Detection] | |
| count: int | |
| width: int | |
| height: int | |
| time_ms: int | |
| annotated_image_b64: Optional[str] = None | |
| class DetectRequest(BaseModel): | |
| image_url: Optional[str] = None | |
| image_b64: Optional[str] = None | |
| conf: float = Field(0.25, ge=0.05, le=0.95) | |
| iou: float = Field(0.45, ge=0.1, le=0.9) | |
| ocr: bool = True | |
| return_image: bool = False | |
| def health(): | |
| return { | |
| "status": "ok", | |
| "service": "plates-api", | |
| "model": os.path.basename(WEIGHTS), | |
| "ocr_gpu": torch.cuda.is_available(), | |
| "allow_origins": ALLOWED, | |
| } | |
| def detect(req: DetectRequest): | |
| try: | |
| if not req.image_url and not req.image_b64: | |
| raise HTTPException(400, "Proporciona 'image_url' o 'image_b64'.") | |
| bgr = load_image_from_url(req.image_url) if req.image_url else load_image_from_b64(req.image_b64) | |
| vis, dets, (w, h), dt_ms = run_on_image_bgr( | |
| bgr, conf=req.conf, iou=req.iou, with_ocr=req.ocr, annotate=req.return_image | |
| ) | |
| b64 = bgr_to_jpeg_base64(vis) if req.return_image else None | |
| return DetectResponse( | |
| detections=dets, count=len(dets), width=w, height=h, time_ms=dt_ms, | |
| annotated_image_b64=b64 | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, f"Error procesando la imagen: {e}") | |
| async def detect_upload( | |
| image: UploadFile = File(...), | |
| conf: float = 0.25, | |
| iou: float = 0.45, | |
| ocr: bool = True, | |
| return_image: bool = False, | |
| ): | |
| try: | |
| data = await image.read() | |
| arr = np.frombuffer(data, np.uint8) | |
| bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| if bgr is None: | |
| raise HTTPException(400, "No pude decodificar el archivo subido.") | |
| vis, dets, (w, h), dt_ms = run_on_image_bgr( | |
| bgr, conf=conf, iou=iou, with_ocr=ocr, annotate=return_image | |
| ) | |
| b64 = bgr_to_jpeg_base64(vis) if return_image else None | |
| return DetectResponse( | |
| detections=dets, count=len(dets), width=w, height=h, time_ms=dt_ms, | |
| annotated_image_b64=b64 | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, f"Error procesando la imagen: {e}") | |