File size: 10,626 Bytes
204d5bf 694f79d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
import os
import sys
import shutil
import traceback
import logging
import gradio as gr
import uuid
import cv2
import time
from gradio_app.utils import convert_to_supported_format
# Adjust sys.path to include the src directory
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'src', 'license_plate_detector_ocr')))
from infer import infer, is_image_file
def gradio_process(model_path, input_file, input_type):
"""Process the input file (image or video) for license plate detection and OCR."""
unique_id = str(uuid.uuid4())[:8]
temp_input_dir = os.path.abspath(os.path.join("apps/gradio_app/temp_data", unique_id))
preview_dir = os.path.abspath(os.path.join("apps/gradio_app/preview_data", unique_id))
try:
file_path = input_file.name if hasattr(input_file, 'name') else input_file
logging.debug(f"Input file path: {file_path}")
print(f"Input file path: {file_path}")
# Verify source file exists and is readable
if not os.path.exists(file_path):
error_msg = f"Error: Source file {file_path} does not exist."
logging.error(error_msg)
return None, None, error_msg, None, None
if not os.access(file_path, os.R_OK):
error_msg = f"Error: Source file {file_path} is not readable."
logging.error(error_msg)
return None, None, error_msg, None, None
# Create unique temp and preview directories
os.makedirs(temp_input_dir, exist_ok=True)
os.makedirs(preview_dir, exist_ok=True)
temp_input_path = os.path.join(temp_input_dir, os.path.basename(file_path))
preview_input_path = os.path.join(preview_dir, os.path.basename(file_path))
# Copy input file to temp and preview directories with retry
max_retries = 3
for attempt in range(max_retries):
try:
shutil.copy2(file_path, temp_input_path) # Copy to temp for processing
shutil.copy2(file_path, preview_input_path) # Copy to preview for display
os.chmod(temp_input_path, 0o644)
os.chmod(preview_input_path, 0o644)
logging.debug(f"Copied input file to: {temp_input_path} and {preview_input_path}")
break
except Exception as e:
if attempt == max_retries - 1:
error_msg = f"Error copying file {file_path} to {temp_input_path} or {preview_input_path} after {max_retries} attempts: {str(e)}"
logging.error(error_msg)
return None, None, error_msg, None, None
time.sleep(0.5) # Brief delay before retry
# Verify copied files
for path in [temp_input_path, preview_input_path]:
if not os.path.exists(path):
error_msg = f"Error: Copied file {path} does not exist."
logging.error(error_msg)
return None, None, error_msg, None, None
if not os.access(path, os.R_OK):
error_msg = f"Error: Copied file {path} is not readable."
logging.error(error_msg)
return None, None, error_msg, None, None
if os.path.getsize(path) == 0:
error_msg = f"Error: Copied file {path} is empty."
logging.error(error_msg)
return None, None, error_msg, None, None
# Validate image or video
if is_image_file(temp_input_path):
img = cv2.imread(temp_input_path)
if img is None:
error_msg = f"Error: Could not load image from {temp_input_path}."
logging.error(error_msg)
return None, None, error_msg, None, None
# Check image properties
height, width, channels = img.shape
logging.debug(f"Image properties: {width}x{height}, {channels} channels")
if channels not in (1, 3, 4):
error_msg = f"Error: Unsupported number of channels ({channels}) in {temp_input_path}. Expected 1, 3, or 4."
logging.error(error_msg)
return None, None, error_msg, None, None
if width == 0 or height == 0:
error_msg = f"Error: Invalid image dimensions ({width}x{height}) in {temp_input_path}."
logging.error(error_msg)
return None, None, error_msg, None, None
else:
cap = cv2.VideoCapture(temp_input_path)
if not cap.isOpened():
error_msg = f"Error: Could not open video at {temp_input_path}."
logging.error(error_msg)
cap.release()
return None, None, error_msg, None, None
cap.release()
# Set output path
output_dir = os.path.abspath(os.path.join("apps/gradio_app/temp_data", str(uuid.uuid4())[:8]))
os.makedirs(output_dir, exist_ok=True)
output_filename = f"{os.path.splitext(os.path.basename(temp_input_path))[0]}_{unique_id}_output{'_output.jpg' if is_image_file(temp_input_path) else '_output.mp4'}"
output_path = os.path.join(output_dir, output_filename)
logging.debug(f"Output path: {output_path}")
# Call the infer function
logging.debug(f"Calling infer with input: {temp_input_path}, output: {output_path}")
result_array, plate_texts = infer(model_path, temp_input_path, output_path)
if result_array is None and is_image_file(temp_input_path):
error_msg = f"Error: Processing failed for {temp_input_path}. 'infer' returned None. Check infer.py logs for details."
logging.error(error_msg)
return None, None, error_msg, preview_input_path if is_image_file(temp_input_path) else None, preview_input_path if not is_image_file(temp_input_path) else None
# Validate output file for videos
if not is_image_file(temp_input_path):
if not os.path.exists(output_path):
error_msg = f"Error: Output video file {output_path} was not created."
logging.error(error_msg)
return None, None, error_msg, None, preview_input_path
# Convert output video to supported format
converted_output_path = os.path.join(output_dir, f"converted_{os.path.basename(output_path)}")
converted_path = convert_to_supported_format(output_path, converted_output_path)
if converted_path is None:
error_msg = f"Error: Failed to convert output video {output_path} to supported format."
logging.error(error_msg)
return None, None, error_msg, None, preview_input_path
output_path = converted_path
# Format plate texts
if is_image_file(temp_input_path):
formatted_texts = "\n".join(plate_texts) if plate_texts else "No plates detected"
logging.debug(f"Image processed successfully. Plate texts: {formatted_texts}")
return result_array, None, formatted_texts, preview_input_path, None
else:
formatted_texts = []
for i, texts in enumerate(plate_texts):
if texts:
formatted_texts.append(f"Frame {i+1}: {', '.join(texts)}")
formatted_texts = "\n".join(formatted_texts) if formatted_texts else "No plates detected"
logging.debug(f"Video processed successfully. Plate texts: {formatted_texts}")
return None, output_path, formatted_texts, None, preview_input_path
except Exception as e:
error_message = f"Error processing {file_path}: {str(e)}\n{traceback.format_exc()}"
logging.error(error_message)
print(error_message)
return None, None, error_message, preview_input_path if is_image_file(file_path) else None, preview_input_path if not is_image_file(file_path) else None
finally:
# Clean up temp directory after processing, but keep preview directory
if os.path.exists(temp_input_dir):
shutil.rmtree(temp_input_dir, ignore_errors=True)
logging.debug(f"Cleaned up temporary directory: {temp_input_dir}")
def update_preview(file, input_type):
"""Return file path for the appropriate preview component based on input type."""
if not file:
logging.debug("No file provided for preview.")
return None, None
# Handle both file objects and string paths
file_path = file.name if hasattr(file, 'name') else file
logging.debug(f"Updating preview for {input_type}: {file_path}")
# Verify file exists
if not os.path.exists(file_path):
logging.error(f"Input file {file_path} does not exist.")
return None, None
# Check if video format is supported
if input_type == "Video" and not file_path.lower().endswith(('.mp4', '.webm')):
logging.error(f"Unsupported video format for {file_path}. Use MP4 or WebM.")
return None, None
# Copy to preview directory for persistent display
unique_id = str(uuid.uuid4())[:8]
preview_dir = os.path.abspath(os.path.join("apps/gradio_app/preview_data", unique_id))
os.makedirs(preview_dir, exist_ok=True)
preview_input_path = os.path.join(preview_dir, os.path.basename(file_path))
try:
shutil.copy2(file_path, preview_input_path)
os.chmod(preview_input_path, 0o644)
logging.debug(f"Copied preview file to: {preview_input_path}")
except Exception as e:
logging.error(f"Error copying preview file to {preview_input_path}: {str(e)}")
return None, None
return preview_input_path if input_type == "Image" else None, preview_input_path if input_type == "Video" else None
def update_visibility(input_type):
"""Update visibility of input/output components based on input type."""
logging.debug(f"Updating visibility for input type: {input_type}")
is_image = input_type == "Image"
is_video = input_type == "Video"
return (
gr.update(visible=is_image),
gr.update(visible=is_video),
gr.update(visible=is_image),
gr.update(visible=is_video)
)
def clear_preview_data():
"""Clear all files in the preview_data directory."""
preview_data_dir = os.path.abspath("apps/gradio_app/preview_data")
if os.path.exists(preview_data_dir):
shutil.rmtree(preview_data_dir, ignore_errors=True)
logging.debug(f"Cleared preview_data directory: {preview_data_dir}")
os.makedirs(preview_data_dir, exist_ok=True) |