import os
import re
import shutil
import traceback
import gradio as gr
from pathlib import Path
from histopath.agent import A1
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Get passcode from environment
PASSCODE = os.getenv("GRADIO_PASSWORD")
# Initialize agent (will be created after passcode validation)
agent = None
def check_for_output_files():
"""Check for all files in the output directory and return their paths."""
output_dir = Path("./output")
if not output_dir.exists():
return [], []
image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"}
data_extensions = {".csv", ".txt", ".json", ".npy"}
images = []
data_files = []
for file in output_dir.iterdir():
if file.is_file():
if file.suffix.lower() in image_extensions:
images.append(str(file))
elif file.suffix.lower() in data_extensions:
data_files.append(str(file))
return images, data_files
def preview_uploaded_file(uploaded_file):
"""Preview the uploaded file - show image or file info."""
if uploaded_file is None:
return None, None, "No file uploaded"
file_path = Path(uploaded_file.name)
file_ext = file_path.suffix.lower()
image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"}
if file_ext in image_extensions:
# Show image preview
return uploaded_file.name, None, f"š· Previewing: {file_path.name}"
else:
# Show file info
file_size = Path(uploaded_file.name).stat().st_size / 1024 # KB
return None, uploaded_file.name, f"š File: {file_path.name} ({file_size:.1f} KB)"
def parse_agent_output(output):
"""Parse agent output to extract code blocks, observations, and regular text."""
# Strip out the message divider bars
output = re.sub(r'={30,}\s*(Human|Ai)\s+Message\s*={30,}', '', output)
output = output.strip()
parsed = {
"type": "text",
"content": output,
"code": None,
"observation": None,
"thinking": None
}
# Check for code execution block
execute_match = re.search(r'(.*?)', output, re.DOTALL)
if execute_match:
parsed["type"] = "code"
parsed["code"] = execute_match.group(1).strip()
# Extract text before the code block (thinking/explanation)
text_before = output[:execute_match.start()].strip()
# Remove any think tags but keep the content
text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL)
text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip()
parsed["thinking"] = text_before if text_before else None
return parsed
# Check for observation block
observation_match = re.search(r'(.*?)', output, re.DOTALL)
if observation_match:
parsed["type"] = "observation"
parsed["observation"] = observation_match.group(1).strip()
# Extract text before observation if any
text_before = output[:observation_match.start()].strip()
text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL)
text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip()
parsed["thinking"] = text_before if text_before else None
return parsed
# Check for solution block
solution_match = re.search(r'(.*?)', output, re.DOTALL)
if solution_match:
parsed["type"] = "solution"
parsed["content"] = solution_match.group(1).strip()
# Get thinking before solution
text_before = output[:solution_match.start()].strip()
text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL)
text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip()
parsed["thinking"] = text_before if text_before else None
return parsed
# Clean up any remaining tags for display
cleaned = re.sub(r'(.*?)', r'\1', output, flags=re.DOTALL)
cleaned = re.sub(r'={30,}.*?={30,}', '', cleaned).strip()
parsed["content"] = cleaned
return parsed
def format_message_for_display(parsed_output):
"""Format parsed output into a readable message for the chatbot."""
msg_parts = []
# Add thinking/explanation text first if present
if parsed_output.get("thinking"):
msg_parts.append(parsed_output["thinking"])
if parsed_output["type"] == "code":
# Add separator if there was thinking text
if parsed_output.get("thinking"):
msg_parts.append("\n---\n")
msg_parts.append("### š» Executing Code\n")
msg_parts.append(f"```python\n{parsed_output['code']}\n```")
elif parsed_output["type"] == "observation":
# Add separator if there was thinking text
if parsed_output.get("thinking"):
msg_parts.append("\n---\n")
msg_parts.append("### š Observation\n")
msg_parts.append(f"```\n{parsed_output['observation']}\n```")
elif parsed_output["type"] == "solution":
# Add separator if there was thinking text
if parsed_output.get("thinking"):
msg_parts.append("\n---\n")
msg_parts.append("### ā
Solution\n")
msg_parts.append(parsed_output['content'])
else:
# For regular text, just add the content if thinking wasn't already set
if not parsed_output.get("thinking"):
msg_parts.append(parsed_output["content"])
return "\n\n".join(msg_parts)
def process_agent_response(prompt, uploaded_file, chatbot_history):
"""Process the agent response and update chatbot."""
global agent
if agent is None:
chatbot_history.append({
"role": "assistant",
"content": "ā ļø Please enter the passcode first to initialize the agent."
})
yield chatbot_history, None, None, None, None, "ā ļø Agent not initialized"
return
if not prompt.strip() and uploaded_file is None:
chatbot_history.append({
"role": "assistant",
"content": "ā ļø Please provide a prompt or upload a file."
})
yield chatbot_history, None, None, None, None, "ā ļø No input provided"
return
# Handle file upload
file_path = None
file_info = ""
if uploaded_file is not None:
try:
# Create data directory if it doesn't exist
data_dir = Path("./data")
data_dir.mkdir(exist_ok=True)
# Copy uploaded file to data directory
file_name = Path(uploaded_file.name).name
file_path = data_dir / file_name
shutil.copy(uploaded_file.name, file_path)
file_info = f"\n\nš **Uploaded file:** `{file_path}`\n"
# Augment prompt with file path
if prompt.strip():
prompt = f"{prompt}\n\nUploaded file path: {file_path}"
else:
prompt = f"I have uploaded a file at: {file_path}. Please analyze it."
except Exception as e:
error_msg = f"ā Error handling file upload: {str(e)}"
chatbot_history.append({
"role": "assistant",
"content": error_msg
})
yield chatbot_history, None, None, None, None, error_msg
return
# Add user message to chat
user_message = prompt if not file_info else f"{prompt}{file_info}"
chatbot_history.append({"role": "user", "content": user_message})
yield chatbot_history, None, None, None, None, "š Processing..."
try:
# Stream agent responses
step_count = 0
for step in agent.go_stream(prompt):
step_count += 1
output = step.get("output", "")
if output:
# Parse the output
parsed = parse_agent_output(output)
# Add thinking text as separate message if present
if parsed.get("thinking"):
chatbot_history.append({
"role": "assistant",
"content": parsed["thinking"]
})
# Add the block (code/observation/solution) as separate message if present
if parsed["type"] == "code" and parsed["code"]:
chatbot_history.append({
"role": "assistant",
"content": f"### š» Executing Code\n\n```python\n{parsed['code']}\n```"
})
elif parsed["type"] == "observation" and parsed["observation"]:
chatbot_history.append({
"role": "assistant",
"content": f"### š Observation\n\n```\n{parsed['observation']}\n```"
})
elif parsed["type"] == "solution":
chatbot_history.append({
"role": "assistant",
"content": f"### ā
Solution\n\n{parsed['content']}"
})
elif parsed["type"] == "text" and parsed["content"]:
# Only add if we haven't already added it as thinking
if not parsed.get("thinking"):
chatbot_history.append({
"role": "assistant",
"content": parsed["content"]
})
# Check for output files after each step
images, data_files = check_for_output_files()
# Create status message
status = f"š Step {step_count}"
if parsed["type"] == "code":
status += " - Executing code..."
elif parsed["type"] == "observation":
status += " - Processing results..."
elif parsed["type"] == "solution":
status += " - Finalizing solution..."
yield (
chatbot_history,
images if images else None,
data_files if data_files else None,
None,
None,
status
)
# Final check for files
final_images, final_data = check_for_output_files()
# Create download links message if files were generated
if final_images or final_data:
download_msg = "\n\n---\n\n### š Generated Files Ready for Download\n\n"
if final_images:
download_msg += f"**š¼ļø Images ({len(final_images)})** - Available in the **Images** tab ā\n"
for img_path in final_images:
img_name = Path(img_path).name
download_msg += f"- `{img_name}`\n"
download_msg += "\n"
if final_data:
download_msg += f"**š Data Files ({len(final_data)})** - Available in the **Data** tab ā\n"
for data_path in final_data:
data_name = Path(data_path).name
download_msg += f"- `{data_name}`\n"
download_msg += "\n*Click the download button on each file in the respective tabs above.*"
# Add download message as separate bubble
chatbot_history.append({
"role": "assistant",
"content": download_msg
})
status = "ā
Complete"
if final_images:
status += f" | {len(final_images)} image(s)"
if final_data:
status += f" | {len(final_data)} data file(s)"
yield chatbot_history, final_images if final_images else None, final_data if final_data else None, None, None, status
except Exception as e:
error_msg = f"ā Error: {str(e)}\n\n```\n{traceback.format_exc()}\n```"
chatbot_history.append({
"role": "assistant",
"content": error_msg
})
yield chatbot_history, None, None, None, None, "ā Error occurred"
def validate_passcode(passcode):
"""Validate the passcode and initialize the agent."""
global agent
if passcode == PASSCODE:
# Initialize agent
try:
agent = A1()
return (
gr.update(visible=False), # Hide passcode section
gr.update(visible=True), # Show main interface
"ā
Access granted! Agent initialized and ready."
)
except Exception as e:
error_trace = traceback.format_exc()
return (
gr.update(visible=True),
gr.update(visible=False),
f"ā Error initializing agent:\n{str(e)}\n\n{error_trace}"
)
else:
return (
gr.update(visible=True),
gr.update(visible=False),
"ā Invalid passcode. Please try again."
)
def clear_chat():
"""Clear the chat history and output files."""
# Clean up output directory
output_dir = Path("./output")
if output_dir.exists():
shutil.rmtree(output_dir)
output_dir.mkdir(exist_ok=True)
# Clean up data directory
data_dir = Path("./data")
if data_dir.exists():
for file in data_dir.iterdir():
if file.is_file():
file.unlink()
return [], None, None, None, None, "šļø Chat cleared"
# Create Gradio interface with custom theme
custom_theme = gr.themes.Soft(
primary_hue="blue",
secondary_hue="slate",
spacing_size="sm",
radius_size="md",
).set(
button_primary_background_fill="*primary_500",
button_primary_background_fill_hover="*primary_600",
block_label_text_weight="600",
block_title_text_weight="600",
)
with gr.Blocks(title="HistoPath Agent", theme=custom_theme, css="""
.gradio-container {
max-width: 100% !important;
}
.main-header {
text-align: center;
padding: 1.5rem 0;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border-radius: 8px;
margin-bottom: 1.5rem;
}
.main-header h1 {
margin: 0;
font-size: 2.2rem;
font-weight: 700;
}
.main-header p {
margin: 0.5rem 0 0 0;
opacity: 0.95;
font-size: 1.1rem;
}
.file-upload-box .wrap {
min-width: 0 !important;
}
.file-upload-box .file-name {
word-break: break-word !important;
white-space: normal !important;
overflow-wrap: break-word !important;
}
.tab-nav {
margin-bottom: 0.5rem;
}
/* Better styling for code and observation blocks */
.message.bot pre {
background-color: #f6f8fa !important;
border: 1px solid #d0d7de !important;
border-radius: 6px !important;
padding: 12px !important;
margin: 8px 0 !important;
}
.message.bot h3 {
margin-top: 12px !important;
margin-bottom: 8px !important;
font-weight: 600 !important;
}
.message.bot hr {
border: none !important;
border-top: 2px solid #e1e4e8 !important;
margin: 16px 0 !important;
}
""") as demo:
# Header
gr.HTML("""
š¬ HistoPath Agent
AI-Powered Histopathology Analysis Assistant
""")
# Passcode section
with gr.Group(visible=True) as passcode_section:
gr.Markdown("### š Authentication Required")
with gr.Row():
passcode_input = gr.Textbox(
label="Passcode",
type="password",
placeholder="Enter your passcode...",
scale=3
)
passcode_btn = gr.Button("š Unlock", variant="primary", scale=1, size="lg")
passcode_status = gr.Textbox(
label="Status",
interactive=False,
lines=2
)
# Main interface (hidden initially)
with gr.Group(visible=False) as main_interface:
with gr.Row(equal_height=True):
# Left column - Chat interface
with gr.Column(scale=3):
chatbot = gr.Chatbot(
label="š¬ Conversation",
height=550,
type="messages",
show_label=True,
avatar_images=(None, "š¤"),
render_markdown=True,
)
# Input area
with gr.Row():
with gr.Column(scale=7):
prompt_input = gr.Textbox(
label="Your Query",
placeholder="E.g., 'Caption the uploaded whole slide image' or 'Segment cells using instanseg model'",
lines=2,
max_lines=5,
show_label=False,
)
with gr.Column(scale=3):
file_upload = gr.File(
label="š Upload File",
file_types=[".svs", ".png", ".jpg", ".jpeg", ".tif", ".tiff", ".csv", ".txt", ".json", ".npy"],
height=75,
elem_classes="file-upload-box",
)
with gr.Row():
submit_btn = gr.Button("š Submit", variant="primary", scale=3, size="lg")
clear_btn = gr.Button("šļø Clear", scale=1, size="lg", variant="secondary")
status_text = gr.Textbox(
label="Status",
interactive=False,
value="Ready",
show_label=False,
container=False,
)
# Right column - Outputs
with gr.Column(scale=2):
with gr.Tabs():
with gr.Tab("š„ Input"):
with gr.Column():
input_image_preview = gr.Image(
label="Input Image",
height=400,
show_label=False,
container=True,
)
input_file_preview = gr.File(
label="Input File",
interactive=False,
height=100,
show_label=False,
container=True,
)
input_status = gr.Textbox(
value="Upload a file to preview",
show_label=False,
interactive=False,
container=False,
)
with gr.Tab("š¼ļø Images"):
output_gallery = gr.Gallery(
label="Generated Visualizations",
columns=1,
height=600,
object_fit="contain",
show_label=False,
show_download_button=True,
)
with gr.Tab("š Data"):
data_files = gr.File(
label="Generated Data Files",
file_count="multiple",
interactive=False,
height=600,
show_label=False,
)
# Event handlers
passcode_btn.click(
fn=validate_passcode,
inputs=[passcode_input],
outputs=[passcode_section, main_interface, passcode_status]
)
# File upload preview
file_upload.change(
fn=preview_uploaded_file,
inputs=[file_upload],
outputs=[input_image_preview, input_file_preview, input_status]
)
submit_btn.click(
fn=process_agent_response,
inputs=[prompt_input, file_upload, chatbot],
outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text]
)
clear_btn.click(
fn=clear_chat,
outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text]
)
# Allow enter key to submit
prompt_input.submit(
fn=process_agent_response,
inputs=[prompt_input, file_upload, chatbot],
outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text]
)
if __name__ == "__main__":
# Create necessary directories
Path("./data").mkdir(exist_ok=True)
Path("./output").mkdir(exist_ok=True)
print("=" * 60)
print("š¬ HistoPath Agent - Gradio Interface")
print("=" * 60)
print(f"Passcode: {PASSCODE}")
print("Starting server...")
print("=" * 60)
# Launch the app
demo.launch(
server_name="0.0.0.0",
server_port=None, # Let Gradio auto-pick an available port
share=False,
show_error=True,
)