File size: 10,626 Bytes
204d5bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
694f79d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import os
import sys
import shutil
import traceback
import logging
import gradio as gr
import uuid
import cv2
import time
from gradio_app.utils import convert_to_supported_format

# Adjust sys.path to include the src directory
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'src', 'license_plate_detector_ocr')))
from infer import infer, is_image_file

def gradio_process(model_path, input_file, input_type):
    """Process the input file (image or video) for license plate detection and OCR."""
    unique_id = str(uuid.uuid4())[:8]
    temp_input_dir = os.path.abspath(os.path.join("apps/gradio_app/temp_data", unique_id))
    preview_dir = os.path.abspath(os.path.join("apps/gradio_app/preview_data", unique_id))
    try:
        file_path = input_file.name if hasattr(input_file, 'name') else input_file
        logging.debug(f"Input file path: {file_path}")
        print(f"Input file path: {file_path}")
        
        # Verify source file exists and is readable
        if not os.path.exists(file_path):
            error_msg = f"Error: Source file {file_path} does not exist."
            logging.error(error_msg)
            return None, None, error_msg, None, None
        if not os.access(file_path, os.R_OK):
            error_msg = f"Error: Source file {file_path} is not readable."
            logging.error(error_msg)
            return None, None, error_msg, None, None
        
        # Create unique temp and preview directories
        os.makedirs(temp_input_dir, exist_ok=True)
        os.makedirs(preview_dir, exist_ok=True)
        temp_input_path = os.path.join(temp_input_dir, os.path.basename(file_path))
        preview_input_path = os.path.join(preview_dir, os.path.basename(file_path))
        
        # Copy input file to temp and preview directories with retry
        max_retries = 3
        for attempt in range(max_retries):
            try:
                shutil.copy2(file_path, temp_input_path)  # Copy to temp for processing
                shutil.copy2(file_path, preview_input_path)  # Copy to preview for display
                os.chmod(temp_input_path, 0o644)
                os.chmod(preview_input_path, 0o644)
                logging.debug(f"Copied input file to: {temp_input_path} and {preview_input_path}")
                break
            except Exception as e:
                if attempt == max_retries - 1:
                    error_msg = f"Error copying file {file_path} to {temp_input_path} or {preview_input_path} after {max_retries} attempts: {str(e)}"
                    logging.error(error_msg)
                    return None, None, error_msg, None, None
                time.sleep(0.5)  # Brief delay before retry
        
        # Verify copied files
        for path in [temp_input_path, preview_input_path]:
            if not os.path.exists(path):
                error_msg = f"Error: Copied file {path} does not exist."
                logging.error(error_msg)
                return None, None, error_msg, None, None
            if not os.access(path, os.R_OK):
                error_msg = f"Error: Copied file {path} is not readable."
                logging.error(error_msg)
                return None, None, error_msg, None, None
            if os.path.getsize(path) == 0:
                error_msg = f"Error: Copied file {path} is empty."
                logging.error(error_msg)
                return None, None, error_msg, None, None
        
        # Validate image or video
        if is_image_file(temp_input_path):
            img = cv2.imread(temp_input_path)
            if img is None:
                error_msg = f"Error: Could not load image from {temp_input_path}."
                logging.error(error_msg)
                return None, None, error_msg, None, None
            # Check image properties
            height, width, channels = img.shape
            logging.debug(f"Image properties: {width}x{height}, {channels} channels")
            if channels not in (1, 3, 4):
                error_msg = f"Error: Unsupported number of channels ({channels}) in {temp_input_path}. Expected 1, 3, or 4."
                logging.error(error_msg)
                return None, None, error_msg, None, None
            if width == 0 or height == 0:
                error_msg = f"Error: Invalid image dimensions ({width}x{height}) in {temp_input_path}."
                logging.error(error_msg)
                return None, None, error_msg, None, None
        else:
            cap = cv2.VideoCapture(temp_input_path)
            if not cap.isOpened():
                error_msg = f"Error: Could not open video at {temp_input_path}."
                logging.error(error_msg)
                cap.release()
                return None, None, error_msg, None, None
            cap.release()
        
        # Set output path
        output_dir = os.path.abspath(os.path.join("apps/gradio_app/temp_data", str(uuid.uuid4())[:8]))
        os.makedirs(output_dir, exist_ok=True)
        output_filename = f"{os.path.splitext(os.path.basename(temp_input_path))[0]}_{unique_id}_output{'_output.jpg' if is_image_file(temp_input_path) else '_output.mp4'}"
        output_path = os.path.join(output_dir, output_filename)
        logging.debug(f"Output path: {output_path}")
        
        # Call the infer function
        logging.debug(f"Calling infer with input: {temp_input_path}, output: {output_path}")
        result_array, plate_texts = infer(model_path, temp_input_path, output_path)
        
        if result_array is None and is_image_file(temp_input_path):
            error_msg = f"Error: Processing failed for {temp_input_path}. 'infer' returned None. Check infer.py logs for details."
            logging.error(error_msg)
            return None, None, error_msg, preview_input_path if is_image_file(temp_input_path) else None, preview_input_path if not is_image_file(temp_input_path) else None
        
        # Validate output file for videos
        if not is_image_file(temp_input_path):
            if not os.path.exists(output_path):
                error_msg = f"Error: Output video file {output_path} was not created."
                logging.error(error_msg)
                return None, None, error_msg, None, preview_input_path
            # Convert output video to supported format
            converted_output_path = os.path.join(output_dir, f"converted_{os.path.basename(output_path)}")
            converted_path = convert_to_supported_format(output_path, converted_output_path)
            if converted_path is None:
                error_msg = f"Error: Failed to convert output video {output_path} to supported format."
                logging.error(error_msg)
                return None, None, error_msg, None, preview_input_path
            output_path = converted_path
        
        # Format plate texts
        if is_image_file(temp_input_path):
            formatted_texts = "\n".join(plate_texts) if plate_texts else "No plates detected"
            logging.debug(f"Image processed successfully. Plate texts: {formatted_texts}")
            return result_array, None, formatted_texts, preview_input_path, None
        else:
            formatted_texts = []
            for i, texts in enumerate(plate_texts):
                if texts:
                    formatted_texts.append(f"Frame {i+1}: {', '.join(texts)}")
            formatted_texts = "\n".join(formatted_texts) if formatted_texts else "No plates detected"
            logging.debug(f"Video processed successfully. Plate texts: {formatted_texts}")
            return None, output_path, formatted_texts, None, preview_input_path
    except Exception as e:
        error_message = f"Error processing {file_path}: {str(e)}\n{traceback.format_exc()}"
        logging.error(error_message)
        print(error_message)
        return None, None, error_message, preview_input_path if is_image_file(file_path) else None, preview_input_path if not is_image_file(file_path) else None
    finally:
        # Clean up temp directory after processing, but keep preview directory
        if os.path.exists(temp_input_dir):
            shutil.rmtree(temp_input_dir, ignore_errors=True)
            logging.debug(f"Cleaned up temporary directory: {temp_input_dir}")

def update_preview(file, input_type):
    """Return file path for the appropriate preview component based on input type."""
    if not file:
        logging.debug("No file provided for preview.")
        return None, None
    
    # Handle both file objects and string paths
    file_path = file.name if hasattr(file, 'name') else file
    logging.debug(f"Updating preview for {input_type}: {file_path}")
    
    # Verify file exists
    if not os.path.exists(file_path):
        logging.error(f"Input file {file_path} does not exist.")
        return None, None
    
    # Check if video format is supported
    if input_type == "Video" and not file_path.lower().endswith(('.mp4', '.webm')):
        logging.error(f"Unsupported video format for {file_path}. Use MP4 or WebM.")
        return None, None
    
    # Copy to preview directory for persistent display
    unique_id = str(uuid.uuid4())[:8]
    preview_dir = os.path.abspath(os.path.join("apps/gradio_app/preview_data", unique_id))
    os.makedirs(preview_dir, exist_ok=True)
    preview_input_path = os.path.join(preview_dir, os.path.basename(file_path))
    try:
        shutil.copy2(file_path, preview_input_path)
        os.chmod(preview_input_path, 0o644)
        logging.debug(f"Copied preview file to: {preview_input_path}")
    except Exception as e:
        logging.error(f"Error copying preview file to {preview_input_path}: {str(e)}")
        return None, None
    
    return preview_input_path if input_type == "Image" else None, preview_input_path if input_type == "Video" else None

def update_visibility(input_type):
    """Update visibility of input/output components based on input type."""
    logging.debug(f"Updating visibility for input type: {input_type}")
    is_image = input_type == "Image"
    is_video = input_type == "Video"
    return (
        gr.update(visible=is_image),
        gr.update(visible=is_video),
        gr.update(visible=is_image),
        gr.update(visible=is_video)
    )

def clear_preview_data():
    """Clear all files in the preview_data directory."""
    preview_data_dir = os.path.abspath("apps/gradio_app/preview_data")
    if os.path.exists(preview_data_dir):
        shutil.rmtree(preview_data_dir, ignore_errors=True)
        logging.debug(f"Cleared preview_data directory: {preview_data_dir}")
    os.makedirs(preview_data_dir, exist_ok=True)