Spaces:
Running
Running
| import os | |
| import io | |
| import cv2 | |
| import numpy as np | |
| import tempfile | |
| import time | |
| import math | |
| import json | |
| from PIL import Image, ImageEnhance, ImageFilter | |
| from pdf2image import convert_from_bytes | |
| import streamlit as st | |
| import logging | |
| import concurrent.futures | |
| from pathlib import Path | |
| # Configure logging | |
| logger = logging.getLogger("preprocessing") | |
| logger.setLevel(logging.INFO) | |
| # Ensure logs directory exists | |
| def ensure_log_directory(config): | |
| """Create logs directory if it doesn't exist""" | |
| if config.get("logging", {}).get("enabled", False): | |
| log_path = config.get("logging", {}).get("output_path", "logs/preprocessing_metrics.json") | |
| log_dir = os.path.dirname(log_path) | |
| if log_dir: | |
| Path(log_dir).mkdir(parents=True, exist_ok=True) | |
| def log_preprocessing_metrics(metrics, config): | |
| """Log preprocessing metrics to JSON file""" | |
| if not config.get("enabled", False): | |
| return | |
| log_path = config.get("output_path", "logs/preprocessing_metrics.json") | |
| ensure_log_directory({"logging": {"enabled": True, "output_path": log_path}}) | |
| # Add timestamp | |
| metrics["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") | |
| # Append to log file | |
| try: | |
| existing_data = [] | |
| if os.path.exists(log_path): | |
| with open(log_path, 'r') as f: | |
| existing_data = json.load(f) | |
| if not isinstance(existing_data, list): | |
| existing_data = [existing_data] | |
| existing_data.append(metrics) | |
| with open(log_path, 'w') as f: | |
| json.dump(existing_data, f, indent=2) | |
| logger.info(f"Logged preprocessing metrics to {log_path}") | |
| except Exception as e: | |
| logger.error(f"Error logging preprocessing metrics: {str(e)}") | |
| def get_document_config(document_type, global_config): | |
| """ | |
| Get document-specific preprocessing configuration by merging with global settings. | |
| Args: | |
| document_type: The type of document (e.g., 'standard', 'newspaper', 'handwritten') | |
| global_config: The global preprocessing configuration | |
| Returns: | |
| A merged configuration dictionary with document-specific overrides | |
| """ | |
| # Start with a copy of the global config | |
| config = { | |
| "deskew": global_config.get("deskew", {}), | |
| "thresholding": global_config.get("thresholding", {}), | |
| "morphology": global_config.get("morphology", {}), | |
| "performance": global_config.get("performance", {}), | |
| "logging": global_config.get("logging", {}) | |
| } | |
| # Apply document-specific overrides if they exist | |
| doc_types = global_config.get("document_types", {}) | |
| if document_type in doc_types: | |
| doc_config = doc_types[document_type] | |
| # Merge document-specific settings into the config | |
| for section in doc_config: | |
| if section in config: | |
| config[section].update(doc_config[section]) | |
| return config | |
| def deskew_image(img_array, config): | |
| """ | |
| Detect and correct skew in document images. | |
| Uses a combination of methods (minAreaRect and/or Hough transform) | |
| to estimate the skew angle more robustly. | |
| Args: | |
| img_array: Input image as numpy array | |
| config: Deskew configuration dict | |
| Returns: | |
| Deskewed image as numpy array, estimated angle, success flag | |
| """ | |
| if not config.get("enabled", False): | |
| return img_array, 0.0, True | |
| # Convert to grayscale if needed | |
| gray = img_array if len(img_array.shape) == 2 else cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| # Start with a threshold to get binary image for angle detection | |
| _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| angles = [] | |
| angle_threshold = config.get("angle_threshold", 0.1) | |
| max_angle = config.get("max_angle", 45.0) | |
| # Method 1: minAreaRect approach | |
| try: | |
| # Find all contours | |
| contours, _ = cv2.findContours(binary, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) | |
| # Filter contours by area to avoid noise | |
| min_area = binary.shape[0] * binary.shape[1] * 0.0001 # 0.01% of image area | |
| filtered_contours = [cnt for cnt in contours if cv2.contourArea(cnt) > min_area] | |
| # Get angles from rotated rectangles around contours | |
| for contour in filtered_contours: | |
| rect = cv2.minAreaRect(contour) | |
| width, height = rect[1] | |
| # Calculate the angle based on the longer side | |
| # (This is important for getting the orientation right) | |
| angle = rect[2] | |
| if width < height: | |
| angle += 90 | |
| # Normalize angle to -45 to 45 range | |
| if angle > 45: | |
| angle -= 90 | |
| if angle < -45: | |
| angle += 90 | |
| # Clamp angle to max limit | |
| angle = max(min(angle, max_angle), -max_angle) | |
| angles.append(angle) | |
| except Exception as e: | |
| logger.error(f"Error in minAreaRect skew detection: {str(e)}") | |
| # Method 2: Hough Transform approach (if enabled) | |
| if config.get("use_hough", True): | |
| try: | |
| # Apply Canny edge detection | |
| edges = cv2.Canny(gray, 50, 150, apertureSize=3) | |
| # Apply Hough lines | |
| lines = cv2.HoughLinesP(edges, 1, np.pi/180, | |
| threshold=100, minLineLength=100, maxLineGap=10) | |
| if lines is not None: | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| if x2 - x1 != 0: # Avoid division by zero | |
| # Calculate line angle in degrees | |
| angle = math.atan2(y2 - y1, x2 - x1) * 180.0 / np.pi | |
| # Normalize angle to -45 to 45 range | |
| if angle > 45: | |
| angle -= 90 | |
| if angle < -45: | |
| angle += 90 | |
| # Clamp angle to max limit | |
| angle = max(min(angle, max_angle), -max_angle) | |
| angles.append(angle) | |
| except Exception as e: | |
| logger.error(f"Error in Hough transform skew detection: {str(e)}") | |
| # If no angles were detected, return original image | |
| if not angles: | |
| logger.warning("No skew angles detected, using original image") | |
| return img_array, 0.0, False | |
| # Combine angles using the specified consensus method | |
| consensus_method = config.get("consensus_method", "average") | |
| if consensus_method == "average": | |
| final_angle = sum(angles) / len(angles) | |
| elif consensus_method == "median": | |
| final_angle = sorted(angles)[len(angles) // 2] | |
| elif consensus_method == "min": | |
| final_angle = min(angles, key=abs) | |
| elif consensus_method == "max": | |
| final_angle = max(angles, key=abs) | |
| else: | |
| final_angle = sum(angles) / len(angles) # Default to average | |
| # If angle is below threshold, don't rotate | |
| if abs(final_angle) < angle_threshold: | |
| logger.info(f"Detected angle ({final_angle:.2f}°) is below threshold, skipping deskew") | |
| return img_array, final_angle, True | |
| # Log the detected angle | |
| logger.info(f"Deskewing image with angle: {final_angle:.2f}°") | |
| # Get image dimensions | |
| h, w = img_array.shape[:2] | |
| center = (w // 2, h // 2) | |
| # Get rotation matrix | |
| rotation_matrix = cv2.getRotationMatrix2D(center, final_angle, 1.0) | |
| # Calculate new image dimensions | |
| abs_cos = abs(rotation_matrix[0, 0]) | |
| abs_sin = abs(rotation_matrix[0, 1]) | |
| new_w = int(h * abs_sin + w * abs_cos) | |
| new_h = int(h * abs_cos + w * abs_sin) | |
| # Adjust the rotation matrix to account for new dimensions | |
| rotation_matrix[0, 2] += (new_w / 2) - center[0] | |
| rotation_matrix[1, 2] += (new_h / 2) - center[1] | |
| # Perform the rotation | |
| try: | |
| # Determine the number of channels to create the correct output array | |
| if len(img_array.shape) == 3: | |
| rotated = cv2.warpAffine(img_array, rotation_matrix, (new_w, new_h), | |
| flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, | |
| borderValue=(255, 255, 255)) | |
| else: | |
| rotated = cv2.warpAffine(img_array, rotation_matrix, (new_w, new_h), | |
| flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, | |
| borderValue=255) | |
| return rotated, final_angle, True | |
| except Exception as e: | |
| logger.error(f"Error rotating image: {str(e)}") | |
| if config.get("fallback", {}).get("enabled", True): | |
| logger.info("Using original image as fallback after rotation failure") | |
| return img_array, final_angle, False | |
| return img_array, final_angle, False | |
| def preblur(img_array, config): | |
| """ | |
| Apply pre-filtering blur to stabilize thresholding results. | |
| Args: | |
| img_array: Input image as numpy array | |
| config: Pre-blur configuration dict | |
| Returns: | |
| Blurred image as numpy array | |
| """ | |
| if not config.get("enabled", False): | |
| return img_array | |
| method = config.get("method", "gaussian") | |
| kernel_size = config.get("kernel_size", 3) | |
| # Ensure kernel size is odd | |
| if kernel_size % 2 == 0: | |
| kernel_size += 1 | |
| try: | |
| if method == "gaussian": | |
| return cv2.GaussianBlur(img_array, (kernel_size, kernel_size), 0) | |
| elif method == "median": | |
| return cv2.medianBlur(img_array, kernel_size) | |
| else: | |
| logger.warning(f"Unknown blur method: {method}, using gaussian") | |
| return cv2.GaussianBlur(img_array, (kernel_size, kernel_size), 0) | |
| except Exception as e: | |
| logger.error(f"Error applying {method} blur: {str(e)}") | |
| return img_array | |
| def apply_threshold(img_array, config): | |
| """ | |
| Apply thresholding to create binary image. | |
| Supports Otsu's method and adaptive thresholding. | |
| Includes pre-filtering and fallback mechanisms. | |
| Args: | |
| img_array: Input image as numpy array | |
| config: Thresholding configuration dict | |
| Returns: | |
| Binary image as numpy array, success flag | |
| """ | |
| method = config.get("method", "adaptive") | |
| if method == "none": | |
| return img_array, True | |
| # Convert to grayscale if needed | |
| gray = img_array if len(img_array.shape) == 2 else cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| # Apply pre-blur if configured | |
| preblur_config = config.get("preblur", {}) | |
| if preblur_config.get("enabled", False): | |
| gray = preblur(gray, preblur_config) | |
| binary = None | |
| try: | |
| if method == "otsu": | |
| # Apply Otsu's thresholding | |
| _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| elif method == "adaptive": | |
| # Apply adaptive thresholding | |
| block_size = config.get("adaptive_block_size", 11) | |
| constant = config.get("adaptive_constant", 2) | |
| # Ensure block size is odd | |
| if block_size % 2 == 0: | |
| block_size += 1 | |
| binary = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, block_size, constant) | |
| else: | |
| logger.warning(f"Unknown thresholding method: {method}, using adaptive") | |
| block_size = config.get("adaptive_block_size", 11) | |
| constant = config.get("adaptive_constant", 2) | |
| # Ensure block size is odd | |
| if block_size % 2 == 0: | |
| block_size += 1 | |
| binary = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, block_size, constant) | |
| except Exception as e: | |
| logger.error(f"Error applying {method} thresholding: {str(e)}") | |
| if config.get("fallback", {}).get("enabled", True): | |
| logger.info("Using original grayscale image as fallback after thresholding failure") | |
| return gray, False | |
| return gray, False | |
| # Calculate percentage of non-zero pixels for logging | |
| nonzero_pct = np.count_nonzero(binary) / binary.size * 100 | |
| logger.info(f"Binary image has {nonzero_pct:.2f}% non-zero pixels") | |
| # Check if thresholding was successful (crude check) | |
| if nonzero_pct < 1 or nonzero_pct > 99: | |
| logger.warning(f"Thresholding produced extreme result ({nonzero_pct:.2f}% non-zero)") | |
| if config.get("fallback", {}).get("enabled", True): | |
| logger.info("Using original grayscale image as fallback after poor thresholding") | |
| return gray, False | |
| return binary, True | |
| def apply_morphology(binary_img, config): | |
| """ | |
| Apply morphological operations to clean up binary image. | |
| Supports opening, closing, or both operations. | |
| Args: | |
| binary_img: Binary image as numpy array | |
| config: Morphology configuration dict | |
| Returns: | |
| Processed binary image as numpy array | |
| """ | |
| if not config.get("enabled", False): | |
| return binary_img | |
| operation = config.get("operation", "close") | |
| kernel_size = config.get("kernel_size", 1) | |
| kernel_shape = config.get("kernel_shape", "rect") | |
| # Create appropriate kernel | |
| if kernel_shape == "rect": | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size*2+1, kernel_size*2+1)) | |
| elif kernel_shape == "ellipse": | |
| kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size*2+1, kernel_size*2+1)) | |
| elif kernel_shape == "cross": | |
| kernel = cv2.getStructuringElement(cv2.MORPH_CROSS, (kernel_size*2+1, kernel_size*2+1)) | |
| else: | |
| logger.warning(f"Unknown kernel shape: {kernel_shape}, using rect") | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size*2+1, kernel_size*2+1)) | |
| result = binary_img | |
| try: | |
| if operation == "open": | |
| # Opening: Erosion followed by dilation - removes small noise | |
| result = cv2.morphologyEx(binary_img, cv2.MORPH_OPEN, kernel) | |
| elif operation == "close": | |
| # Closing: Dilation followed by erosion - fills small holes | |
| result = cv2.morphologyEx(binary_img, cv2.MORPH_CLOSE, kernel) | |
| elif operation == "both": | |
| # Both operations in sequence | |
| result = cv2.morphologyEx(binary_img, cv2.MORPH_OPEN, kernel) | |
| result = cv2.morphologyEx(result, cv2.MORPH_CLOSE, kernel) | |
| else: | |
| logger.warning(f"Unknown morphological operation: {operation}, using close") | |
| result = cv2.morphologyEx(binary_img, cv2.MORPH_CLOSE, kernel) | |
| except Exception as e: | |
| logger.error(f"Error applying morphological operation: {str(e)}") | |
| return binary_img | |
| return result | |
| # Cache for 24 hours | |
| def convert_pdf_to_images(pdf_bytes, dpi=150, rotation=0): | |
| """Convert PDF bytes to a list of images with caching""" | |
| try: | |
| images = convert_from_bytes(pdf_bytes, dpi=dpi) | |
| # Apply rotation if specified | |
| if rotation != 0 and images: | |
| rotated_images = [] | |
| for img in images: | |
| rotated_img = img.rotate(rotation, expand=True, resample=Image.BICUBIC) | |
| rotated_images.append(rotated_img) | |
| return rotated_images | |
| return images | |
| except Exception as e: | |
| st.error(f"Error converting PDF: {str(e)}") | |
| logger.error(f"PDF conversion error: {str(e)}") | |
| return [] | |
| def preprocess_image(image_bytes, preprocessing_options): | |
| """ | |
| Conservative preprocessing function for handwritten documents with early exit for clean scans. | |
| Implements light processing: grayscale → denoise (gently) → contrast (conservative) | |
| Args: | |
| image_bytes: Image content as bytes | |
| preprocessing_options: Dictionary with document_type, grayscale, denoise, contrast options | |
| Returns: | |
| Processed image bytes or original image bytes if no processing needed | |
| """ | |
| # Setup basic console logging | |
| logger = logging.getLogger("image_preprocessor") | |
| logger.setLevel(logging.INFO) | |
| # Log which preprocessing options are being applied | |
| logger.info(f"Document type: {preprocessing_options.get('document_type', 'standard')}") | |
| # Check if any preprocessing is actually requested | |
| has_preprocessing = ( | |
| preprocessing_options.get("grayscale", False) or | |
| preprocessing_options.get("denoise", False) or | |
| preprocessing_options.get("contrast", 0) != 0 | |
| ) | |
| # Convert bytes to PIL Image | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| # Check for minimal skew and exit early if document is already straight | |
| # This avoids unnecessary processing for clean scans | |
| try: | |
| from utils.image_utils import detect_skew | |
| skew_angle = detect_skew(image) | |
| if abs(skew_angle) < 0.5: | |
| logger.info(f"Document has minimal skew ({skew_angle:.2f}°), skipping preprocessing") | |
| # Return original image bytes as is for perfectly straight documents | |
| if not has_preprocessing: | |
| return image_bytes | |
| except Exception as e: | |
| logger.warning(f"Error in skew detection: {str(e)}, continuing with preprocessing") | |
| # If no preprocessing options are selected, return the original image | |
| if not has_preprocessing: | |
| logger.info("No preprocessing options selected, skipping preprocessing") | |
| return image_bytes | |
| # Initialize metrics for logging | |
| metrics = { | |
| "file": preprocessing_options.get("filename", "unknown"), | |
| "document_type": preprocessing_options.get("document_type", "standard"), | |
| "preprocessing_applied": [] | |
| } | |
| start_time = time.time() | |
| # Handle RGBA images (transparency) by converting to RGB | |
| if image.mode == 'RGBA': | |
| # Convert RGBA to RGB by compositing onto white background | |
| logger.info("Converting RGBA image to RGB") | |
| background = Image.new('RGB', image.size, (255, 255, 255)) | |
| background.paste(image, mask=image.split()[3]) # 3 is the alpha channel | |
| image = background | |
| metrics["preprocessing_applied"].append("alpha_conversion") | |
| elif image.mode not in ('RGB', 'L'): | |
| # Convert other modes to RGB | |
| logger.info(f"Converting {image.mode} image to RGB") | |
| image = image.convert('RGB') | |
| metrics["preprocessing_applied"].append("format_conversion") | |
| # Convert to NumPy array for OpenCV processing | |
| img_array = np.array(image) | |
| # Apply grayscale if requested (useful for handwritten text) | |
| if preprocessing_options.get("grayscale", False): | |
| if len(img_array.shape) == 3: # Only convert if it's not already grayscale | |
| # For handwritten documents, apply gentle CLAHE to enhance contrast locally | |
| if preprocessing_options.get("document_type") == "handwritten": | |
| img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| clahe = cv2.createCLAHE(clipLimit=1.5, tileGridSize=(8,8)) # Conservative clip limit | |
| img_array = clahe.apply(img_array) | |
| else: | |
| # Standard grayscale for printed documents | |
| img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| metrics["preprocessing_applied"].append("grayscale") | |
| # Apply light denoising if requested | |
| if preprocessing_options.get("denoise", False): | |
| try: | |
| # Apply very gentle denoising | |
| is_color = len(img_array.shape) == 3 and img_array.shape[2] == 3 | |
| if is_color: | |
| # Very light color denoising with conservative parameters | |
| img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 2, 2, 3, 7) | |
| else: | |
| # Very light grayscale denoising | |
| img_array = cv2.fastNlMeansDenoising(img_array, None, 2, 3, 7) | |
| metrics["preprocessing_applied"].append("light_denoise") | |
| except Exception as e: | |
| logger.error(f"Denoising error: {str(e)}") | |
| # Apply contrast adjustment if requested (conservative range) | |
| contrast_value = preprocessing_options.get("contrast", 0) | |
| if contrast_value != 0: | |
| # Use a gentler contrast adjustment factor | |
| contrast_factor = 1 + (contrast_value / 200) # Conservative scaling factor | |
| # Convert NumPy array back to PIL Image for contrast adjustment | |
| if len(img_array.shape) == 2: # If grayscale, convert to RGB for PIL | |
| image = Image.fromarray(cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)) | |
| else: | |
| image = Image.fromarray(img_array) | |
| enhancer = ImageEnhance.Contrast(image) | |
| image = enhancer.enhance(contrast_factor) | |
| # Convert back to NumPy array | |
| img_array = np.array(image) | |
| metrics["preprocessing_applied"].append(f"contrast_{contrast_value}") | |
| # Convert back to PIL Image | |
| if len(img_array.shape) == 2: # If grayscale, convert to RGB for saving | |
| processed_image = Image.fromarray(cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)) | |
| else: | |
| processed_image = Image.fromarray(img_array) | |
| # Record total processing time | |
| metrics["processing_time"] = (time.time() - start_time) * 1000 # ms | |
| # Higher quality for OCR processing | |
| byte_io = io.BytesIO() | |
| try: | |
| # Make sure the image is in RGB mode before saving as JPEG | |
| if processed_image.mode not in ('RGB', 'L'): | |
| processed_image = processed_image.convert('RGB') | |
| processed_image.save(byte_io, format='JPEG', quality=92, optimize=True) | |
| byte_io.seek(0) | |
| logger.info(f"Preprocessing complete. Original image mode: {image.mode}, processed mode: {processed_image.mode}") | |
| logger.info(f"Original size: {len(image_bytes)/1024:.1f}KB, processed size: {len(byte_io.getvalue())/1024:.1f}KB") | |
| logger.info(f"Applied preprocessing steps: {', '.join(metrics['preprocessing_applied'])}") | |
| return byte_io.getvalue() | |
| except Exception as e: | |
| logger.error(f"Error saving processed image: {str(e)}") | |
| # Fallback to original image | |
| logger.info("Using original image as fallback") | |
| return image_bytes | |
| def create_temp_file(content, suffix, temp_file_paths): | |
| """Create a temporary file and track it for cleanup""" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
| tmp.write(content) | |
| temp_path = tmp.name | |
| # Track temporary file for cleanup | |
| temp_file_paths.append(temp_path) | |
| logger.info(f"Created temporary file: {temp_path}") | |
| return temp_path | |
| def apply_preprocessing_to_file(file_bytes, file_ext, preprocessing_options, temp_file_paths): | |
| """ | |
| Apply conservative preprocessing to file and return path to the temporary file. | |
| Handles format conversion and user-selected preprocessing options. | |
| Args: | |
| file_bytes: File content as bytes | |
| file_ext: File extension (e.g., '.jpg', '.pdf') | |
| preprocessing_options: Dictionary with document_type and preprocessing options | |
| temp_file_paths: List to track temporary files for cleanup | |
| Returns: | |
| Tuple of (temp_file_path, was_processed_flag) | |
| """ | |
| document_type = preprocessing_options.get("document_type", "standard") | |
| # Check for user-selected preprocessing | |
| has_preprocessing = ( | |
| preprocessing_options.get("grayscale", False) or | |
| preprocessing_options.get("denoise", False) or | |
| preprocessing_options.get("contrast", 0) != 0 | |
| ) | |
| # Check for RGBA/transparency that needs conversion | |
| format_needs_conversion = False | |
| # Only check formats that might have transparency | |
| if file_ext.lower() in ['.png', '.tif', '.tiff']: | |
| try: | |
| # Check if image has transparency | |
| image = Image.open(io.BytesIO(file_bytes)) | |
| if image.mode == 'RGBA' or image.mode not in ('RGB', 'L'): | |
| format_needs_conversion = True | |
| except Exception as e: | |
| logger.warning(f"Error checking image format: {str(e)}") | |
| # Process if user requested preprocessing OR format needs conversion | |
| needs_processing = has_preprocessing or format_needs_conversion | |
| if needs_processing: | |
| # Apply preprocessing | |
| logger.info(f"Applying preprocessing with options: {preprocessing_options}") | |
| logger.info(f"Using document type '{document_type}' with advanced preprocessing options") | |
| # Add filename to preprocessing options for logging if available | |
| if hasattr(file_bytes, 'name'): | |
| preprocessing_options["filename"] = file_bytes.name | |
| processed_bytes = preprocess_image(file_bytes, preprocessing_options) | |
| # Save processed image to temp file | |
| temp_path = create_temp_file(processed_bytes, file_ext, temp_file_paths) | |
| return temp_path, True # Return path and flag indicating preprocessing was applied | |
| else: | |
| # No preprocessing needed, just save the original file | |
| logger.info("No preprocessing applied - using original image") | |
| temp_path = create_temp_file(file_bytes, file_ext, temp_file_paths) | |
| return temp_path, False # Return path and flag indicating no preprocessing was applied | |