Spaces:
Sleeping
Sleeping
| """ | |
| Base64 to Image Decoder | |
| Decodes Base64 data to image files and optionally converts to MOL format using MolScribe | |
| """ | |
| import gradio as gr | |
| import base64 | |
| import json | |
| import tempfile | |
| import os | |
| import logging | |
| from io import BytesIO | |
| from typing import Optional, Tuple, List | |
| import zipfile | |
| # Import required libraries | |
| try: | |
| from PIL import Image | |
| import numpy as np | |
| import torch | |
| # MolScribe will be lazy loaded | |
| except ImportError as e: | |
| logging.error(f"Required library not found: {e}") | |
| print("Please install required dependencies:") | |
| print("pip install pillow") | |
| print("pip install numpy") | |
| print('pip install "gradio[mcp]"') | |
| print("pip install MolScribe") | |
| print("pip install torch") | |
| raise | |
| # Logging setup | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global variable to cache MolScribe model | |
| _molscribe_model = None | |
| def initialize_molscribe_model(): | |
| """Initialize MolScribe model (CPU)""" | |
| global _molscribe_model | |
| if _molscribe_model is not None: | |
| return _molscribe_model | |
| try: | |
| import numpy as np | |
| import torch | |
| from molscribe import MolScribe | |
| from huggingface_hub import hf_hub_download | |
| logger.info("Downloading MolScribe checkpoint...") | |
| ckpt_path = hf_hub_download('yujieq/MolScribe', 'swin_base_char_aux_1m.pth') | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| logger.info(f"Initializing MolScribe on {device}...") | |
| logger.info(f"NumPy version: {np.__version__}") | |
| logger.info(f"PyTorch version: {torch.__version__}") | |
| _molscribe_model = MolScribe(ckpt_path, device=device) | |
| logger.info("MolScribe model ready") | |
| return _molscribe_model | |
| except Exception as e: | |
| logger.error(f"Failed to initialize MolScribe: {e}") | |
| raise | |
| def run_molscribe_prediction(image_path: str): | |
| """Run MolScribe prediction""" | |
| try: | |
| import numpy as np | |
| import torch | |
| logger.info("Starting MolScribe prediction...") | |
| logger.info(f"NumPy available: {np.__version__}") | |
| logger.info(f"PyTorch available: {torch.__version__}") | |
| model = initialize_molscribe_model() | |
| logger.info("Model initialized, running prediction...") | |
| result = model.predict_image_file(image_path) | |
| logger.info(f"Prediction completed: {result}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"MolScribe prediction failed: {e}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| return {"error": str(e), "traceback": traceback.format_exc()} | |
| def decode_single_base64_image(base64_data: str, filename: Optional[str] = None, include_molscribe: bool = False) -> Tuple[Optional[Image.Image], str, dict]: | |
| """Decode a single Base64 image (with optional MolScribe conversion) | |
| Args: | |
| base64_data: Base64 encoded image data | |
| filename: Output filename (optional) | |
| include_molscribe: Whether to run MolScribe MOL conversion | |
| Returns: | |
| Tuple[PIL.Image, filename, metadata] | |
| """ | |
| try: | |
| # Clean up Base64 data | |
| base64_data = base64_data.strip() | |
| # Remove data URL prefix | |
| original_format = "unknown" | |
| if base64_data.startswith('data:image'): | |
| header_part = base64_data.split(',')[0] | |
| if 'image/' in header_part: | |
| format_part = header_part.split('image/')[1].split(';')[0] | |
| original_format = format_part.upper() | |
| base64_data = base64_data.split(',')[1] | |
| # Decode Base64 | |
| image_data = base64.b64decode(base64_data) | |
| # Convert to PIL Image | |
| image = Image.open(BytesIO(image_data)) | |
| # Generate filename | |
| if not filename: | |
| if original_format != "unknown": | |
| filename = f"decoded_image.{original_format.lower()}" | |
| else: | |
| filename = f"decoded_image.{image.format.lower() if image.format else 'png'}" | |
| # Collect metadata | |
| metadata = { | |
| "success": True, | |
| "filename": filename, | |
| "image_format": image.format or original_format, | |
| "image_mode": image.mode, | |
| "image_size": { | |
| "width": image.width, | |
| "height": image.height | |
| }, | |
| "original_data_size": len(base64_data), | |
| "decoded_data_size": len(image_data) | |
| } | |
| # Run MolScribe conversion if requested | |
| if include_molscribe: | |
| try: | |
| # Preprocess image for MolScribe | |
| processed_image = image | |
| # Convert RGBA to RGB (handle transparency with white background) | |
| if processed_image.mode == 'RGBA': | |
| background = Image.new('RGB', processed_image.size, (255, 255, 255)) | |
| background.paste(processed_image, mask=processed_image.split()[-1]) | |
| processed_image = background | |
| elif processed_image.mode != 'RGB': | |
| processed_image = processed_image.convert('RGB') | |
| # Resize if too small (MolScribe recommended minimum) | |
| min_size = 224 | |
| if processed_image.width < min_size or processed_image.height < min_size: | |
| scale_factor = max(min_size / processed_image.width, min_size / processed_image.height) | |
| new_width = int(processed_image.width * scale_factor) | |
| new_height = int(processed_image.height * scale_factor) | |
| processed_image = processed_image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| logger.info(f"Resized image from {image.width}x{image.height} to {new_width}x{new_height}") | |
| # Save to temporary file | |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file: | |
| processed_image.save(temp_file.name, 'PNG') | |
| temp_image_path = temp_file.name | |
| try: | |
| # Run MolScribe prediction | |
| logger.info(f"Running MolScribe prediction for {filename}...") | |
| result = run_molscribe_prediction(temp_image_path) | |
| if result and not isinstance(result, dict) or not result.get("error"): | |
| metadata["molscribe"] = { | |
| "success": True, | |
| "smiles": result.get('smiles', ''), | |
| "molfile": result.get('molfile', ''), | |
| "confidence": result.get('confidence', 0.0) | |
| } | |
| logger.info(f"MolScribe prediction successful for {filename}") | |
| elif result and result.get("error"): | |
| metadata["molscribe"] = { | |
| "success": False, | |
| "error": result.get("error", "Unknown error"), | |
| "details": result.get("traceback", "No traceback available") | |
| } | |
| logger.warning(f"MolScribe prediction failed for {filename}: {result.get('error')}") | |
| else: | |
| metadata["molscribe"] = { | |
| "success": False, | |
| "error": "MolScribe returned empty result", | |
| "details": "The model may not have detected any chemical structures in the image" | |
| } | |
| logger.warning(f"MolScribe returned empty result for {filename}") | |
| finally: | |
| # Clean up temporary file | |
| if os.path.exists(temp_image_path): | |
| os.remove(temp_image_path) | |
| except Exception as e: | |
| metadata["molscribe"] = { | |
| "success": False, | |
| "error": f"Image preprocessing or MolScribe execution failed: {str(e)}", | |
| "details": "This may happen if the image doesn't contain recognizable chemical structures or if there's a model initialization issue" | |
| } | |
| logger.error(f"MolScribe error for {filename}: {e}") | |
| import traceback | |
| logger.error(f"Full traceback: {traceback.format_exc()}") | |
| return image, filename, metadata | |
| except Exception as e: | |
| error_metadata = { | |
| "success": False, | |
| "error": str(e), | |
| "filename": filename or "error.png" | |
| } | |
| if include_molscribe: | |
| error_metadata["molscribe"] = { | |
| "success": False, | |
| "error": "Image decoding failed, MolScribe not executed" | |
| } | |
| return None, filename or "error.png", error_metadata | |
| def decode_base64_to_images(input_data: str, include_molscribe: bool = False) -> str: | |
| """Convert Base64 data to downloadable images (with optional MolScribe conversion) | |
| Args: | |
| input_data: Base64 image data or JSON string containing Base64 images | |
| include_molscribe: Whether to include MolScribe MOL conversion | |
| Returns: | |
| JSON string containing conversion results and download information | |
| """ | |
| try: | |
| if not input_data or not input_data.strip(): | |
| return json.dumps({ | |
| "error": "No input data provided", | |
| "total_processed": 0, | |
| "successful_conversions": 0, | |
| "results": [] | |
| }, indent=2, ensure_ascii=False) | |
| results = [] | |
| temp_files = [] | |
| # Determine input data format | |
| input_data = input_data.strip() | |
| if input_data.startswith('{') or input_data.startswith('['): | |
| # JSON format | |
| try: | |
| json_data = json.loads(input_data) | |
| if isinstance(json_data, dict): | |
| # Single image data | |
| if "image_base64" in json_data: | |
| image, filename, metadata = decode_single_base64_image( | |
| json_data["image_base64"], | |
| json_data.get("filename"), | |
| include_molscribe | |
| ) | |
| if image: | |
| temp_files.append((image, filename)) | |
| results.append(metadata) | |
| # Multi-page data (DECIMER output format) | |
| elif "pages" in json_data: | |
| for page in json_data["pages"]: | |
| for structure in page.get("structures", []): | |
| filename = f"page_{page['page_number']}_structure_{structure['segment_id']}.png" | |
| image, filename, metadata = decode_single_base64_image( | |
| structure["image_base64"], | |
| filename, | |
| include_molscribe | |
| ) | |
| if image: | |
| temp_files.append((image, filename)) | |
| results.append(metadata) | |
| else: | |
| return json.dumps({ | |
| "error": "Invalid JSON format. Expected 'image_base64' or 'pages' field", | |
| "total_processed": 0, | |
| "successful_conversions": 0, | |
| "results": [] | |
| }, indent=2, ensure_ascii=False) | |
| elif isinstance(json_data, list): | |
| # List format | |
| for i, item in enumerate(json_data): | |
| if isinstance(item, dict) and "image_base64" in item: | |
| filename = item.get("filename", f"image_{i+1}.png") | |
| image, filename, metadata = decode_single_base64_image( | |
| item["image_base64"], | |
| filename, | |
| include_molscribe | |
| ) | |
| if image: | |
| temp_files.append((image, filename)) | |
| results.append(metadata) | |
| except json.JSONDecodeError as e: | |
| return json.dumps({ | |
| "error": f"Invalid JSON format: {e}", | |
| "total_processed": 0, | |
| "successful_conversions": 0, | |
| "results": [] | |
| }, indent=2, ensure_ascii=False) | |
| else: | |
| # Plain Base64 string | |
| image, filename, metadata = decode_single_base64_image(input_data, "decoded_image.png", include_molscribe) | |
| if image: | |
| temp_files.append((image, filename)) | |
| results.append(metadata) | |
| # Calculate successful conversions | |
| successful_conversions = sum(1 for r in results if r.get("success", False)) | |
| molscribe_conversions = sum(1 for r in results if r.get("molscribe", {}).get("success", False)) | |
| # Compile results | |
| final_result = { | |
| "total_processed": len(results), | |
| "successful_conversions": successful_conversions, | |
| "failed_conversions": len(results) - successful_conversions, | |
| "molscribe_enabled": include_molscribe, | |
| "molscribe_conversions": molscribe_conversions if include_molscribe else None, | |
| "download_info": { | |
| "total_files": len(temp_files), | |
| "files_available": successful_conversions > 0 | |
| }, | |
| "results": results | |
| } | |
| logger.info(f"Conversion completed: {successful_conversions}/{len(results)} successful") | |
| return json.dumps(final_result, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| logger.error(f"Error in conversion process: {e}") | |
| error_result = { | |
| "error": str(e), | |
| "total_processed": 0, | |
| "successful_conversions": 0, | |
| "failed_conversions": 1, | |
| "molscribe_enabled": include_molscribe, | |
| "molscribe_conversions": 0 if include_molscribe else None, | |
| "download_info": { | |
| "total_files": 0, | |
| "files_available": False | |
| }, | |
| "results": [] | |
| } | |
| return json.dumps(error_result, indent=2, ensure_ascii=False) | |
| def create_zip_from_base64(input_data: str, include_molscribe: bool = False) -> Optional[str]: | |
| """Extract images from Base64 data and create ZIP file (including results JSON)""" | |
| try: | |
| if not input_data or not input_data.strip(): | |
| logger.warning("Empty input data") | |
| return None | |
| images_to_zip = [] | |
| input_data = input_data.strip() | |
| # Parse input data | |
| if input_data.startswith('{') or input_data.startswith('['): | |
| try: | |
| json_data = json.loads(input_data) | |
| if isinstance(json_data, dict): | |
| if "image_base64" in json_data: | |
| image, filename, metadata = decode_single_base64_image( | |
| json_data["image_base64"], | |
| json_data.get("filename"), | |
| include_molscribe | |
| ) | |
| if image: | |
| images_to_zip.append((image, filename, metadata)) | |
| elif "pages" in json_data: | |
| for page in json_data["pages"]: | |
| for structure in page.get("structures", []): | |
| filename = f"page_{page['page_number']}_structure_{structure['segment_id']}.png" | |
| image, filename, metadata = decode_single_base64_image( | |
| structure["image_base64"], | |
| filename, | |
| include_molscribe | |
| ) | |
| if image: | |
| images_to_zip.append((image, filename, metadata)) | |
| elif isinstance(json_data, list): | |
| for i, item in enumerate(json_data): | |
| if isinstance(item, dict) and "image_base64" in item: | |
| filename = item.get("filename", f"image_{i+1}.png") | |
| image, filename, metadata = decode_single_base64_image( | |
| item["image_base64"], | |
| filename, | |
| include_molscribe | |
| ) | |
| if image: | |
| images_to_zip.append((image, filename, metadata)) | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON decode error: {e}") | |
| return None | |
| else: | |
| # Plain Base64 string | |
| image, filename, metadata = decode_single_base64_image(input_data, "decoded_image.png", include_molscribe) | |
| if image: | |
| images_to_zip.append((image, filename, metadata)) | |
| # Create ZIP file | |
| if not images_to_zip: | |
| logger.warning("No images to zip") | |
| return None | |
| logger.info(f"Creating ZIP with {len(images_to_zip)} images") | |
| # Create temporary ZIP file | |
| temp_zip = tempfile.NamedTemporaryFile(suffix='.zip', delete=False) | |
| temp_zip_path = temp_zip.name | |
| temp_zip.close() | |
| # Prepare results JSON | |
| results_data = [] | |
| for image, filename, metadata in images_to_zip: | |
| results_data.append(metadata) | |
| results_json = { | |
| "total_processed": len(images_to_zip), | |
| "successful_conversions": len([r for r in results_data if r.get("success", False)]), | |
| "molscribe_enabled": include_molscribe, | |
| "molscribe_conversions": len([r for r in results_data if r.get("molscribe", {}).get("success", False)]) if include_molscribe else 0, | |
| "results": results_data | |
| } | |
| with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| # Add results JSON | |
| json_str = json.dumps(results_json, indent=2, ensure_ascii=False) | |
| zipf.writestr('conversion_results.json', json_str) | |
| for image, filename, metadata in images_to_zip: | |
| # Save image to byte stream and add to ZIP | |
| img_byte_arr = BytesIO() | |
| image.save(img_byte_arr, format='PNG') | |
| img_byte_arr.seek(0) | |
| zipf.writestr(filename, img_byte_arr.read()) | |
| # Add MOL file if MolScribe data exists | |
| if include_molscribe and metadata.get("molscribe", {}).get("success", False): | |
| mol_filename = filename.rsplit('.', 1)[0] + '.mol' | |
| molfile_content = metadata["molscribe"]["molfile"] | |
| # Add metadata to MOL file | |
| enhanced_molfile = f"""{molfile_content} | |
| > <SMILES> | |
| {metadata["molscribe"]["smiles"]} | |
| > <CONFIDENCE> | |
| {metadata["molscribe"]["confidence"]:.4f} | |
| > <SOURCE_IMAGE> | |
| {filename} | |
| > <GENERATED_BY> | |
| MolScribe via Base64 Decoder | |
| $$$$""" | |
| zipf.writestr(mol_filename, enhanced_molfile) | |
| if os.path.exists(temp_zip_path): | |
| logger.info(f"ZIP file created: {temp_zip_path}") | |
| return temp_zip_path | |
| else: | |
| logger.error("ZIP file was not created") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error creating ZIP file: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| return None | |
| def create_demo(): | |
| """Create Gradio interface for Base64 decoder""" | |
| with gr.Blocks( | |
| title="Base64 to Image Decoder", | |
| theme=gr.themes.Soft() | |
| ) as demo: | |
| gr.Markdown(""" | |
| # ChemGrasp-OCSR (Base64 to Image Decoder and Optical Chemical Structure Recognition) | |
| Decode Base64-encoded image data and convert to image files. | |
| Optionally generate MOL files using MolScribe. | |
| π» **CPU Environment** | |
| ## π Quick Start | |
| 1. Paste your Base64 image data or JSON in the input field | |
| 2. (Optional) Check "Include MolScribe MOL conversion" for chemical structures | |
| 3. Click "Decode Images" to see results, or "Download as ZIP" to get files | |
| ## π Input Formats | |
| - Single Base64 image data | |
| - JSON format (multiple images supported) | |
| - DECIMER output format (structure images) | |
| ## π Citation | |
| If you use MolScribe in your research, please cite: | |
| ``` | |
| @article{MolScribe, | |
| title = {{MolScribe}: Robust Molecular Structure Recognition with Image-to-Graph Generation}, | |
| author = {Yujie Qian and Jiang Guo and Zhengkai Tu and Zhening Li and Connor W. Coley and Regina Barzilay}, | |
| journal = {Journal of Chemical Information and Modeling}, | |
| publisher = {American Chemical Society ({ACS})}, | |
| doi = {10.1021/acs.jcim.2c01480}, | |
| year = 2023, | |
| } | |
| ``` | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_data = gr.Textbox( | |
| label="π₯ Base64 Image Data or JSON", | |
| lines=10, | |
| placeholder="Enter Base64 image data or JSON format data...", | |
| show_copy_button=True | |
| ) | |
| molscribe_checkbox = gr.Checkbox( | |
| label="𧬠Include MolScribe MOL conversion", | |
| value=False, | |
| info="For chemical structures, also generate MOL files" | |
| ) | |
| with gr.Row(): | |
| decode_btn = gr.Button( | |
| "πΌοΈ Decode Images", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| download_btn = gr.Button( | |
| "π¦ Download as ZIP", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| with gr.Column(): | |
| result_output = gr.Textbox( | |
| label="π Conversion Results", | |
| lines=15, | |
| show_copy_button=True, | |
| placeholder="Conversion results will appear here...", | |
| interactive=False | |
| ) | |
| zip_download = gr.File( | |
| label="π¦ Download ZIP File" | |
| ) | |
| with gr.Accordion("π Input Format Examples", open=False): | |
| gr.Markdown(""" | |
| ### Single Base64 Image | |
| ``` | |
| iVBORw0KGgoAAAANSUhEUgAA... | |
| ``` | |
| ### Data URL Format | |
| ``` | |
| data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAA... | |
| ``` | |
| ### JSON Format (Single Image) | |
| ```json | |
| { | |
| "image_base64": "iVBORw0KGgoAAAANSUhEUgAA...", | |
| "filename": "structure.png" | |
| } | |
| ``` | |
| ### DECIMER Output Format | |
| ```json | |
| { | |
| "pages": [ | |
| { | |
| "page_number": 1, | |
| "structures": [ | |
| { | |
| "segment_id": 1, | |
| "image_base64": "iVBORw0KGgoAAAANSUhEUgAA..." | |
| } | |
| ] | |
| } | |
| ] | |
| } | |
| ``` | |
| """) | |
| with gr.Accordion("π Output Format Example", open=False): | |
| gr.Markdown(""" | |
| ### JSON Conversion Results (Display & Inside ZIP) | |
| """) | |
| gr.Code(""" | |
| { | |
| "total_processed": 2, | |
| "successful_conversions": 2, | |
| "failed_conversions": 0, | |
| "molscribe_enabled": true, | |
| "molscribe_conversions": 2, | |
| "download_info": { | |
| "total_files": 2, | |
| "files_available": true | |
| }, | |
| "results": [ | |
| { | |
| "success": true, | |
| "filename": "decoded_image.png", | |
| "image_format": "PNG", | |
| "image_mode": "RGB", | |
| "image_size": { | |
| "width": 256, | |
| "height": 256 | |
| }, | |
| "original_data_size": 12345, | |
| "decoded_data_size": 8192, | |
| "molscribe": { | |
| "success": true, | |
| "smiles": "c1ccccc1", | |
| "molfile": "\\n Mrv2014 01011200\\n\\n 6 6 0 0 0 0...", | |
| "confidence": 0.9876 | |
| } | |
| } | |
| ] | |
| } | |
| """, language="json") | |
| gr.Markdown(""" | |
| ### ZIP File Structure Example | |
| ``` | |
| chemical_structures.zip | |
| βββ conversion_results.json β JSON data above | |
| βββ page_1_structure_1.png β Decoded image | |
| βββ page_1_structure_1.mol β MOL generated by MolScribe | |
| βββ page_1_structure_2.png | |
| βββ page_1_structure_2.mol | |
| βββ ... | |
| ``` | |
| """) | |
| # Event handlers | |
| def decode_and_show_results(input_data, include_molscribe): | |
| return decode_base64_to_images(input_data, include_molscribe) | |
| def create_and_show_zip(input_data, include_molscribe): | |
| """Create ZIP file with error handling""" | |
| try: | |
| if not input_data or not input_data.strip(): | |
| logger.warning("No input data provided for ZIP creation") | |
| return None | |
| logger.info(f"Creating ZIP file... (MolScribe: {include_molscribe})") | |
| zip_path = create_zip_from_base64(input_data, include_molscribe) | |
| if zip_path and os.path.exists(zip_path): | |
| file_size = os.path.getsize(zip_path) | |
| logger.info(f"ZIP file created successfully: {zip_path} ({file_size} bytes)") | |
| return zip_path | |
| else: | |
| logger.error("ZIP file creation failed or file not found") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error in create_and_show_zip: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| return None | |
| decode_btn.click( | |
| fn=decode_and_show_results, | |
| inputs=[input_data, molscribe_checkbox], | |
| outputs=[result_output], | |
| show_progress=True | |
| ) | |
| download_btn.click( | |
| fn=create_and_show_zip, | |
| inputs=[input_data, molscribe_checkbox], | |
| outputs=[zip_download], | |
| show_progress=True | |
| ) | |
| return demo | |
| # Main execution | |
| if __name__ == "__main__": | |
| logger.info("Running in CPU environment") | |
| demo = create_demo() | |
| demo.launch(mcp_server=False) | |