Spaces:
Running
Running
| import os | |
| import base64 | |
| import hashlib | |
| import time | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| from functools import wraps | |
| from constants import CONTENT_THEMES, PERIOD_TAGS, DEFAULT_TAGS, GENERIC_TAGS | |
| # Configure logging | |
| logger = logging.getLogger("utils") | |
| logger.setLevel(logging.INFO) | |
| def get_base64_from_image(image_path): | |
| """ | |
| Get base64 data URL from image file with proper MIME type. | |
| Args: | |
| image_path: Path to the image file | |
| Returns: | |
| Base64 data URL with appropriate MIME type prefix | |
| """ | |
| try: | |
| # Convert to Path object for better handling | |
| path_obj = Path(image_path) | |
| # Determine mime type based on file extension | |
| mime_type = 'image/jpeg' # Default mime type | |
| suffix = path_obj.suffix.lower() | |
| if suffix == '.png': | |
| mime_type = 'image/png' | |
| elif suffix == '.gif': | |
| mime_type = 'image/gif' | |
| elif suffix in ['.jpg', '.jpeg']: | |
| mime_type = 'image/jpeg' | |
| elif suffix == '.pdf': | |
| mime_type = 'application/pdf' | |
| # Read and encode file | |
| with open(path_obj, "rb") as file: | |
| encoded = base64.b64encode(file.read()).decode('utf-8') | |
| return f"data:{mime_type};base64,{encoded}" | |
| except Exception as e: | |
| logger.error(f"Error encoding file to base64: {str(e)}") | |
| return "" | |
| def get_base64_from_bytes(file_bytes, mime_type=None, file_name=None): | |
| """ | |
| Get base64 data URL from file bytes with proper MIME type. | |
| Args: | |
| file_bytes: Binary file data | |
| mime_type: MIME type of the file (optional) | |
| file_name: Original file name for MIME type detection (optional) | |
| Returns: | |
| Base64 data URL with appropriate MIME type prefix | |
| """ | |
| try: | |
| # Determine mime type if not provided | |
| if mime_type is None and file_name is not None: | |
| # Get file extension | |
| suffix = Path(file_name).suffix.lower() | |
| if suffix == '.png': | |
| mime_type = 'image/png' | |
| elif suffix == '.gif': | |
| mime_type = 'image/gif' | |
| elif suffix in ['.jpg', '.jpeg']: | |
| mime_type = 'image/jpeg' | |
| elif suffix == '.pdf': | |
| mime_type = 'application/pdf' | |
| else: | |
| # Default to octet-stream for unknown types | |
| mime_type = 'application/octet-stream' | |
| elif mime_type is None: | |
| # Default MIME type if we can't determine it | |
| mime_type = 'application/octet-stream' | |
| # Encode and create data URL | |
| encoded = base64.b64encode(file_bytes).decode('utf-8') | |
| return f"data:{mime_type};base64,{encoded}" | |
| except Exception as e: | |
| logger.error(f"Error encoding bytes to base64: {str(e)}") | |
| return "" | |
| def timing(description): | |
| """Context manager for timing code execution""" | |
| class TimingContext: | |
| def __init__(self, description): | |
| self.description = description | |
| def __enter__(self): | |
| self.start_time = time.time() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| end_time = time.time() | |
| execution_time = end_time - self.start_time | |
| logger.info(f"{self.description} took {execution_time:.2f} seconds") | |
| return False | |
| return TimingContext(description) | |
| def format_timestamp(timestamp=None, for_filename=False): | |
| """ | |
| Format timestamp for display or filenames | |
| Args: | |
| timestamp: Datetime object or string to format (defaults to current time) | |
| for_filename: Whether to format for use in a filename (defaults to False) | |
| Returns: | |
| str: Formatted timestamp | |
| """ | |
| if timestamp is None: | |
| timestamp = datetime.now() | |
| elif isinstance(timestamp, str): | |
| try: | |
| timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") | |
| except ValueError: | |
| timestamp = datetime.now() | |
| if for_filename: | |
| # Format suitable for filenames: "Apr 30, 2025" | |
| return timestamp.strftime("%b %d, %Y") | |
| else: | |
| # Standard format for display | |
| return timestamp.strftime("%Y-%m-%d %H:%M") | |
| def generate_cache_key(file_bytes, file_type, use_vision, preprocessing_options=None, pdf_rotation=0, custom_prompt=None): | |
| """ | |
| Generate a cache key for OCR processing | |
| Args: | |
| file_bytes: File content as bytes | |
| file_type: Type of file (pdf or image) | |
| use_vision: Whether to use vision model | |
| preprocessing_options: Dictionary of preprocessing options | |
| pdf_rotation: PDF rotation value | |
| custom_prompt: Custom prompt for OCR | |
| Returns: | |
| str: Cache key | |
| """ | |
| # Generate file hash | |
| file_hash = hashlib.md5(file_bytes).hexdigest() | |
| # Include preprocessing options in cache key | |
| preprocessing_options_hash = "" | |
| if preprocessing_options: | |
| # Add pdf_rotation to preprocessing options to ensure it's part of the cache key | |
| if pdf_rotation != 0: | |
| preprocessing_options_with_rotation = preprocessing_options.copy() | |
| preprocessing_options_with_rotation['pdf_rotation'] = pdf_rotation | |
| preprocessing_str = str(sorted(preprocessing_options_with_rotation.items())) | |
| else: | |
| preprocessing_str = str(sorted(preprocessing_options.items())) | |
| preprocessing_options_hash = hashlib.md5(preprocessing_str.encode()).hexdigest() | |
| elif pdf_rotation != 0: | |
| # If no preprocessing options but we have rotation, include that in the hash | |
| preprocessing_options_hash = hashlib.md5(f"pdf_rotation_{pdf_rotation}".encode()).hexdigest() | |
| # Create base cache key | |
| cache_key = f"{file_hash}_{file_type}_{use_vision}_{preprocessing_options_hash}" | |
| # Include custom prompt in cache key if provided | |
| if custom_prompt: | |
| custom_prompt_hash = hashlib.md5(str(custom_prompt).encode()).hexdigest() | |
| cache_key = f"{cache_key}_{custom_prompt_hash}" | |
| return cache_key | |
| def handle_temp_files(temp_file_paths): | |
| """ | |
| Clean up temporary files | |
| Args: | |
| temp_file_paths: List of temporary file paths to clean up | |
| """ | |
| for temp_path in temp_file_paths: | |
| try: | |
| if os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| logger.info(f"Removed temporary file: {temp_path}") | |
| except Exception as e: | |
| logger.warning(f"Failed to remove temporary file {temp_path}: {str(e)}") | |
| def create_descriptive_filename(original_filename, result, file_ext, preprocessing_options=None): | |
| """ | |
| Create a user-friendly descriptive filename for the result | |
| Args: | |
| original_filename: Original filename | |
| result: OCR result dictionary | |
| file_ext: File extension | |
| preprocessing_options: Dictionary of preprocessing options | |
| Returns: | |
| str: Human-readable descriptive filename | |
| """ | |
| from datetime import datetime | |
| # Get base name without extension and capitalize words | |
| original_name = Path(original_filename).stem | |
| # Make the original name more readable by replacing dashes and underscores with spaces | |
| # Then capitalize each word | |
| readable_name = original_name.replace('-', ' ').replace('_', ' ') | |
| # Split by spaces and capitalize each word, then rejoin | |
| name_parts = readable_name.split() | |
| readable_name = ' '.join(word.capitalize() for word in name_parts) | |
| # Determine document type | |
| doc_type = None | |
| if 'detected_document_type' in result and result['detected_document_type']: | |
| doc_type = result['detected_document_type'].capitalize() | |
| elif 'topics' in result and result['topics']: | |
| # Use first topic as document type if not explicitly detected | |
| doc_type = result['topics'][0] | |
| # Find period/era information | |
| period_info = None | |
| if 'topics' in result and result['topics']: | |
| for tag in result['topics']: | |
| if "century" in tag.lower() or "pre-" in tag.lower() or "era" in tag.lower(): | |
| period_info = tag | |
| break | |
| # Format metadata within parentheses if available | |
| metadata = [] | |
| if doc_type: | |
| metadata.append(doc_type) | |
| if period_info: | |
| metadata.append(period_info) | |
| metadata_str = "" | |
| if metadata: | |
| metadata_str = f" ({', '.join(metadata)})" | |
| # Add current date for uniqueness and sorting | |
| current_date = format_timestamp(for_filename=True) | |
| date_str = f" - {current_date}" | |
| # Generate final user-friendly filename | |
| descriptive_name = f"{readable_name}{metadata_str}{date_str}{file_ext}" | |
| return descriptive_name | |
| def extract_subject_tags(result, raw_text, preprocessing_options=None): | |
| """ | |
| Extract subject tags from OCR result | |
| Args: | |
| result: OCR result dictionary | |
| raw_text: Raw text from OCR | |
| preprocessing_options: Dictionary of preprocessing options | |
| Returns: | |
| list: Subject tags | |
| """ | |
| subject_tags = [] | |
| try: | |
| # Use existing topics as starting point if available | |
| if 'topics' in result and result['topics']: | |
| subject_tags = list(result['topics']) | |
| # Add document type if detected | |
| if 'detected_document_type' in result: | |
| doc_type = result['detected_document_type'].capitalize() | |
| if doc_type not in subject_tags: | |
| subject_tags.append(doc_type) | |
| # Analyze content for common themes based on keywords | |
| if raw_text: | |
| raw_text_lower = raw_text.lower() | |
| # Track keyword matches for each theme and their frequency | |
| theme_matches = {} | |
| # First pass - find all matching keywords for each theme | |
| for theme, keywords in CONTENT_THEMES.items(): | |
| matches = [] | |
| for keyword in keywords: | |
| # For multi-word keywords, we want exact phrase matching | |
| if " " in keyword: | |
| if keyword in raw_text_lower: | |
| matches.append(keyword) | |
| # For single-word keywords, we want word boundary matching to avoid partial matches | |
| else: | |
| import re | |
| pattern = r'\b' + re.escape(keyword) + r'\b' | |
| if re.search(pattern, raw_text_lower): | |
| matches.append(keyword) | |
| if matches: | |
| # Store both the matches and their count | |
| theme_matches[theme] = { | |
| "matches": matches, | |
| "count": len(matches) | |
| } | |
| # Sort themes by match count in descending order | |
| sorted_themes = sorted(theme_matches.keys(), | |
| key=lambda t: theme_matches[t]["count"], | |
| reverse=True) | |
| # Add the most relevant themes (more matches = more relevant) | |
| # Limit to top 5 themes to avoid too many irrelevant tags | |
| top_themes = sorted_themes[:5] if len(sorted_themes) > 5 else sorted_themes | |
| # Add historical period tags first (they're often most important for historical research) | |
| period_themes = [t for t in top_themes if t in [ | |
| "Prehistoric", "Ancient World", "Medieval", "Renaissance", | |
| "Early Modern", "18th Century", "19th Century", "20th Century", "Contemporary" | |
| ]] | |
| for theme in period_themes: | |
| if theme not in subject_tags: | |
| subject_tags.append(theme) | |
| # Then add the remaining top themes | |
| for theme in top_themes: | |
| if theme not in period_themes and theme not in subject_tags: | |
| subject_tags.append(theme) | |
| # Add debug information to log | |
| if theme_matches: | |
| logger.info(f"Extracted themes: {', '.join(top_themes)}") | |
| logger.info(f"Theme match details: {theme_matches}") | |
| # Add document period tag if date patterns are detected | |
| if raw_text: | |
| # Look for years in content | |
| import re | |
| year_matches = re.findall(r'\b1[0-9]{3}\b|\b20[0-1][0-9]\b', raw_text) | |
| if year_matches: | |
| # Convert to integers | |
| years = [int(y) for y in year_matches] | |
| # Get earliest year | |
| earliest = min(years) | |
| # Find the period tag for this year | |
| for year_range, period_tag in PERIOD_TAGS.items(): | |
| if year_range[0] <= earliest <= year_range[1]: | |
| if period_tag not in subject_tags: | |
| subject_tags.append(period_tag) | |
| break | |
| # Add languages as topics if available | |
| if 'languages' in result and result['languages']: | |
| for lang in result['languages']: | |
| if lang and lang not in subject_tags: | |
| lang_tag = f"{lang} Language" | |
| subject_tags.append(lang_tag) | |
| # Add preprocessing information as tags if preprocessing was applied | |
| if preprocessing_options: | |
| preprocessing_methods = [] | |
| if preprocessing_options.get("document_type", "standard") != "standard": | |
| doc_type = preprocessing_options["document_type"].capitalize() | |
| preprocessing_tag = f"Enhanced ({doc_type})" | |
| if preprocessing_tag not in subject_tags: | |
| subject_tags.append(preprocessing_tag) | |
| if preprocessing_options.get("grayscale", False): | |
| preprocessing_methods.append("Grayscale") | |
| if preprocessing_options.get("denoise", False): | |
| preprocessing_methods.append("Denoised") | |
| if preprocessing_options.get("contrast", 0) != 0: | |
| contrast_val = preprocessing_options.get("contrast", 0) | |
| if contrast_val > 0: | |
| preprocessing_methods.append("Contrast Enhanced") | |
| else: | |
| preprocessing_methods.append("Contrast Reduced") | |
| if preprocessing_options.get("rotation", 0) != 0: | |
| preprocessing_methods.append("Rotated") | |
| # Add a combined preprocessing tag if methods were applied | |
| if preprocessing_methods: | |
| prep_tag = "Preprocessed" | |
| if prep_tag not in subject_tags: | |
| subject_tags.append(prep_tag) | |
| # Add the specific method as a tag if only one was used | |
| if len(preprocessing_methods) == 1: | |
| method_tag = preprocessing_methods[0] | |
| if method_tag not in subject_tags: | |
| subject_tags.append(method_tag) | |
| except Exception as e: | |
| logger.warning(f"Error generating subject tags: {str(e)}") | |
| # Fallback tags if extraction fails | |
| if not subject_tags: | |
| subject_tags = DEFAULT_TAGS.copy() | |
| # Ensure we have at least 3 tags | |
| while len(subject_tags) < 3: | |
| for tag in DEFAULT_TAGS: | |
| if tag not in subject_tags: | |
| subject_tags.append(tag) | |
| break | |
| else: | |
| # If all default tags are already used, add generic ones | |
| for tag in GENERIC_TAGS: | |
| if tag not in subject_tags: | |
| subject_tags.append(tag) | |
| break | |
| else: | |
| # If we still can't add any more tags, break the loop | |
| break | |
| return subject_tags | |