Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| PDFOCR - Module for processing PDF files with OCR and extracting structured data. | |
| Provides robust PDF to image conversion before OCR processing. | |
| """ | |
| import json | |
| import os | |
| import tempfile | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Dict, List, Union, Tuple, Any | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger("pdf_ocr") | |
| # Import StructuredOCR for OCR processing | |
| from structured_ocr import StructuredOCR | |
| class PDFConversionResult: | |
| """Class to hold results of PDF to image conversion.""" | |
| def __init__(self, | |
| success: bool, | |
| images: List[Path] = None, | |
| error: str = None, | |
| page_count: int = 0, | |
| temp_files: List[str] = None): | |
| """Initialize the conversion result. | |
| Args: | |
| success: Whether the conversion was successful | |
| images: List of paths to the converted images | |
| error: Error message if conversion failed | |
| page_count: Total number of pages in the PDF | |
| temp_files: List of temporary files that should be cleaned up | |
| """ | |
| self.success = success | |
| self.images = images or [] | |
| self.error = error | |
| self.page_count = page_count | |
| self.temp_files = temp_files or [] | |
| def __bool__(self): | |
| """Enable boolean evaluation of the result.""" | |
| return self.success | |
| def cleanup(self): | |
| """Clean up any temporary files created during conversion.""" | |
| for temp_file in self.temp_files: | |
| try: | |
| if os.path.exists(temp_file): | |
| os.unlink(temp_file) | |
| logger.debug(f"Removed temporary file: {temp_file}") | |
| except Exception as e: | |
| logger.warning(f"Failed to remove temporary file {temp_file}: {e}") | |
| self.temp_files = [] | |
| class PDFOCR: | |
| """Class for processing PDF files with OCR and extracting structured data.""" | |
| def __init__(self, api_key=None): | |
| """Initialize the PDF OCR processor.""" | |
| self.processor = StructuredOCR(api_key=api_key) | |
| self.temp_files = [] | |
| def __del__(self): | |
| """Clean up resources when object is destroyed.""" | |
| self.cleanup() | |
| def cleanup(self): | |
| """Clean up any temporary files.""" | |
| for temp_file in self.temp_files: | |
| try: | |
| if os.path.exists(temp_file): | |
| os.unlink(temp_file) | |
| logger.debug(f"Removed temporary file: {temp_file}") | |
| except Exception as e: | |
| logger.warning(f"Failed to remove temporary file {temp_file}: {e}") | |
| self.temp_files = [] | |
| def convert_pdf_to_images(self, | |
| pdf_path: Union[str, Path], | |
| dpi: int = 200, | |
| max_pages: Optional[int] = None, | |
| page_numbers: Optional[List[int]] = None) -> PDFConversionResult: | |
| """ | |
| Convert a PDF file to images. | |
| Args: | |
| pdf_path: Path to the PDF file | |
| dpi: DPI for the output images | |
| max_pages: Maximum number of pages to convert (None for all) | |
| page_numbers: Specific page numbers to convert (1-based indexing) | |
| Returns: | |
| PDFConversionResult object with conversion results | |
| """ | |
| pdf_path = Path(pdf_path) | |
| if not pdf_path.exists(): | |
| return PDFConversionResult( | |
| success=False, | |
| error=f"PDF file not found: {pdf_path}" | |
| ) | |
| # Check file size | |
| file_size_mb = pdf_path.stat().st_size / (1024 * 1024) | |
| logger.info(f"PDF size: {file_size_mb:.2f} MB") | |
| try: | |
| # Import pdf2image for conversion | |
| import pdf2image | |
| # Initialize list for temporary files | |
| temp_files = [] | |
| # Optimize conversion parameters based on file size | |
| thread_count = min(4, os.cpu_count() or 2) | |
| # First, determine total pages in the document | |
| logger.info("Determining PDF page count...") | |
| try: | |
| # Use a lightweight approach with multi-threading for faster processing | |
| pdf_info = pdf2image.convert_from_path( | |
| pdf_path, | |
| dpi=72, # Low DPI just for info | |
| first_page=1, | |
| last_page=1, | |
| size=(100, 100), # Tiny image to save memory | |
| fmt="jpeg", | |
| thread_count=thread_count, | |
| output_file=None | |
| ) | |
| # Get page count from poppler info if available | |
| if hasattr(pdf_info, 'n_pages'): | |
| total_pages = pdf_info.n_pages | |
| else: | |
| # Try a different approach to get page count | |
| try: | |
| from pypdf import PdfReader | |
| reader = PdfReader(pdf_path) | |
| total_pages = len(reader.pages) | |
| except: | |
| total_pages = 1 | |
| logger.warning("Could not determine total page count, assuming 1 page") | |
| except Exception as e: | |
| logger.warning(f"Failed to determine page count: {e}") | |
| total_pages = 1 | |
| logger.info(f"PDF has {total_pages} total pages") | |
| # Determine which pages to process | |
| pages_to_process = [] | |
| # If specific pages are requested, use those | |
| if page_numbers and any(1 <= p <= total_pages for p in page_numbers): | |
| pages_to_process = [p for p in page_numbers if 1 <= p <= total_pages] | |
| logger.info(f"Converting {len(pages_to_process)} specified pages: {pages_to_process}") | |
| # If max_pages is set, limit to that number | |
| elif max_pages and max_pages < total_pages: | |
| pages_to_process = list(range(1, max_pages + 1)) | |
| logger.info(f"Converting first {max_pages} pages of {total_pages} total") | |
| # Otherwise convert all pages if reasonable count | |
| else: | |
| pages_to_process = list(range(1, total_pages + 1)) | |
| logger.info(f"Converting all {total_pages} pages") | |
| # Convert PDF to images | |
| converted_images = [] | |
| # Process in batches for better memory management | |
| batch_size = min(5, len(pages_to_process)) # Process up to 5 pages at a time | |
| for i in range(0, len(pages_to_process), batch_size): | |
| batch_pages = pages_to_process[i:i+batch_size] | |
| logger.info(f"Converting batch of pages {batch_pages}") | |
| # Convert this batch of pages | |
| try: | |
| batch_images = pdf2image.convert_from_path( | |
| pdf_path, | |
| dpi=dpi, | |
| first_page=min(batch_pages), | |
| last_page=max(batch_pages), | |
| thread_count=thread_count, | |
| fmt="jpeg" | |
| ) | |
| # Map converted images to requested page numbers | |
| for idx, page_num in enumerate(range(min(batch_pages), max(batch_pages) + 1)): | |
| if page_num in pages_to_process and idx < len(batch_images): | |
| # Save the image to a temporary file | |
| img_temp_path = tempfile.NamedTemporaryFile(suffix=f'_page{page_num}.jpg', delete=False).name | |
| batch_images[idx].save(img_temp_path, format='JPEG', quality=95) | |
| # Add to results and track the temp file | |
| converted_images.append((page_num, Path(img_temp_path))) | |
| temp_files.append(img_temp_path) | |
| except Exception as e: | |
| logger.error(f"Failed to convert batch {batch_pages}: {e}") | |
| # Continue with other batches | |
| # Sort by page number to ensure correct order | |
| converted_images.sort(key=lambda x: x[0]) | |
| # Extract just the image paths in correct page order | |
| image_paths = [img_path for _, img_path in converted_images] | |
| if not image_paths: | |
| # No images were successfully converted | |
| return PDFConversionResult( | |
| success=False, | |
| error="Failed to convert PDF to images", | |
| page_count=total_pages, | |
| temp_files=temp_files | |
| ) | |
| # Store temp files for later cleanup | |
| self.temp_files.extend(temp_files) | |
| # Return successful result | |
| return PDFConversionResult( | |
| success=True, | |
| images=image_paths, | |
| page_count=total_pages, | |
| temp_files=temp_files | |
| ) | |
| except ImportError: | |
| return PDFConversionResult( | |
| success=False, | |
| error="pdf2image module not available. Please install with: pip install pdf2image" | |
| ) | |
| except Exception as e: | |
| logger.error(f"PDF conversion error: {str(e)}") | |
| return PDFConversionResult( | |
| success=False, | |
| error=f"Failed to convert PDF to images: {str(e)}" | |
| ) | |
| def process_pdf(self, pdf_path, use_vision=True, max_pages=None, custom_pages=None, custom_prompt=None): | |
| """ | |
| Process a PDF file with OCR and extract structured data. | |
| Args: | |
| pdf_path: Path to the PDF file | |
| use_vision: Whether to use vision model for improved analysis | |
| max_pages: Maximum number of pages to process | |
| custom_pages: Specific page numbers to process (1-based indexing) | |
| custom_prompt: Custom instructions for processing | |
| Returns: | |
| Dictionary with structured OCR results | |
| """ | |
| pdf_path = Path(pdf_path) | |
| if not pdf_path.exists(): | |
| raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
| # Convert page numbers to list if provided | |
| page_numbers = None | |
| if custom_pages: | |
| if isinstance(custom_pages, (list, tuple)): | |
| page_numbers = custom_pages | |
| else: | |
| try: | |
| # Try to parse as comma-separated string | |
| page_numbers = [int(p.strip()) for p in str(custom_pages).split(',')] | |
| except: | |
| logger.warning(f"Invalid custom_pages format: {custom_pages}. Should be list or comma-separated string.") | |
| # First try our optimized PDF to image conversion | |
| conversion_result = self.convert_pdf_to_images( | |
| pdf_path=pdf_path, | |
| max_pages=max_pages, | |
| page_numbers=page_numbers | |
| ) | |
| if conversion_result.success and conversion_result.images: | |
| logger.info(f"Successfully converted PDF to {len(conversion_result.images)} images") | |
| # Determine if we need to add PDF-specific context to the prompt | |
| modified_prompt = custom_prompt | |
| if not modified_prompt: | |
| modified_prompt = f"This is a multi-page PDF document with {conversion_result.page_count} total pages, of which {len(conversion_result.images)} were processed." | |
| elif "pdf" not in modified_prompt.lower() and "multi-page" not in modified_prompt.lower(): | |
| modified_prompt += f" This is a multi-page PDF document with {conversion_result.page_count} total pages, of which {len(conversion_result.images)} were processed." | |
| try: | |
| # First process the first page with vision if requested | |
| first_page_result = self.processor.process_file( | |
| file_path=conversion_result.images[0], | |
| file_type="image", | |
| use_vision=use_vision, | |
| custom_prompt=modified_prompt | |
| ) | |
| # Process additional pages if available | |
| all_pages_text = [] | |
| all_languages = set() | |
| # Extract text from first page | |
| if 'ocr_contents' in first_page_result and 'raw_text' in first_page_result['ocr_contents']: | |
| all_pages_text.append(first_page_result['ocr_contents']['raw_text']) | |
| # Track languages from first page | |
| if 'languages' in first_page_result: | |
| for lang in first_page_result['languages']: | |
| all_languages.add(str(lang)) | |
| # Process additional pages if any | |
| for i, img_path in enumerate(conversion_result.images[1:], 1): | |
| try: | |
| # Simple text extraction for additional pages | |
| page_result = self.processor.process_file( | |
| file_path=img_path, | |
| file_type="image", | |
| use_vision=False, # Use simpler processing for additional pages | |
| custom_prompt=f"This is page {i+1} of a {conversion_result.page_count}-page document." | |
| ) | |
| # Extract text | |
| if 'ocr_contents' in page_result and 'raw_text' in page_result['ocr_contents']: | |
| all_pages_text.append(page_result['ocr_contents']['raw_text']) | |
| # Track languages | |
| if 'languages' in page_result: | |
| for lang in page_result['languages']: | |
| all_languages.add(str(lang)) | |
| except Exception as e: | |
| logger.warning(f"Error processing page {i+1}: {e}") | |
| # Combine all text into a single document | |
| combined_text = "\n\n".join(all_pages_text) | |
| # Update the first page result with combined data | |
| if 'ocr_contents' in first_page_result: | |
| first_page_result['ocr_contents']['raw_text'] = combined_text | |
| # Update languages with all detected languages | |
| if all_languages: | |
| first_page_result['languages'] = list(all_languages) | |
| # Add PDF metadata | |
| first_page_result['file_name'] = pdf_path.name | |
| first_page_result['file_type'] = "pdf" | |
| first_page_result['total_pages'] = conversion_result.page_count | |
| first_page_result['processed_pages'] = len(conversion_result.images) | |
| # Add conversion info | |
| first_page_result['pdf_conversion'] = { | |
| "method": "pdf2image", | |
| "pages_converted": len(conversion_result.images), | |
| "pages_requested": len(page_numbers) if page_numbers else (max_pages or conversion_result.page_count) | |
| } | |
| return first_page_result | |
| except Exception as e: | |
| logger.error(f"Error processing converted images: {e}") | |
| # Fall back to direct processing via StructuredOCR | |
| finally: | |
| # Clean up temporary files | |
| conversion_result.cleanup() | |
| # If conversion failed or processing the images failed, fall back to direct processing | |
| logger.info(f"Using direct StructuredOCR processing for PDF") | |
| return self.processor.process_file( | |
| file_path=pdf_path, | |
| file_type="pdf", | |
| use_vision=use_vision, | |
| max_pages=max_pages, | |
| custom_pages=custom_pages, | |
| custom_prompt=custom_prompt | |
| ) | |
| def save_json_output(self, pdf_path, output_path, use_vision=True, max_pages=None, custom_pages=None, custom_prompt=None): | |
| """ | |
| Process a PDF file and save the structured output as JSON. | |
| Args: | |
| pdf_path: Path to the PDF file | |
| output_path: Path where to save the JSON output | |
| use_vision: Whether to use vision model for improved analysis | |
| max_pages: Maximum number of pages to process | |
| custom_pages: Specific page numbers to process (1-based indexing) | |
| custom_prompt: Custom instructions for processing | |
| Returns: | |
| Path to the saved JSON file | |
| """ | |
| # Process the PDF | |
| result = self.process_pdf( | |
| pdf_path, | |
| use_vision=use_vision, | |
| max_pages=max_pages, | |
| custom_pages=custom_pages, | |
| custom_prompt=custom_prompt | |
| ) | |
| # Save the result to JSON | |
| output_path = Path(output_path) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(output_path, 'w') as f: | |
| json.dump(result, f, indent=2) | |
| return output_path | |
| # For testing directly | |
| if __name__ == "__main__": | |
| import sys | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Process PDF files with OCR.") | |
| parser.add_argument("pdf_path", help="Path to the PDF file to process") | |
| parser.add_argument("--output", "-o", help="Path to save the output JSON") | |
| parser.add_argument("--no-vision", dest="use_vision", action="store_false", | |
| help="Disable vision model for processing") | |
| parser.add_argument("--max-pages", type=int, help="Maximum number of pages to process") | |
| parser.add_argument("--pages", help="Specific pages to process (comma-separated)") | |
| parser.add_argument("--prompt", help="Custom prompt for processing") | |
| args = parser.parse_args() | |
| processor = PDFOCR() | |
| # Parse custom pages if provided | |
| custom_pages = None | |
| if args.pages: | |
| try: | |
| custom_pages = [int(p.strip()) for p in args.pages.split(',')] | |
| except: | |
| print(f"Error parsing pages: {args.pages}. Should be comma-separated list of numbers.") | |
| sys.exit(1) | |
| if args.output: | |
| result_path = processor.save_json_output( | |
| args.pdf_path, | |
| args.output, | |
| use_vision=args.use_vision, | |
| max_pages=args.max_pages, | |
| custom_pages=custom_pages, | |
| custom_prompt=args.prompt | |
| ) | |
| print(f"Results saved to: {result_path}") | |
| else: | |
| result = processor.process_pdf( | |
| args.pdf_path, | |
| use_vision=args.use_vision, | |
| max_pages=args.max_pages, | |
| custom_pages=custom_pages, | |
| custom_prompt=args.prompt | |
| ) | |
| print(json.dumps(result, indent=2)) | |