Spaces:
Running
Running
| import streamlit as st | |
| import os | |
| import io | |
| import base64 | |
| import logging | |
| import re | |
| from datetime import datetime | |
| from pathlib import Path | |
| import json | |
| # Define exports | |
| __all__ = [ | |
| 'ProgressReporter', | |
| 'create_sidebar_options', | |
| 'create_file_uploader', | |
| 'display_document_with_images', | |
| 'display_previous_results', | |
| 'display_about_tab', | |
| 'display_results' # Re-export from utils.ui_utils | |
| ] | |
| from constants import ( | |
| DOCUMENT_TYPES, | |
| DOCUMENT_LAYOUTS, | |
| CUSTOM_PROMPT_TEMPLATES, | |
| LAYOUT_PROMPT_ADDITIONS, | |
| DEFAULT_PDF_DPI, | |
| MIN_PDF_DPI, | |
| MAX_PDF_DPI, | |
| DEFAULT_MAX_PAGES, | |
| PERFORMANCE_MODES, | |
| PREPROCESSING_DOC_TYPES, | |
| ROTATION_OPTIONS | |
| ) | |
| from utils.text_utils import format_ocr_text, clean_raw_text, format_markdown_text # Import from text_utils | |
| from utils.content_utils import ( | |
| classify_document_content, | |
| extract_document_text, | |
| extract_image_description | |
| ) | |
| from utils.ui_utils import display_results | |
| from preprocessing import preprocess_image | |
| class ProgressReporter: | |
| """Class to handle progress reporting in the UI""" | |
| def __init__(self, placeholder): | |
| self.placeholder = placeholder | |
| self.progress_bar = None | |
| self.status_text = None | |
| def setup(self): | |
| """Setup the progress components""" | |
| with self.placeholder.container(): | |
| self.progress_bar = st.progress(0) | |
| self.status_text = st.empty() | |
| return self | |
| def update(self, percent, status_text): | |
| """Update the progress bar and status text""" | |
| if self.progress_bar is not None: | |
| self.progress_bar.progress(percent / 100) | |
| if self.status_text is not None: | |
| self.status_text.text(status_text) | |
| def complete(self, success=True): | |
| """Complete the progress reporting""" | |
| if success: | |
| if self.progress_bar is not None: | |
| self.progress_bar.progress(100) | |
| if self.status_text is not None: | |
| self.status_text.text("Processing complete!") | |
| else: | |
| if self.status_text is not None: | |
| self.status_text.text("Processing failed.") | |
| # Clear the progress components after a delay | |
| import time | |
| time.sleep(0.8) # Short delay to show completion | |
| if self.progress_bar is not None: | |
| self.progress_bar.empty() | |
| if self.status_text is not None: | |
| self.status_text.empty() | |
| def create_sidebar_options(): | |
| """Create and return sidebar options""" | |
| with st.sidebar: | |
| st.markdown("## OCR Settings") | |
| # Create a container for the sidebar options | |
| with st.container(): | |
| # Default to using vision model (removed selection from UI) | |
| use_vision = True | |
| # Document type selection | |
| doc_type = st.selectbox("Document Type", DOCUMENT_TYPES, | |
| help="Select the type of document you're processing for better results") | |
| # Document layout | |
| doc_layout = st.selectbox("Document Layout", DOCUMENT_LAYOUTS, | |
| help="Select the layout of your document") | |
| # Initialize preprocessing variables with default values | |
| grayscale = False | |
| denoise = False | |
| contrast = 0 | |
| rotation = 0 | |
| use_segmentation = False | |
| # Custom prompt | |
| custom_prompt = "" | |
| # Get the template for the selected document type if not auto-detect | |
| if doc_type != DOCUMENT_TYPES[0]: | |
| prompt_template = CUSTOM_PROMPT_TEMPLATES.get(doc_type, "") | |
| # Add layout information if not standard | |
| if doc_layout != DOCUMENT_LAYOUTS[0]: # Not standard layout | |
| layout_addition = LAYOUT_PROMPT_ADDITIONS.get(doc_layout, "") | |
| if layout_addition: | |
| prompt_template += " " + layout_addition | |
| # Set the custom prompt | |
| custom_prompt = prompt_template | |
| # Allow user to edit the prompt (always visible) | |
| custom_prompt = st.text_area("Custom Processing Instructions", value=custom_prompt, | |
| help="Customize the instructions for processing this document", | |
| height=80) | |
| # Image preprocessing options (always visible) | |
| st.markdown("### Image Preprocessing") | |
| # Grayscale conversion | |
| grayscale = st.checkbox("Convert to Grayscale", | |
| value=True, | |
| help="Convert color images to grayscale for better text recognition") | |
| # Light denoising option | |
| denoise = st.checkbox("Light Denoising", | |
| value=True, | |
| help="Apply gentle denoising to improve text clarity") | |
| # Contrast adjustment | |
| contrast = st.slider("Contrast Adjustment", | |
| min_value=-20, | |
| max_value=20, | |
| value=5, | |
| step=5, | |
| help="Adjust image contrast (limited range)") | |
| # Initialize rotation (keeping it set to 0) | |
| rotation = 0 | |
| use_segmentation = False | |
| # Create preprocessing options dictionary | |
| # Map UI document types to preprocessing document types | |
| doc_type_for_preprocessing = "standard" | |
| if "Handwritten" in doc_type: | |
| doc_type_for_preprocessing = "handwritten" | |
| elif "Newspaper" in doc_type or "Magazine" in doc_type: | |
| doc_type_for_preprocessing = "newspaper" | |
| elif "Book" in doc_type or "Publication" in doc_type: | |
| doc_type_for_preprocessing = "book" # Match the actual preprocessing type | |
| preprocessing_options = { | |
| "document_type": doc_type_for_preprocessing, | |
| "grayscale": grayscale, | |
| "denoise": denoise, | |
| "contrast": contrast, | |
| "rotation": rotation | |
| } | |
| # PDF-specific options | |
| st.markdown("### PDF Options") | |
| max_pages = st.number_input("Maximum Pages to Process", | |
| min_value=1, | |
| max_value=20, | |
| value=DEFAULT_MAX_PAGES, | |
| help="Limit the number of pages to process (for multi-page PDFs)") | |
| # Set default values for removed options | |
| pdf_dpi = DEFAULT_PDF_DPI | |
| pdf_rotation = 0 | |
| # Create options dictionary | |
| options = { | |
| "use_vision": use_vision, | |
| "perf_mode": "Quality", # Default to Quality, removed performance mode option | |
| "pdf_dpi": pdf_dpi, | |
| "max_pages": max_pages, | |
| "pdf_rotation": pdf_rotation, | |
| "custom_prompt": custom_prompt, | |
| "preprocessing_options": preprocessing_options, | |
| "use_segmentation": use_segmentation if 'use_segmentation' in locals() else False | |
| } | |
| return options | |
| def create_file_uploader(): | |
| """Create and return a file uploader""" | |
| # Add app description | |
| st.markdown(f'<div style="display: flex; align-items: center; gap: 10px;"><div style="font-size: 32px;">📜</div><div><h2 style="margin: 0; padding: 10px 0 0 0;">Historical OCR</h2></div></div>', unsafe_allow_html=True) | |
| st.markdown("<p style='font-size: 0.8em; color: #666; text-align: left;'>Made possible by Mistral AI</p>", unsafe_allow_html=True) | |
| # Add project framing | |
| st.markdown(""" | |
| This tool assists scholars in historical research by extracting text from challenging documents. While it may not achieve 100% accuracy, it helps navigate: | |
| - **Historical newspapers** with complex layouts | |
| - **Handwritten documents** from various periods | |
| - **Photos of archival materials** | |
| Upload a document to begin, or explore the examples. | |
| """) | |
| # Create file uploader with a more concise label | |
| uploaded_file = st.file_uploader( | |
| "Select file", | |
| type=["pdf", "png", "jpg"], | |
| help="Upload a PDF or image file for OCR processing" | |
| ) | |
| return uploaded_file | |
| def display_document_with_images(result): | |
| """Display document with images""" | |
| # Check for pages_data first | |
| if 'pages_data' in result and result['pages_data']: | |
| pages_data = result['pages_data'] | |
| # If pages_data not available, try to extract from raw_response_data | |
| elif 'raw_response_data' in result and isinstance(result['raw_response_data'], dict) and 'pages' in result['raw_response_data']: | |
| # Build pages_data from raw_response_data | |
| pages_data = [] | |
| raw_pages = result['raw_response_data']['pages'] | |
| for page_idx, page in enumerate(raw_pages): | |
| if not isinstance(page, dict): | |
| continue | |
| page_data = { | |
| 'page_number': page_idx + 1, | |
| 'markdown': page.get('markdown', ''), | |
| 'images': [] | |
| } | |
| # Extract images if present | |
| if 'images' in page and isinstance(page['images'], list): | |
| for img_idx, img in enumerate(page['images']): | |
| if isinstance(img, dict) and ('base64' in img or 'image_base64' in img): | |
| img_base64 = img.get('image_base64', img.get('base64', '')) | |
| if img_base64: | |
| page_data['images'].append({ | |
| 'id': img.get('id', f"img_{page_idx}_{img_idx}"), | |
| 'image_base64': img_base64 | |
| }) | |
| if page_data['markdown'] or page_data['images']: | |
| pages_data.append(page_data) | |
| else: | |
| st.info("No image data available.") | |
| return | |
| # Display each page | |
| for i, page_data in enumerate(pages_data): | |
| st.markdown(f"### Page {i+1}") | |
| # Display only the image (removed text column) | |
| # Display the image - check multiple possible field names | |
| image_displayed = False | |
| # Try 'image_data' field first | |
| if 'image_data' in page_data: | |
| try: | |
| # Convert base64 to image | |
| image_data = base64.b64decode(page_data['image_data']) | |
| st.image(io.BytesIO(image_data), use_container_width=True) | |
| image_displayed = True | |
| except Exception as e: | |
| st.error(f"Error displaying image from image_data: {str(e)}") | |
| # Try 'images' array if image_data didn't work | |
| if not image_displayed and 'images' in page_data and len(page_data['images']) > 0: | |
| for img in page_data['images']: | |
| if 'image_base64' in img: | |
| try: | |
| st.image(img['image_base64'], use_container_width=True) | |
| image_displayed = True | |
| break | |
| except Exception as e: | |
| st.error(f"Error displaying image from images array: {str(e)}") | |
| # Try alternative image source if still not displayed | |
| if not image_displayed and 'raw_response_data' in result: | |
| raw_data = result['raw_response_data'] | |
| if isinstance(raw_data, dict) and 'pages' in raw_data: | |
| for raw_page in raw_data['pages']: | |
| if isinstance(raw_page, dict) and 'images' in raw_page: | |
| for img in raw_page['images']: | |
| if isinstance(img, dict) and 'base64' in img: | |
| st.image(img['base64'], use_container_width=True) | |
| st.caption("Image from OCR response") | |
| image_displayed = True | |
| break | |
| if image_displayed: | |
| break | |
| if not image_displayed: | |
| st.info("No image available for this page.") | |
| # Extract and display alt text if available | |
| page_text = "" | |
| if 'text' in page_data: | |
| page_text = page_data['text'] | |
| elif 'markdown' in page_data: | |
| page_text = page_data['markdown'] | |
| if page_text and page_text.startswith("![") and page_text.endswith(")"): | |
| try: | |
| alt_text = page_text[2:page_text.index(']')] | |
| if alt_text and len(alt_text) > 5: # Only show if alt text is meaningful | |
| st.caption(f"Image description: {alt_text}") | |
| except: | |
| pass | |
| def display_previous_results(): | |
| """Display previous results tab content in a simplified, structured view""" | |
| # Use a simple header without the button column | |
| st.header("Previous Results") | |
| # Display previous results if available | |
| if not st.session_state.previous_results: | |
| st.markdown(""" | |
| <div style="text-align: center; padding: 30px 20px; background-color: #f8f9fa; border-radius: 6px; margin-top: 10px;"> | |
| <div style="font-size: 36px; margin-bottom: 15px;">📄</div> | |
| <h3="margin-bottom: 16px; font-weight: 500;">No Previous Results</h3> | |
| <p style="font-size: 14px; color: #666;">Process a document to see your results history.</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| else: | |
| # Prepare zip download outside of the UI flow | |
| try: | |
| # Create download button for all results | |
| from utils.image_utils import create_results_zip_in_memory | |
| zip_data = create_results_zip_in_memory(st.session_state.previous_results) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Simplified filename | |
| zip_filename = f"ocr_results_{timestamp}.zip" | |
| # Encode the zip data for direct download link | |
| zip_b64 = base64.b64encode(zip_data).decode() | |
| # Add styled download tag in the metadata section | |
| download_html = '<div style="display: flex; align-items: center; margin: 0.5rem 0; flex-wrap: wrap;">' | |
| download_html += '<div style="margin-right: 0.3rem; font-weight: bold;">Download:</div>' | |
| download_html += f'<a href="data:application/zip;base64,{zip_b64}" download="{zip_filename}" class="subject-tag tag-download">All Results</a>' | |
| download_html += '</div>' | |
| st.markdown(download_html, unsafe_allow_html=True) | |
| except Exception: | |
| # Silent fail - no error message to keep UI clean | |
| pass | |
| # Create a cleaner, more minimal grid for results using Streamlit columns | |
| # Calculate number of columns based on screen width - more responsive | |
| num_columns = 2 # Two columns for most screens | |
| # Create rows of result cards | |
| for i in range(0, len(st.session_state.previous_results), num_columns): | |
| # Create a row of columns | |
| cols = st.columns(num_columns) | |
| # Fill each column with a result card | |
| for j in range(num_columns): | |
| index = i + j | |
| if index < len(st.session_state.previous_results): | |
| result = st.session_state.previous_results[index] | |
| # Get basic info for the card | |
| file_name = result.get("file_name", f"Document {index+1}") | |
| timestamp = result.get("timestamp", "") | |
| # Determine file type icon | |
| if file_name.lower().endswith(".pdf"): | |
| icon = "📄" | |
| elif any(file_name.lower().endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".gif"]): | |
| icon = "🖼️" | |
| else: | |
| icon = "📝" | |
| # Display a simplified card in each column | |
| with cols[j]: | |
| # Use a container for better styling control | |
| with st.container(): | |
| # Create visually cleaner card with less vertical space | |
| st.markdown(f""" | |
| <div style="padding: 10px; border: 1px solid #e0e0e0; border-radius: 6px; margin-bottom: 10px;"> | |
| <div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 5px;"> | |
| <div style="font-weight: 500; font-size: 14px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap;">{icon} {file_name}</div> | |
| <div style="color: #666; font-size: 12px;">{timestamp.split()[0] if timestamp else ""}</div> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Add a simple button below each card | |
| if st.button(f"View", key=f"view_{index}", help=f"View {file_name}"): | |
| st.session_state.selected_previous_result = st.session_state.previous_results[index] | |
| st.rerun() | |
| # Display the selected result if available | |
| if 'selected_previous_result' in st.session_state and st.session_state.selected_previous_result: | |
| selected_result = st.session_state.selected_previous_result | |
| # Draw a separator between results list and selected document | |
| st.markdown("<hr style='margin: 20px 0 15px 0; border: none; height: 1px; background-color: #eee;'>", unsafe_allow_html=True) | |
| # Create a cleaner header for the selected document | |
| file_name = selected_result.get('file_name', 'Document') | |
| st.subheader(f"{file_name}") | |
| # Add a simple back button at the top | |
| if st.button("← Back to Results", key="back_to_results"): | |
| if 'selected_previous_result' in st.session_state: | |
| del st.session_state.selected_previous_result | |
| st.session_state.perform_reset = True | |
| st.rerun() | |
| # Simplified metadata display - just one line with essential info | |
| meta_html = '<div style="display: flex; flex-wrap: wrap; gap: 12px; margin: 8px 0 15px 0; font-size: 14px; color: #666;">' | |
| # Add timestamp | |
| if 'timestamp' in selected_result: | |
| meta_html += f'<div>{selected_result["timestamp"]}</div>' | |
| # Add languages if available (simplified) | |
| if 'languages' in selected_result and selected_result['languages']: | |
| languages = [lang for lang in selected_result['languages'] if lang is not None] | |
| if languages: | |
| meta_html += f'<div>Language: {", ".join(languages)}</div>' | |
| # Add page count if available (simplified) | |
| if 'limited_pages' in selected_result: | |
| meta_html += f'<div>Pages: {selected_result["limited_pages"]["processed"]}/{selected_result["limited_pages"]["total"]}</div>' | |
| meta_html += '</div>' | |
| st.markdown(meta_html, unsafe_allow_html=True) | |
| # Simplified tabs - using the same format as main view | |
| has_images = selected_result.get('has_images', False) | |
| if has_images: | |
| view_tabs = st.tabs(["Document Content", "Raw JSON", "Images"]) | |
| view_tab1, view_tab2, view_tab3 = view_tabs | |
| else: | |
| view_tabs = st.tabs(["Document Content", "Raw JSON"]) | |
| view_tab1, view_tab2 = view_tabs | |
| view_tab3 = None | |
| # First tab - Document Content (simplified structured view) | |
| with view_tab1: | |
| # Display content in a cleaner, more streamlined format | |
| if 'ocr_contents' in selected_result and isinstance(selected_result['ocr_contents'], dict): | |
| # Create a more focused list of important sections | |
| priority_sections = ["title", "content", "transcript", "summary"] | |
| displayed_sections = set() | |
| # First display priority sections | |
| for section in priority_sections: | |
| if section in selected_result['ocr_contents'] and selected_result['ocr_contents'][section]: | |
| content = selected_result['ocr_contents'][section] | |
| if isinstance(content, str) and content.strip(): | |
| # Only add a subheader for meaningful section names, not raw_text | |
| if section != "raw_text": | |
| st.markdown(f"##### {section.replace('_', ' ').title()}") | |
| # Format and display content | |
| formatted_content = format_ocr_text(content, for_display=True) | |
| st.markdown(formatted_content) | |
| displayed_sections.add(section) | |
| # Then display any remaining sections not already shown | |
| for section, content in selected_result['ocr_contents'].items(): | |
| if (section not in displayed_sections and | |
| section not in ['error', 'partial_text'] and | |
| content): | |
| st.markdown(f"##### {section.replace('_', ' ').title()}") | |
| if isinstance(content, str): | |
| st.markdown(format_ocr_text(content, for_display=True)) | |
| elif isinstance(content, list): | |
| for item in content: | |
| st.markdown(f"- {item}") | |
| elif isinstance(content, dict): | |
| for k, v in content.items(): | |
| st.markdown(f"**{k}:** {v}") | |
| # Second tab - Raw JSON (simplified) | |
| with view_tab2: | |
| # Extract the relevant JSON data | |
| json_data = {} | |
| # Include important metadata | |
| for field in ['file_name', 'timestamp', 'processing_time', 'title', 'languages', 'topics', 'subjects', 'text',' raw_text']: | |
| if field in selected_result: | |
| json_data[field] = selected_result[field] | |
| # Include OCR contents | |
| if 'ocr_contents' in selected_result: | |
| json_data['ocr_contents'] = selected_result['ocr_contents'] | |
| # Format the JSON prettily | |
| json_str = json.dumps(json_data, indent=2) | |
| # Display in a monospace font with syntax highlighting | |
| st.code(json_str, language="json") | |
| # Third tab - Images (simplified) | |
| if has_images and view_tab3 is not None: | |
| with view_tab3: | |
| # Simplified image display | |
| if 'pages_data' in selected_result: | |
| for i, page_data in enumerate(selected_result['pages_data']): | |
| # Display each page | |
| if 'images' in page_data and len(page_data['images']) > 0: | |
| for img in page_data['images']: | |
| if 'image_base64' in img: | |
| st.image(img['image_base64'], use_container_width=True) | |
| # Get page text if available | |
| page_text = "" | |
| if 'markdown' in page_data: | |
| page_text = page_data['markdown'] | |
| # Display text if available | |
| if page_text: | |
| with st.expander(f"Page {i+1} Text", expanded=False): | |
| st.text(page_text) | |
| def display_about_tab(): | |
| """Display learn more tab content""" | |
| st.header("Learn More") | |
| # Add app description | |
| st.markdown(""" | |
| **Historical OCR** is a tailored academic tool for extracting text from historical documents, manuscripts, and printed materials. | |
| """) | |
| # Purpose section with consistent formatting | |
| st.markdown("### Purpose") | |
| st.markdown(""" | |
| This tool is designed to assist scholars in historical research by extracting text from challenging documents. | |
| While it may not achieve full accuracy for all materials, it serves as a tailored research aid for navigating | |
| historical documents, particularly: | |
| """) | |
| st.markdown(""" | |
| - **Historical newspapers** with complex layouts and aged text | |
| - **Handwritten documents** from various time periods | |
| - **Photos of archival materials** that may be difficult to read | |
| """) | |
| # Features section with consistent formatting | |
| st.markdown("### Features") | |
| st.markdown(""" | |
| - **Advanced Image Preprocessing**: Optimize historical documents for better OCR results | |
| - **Custom Document Type Processing**: Specialized handling for newspapers, letters, books, and more | |
| - **Editable Results**: Review and edit extracted text directly in the interface | |
| - **Structured Content Analysis**: Automatic organization of document content | |
| - **Multi-language Support**: Process documents in various languages | |
| - **PDF Processing**: Handle multi-page historical documents | |
| """) | |
| # How to Use section with consistent formatting | |
| st.markdown("### How to Use") | |
| st.markdown(""" | |
| 1. Upload a document (PDF or image) | |
| 2. Select the document type and adjust preprocessing options if needed | |
| 3. Add custom processing instructions for specialized documents | |
| 4. Process the document | |
| 5. Review, edit, and download the results | |
| """) | |
| # Technologies section with consistent formatting | |
| st.markdown("### Technologies") | |
| st.markdown(""" | |
| - OCR processing using Mistral AI's advanced document understanding capabilities | |
| - Image preprocessing with OpenCV | |
| - PDF handling with pdf2image | |
| - Web interface with Streamlit | |
| """) | |
| # Add version information | |
| st.markdown("**Version:** 2.0.0") | |