detector-placas / app.py
Damanger
corrigiendo app
24927f2
raw
history blame
7.96 kB
import os, io, base64, urllib.request, ssl, time, json, pathlib
from typing import Optional, List
import numpy as np, cv2, torch
from ultralytics import YOLO
import easyocr
from fastapi import FastAPI, HTTPException, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
TMP_DIR = "/tmp"
paths = [
f"{TMP_DIR}/Ultralytics",
f"{TMP_DIR}/.EasyOCR",
f"{TMP_DIR}/.EasyOCR/user_network",
f"{TMP_DIR}/mplconfig",
]
for p in paths:
os.makedirs(p, exist_ok=True)
from huggingface_hub import hf_hub_download
try:
WEIGHTS = hf_hub_download(
repo_id="keremberke/yolov5n-license-plate",
filename="best.pt",
token=os.getenv("HF_TOKEN", None), # opcional
)
except Exception:
# 2) Fallback directo a /tmp si falla la caché
local_path = os.path.join(TMP_DIR, "best.pt")
if not os.path.exists(local_path):
url = "https://huggingface.co/keremberke/yolov5n-license-plate/resolve/main/best.pt"
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req) as r, open(local_path, "wb") as f:
f.write(r.read())
WEIGHTS = local_path
yolo = YOLO(WEIGHTS)
# EasyOCR con GPU si está disponible
reader = easyocr.Reader(
['en'],
gpu=torch.cuda.is_available(),
model_storage_directory=f"{TMP_DIR}/.EasyOCR",
user_network_directory=f"{TMP_DIR}/.EasyOCR/user_network",
download_enabled=True,
)
ALLOW = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
def preprocess_for_ocr(plate_bgr):
img = plate_bgr.copy()
h, w = img.shape[:2]
if max(h, w) < 160:
img = cv2.resize(img, (w*2, h*2), interpolation=cv2.INTER_CUBIC)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
gray = cv2.bilateralFilter(gray, 7, 50, 50)
th = cv2.adaptiveThreshold(gray,255,cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 31, 5)
return th
def ocr_plate(plate_bgr):
img = preprocess_for_ocr(plate_bgr)
out1 = reader.readtext(img, detail=1, allowlist=ALLOW)
out2 = reader.readtext(plate_bgr, detail=1, allowlist=ALLOW)
cands = []
for out in (out1, out2):
for _, text, score in out:
t = "".join([c for c in text.upper() if c in ALLOW])
if len(t) >= 4:
cands.append((t, float(score)))
if not cands:
return "", 0.0
cands.sort(key=lambda x: (x[1], len(x[0])), reverse=True)
return cands[0]
def draw_box_text(img, xyxy, text, color=(0, 255, 0)):
x1, y1, x2, y2 = [int(v) for v in xyxy]
cv2.rectangle(img, (x1,y1), (x2,y2), color, 2)
if text:
tsize = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
cv2.rectangle(img, (x1, y1 - tsize[1] - 6), (x1 + tsize[0] + 4, y1), color, -1)
cv2.putText(img, text, (x1 + 2, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 2, cv2.LINE_AA)
def detect_plates_bgr(bgr, conf=0.25, iou=0.45):
res = yolo.predict(bgr, conf=conf, iou=iou, verbose=False)[0]
boxes = res.boxes.xyxy.cpu().numpy() if res.boxes is not None else np.empty((0,4))
confs = res.boxes.conf.cpu().numpy() if res.boxes is not None else np.empty((0,))
return boxes, confs
def run_on_image_bgr(bgr, conf=0.25, iou=0.45, with_ocr=True, annotate=True):
h, w = bgr.shape[:2]
vis = bgr.copy()
t0 = time.time()
boxes, confs = detect_plates_bgr(bgr, conf, iou)
detections = []
for xyxy, c in zip(boxes, confs):
x1, y1, x2, y2 = [int(v) for v in xyxy]
crop = bgr[max(0,y1):max(0,y2), max(0,x1):max(0,x2)]
txt, s = ("", 0.0)
if with_ocr and crop.size:
txt, s = ocr_plate(crop)
if annotate:
label = f"{txt or 'plate'} {c:.2f}"
draw_box_text(vis, xyxy, label)
detections.append({
"box_xyxy": [x1, y1, x2, y2],
"det_conf": float(c),
"ocr_text": txt,
"ocr_conf": float(s),
})
dt_ms = int((time.time() - t0) * 1000)
return vis, detections, (w, h), dt_ms
def bgr_to_jpeg_base64(bgr):
ok, buf = cv2.imencode(".jpg", bgr, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
if not ok:
return None
return base64.b64encode(buf.tobytes()).decode("ascii")
def load_image_from_url(url: str):
ssl._create_default_https_context = ssl._create_unverified_context
data = urllib.request.urlopen(url).read()
arr = np.frombuffer(data, np.uint8)
bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if bgr is None:
raise ValueError("No pude decodificar la imagen desde URL.")
return bgr
def load_image_from_b64(b64_or_data_url: str):
s = b64_or_data_url
if s.startswith("data:"):
s = s.split(",", 1)[1]
raw = base64.b64decode(s)
arr = np.frombuffer(raw, np.uint8)
bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if bgr is None:
raise ValueError("No pude decodificar la imagen desde base64.")
return bgr
# --- FastAPI ---
app = FastAPI(title="Plates API (HF Space)")
ALLOWED = [
"http://localhost:5173", "http://127.0.0.1:5173",
"https://www.omar-cruz.com", "https://omar-cruz.com",
]
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED,
allow_origin_regex=r"^https?://([a-z0-9-]+\.)*hf\.space$",
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
class Detection(BaseModel):
box_xyxy: List[int]
det_conf: float
ocr_text: str = ""
ocr_conf: float = 0.0
class DetectResponse(BaseModel):
detections: List[Detection]
count: int
width: int
height: int
time_ms: int
annotated_image_b64: Optional[str] = None
class DetectRequest(BaseModel):
image_url: Optional[str] = None
image_b64: Optional[str] = None
conf: float = Field(0.25, ge=0.05, le=0.95)
iou: float = Field(0.45, ge=0.1, le=0.9)
ocr: bool = True
return_image: bool = False
@app.get("/")
def health():
return {
"status": "ok",
"service": "plates-api",
"model": os.path.basename(WEIGHTS),
"ocr_gpu": torch.cuda.is_available(),
"allow_origins": ALLOWED,
}
@app.post("/detect", response_model=DetectResponse)
def detect(req: DetectRequest):
try:
if not req.image_url and not req.image_b64:
raise HTTPException(400, "Proporciona 'image_url' o 'image_b64'.")
bgr = load_image_from_url(req.image_url) if req.image_url else load_image_from_b64(req.image_b64)
vis, dets, (w, h), dt_ms = run_on_image_bgr(
bgr, conf=req.conf, iou=req.iou, with_ocr=req.ocr, annotate=req.return_image
)
b64 = bgr_to_jpeg_base64(vis) if req.return_image else None
return DetectResponse(
detections=dets, count=len(dets), width=w, height=h, time_ms=dt_ms,
annotated_image_b64=b64
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, f"Error procesando la imagen: {e}")
@app.post("/detect_upload", response_model=DetectResponse)
async def detect_upload(
image: UploadFile = File(...),
conf: float = 0.25,
iou: float = 0.45,
ocr: bool = True,
return_image: bool = False,
):
try:
data = await image.read()
arr = np.frombuffer(data, np.uint8)
bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if bgr is None:
raise HTTPException(400, "No pude decodificar el archivo subido.")
vis, dets, (w, h), dt_ms = run_on_image_bgr(
bgr, conf=conf, iou=iou, with_ocr=ocr, annotate=return_image
)
b64 = bgr_to_jpeg_base64(vis) if return_image else None
return DetectResponse(
detections=dets, count=len(dets), width=w, height=h, time_ms=dt_ms,
annotated_image_b64=b64
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, f"Error procesando la imagen: {e}")