import os import re import shutil import traceback import gradio as gr from pathlib import Path from histopath.agent import A1 from dotenv import load_dotenv # Load environment variables load_dotenv() # Get passcode from environment PASSCODE = os.getenv("GRADIO_PASSWORD") # Initialize agent (will be created after passcode validation) agent = None def check_for_output_files(): """Check for all files in the output directory and return their paths.""" output_dir = Path("./output") if not output_dir.exists(): return [], [] image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"} data_extensions = {".csv", ".txt", ".json", ".npy"} images = [] data_files = [] for file in output_dir.iterdir(): if file.is_file(): if file.suffix.lower() in image_extensions: images.append(str(file)) elif file.suffix.lower() in data_extensions: data_files.append(str(file)) return images, data_files def preview_uploaded_file(uploaded_file): """Preview the uploaded file - show image or file info.""" if uploaded_file is None: return None, None, "No file uploaded" file_path = Path(uploaded_file.name) file_ext = file_path.suffix.lower() image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"} if file_ext in image_extensions: # Show image preview return uploaded_file.name, None, f"šŸ“· Previewing: {file_path.name}" else: # Show file info file_size = Path(uploaded_file.name).stat().st_size / 1024 # KB return None, uploaded_file.name, f"šŸ“„ File: {file_path.name} ({file_size:.1f} KB)" def parse_agent_output(output): """Parse agent output to extract code blocks, observations, and regular text.""" # Strip out the message divider bars output = re.sub(r'={30,}\s*(Human|Ai)\s+Message\s*={30,}', '', output) output = output.strip() parsed = { "type": "text", "content": output, "code": None, "observation": None, "thinking": None } # Check for code execution block execute_match = re.search(r'(.*?)', output, re.DOTALL) if execute_match: parsed["type"] = "code" parsed["code"] = execute_match.group(1).strip() # Extract text before the code block (thinking/explanation) text_before = output[:execute_match.start()].strip() # Remove any think tags but keep the content text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL) text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() parsed["thinking"] = text_before if text_before else None return parsed # Check for observation block observation_match = re.search(r'(.*?)', output, re.DOTALL) if observation_match: parsed["type"] = "observation" parsed["observation"] = observation_match.group(1).strip() # Extract text before observation if any text_before = output[:observation_match.start()].strip() text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL) text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() parsed["thinking"] = text_before if text_before else None return parsed # Check for solution block solution_match = re.search(r'(.*?)', output, re.DOTALL) if solution_match: parsed["type"] = "solution" parsed["content"] = solution_match.group(1).strip() # Get thinking before solution text_before = output[:solution_match.start()].strip() text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL) text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() parsed["thinking"] = text_before if text_before else None return parsed # Clean up any remaining tags for display cleaned = re.sub(r'(.*?)', r'\1', output, flags=re.DOTALL) cleaned = re.sub(r'={30,}.*?={30,}', '', cleaned).strip() parsed["content"] = cleaned return parsed def format_message_for_display(parsed_output): """Format parsed output into a readable message for the chatbot.""" msg_parts = [] # Add thinking/explanation text first if present if parsed_output.get("thinking"): msg_parts.append(parsed_output["thinking"]) if parsed_output["type"] == "code": # Add separator if there was thinking text if parsed_output.get("thinking"): msg_parts.append("\n---\n") msg_parts.append("### šŸ’» Executing Code\n") msg_parts.append(f"```python\n{parsed_output['code']}\n```") elif parsed_output["type"] == "observation": # Add separator if there was thinking text if parsed_output.get("thinking"): msg_parts.append("\n---\n") msg_parts.append("### šŸ“Š Observation\n") msg_parts.append(f"```\n{parsed_output['observation']}\n```") elif parsed_output["type"] == "solution": # Add separator if there was thinking text if parsed_output.get("thinking"): msg_parts.append("\n---\n") msg_parts.append("### āœ… Solution\n") msg_parts.append(parsed_output['content']) else: # For regular text, just add the content if thinking wasn't already set if not parsed_output.get("thinking"): msg_parts.append(parsed_output["content"]) return "\n\n".join(msg_parts) def process_agent_response(prompt, uploaded_file, chatbot_history): """Process the agent response and update chatbot.""" global agent if agent is None: chatbot_history.append({ "role": "assistant", "content": "āš ļø Please enter the passcode first to initialize the agent." }) yield chatbot_history, None, None, None, None, "āš ļø Agent not initialized" return if not prompt.strip() and uploaded_file is None: chatbot_history.append({ "role": "assistant", "content": "āš ļø Please provide a prompt or upload a file." }) yield chatbot_history, None, None, None, None, "āš ļø No input provided" return # Handle file upload file_path = None file_info = "" if uploaded_file is not None: try: # Create data directory if it doesn't exist data_dir = Path("./data") data_dir.mkdir(exist_ok=True) # Copy uploaded file to data directory file_name = Path(uploaded_file.name).name file_path = data_dir / file_name shutil.copy(uploaded_file.name, file_path) file_info = f"\n\nšŸ“Ž **Uploaded file:** `{file_path}`\n" # Augment prompt with file path if prompt.strip(): prompt = f"{prompt}\n\nUploaded file path: {file_path}" else: prompt = f"I have uploaded a file at: {file_path}. Please analyze it." except Exception as e: error_msg = f"āŒ Error handling file upload: {str(e)}" chatbot_history.append({ "role": "assistant", "content": error_msg }) yield chatbot_history, None, None, None, None, error_msg return # Add user message to chat user_message = prompt if not file_info else f"{prompt}{file_info}" chatbot_history.append({"role": "user", "content": user_message}) yield chatbot_history, None, None, None, None, "šŸ”„ Processing..." try: # Stream agent responses step_count = 0 for step in agent.go_stream(prompt): step_count += 1 output = step.get("output", "") if output: # Parse the output parsed = parse_agent_output(output) # Add thinking text as separate message if present if parsed.get("thinking"): chatbot_history.append({ "role": "assistant", "content": parsed["thinking"] }) # Add the block (code/observation/solution) as separate message if present if parsed["type"] == "code" and parsed["code"]: chatbot_history.append({ "role": "assistant", "content": f"### šŸ’» Executing Code\n\n```python\n{parsed['code']}\n```" }) elif parsed["type"] == "observation" and parsed["observation"]: chatbot_history.append({ "role": "assistant", "content": f"### šŸ“Š Observation\n\n```\n{parsed['observation']}\n```" }) elif parsed["type"] == "solution": chatbot_history.append({ "role": "assistant", "content": f"### āœ… Solution\n\n{parsed['content']}" }) elif parsed["type"] == "text" and parsed["content"]: # Only add if we haven't already added it as thinking if not parsed.get("thinking"): chatbot_history.append({ "role": "assistant", "content": parsed["content"] }) # Check for output files after each step images, data_files = check_for_output_files() # Create status message status = f"šŸ”„ Step {step_count}" if parsed["type"] == "code": status += " - Executing code..." elif parsed["type"] == "observation": status += " - Processing results..." elif parsed["type"] == "solution": status += " - Finalizing solution..." yield ( chatbot_history, images if images else None, data_files if data_files else None, None, None, status ) # Final check for files final_images, final_data = check_for_output_files() # Create download links message if files were generated if final_images or final_data: download_msg = "\n\n---\n\n### šŸ“ Generated Files Ready for Download\n\n" if final_images: download_msg += f"**šŸ–¼ļø Images ({len(final_images)})** - Available in the **Images** tab →\n" for img_path in final_images: img_name = Path(img_path).name download_msg += f"- `{img_name}`\n" download_msg += "\n" if final_data: download_msg += f"**šŸ“„ Data Files ({len(final_data)})** - Available in the **Data** tab →\n" for data_path in final_data: data_name = Path(data_path).name download_msg += f"- `{data_name}`\n" download_msg += "\n*Click the download button on each file in the respective tabs above.*" # Add download message as separate bubble chatbot_history.append({ "role": "assistant", "content": download_msg }) status = "āœ… Complete" if final_images: status += f" | {len(final_images)} image(s)" if final_data: status += f" | {len(final_data)} data file(s)" yield chatbot_history, final_images if final_images else None, final_data if final_data else None, None, None, status except Exception as e: error_msg = f"āŒ Error: {str(e)}\n\n```\n{traceback.format_exc()}\n```" chatbot_history.append({ "role": "assistant", "content": error_msg }) yield chatbot_history, None, None, None, None, "āŒ Error occurred" def validate_passcode(passcode): """Validate the passcode and initialize the agent.""" global agent if passcode == PASSCODE: # Initialize agent try: agent = A1() return ( gr.update(visible=False), # Hide passcode section gr.update(visible=True), # Show main interface "āœ… Access granted! Agent initialized and ready." ) except Exception as e: error_trace = traceback.format_exc() return ( gr.update(visible=True), gr.update(visible=False), f"āŒ Error initializing agent:\n{str(e)}\n\n{error_trace}" ) else: return ( gr.update(visible=True), gr.update(visible=False), "āŒ Invalid passcode. Please try again." ) def clear_chat(): """Clear the chat history and output files.""" # Clean up output directory output_dir = Path("./output") if output_dir.exists(): shutil.rmtree(output_dir) output_dir.mkdir(exist_ok=True) # Clean up data directory data_dir = Path("./data") if data_dir.exists(): for file in data_dir.iterdir(): if file.is_file(): file.unlink() return [], None, None, None, None, "šŸ—‘ļø Chat cleared" # Create Gradio interface with custom theme custom_theme = gr.themes.Soft( primary_hue="blue", secondary_hue="slate", spacing_size="sm", radius_size="md", ).set( button_primary_background_fill="*primary_500", button_primary_background_fill_hover="*primary_600", block_label_text_weight="600", block_title_text_weight="600", ) with gr.Blocks(title="HistoPath Agent", theme=custom_theme, css=""" .gradio-container { max-width: 100% !important; } .main-header { text-align: center; padding: 1.5rem 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 8px; margin-bottom: 1.5rem; } .main-header h1 { margin: 0; font-size: 2.2rem; font-weight: 700; } .main-header p { margin: 0.5rem 0 0 0; opacity: 0.95; font-size: 1.1rem; } .file-upload-box .wrap { min-width: 0 !important; } .file-upload-box .file-name { word-break: break-word !important; white-space: normal !important; overflow-wrap: break-word !important; } .tab-nav { margin-bottom: 0.5rem; } /* Better styling for code and observation blocks */ .message.bot pre { background-color: #f6f8fa !important; border: 1px solid #d0d7de !important; border-radius: 6px !important; padding: 12px !important; margin: 8px 0 !important; } .message.bot h3 { margin-top: 12px !important; margin-bottom: 8px !important; font-weight: 600 !important; } .message.bot hr { border: none !important; border-top: 2px solid #e1e4e8 !important; margin: 16px 0 !important; } """) as demo: # Header gr.HTML("""

šŸ”¬ HistoPath Agent

AI-Powered Histopathology Analysis Assistant

""") # Passcode section with gr.Group(visible=True) as passcode_section: gr.Markdown("### šŸ” Authentication Required") with gr.Row(): passcode_input = gr.Textbox( label="Passcode", type="password", placeholder="Enter your passcode...", scale=3 ) passcode_btn = gr.Button("šŸ”“ Unlock", variant="primary", scale=1, size="lg") passcode_status = gr.Textbox( label="Status", interactive=False, lines=2 ) # Main interface (hidden initially) with gr.Group(visible=False) as main_interface: with gr.Row(equal_height=True): # Left column - Chat interface with gr.Column(scale=3): chatbot = gr.Chatbot( label="šŸ’¬ Conversation", height=550, type="messages", show_label=True, avatar_images=(None, "šŸ¤–"), render_markdown=True, ) # Input area with gr.Row(): with gr.Column(scale=7): prompt_input = gr.Textbox( label="Your Query", placeholder="E.g., 'Caption the uploaded whole slide image' or 'Segment cells using instanseg model'", lines=2, max_lines=5, show_label=False, ) with gr.Column(scale=3): file_upload = gr.File( label="šŸ“Ž Upload File", file_types=[".svs", ".png", ".jpg", ".jpeg", ".tif", ".tiff", ".csv", ".txt", ".json", ".npy"], height=75, elem_classes="file-upload-box", ) with gr.Row(): submit_btn = gr.Button("šŸš€ Submit", variant="primary", scale=3, size="lg") clear_btn = gr.Button("šŸ—‘ļø Clear", scale=1, size="lg", variant="secondary") status_text = gr.Textbox( label="Status", interactive=False, value="Ready", show_label=False, container=False, ) # Right column - Outputs with gr.Column(scale=2): with gr.Tabs(): with gr.Tab("šŸ“„ Input"): with gr.Column(): input_image_preview = gr.Image( label="Input Image", height=400, show_label=False, container=True, ) input_file_preview = gr.File( label="Input File", interactive=False, height=100, show_label=False, container=True, ) input_status = gr.Textbox( value="Upload a file to preview", show_label=False, interactive=False, container=False, ) with gr.Tab("šŸ–¼ļø Images"): output_gallery = gr.Gallery( label="Generated Visualizations", columns=1, height=600, object_fit="contain", show_label=False, show_download_button=True, ) with gr.Tab("šŸ“„ Data"): data_files = gr.File( label="Generated Data Files", file_count="multiple", interactive=False, height=600, show_label=False, ) # Event handlers passcode_btn.click( fn=validate_passcode, inputs=[passcode_input], outputs=[passcode_section, main_interface, passcode_status] ) # File upload preview file_upload.change( fn=preview_uploaded_file, inputs=[file_upload], outputs=[input_image_preview, input_file_preview, input_status] ) submit_btn.click( fn=process_agent_response, inputs=[prompt_input, file_upload, chatbot], outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] ) clear_btn.click( fn=clear_chat, outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] ) # Allow enter key to submit prompt_input.submit( fn=process_agent_response, inputs=[prompt_input, file_upload, chatbot], outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] ) if __name__ == "__main__": # Create necessary directories Path("./data").mkdir(exist_ok=True) Path("./output").mkdir(exist_ok=True) print("=" * 60) print("šŸ”¬ HistoPath Agent - Gradio Interface") print("=" * 60) print(f"Passcode: {PASSCODE}") print("Starting server...") print("=" * 60) # Launch the app demo.launch( server_name="0.0.0.0", server_port=None, # Let Gradio auto-pick an available port share=False, show_error=True, )