Pial2233's picture
Update web_app.py
3aa0fbf verified
"""
FastAPI web application with Gradio interface for Legal Deed Review.
"""
import asyncio
import json
import os
import re
import tempfile
import re
from pathlib import Path
from typing import Dict, Any, Optional, List
import gradio as gr
from dotenv import load_dotenv
from fastapi import FastAPI, File, UploadFile, HTTPException
# Import the functions from main.py
from main import (
extract_text_from_deed_pdf,
split_deed_into_clauses,
classify_deed_type,
analyze_deed_risks,
generate_comprehensive_deed_report
)
load_dotenv()
app = FastAPI(
title="Legal Deed Review System",
description="Upload PDF deed documents for comprehensive legal risk analysis"
)
# Legal disclaimers
LEGAL_DISCLAIMER = """
⚖️ **LEGAL DISCLAIMER**
**This is an automated analysis tool for informational purposes only.**
- ❌ This does NOT constitute legal advice
- ❌ This does NOT replace consultation with a qualified attorney
- ❌ This analysis may NOT identify all potential legal issues
- ✅ Always have deeds reviewed by a licensed attorney before taking action
- ✅ Consult local legal professionals familiar with your jurisdiction
**By using this tool, you acknowledge these limitations.**
"""
RISK_LEVEL_COLORS = {
"LOW": "🟢",
"MEDIUM": "🟡",
"HIGH": "🔴"
}
async def process_deed_pdf(pdf_file) -> Dict[str, Any]:
"""Process a PDF deed file and return the raw report data."""
if pdf_file is None:
return {"error": "Please upload a PDF deed document first."}
try:
# Handle different Gradio file input formats
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
if hasattr(pdf_file, 'read'):
# File-like object
tmp_file.write(pdf_file.read())
elif isinstance(pdf_file, str):
# File path string
with open(pdf_file, 'rb') as f:
tmp_file.write(f.read())
elif isinstance(pdf_file, bytes):
# Raw bytes
tmp_file.write(pdf_file)
else:
return {"error": f"Unsupported file format: {type(pdf_file)}"}
tmp_path = tmp_file.name
# Generate comprehensive report
report_result = await generate_comprehensive_deed_report(tmp_path)
report_data = json.loads(report_result)
# Clean up temp file
os.unlink(tmp_path)
if not report_data.get("success"):
error_msg = f"Analysis failed: {report_data.get('error', 'Unknown error')}"
return {"error": error_msg}
return report_data
except Exception as e:
return {"error": f"Error processing deed: {str(e)}"}
def format_classification_display(classification_data: Dict) -> str:
"""Format deed classification for display."""
if not classification_data or not classification_data.get("success"):
return "❌ Classification failed"
classification = classification_data.get("classification", {})
if isinstance(classification, str):
try:
classification = json.loads(classification)
except Exception:
classification = {}
# Handle nested classification structure from LLM
if "classification" in classification:
classification = classification["classification"]
if "raw_analysis" in classification:
return f"📋 **Deed Classification (LLM):**\n\n{classification['raw_analysis']}"
parts = []
deed_type = classification.get("deed_type") or classification.get("type")
if deed_type:
parts.append(f"• **Deed Type:** {deed_type}")
jurisdiction = classification.get("jurisdiction") or classification.get("jurisdiction_hint")
if jurisdiction:
if isinstance(jurisdiction, dict):
# Format jurisdiction nicely
country = jurisdiction.get("country", "")
state = jurisdiction.get("state_province", "") or jurisdiction.get("state", "")
if country and state:
jurisdiction_str = f"{country}, {state}"
elif country:
jurisdiction_str = country
else:
jurisdiction_str = json.dumps(jurisdiction, indent=2)
parts.append(f"• **Jurisdiction:** {jurisdiction_str}")
else:
parts.append(f"• **Jurisdiction:** {jurisdiction}")
parties = classification.get("key_parties") or classification.get("parties")
if parties:
if isinstance(parties, dict):
# Format parties nicely
party_lines = []
for role, party_info in parties.items():
if isinstance(party_info, dict) and "name" in party_info:
party_lines.append(f" - {role.title()}: {party_info['name']}")
else:
party_lines.append(f" - {role.title()}: {party_info}")
if party_lines:
parts.append(f"• **Parties:**\n" + "\n".join(party_lines))
else:
parts.append(f"• **Parties:** {json.dumps(parties, indent=2)}")
else:
parts.append(f"• **Parties:** {parties}")
property_desc = (
classification.get("property_description_and_location")
or classification.get("property_description")
or classification.get("property")
)
if property_desc:
if isinstance(property_desc, dict):
# Format property description nicely
prop_lines = []
for key, value in property_desc.items():
if value and str(value).strip():
prop_lines.append(f" - {key.replace('_', ' ').title()}: {value}")
if prop_lines:
parts.append(f"• **Property:**\n" + "\n".join(prop_lines))
else:
parts.append(f"• **Property:** {json.dumps(property_desc, indent=2)}")
else:
parts.append(f"• **Property:** {property_desc}")
consideration = classification.get("consideration_amount") or classification.get("consideration")
if consideration:
parts.append(f"• **Consideration:** {consideration}")
special = classification.get("special_conditions_or_restrictions")
if special:
if isinstance(special, (dict, list)):
parts.append(f"• **Special Conditions:** {json.dumps(special, indent=2)}")
else:
parts.append(f"• **Special Conditions:** {special}")
return "📋 **Deed Classification:**\n\n" + "\n".join(parts)
def _safe_preview(text: str, limit: int = 200) -> str:
return text[:limit] + ("..." if len(text) > limit else "")
def format_clause_table(clause_data: Dict) -> List[List[Any]]:
"""Prepare clause breakdown as table rows in list format for Gradio DataFrame."""
print(f"🔍 DEBUG format_clause_table input: {clause_data}")
if not clause_data:
print("🚨 DEBUG: No clause data provided")
return []
if not clause_data.get("success"):
print(f"🚨 DEBUG: Clause data processing failed: {clause_data}")
return []
clauses = clause_data.get("clauses", [])
print(f"🔍 DEBUG: Found {len(clauses)} clauses")
rows = []
for i, clause in enumerate(clauses):
clause_id = clause.get("id", f"clause_{i+1}")
clause_type = clause.get("type", "General")
word_count = clause.get("word_count", 0)
clause_text = clause.get("text", "")
preview = clause_text # Show full text, no truncation
# Create row as list matching headers: ["id", "type", "words", "preview"]
row = [clause_id, clause_type, word_count, preview]
rows.append(row)
print(f"🔍 DEBUG: Clause {i+1}: {clause_id} ({clause_type}) - {word_count} words")
print(f"🔍 DEBUG format_clause_table output: {len(rows)} rows")
return rows
def _flatten_json(data: Any, parent_key: str = "") -> List[tuple]:
"""Flatten nested JSON into dotted keys."""
items: List[tuple] = []
if isinstance(data, dict):
for k, v in data.items():
new_key = f"{parent_key}.{k}" if parent_key else k
items.extend(_flatten_json(v, new_key))
elif isinstance(data, list):
for idx, v in enumerate(data):
new_key = f"{parent_key}[{idx}]" if parent_key else f"[{idx}]"
items.extend(_flatten_json(v, new_key))
else:
items.append((parent_key, data))
return items
def format_classification_table(classification_data: Dict) -> List[List[Any]]:
"""Prepare deed classification as table rows with readable nested formatting."""
if not classification_data:
return []
def _clean_json_response(response: str) -> str:
"""Clean JSON response similar to main.py function."""
cleaned = response.strip()
# Remove code fences if present
if cleaned.startswith("```"):
lines = cleaned.split("\n")
lines = lines[1:] # Remove first line
if lines and lines[-1].strip() == "```":
lines = lines[:-1] # Remove last line
cleaned = "\n".join(lines).strip()
# Look for JSON object boundaries
start_idx = cleaned.find("{")
end_idx = cleaned.rfind("}") + 1
if start_idx != -1 and end_idx > start_idx:
cleaned = cleaned[start_idx:end_idx]
return cleaned.strip()
# Handle the data structure from the LLM response
print(f"🔍 DEBUG format_classification_table input: {classification_data}")
# Extract classification from nested structure
if "classification" in classification_data:
classification = classification_data.get("classification", {})
else:
classification = classification_data
# Handle stringified JSON (including problematic formats)
if isinstance(classification, str):
try:
# Try multiple cleaning approaches
cleaned = _clean_json_response(classification)
classification = json.loads(cleaned)
print(f"🔍 DEBUG Successfully parsed JSON from string: {type(classification)}")
except Exception as e:
print(f"🚨 DEBUG JSON parsing failed: {e}")
# If it looks like it might be raw analysis text, don't treat as JSON
if "raw_analysis" in classification or len(classification) > 500:
classification = {"raw_analysis": classification}
else:
# Try to extract key info from the string
classification = _extract_basic_info_from_string(classification)
# Unwrap nested classification if present
if isinstance(classification, dict) and "classification" in classification:
classification = classification["classification"]
if not isinstance(classification, dict):
classification = {"value": str(classification)}
def _pretty(value: Any, indent: int = 0) -> str:
pad = " " * indent
if isinstance(value, dict):
lines = []
for k, v in value.items():
if isinstance(v, (dict, list)) and v: # Only show non-empty nested items
lines.append(f"{pad}{k.replace('_', ' ').title()}:")
lines.append(_pretty(v, indent + 1))
elif not isinstance(v, (dict, list)): # Show simple values
display_val = str(v) if v not in [None, "", "N/A"] else "N/A"
lines.append(f"{pad}{k.replace('_', ' ').title()}: {display_val}")
return "\n".join(lines)
elif isinstance(value, list):
if not value: # Empty list
return f"{pad}(None)"
lines = []
for idx, v in enumerate(value, 1):
if isinstance(v, (dict, list)):
lines.append(f"{pad}{idx}.")
lines.append(_pretty(v, indent + 1))
else:
lines.append(f"{pad}{idx}. {v}")
return "\n".join(lines)
return f"{pad}{value}"
rows: List[List[Any]] = []
for key, value in classification.items():
display_key = key.replace('_', ' ').title()
rows.append([display_key, _pretty(value)])
print(f"🔍 DEBUG format_classification_table output: {len(rows)} rows")
return rows
def _extract_basic_info_from_string(text: str) -> Dict[str, str]:
"""Extract basic info from problematic string responses."""
# Basic fallback - try to find key information
result = {}
# Try to extract deed type
if any(word in text.lower() for word in ["sale", "purchase", "buy"]):
result["deed_type"] = "sale"
elif "mortgage" in text.lower():
result["deed_type"] = "mortgage"
elif "lease" in text.lower():
result["deed_type"] = "lease"
else:
result["deed_type"] = "unknown"
# Try to find jurisdiction
if "bangladesh" in text.lower():
result["jurisdiction"] = "Bangladesh"
# If we can't extract much, show the raw text
if len(result) < 2:
result["raw_analysis"] = text[:500] + "..." if len(text) > 500 else text
return result
def _extract_risk_level(risk_text: str) -> str:
"""Extract risk level from risk analysis text using multiple patterns."""
print(f"📊 DEBUG _extract_risk_level input: {risk_text[:200]}...")
if not risk_text or not isinstance(risk_text, str):
print(f"📊 DEBUG Invalid input, returning UNKNOWN")
return "UNKNOWN"
text_upper = risk_text.upper()
# Try different patterns for risk levels
risk_patterns = [
# Direct matches
(r'\bRISK\s+LEVEL[:\s]+HIGH\b', 'HIGH'),
(r'\bRISK\s+LEVEL[:\s]+MEDIUM\b', 'MEDIUM'),
(r'\bRISK\s+LEVEL[:\s]+LOW\b', 'LOW'),
# Simple matches
(r'\bHIGH\s+RISK\b', 'HIGH'),
(r'\bMEDIUM\s+RISK\b', 'MEDIUM'),
(r'\bLOW\s+RISK\b', 'LOW'),
# Pattern: "Risk: HIGH" or "Risk Level: MEDIUM"
(r'\bRISK[:\s]+HIGH\b', 'HIGH'),
(r'\bRISK[:\s]+MEDIUM\b', 'MEDIUM'),
(r'\bRISK[:\s]+LOW\b', 'LOW'),
# Standalone mentions (fallback)
(r'\bHIGH\b', 'HIGH'),
(r'\bMEDIUM\b', 'MEDIUM'),
(r'\bLOW\b', 'LOW'),
]
import re
for pattern, level in risk_patterns:
if re.search(pattern, text_upper):
print(f"📊 DEBUG Found risk level '{level}' using pattern: {pattern}")
return level
print(f"📊 DEBUG No risk level found in text, returning UNKNOWN")
return "UNKNOWN"
def format_risk_table(risk_data: Dict) -> List[List[Any]]:
"""Prepare risk analysis table rows in list format for Gradio DataFrame."""
print(f"🔍 DEBUG format_risk_table input: {risk_data}")
if not risk_data:
print("🚨 DEBUG: No risk data provided")
return []
if not risk_data.get("success"):
print(f"🚨 DEBUG: Risk data processing failed: {risk_data}")
return []
clause_risks = risk_data.get("clause_risks", [])
print(f"🔍 DEBUG: Found {len(clause_risks)} clause risks")
rows = []
for i, risk in enumerate(clause_risks):
clause_id = risk.get("clause_id", f"clause_{i+1}")
clause_type = risk.get("clause_type", "General")
analysis = risk.get("risk_analysis", "")
risk_level = _extract_risk_level(analysis)
summary = analysis # Show full analysis text
print(f"📊 DEBUG Risk {i+1}: clause_id={clause_id}, clause_type={clause_type}, extracted_risk_level={risk_level}")
print(f"📊 DEBUG Risk {i+1}: analysis_preview={analysis[:200]}...")
# Create row as list matching headers: ["clause_id", "clause_type", "risk_level", "summary"]
row = [clause_id, clause_type, risk_level, summary]
rows.append(row)
print(f"🔍 DEBUG: Risk {i+1}: {clause_id} ({clause_type}) - {risk_level}")
print(f"🔍 DEBUG format_risk_table output: {len(rows)} rows")
return rows
def format_risk_overview(risk_data: Dict) -> str:
"""Format overall risk summary with validation."""
if not risk_data.get("success"):
return "❌ Risk analysis failed"
overall_summary = risk_data.get("overall_summary", "")
disclaimer = risk_data.get("disclaimer", "")
clause_risks = risk_data.get("clause_risks", [])
# Validate consistency between overall and individual risk levels
validation_warnings = _validate_risk_consistency(overall_summary, clause_risks)
parts = ["⚠️ **Risk Analysis Overview:**"]
# Add validation warnings if any
if validation_warnings:
parts.append("🔍 **Validation Notices:**")
parts.extend([f"• {warning}" for warning in validation_warnings])
parts.append("---")
if overall_summary:
parts.append(overall_summary)
if disclaimer:
parts.append(f"\n_{disclaimer}_")
return "\n\n".join(parts)
def _validate_risk_consistency(overall_summary: str, clause_risks: List[Dict]) -> List[str]:
"""Validate consistency between overall risk summary and individual clause risks."""
warnings = []
if not overall_summary or not clause_risks:
return warnings
# Extract overall risk level from summary
overall_risk_level = "UNKNOWN"
overall_upper = overall_summary.upper()
for level in ["HIGH", "MEDIUM", "LOW"]:
if f"OVERALL RISK LEVEL: {level}" in overall_upper or f"RISK LEVEL: {level}" in overall_upper:
overall_risk_level = level
break
# If not found in structured format, try fallback patterns
if overall_risk_level == "UNKNOWN":
overall_risk_patterns = [
(r'OVERALL.*RISK.*LEVEL.*HIGH', 'HIGH'),
(r'OVERALL.*RISK.*LEVEL.*MEDIUM', 'MEDIUM'),
(r'OVERALL.*RISK.*LEVEL.*LOW', 'LOW'),
(r'RISK.*LEVEL.*HIGH', 'HIGH'),
(r'RISK.*LEVEL.*MEDIUM', 'MEDIUM'),
(r'RISK.*LEVEL.*LOW', 'LOW'),
]
for pattern, level in overall_risk_patterns:
if re.search(pattern, overall_upper):
overall_risk_level = level
break
# Extract individual clause risk levels
individual_levels = []
for clause_risk in clause_risks:
analysis = clause_risk.get("risk_analysis", "")
extracted_level = _extract_risk_level(analysis)
if extracted_level != "UNKNOWN":
individual_levels.append(extracted_level)
print(f"📊 DEBUG Risk Validation - Overall: {overall_risk_level}, Individual: {individual_levels}")
# Check for consistency
if overall_risk_level != "UNKNOWN" and individual_levels:
# Check if overall level makes sense given individual levels
has_high = "HIGH" in individual_levels
has_medium = "MEDIUM" in individual_levels
if overall_risk_level == "LOW" and has_high:
warnings.append(f"Overall risk shows {overall_risk_level} but found HIGH risk clauses")
elif overall_risk_level == "LOW" and has_medium:
warnings.append(f"Overall risk shows {overall_risk_level} but found MEDIUM risk clauses")
elif overall_risk_level == "HIGH" and not has_high and not has_medium:
warnings.append(f"Overall risk shows {overall_risk_level} but no HIGH or MEDIUM risk clauses found")
# Check for missing individual risk classifications
unknown_count = sum(1 for clause_risk in clause_risks if _extract_risk_level(clause_risk.get("risk_analysis", "")) == "UNKNOWN")
if unknown_count > 0:
warnings.append(f"{unknown_count} clause(s) have unclear risk levels")
return warnings
def extract_metadata_fields(classification_data: Dict) -> Dict[str, str]:
"""Pull key metadata fields for sidebar display."""
# Debug print to see what we're getting
print(f"🔍 DEBUG extract_metadata_fields input: {classification_data}")
# The classification data comes nested under "classification" key
classification = classification_data.get("classification", {}) if classification_data else {}
# If the classification is stored as a string (JSON), parse it using the same logic as the table
if isinstance(classification, str):
try:
# Use the same JSON cleaning logic as format_classification_table
cleaned = _clean_json_response_metadata(classification)
classification = json.loads(cleaned)
print(f"🔍 DEBUG extract_metadata_fields: Successfully parsed JSON from string")
except Exception as e:
print(f"🔍 DEBUG extract_metadata_fields: JSON parsing failed: {e}")
# Try to extract from raw text
return _extract_from_raw_text(classification)
# If classification is still nested under another "classification" key (from LLM response)
if isinstance(classification, dict) and "classification" in classification:
classification = classification["classification"]
# Handle the case where JSON parsing failed and we only have raw_analysis
if isinstance(classification, dict) and "raw_analysis" in classification and len(classification) == 1:
# Try to extract metadata from the raw text analysis
raw_text = classification["raw_analysis"]
print(f"🔍 DEBUG: Extracting from raw_analysis: {raw_text[:200]}...")
return _extract_from_raw_text(raw_text)
# Extract deed type
deed_type = classification.get("deed_type") or classification.get("type") or "N/A"
# Extract jurisdiction - handle both dict and string formats
jurisdiction_value = classification.get("jurisdiction") or classification.get("jurisdiction_hint") or "N/A"
if isinstance(jurisdiction_value, dict):
# Format nested jurisdiction nicely
country = jurisdiction_value.get("country", "")
state = jurisdiction_value.get("state_province", "") or jurisdiction_value.get("state", "")
if country and state:
jurisdiction = f"{country}, {state}"
elif country:
jurisdiction = country
else:
jurisdiction = json.dumps(jurisdiction_value, indent=2)
elif isinstance(jurisdiction_value, list):
jurisdiction = json.dumps(jurisdiction_value, indent=2)
else:
jurisdiction = str(jurisdiction_value)
# Extract parties information
parties = classification.get("key_parties") or classification.get("parties") or {}
if isinstance(parties, dict) and parties:
# Format parties information nicely
parts = []
if "grantor" in parties:
grantor = parties["grantor"]
if isinstance(grantor, dict):
name = grantor.get("name", "")
if name:
parts.append(f"Grantor: {name}")
if "grantee" in parties:
grantee = parties["grantee"]
if isinstance(grantee, dict):
name = grantee.get("name", "")
if name:
parts.append(f"Grantee: {name}")
parties_str = "\n".join(parts) if parts else json.dumps(parties, indent=2)
else:
parties_str = "N/A"
# Extract property description
property_desc = (
classification.get("property_description_and_location")
or classification.get("property_description")
or classification.get("property")
or "N/A"
)
if isinstance(property_desc, dict):
# Format property info nicely
parts = []
if "district" in property_desc:
parts.append(f"District: {property_desc['district']}")
if "upazila_thana" in property_desc:
parts.append(f"Area: {property_desc['upazila_thana']}")
if "mouza" in property_desc:
parts.append(f"Mouza: {property_desc['mouza']}")
if "area" in property_desc:
parts.append(f"Size: {property_desc['area']}")
property_str = "\n".join(parts) if parts else json.dumps(property_desc, indent=2)
elif isinstance(property_desc, list):
property_str = json.dumps(property_desc, indent=2)
else:
property_str = str(property_desc)
# Extract consideration/price
consideration = classification.get("consideration_amount") or classification.get("consideration") or "N/A"
result = {
"deed_type": deed_type,
"jurisdiction": jurisdiction,
"parties": parties_str,
"property": property_str,
"consideration": str(consideration),
}
print(f"🔍 DEBUG extract_metadata_fields result: {result}")
return result
def _extract_from_raw_text(raw_text: str) -> Dict[str, str]:
"""Extract metadata from raw text analysis when JSON parsing fails."""
print(f"🔍 DEBUG: Attempting to extract from raw text: {raw_text[:300]}...")
# Initialize with default values
result = {
"deed_type": "N/A",
"jurisdiction": "N/A",
"parties": "N/A",
"property": "N/A",
"consideration": "N/A"
}
# Try to extract deed type
deed_type_patterns = [
r"deed\s+type[:\-\s]+([\w\s]+?)(?:\n|$|;|,)",
r"type\s+of\s+deed[:\-\s]+([\w\s]+?)(?:\n|$|;|,)",
r"this\s+is\s+a[n]?\s+([\w\s]+?)\s+deed",
r"(sale|mortgage|lease|gift|warranty|quitclaim)\s+deed"
]
for pattern in deed_type_patterns:
match = re.search(pattern, raw_text, re.IGNORECASE)
if match:
result["deed_type"] = match.group(1).strip().title()
break
# Try to extract jurisdiction
jurisdiction_patterns = [
r"jurisdiction[:\-\s]+([\w\s,]+?)(?:\n|$)",
r"state[:\-\s]+([\w\s,]+?)(?:\n|$)",
r"country[:\-\s]+([\w\s,]+?)(?:\n|$)",
r"location[:\-\s]+([\w\s,]+?)(?:\n|$)"
]
for pattern in jurisdiction_patterns:
match = re.search(pattern, raw_text, re.IGNORECASE)
if match:
result["jurisdiction"] = match.group(1).strip()
break
# Try to extract parties
parties_patterns = [
r"grantor[:\-\s]+([\w\s,]+?)(?:,\s*resident|$|\n)",
r"grantee[:\-\s]+([\w\s,]+?)(?:,\s*resident|$|\n)",
r"seller[:\-\s]+([\w\s,]+?)(?:,\s*resident|$|\n)",
r"buyer[:\-\s]+([\w\s,]+?)(?:,\s*resident|$|\n)"
]
parties_found = []
for pattern in parties_patterns:
matches = re.finditer(pattern, raw_text, re.IGNORECASE)
for match in matches:
# Extract role and name
full_match = match.group(0).strip()
name = match.group(1).strip()
role = full_match.split(':')[0].strip().title()
party_info = f"{role}: {name}"
if party_info not in parties_found and name:
parties_found.append(party_info)
if parties_found:
result["parties"] = "\n".join(parties_found)
# Try to extract property info
property_patterns = [
r"property[:\-\s]+([\w\s,]+?)(?:\n|$)",
r"district[:\-\s]+([\w\s]+?)(?:\n|$)",
r"area[:\-\s]+([\d\.\s\w]+?)(?:\n|$)"
]
property_found = []
for pattern in property_patterns:
matches = re.finditer(pattern, raw_text, re.IGNORECASE)
for match in matches:
prop_info = match.group(0).strip()
if prop_info not in property_found:
property_found.append(prop_info)
if property_found:
result["property"] = "\n".join(property_found)
# Try to extract consideration/amount
consideration_patterns = [
r"consideration[:\-\s]+([\d,\.\s\w]+?)(?:\n|$)",
r"amount[:\-\s]+([\d,\.\s\w]+?)(?:\n|$)",
r"price[:\-\s]+([\d,\.\s\w]+?)(?:\n|$)",
r"(\d+[,\d]*\s*(?:taka|dollars?|usd|€|£|\$))"
]
for pattern in consideration_patterns:
match = re.search(pattern, raw_text, re.IGNORECASE)
if match:
result["consideration"] = match.group(1).strip()
break
print(f"🔍 DEBUG: Extracted from raw text: {result}")
return result
def _clean_json_response_metadata(response: str) -> str:
"""Clean JSON response for metadata extraction (same as main.py logic)."""
cleaned = response.strip()
# Remove code fences if present
if cleaned.startswith("```"):
lines = cleaned.split("\n")
lines = lines[1:] # Remove first line
if lines and lines[-1].strip() == "```":
lines = lines[:-1] # Remove last line
cleaned = "\n".join(lines).strip()
# Look for JSON object boundaries
start_idx = cleaned.find("{")
end_idx = cleaned.rfind("}") + 1
if start_idx != -1 and end_idx > start_idx:
cleaned = cleaned[start_idx:end_idx]
return cleaned.strip()
def build_report_pdf(report: Dict[str, Any]) -> str:
"""Create a professional PDF report for download."""
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib.enums import TA_LEFT, TA_CENTER
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak
from reportlab.lib import colors
# Create temporary PDF file
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf")
pdf_path = tmp.name
tmp.close()
# Create PDF document
doc = SimpleDocTemplate(pdf_path, pagesize=letter,
rightMargin=72, leftMargin=72,
topMargin=72, bottomMargin=18)
# Container for the 'Flowable' objects
elements = []
# Define styles
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
textColor=colors.HexColor('#1f2937'),
spaceAfter=30,
alignment=TA_CENTER
)
heading_style = ParagraphStyle(
'CustomHeading',
parent=styles['Heading2'],
fontSize=16,
textColor=colors.HexColor('#374151'),
spaceAfter=12,
spaceBefore=12
)
normal_style = styles['BodyText']
# Title
elements.append(Paragraph("Legal Deed Analysis Report", title_style))
elements.append(Spacer(1, 0.2*inch))
# Disclaimer
disclaimer_text = """
<b>LEGAL DISCLAIMER:</b> This automated analysis is for informational purposes only
and does not constitute legal advice. Always consult with a qualified attorney licensed
in your jurisdiction before making decisions based on deed documents.
"""
elements.append(Paragraph(disclaimer_text, normal_style))
elements.append(Spacer(1, 0.3*inch))
# Deed Classification
elements.append(Paragraph("Deed Classification", heading_style))
classification_data = report.get("deed_classification", {})
if classification_data.get("success"):
classification = classification_data.get("classification", {})
# Extract key info
deed_type = classification.get("deed_type", "N/A")
jurisdiction = classification.get("jurisdiction", {})
if isinstance(jurisdiction, dict):
jurisdiction_str = f"{jurisdiction.get('country', 'N/A')}, {jurisdiction.get('state_province', 'N/A')}"
else:
jurisdiction_str = str(jurisdiction)
consideration = classification.get("consideration_amount", "N/A")
date_exec = classification.get("date_of_execution", "N/A")
# Create header style for bold headers
header_style = ParagraphStyle(
'TableHeader',
parent=normal_style,
fontName='Helvetica-Bold',
fontSize=11
)
classification_table_data = [
[Paragraph("Field", header_style), Paragraph("Value", header_style)],
["Deed Type", str(deed_type)],
["Jurisdiction", jurisdiction_str],
["Consideration", str(consideration)],
["Date of Execution", str(date_exec)]
]
classification_table = Table(classification_table_data, colWidths=[2*inch, 4*inch])
classification_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#e5e7eb')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('GRID', (0, 0), (-1, -1), 1, colors.grey),
('VALIGN', (0, 0), (-1, -1), 'TOP'),
]))
elements.append(classification_table)
else:
elements.append(Paragraph("Classification data not available", normal_style))
elements.append(Spacer(1, 0.3*inch))
# Risk Overview - Properly formatted
elements.append(Paragraph("Risk Analysis Overview", heading_style))
risk_data = report.get("risk_analysis", {})
if risk_data.get("success"):
overall_summary = risk_data.get("overall_summary", "No summary available")
# Parse and format the structured risk overview
lines = overall_summary.split('\n')
# Create styles for different sections
section_style = ParagraphStyle(
'SectionHeading',
parent=styles['Heading3'],
fontSize=12,
textColor=colors.HexColor('#374151'),
spaceAfter=6,
spaceBefore=10,
fontName='Helvetica-Bold'
)
bullet_style = ParagraphStyle(
'BulletText',
parent=normal_style,
fontSize=10,
leftIndent=20,
bulletIndent=10,
spaceAfter=4
)
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip empty lines
if not line:
i += 1
continue
# Check for structured sections
if line.startswith("OVERALL RISK LEVEL:"):
# Extract and highlight risk level
risk_level_text = line.replace("OVERALL RISK LEVEL:", "").strip()
if "HIGH" in risk_level_text.upper():
risk_color = colors.red
elif "MEDIUM" in risk_level_text.upper():
risk_color = colors.orange
else:
risk_color = colors.green
elements.append(Paragraph(
f"<b>OVERALL RISK LEVEL:</b> <font color='{risk_color.hexval()}'><b>{risk_level_text}</b></font>",
section_style
))
elif line.startswith("KEY FINDINGS:"):
elements.append(Paragraph("<b>Key Findings:</b>", section_style))
# Collect bullet points
i += 1
while i < len(lines) and lines[i].strip().startswith('-'):
bullet_text = lines[i].strip()[1:].strip() # Remove '-' and whitespace
elements.append(Paragraph(f"• {bullet_text}", bullet_style))
i += 1
i -= 1 # Back up one since we'll increment at the end of loop
elif line.startswith("RISK CATEGORIES FOUND:"):
categories = line.replace("RISK CATEGORIES FOUND:", "").strip()
elements.append(Paragraph(f"<b>Risk Categories Found:</b> {categories}", section_style))
elif line.startswith("RECOMMENDATIONS:"):
elements.append(Paragraph("<b>Recommendations:</b>", section_style))
# Collect bullet points
i += 1
while i < len(lines) and lines[i].strip().startswith('-'):
bullet_text = lines[i].strip()[1:].strip()
elements.append(Paragraph(f"• {bullet_text}", bullet_style))
i += 1
i -= 1
elif line.startswith("DISCLAIMER:"):
disclaimer_text = line.replace("DISCLAIMER:", "").strip()
# Collect any continuation lines
full_disclaimer = [disclaimer_text]
i += 1
while i < len(lines) and lines[i].strip() and not any(lines[i].strip().startswith(s) for s in ["OVERALL", "KEY", "RISK", "RECOMMENDATIONS"]):
full_disclaimer.append(lines[i].strip())
i += 1
i -= 1
disclaimer_style = ParagraphStyle(
'Disclaimer',
parent=normal_style,
fontSize=9,
textColor=colors.HexColor('#6b7280'),
fontName='Helvetica-Oblique',
spaceAfter=10,
spaceBefore=10
)
elements.append(Paragraph(f"<b>Disclaimer:</b> {' '.join(full_disclaimer)}", disclaimer_style))
else:
# Regular paragraph
if line:
elements.append(Paragraph(line, normal_style))
i += 1
else:
elements.append(Paragraph("Risk analysis not available", normal_style))
elements.append(Spacer(1, 0.3*inch))
# Clause Breakdown (first 15)
elements.append(Paragraph("Clause Breakdown (Summary)", heading_style))
clauses = report.get("clause_breakdown", {}).get("clauses", [])
if clauses:
# Create small font style for table cells
small_style = ParagraphStyle(
'SmallText',
parent=normal_style,
fontSize=8,
leading=10
)
clause_table_data = [[Paragraph("<b>ID</b>", normal_style),
Paragraph("<b>Type</b>", normal_style),
Paragraph("<b>Preview</b>", normal_style)]]
for clause in clauses[:15]:
clause_id = clause.get('id', '')
clause_type = clause.get('type', 'General')
clause_text = clause.get('text', '') # NO TRUNCATION - full text
# Use Paragraph objects for proper wrapping
clause_table_data.append([
Paragraph(clause_id, small_style),
Paragraph(clause_type, small_style),
Paragraph(clause_text, small_style) # Full text with wrapping
])
clause_table = Table(clause_table_data, colWidths=[0.6*inch, 1.2*inch, 4.2*inch])
clause_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#e5e7eb')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('VALIGN', (0, 0), (-1, -1), 'TOP'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 8),
('TOPPADDING', (0, 1), (-1, -1), 6),
('BOTTOMPADDING', (0, 1), (-1, -1), 6),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('WORDWRAP', (0, 0), (-1, -1), True), # Enable word wrapping
]))
elements.append(clause_table)
else:
elements.append(Paragraph("No clauses detected", normal_style))
elements.append(Spacer(1, 0.3*inch))
# Page break before detailed risk analysis
elements.append(PageBreak())
# Detailed Risk Analysis
elements.append(Paragraph("Detailed Risk Analysis by Clause", heading_style))
clause_risks = risk_data.get("clause_risks", [])
if clause_risks:
for idx, risk in enumerate(clause_risks[:10], 1): # Limit to first 10
clause_id = risk.get("clause_id", "Unknown")
clause_type = risk.get("clause_type", "General")
risk_analysis = risk.get("risk_analysis", "No analysis available")
# Extract risk level
risk_level = "UNKNOWN"
if "RISK LEVEL: HIGH" in risk_analysis.upper():
risk_level = "HIGH"
risk_color = colors.red
elif "RISK LEVEL: MEDIUM" in risk_analysis.upper():
risk_level = "MEDIUM"
risk_color = colors.orange
elif "RISK LEVEL: LOW" in risk_analysis.upper():
risk_level = "LOW"
risk_color = colors.green
else:
risk_color = colors.grey
# Create risk header
risk_header = f"<b>Clause {clause_id}</b> ({clause_type}) - <font color='{risk_color.hexval()}'>Risk: {risk_level}</font>"
elements.append(Paragraph(risk_header, normal_style))
elements.append(Spacer(1, 0.1*inch))
# Add full risk analysis with proper text wrapping (no truncation)
# Split into sections for better formatting
sections = risk_analysis.split('\n')
for section in sections[:10]: # Limit to first 10 sections to prevent overly long output
if section.strip():
# Use Paragraphs for proper text wrapping
elements.append(Paragraph(section.strip(), normal_style))
elements.append(Spacer(1, 0.3*inch))
else:
elements.append(Paragraph("No detailed risk analysis available", normal_style))
# Build PDF
doc.build(elements)
return pdf_path
@app.post("/analyze-deed", response_model=Dict[str, Any])
async def analyze_deed_endpoint(file: UploadFile = File(...)):
"""FastAPI endpoint for deed analysis."""
try:
# Validate file type
if not file.content_type or not file.content_type == "application/pdf":
raise HTTPException(status_code=400, detail="Only PDF files are supported")
# Read file content
content = await file.read()
# Save temporarily and process
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
tmp_file.write(content)
tmp_path = tmp_file.name
try:
# Generate comprehensive report
report_result = await generate_comprehensive_deed_report(tmp_path)
report_data = json.loads(report_result)
return {
"success": report_data.get("success", False),
"filename": file.filename,
"file_size": len(content),
"report": report_data
}
finally:
os.unlink(tmp_path)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
# Create Gradio interface (Gradio 6 compatible)
# Note: Theme will be applied in the .launch() method below
with gr.Blocks() as gradio_app:
# Header
gr.Markdown("# ⚖️ Legal Deed Review System")
gr.Markdown("Upload a PDF deed document to receive comprehensive legal risk analysis.")
# Legal disclaimer with custom styling using HTML - formatted with checkmarks
gr.HTML("""
<div style="
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border: 2px solid #5a67d8;
border-radius: 10px;
padding: 20px;
margin: 20px 0;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.3);
">
<h3 style="color: #fff; margin-top: 0; margin-bottom: 15px; display: flex; align-items: center;">
⚖️ LEGAL DISCLAIMER
</h3>
<div style="color: #e6e6ff; line-height: 1.8; font-size: 14px;">
<p style="margin-bottom: 12px; font-weight: 500;">This is an automated analysis tool for informational purposes only.</p>
<ul style="list-style: none; padding-left: 0; margin: 10px 0;">
<li style="margin-bottom: 8px;">✅ This does NOT constitute legal advice</li>
<li style="margin-bottom: 8px;">✅ This does NOT replace consultation with a qualified attorney</li>
<li style="margin-bottom: 8px;">✅ This analysis may NOT identify all potential legal issues</li>
<li style="margin-bottom: 8px;">✅ Always have deeds reviewed by a licensed attorney before taking action</li>
<li style="margin-bottom: 8px;">✅ Consult local legal professionals familiar with your jurisdiction</li>
</ul>
<p style="margin-top: 15px; font-weight: 500;">By using this tool, you acknowledge these limitations.</p>
</div>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## 📄 Upload & Stats")
# Sample deed section
gr.Markdown("### 💡 Try with a Sample")
with gr.Row():
sample_download = gr.File(
value="usa_general_warranty_deed_sample.pdf",
label="📥 Download Sample Deed",
visible=True
)
load_sample_btn = gr.Button(
"🔗 Load Sample",
variant="secondary",
size="sm"
)
gr.Markdown("""
**Quick start options:**
1. **Easy**: Click "🔗 Load Sample" to auto-load the sample deed
2. **Manual**: Download sample above, then upload below
3. **Your own**: Upload your PDF deed document
""")
pdf_input = gr.File(
label="Upload PDF Deed",
file_types=[".pdf"],
type="binary"
)
analyze_button = gr.Button(
"🔍 Analyze Deed",
variant="primary",
size="lg",
elem_id="analyze-btn"
)
# Add custom CSS for purple button
gr.HTML("""
<style>
#analyze-btn {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
border: none !important;
color: white !important;
font-weight: 600 !important;
box-shadow: 0 4px 6px rgba(102, 126, 234, 0.4) !important;
transition: all 0.3s ease !important;
}
#analyze-btn:hover {
background: linear-gradient(135deg, #5a67d8 0%, #6a3f8f 100%) !important;
box-shadow: 0 6px 12px rgba(102, 126, 234, 0.6) !important;
transform: translateY(-2px) !important;
}
</style>
""")
# Add CSS for purple tab indicators
gr.HTML("""
<style>
/* Force blue color for all tab interactions */
.tabs > button.selected,
.tab-nav > button.selected,
button[role="tab"][aria-selected="true"],
button[role="tab"].selected {
color: #667eea !important;
border-bottom-color: #667eea !important;
}
.tabs > button:hover,
.tab-nav > button:hover,
button[role="tab"]:hover {
color: #667eea !important;
border-bottom-color: #667eea !important;
}
/* Target the specific orange underline if present */
.tabs > button.selected::after,
button[role="tab"][aria-selected="true"]::after {
background-color: #667eea !important;
}
/* Custom Loading Animation - Blue Pulse */
.generating {
border-color: #667eea !important;
}
/* Override default orange spinner/loader */
.loader {
--loader-color: #667eea !important;
border-top-color: #667eea !important;
border-left-color: #667eea !important;
}
/* Add a subtle blue glow to active processing elements */
.generating::before {
background: linear-gradient(90deg, transparent, rgba(102, 126, 234, 0.2), transparent) !important;
}
</style>
""")
gr.Markdown("### 📊 Quick Stats")
stats_display = gr.Markdown(
value="Upload a deed to see document statistics...",
elem_id="stats"
)
gr.Markdown("### 🧭 Deed Metadata")
deed_type_box = gr.Textbox(label="Deed Type", interactive=False)
jurisdiction_box = gr.Textbox(label="Jurisdiction", interactive=False)
consideration_box = gr.Textbox(label="Consideration / Price", interactive=False)
parties_box = gr.Textbox(label="Parties", lines=6, interactive=False)
property_box = gr.Textbox(label="Property Description", lines=4, interactive=False)
with gr.Column(scale=2):
# Agent Internal Monologue (Terminal View)
agent_monologue = gr.Code(
label=" Reasoning Trace ",
language="shell",
interactive=False,
elem_id="agent-terminal",
lines=12,
value="⚡ Waiting for document upload......"
)
gr.Markdown("## 📋 Analysis Results")
with gr.Tabs():
with gr.TabItem("📝 Overview"):
classification_output = gr.DataFrame(
headers=["field", "value"],
label="Deed Classification (Table)",
interactive=False,
datatype=["str", "str"],
column_count=(2, "fixed"),
row_count=(0, "dynamic")
)
risk_overview_output = gr.Markdown(
value="Risk overview will appear here after analysis.",
label="Risk Overview"
)
report_file = gr.File(label="Download Report", interactive=False)
with gr.TabItem("✂️ Clause Breakdown"):
clause_table = gr.DataFrame(
headers=["id", "type", "words", "preview"],
label="Clauses",
interactive=False,
wrap=True,
column_widths=["10%", "20%", "10%", "60%"] # ← Constrain preview to 60% width
)
with gr.TabItem("⚠️ Risk Analysis"):
risk_table = gr.DataFrame(
headers=["clause_id", "clause_type", "risk_level", "summary"],
label="Clause Risks",
interactive=False,
wrap=True,
column_widths=["10%", "20%", "10%", "60%"]
)
with gr.TabItem("📄 Extracted Text"):
text_output = gr.Textbox(
value="Upload and analyze a deed to see extracted text...",
label="OCR Text Extraction",
lines=30,
max_lines=None, # No limit - show full text
interactive=False,
autoscroll=False # Prevent auto-scrolling to bottom
)
with gr.TabItem("🗃️ Raw JSON"):
json_output = gr.JSON(
label="Full Response",
value=None
)
# Usage instructions
gr.Markdown("""
## 🔧 How to Use
1. **Upload** a PDF deed document using the file uploader
2. **Click** the "Analyze Deed" button to start processing
3. **Review** the results in the tabs:
- **Classification:** Deed type, parties, and key information
- **Clause Breakdown:** Identified legal clauses and sections
- **Risk Analysis:** Potential legal risks and recommendations
- **Extracted Text:** Raw text extracted from the PDF
4. **Consult** a qualified attorney for legal advice based on the analysis
### ⚡ Processing Time
- Analysis typically takes 30-60 seconds or more depending on document complexity
- Multi-page deeds may take longer for OCR processing
### 📋 Supported Documents
- Property sale deeds
- Mortgage deeds
- Lease agreements
- Gift deeds
- Warranty deeds
- Quitclaim deeds
""")
# Event handlers
def update_stats(pdf_file):
"""Update quick stats display."""
if pdf_file is None:
return "No document uploaded"
try:
# Handle different Gradio file input formats
if hasattr(pdf_file, 'read') and hasattr(pdf_file, 'seek'):
# File-like object
file_size = len(pdf_file.read())
pdf_file.seek(0) # Reset file pointer
elif isinstance(pdf_file, str):
# File path string
import os
file_size = os.path.getsize(pdf_file)
elif isinstance(pdf_file, bytes):
# Raw bytes
file_size = len(pdf_file)
else:
return f"📊 **Document Stats:**\n• File type: {type(pdf_file).__name__}\n• Status: Ready for analysis"
return f"""📊 **Document Stats:**
• File size: {file_size:,} bytes
• Status: Ready for analysis
• Click 'Analyze Deed' to start processing"""
except:
return "Error reading document information"
def _write_report_file(report_data: Dict[str, Any]) -> Optional[str]:
"""Generate and return PDF report file path."""
try:
return build_report_pdf(report_data)
except Exception as e:
print(f"Error generating PDF report: {e}")
return None
async def analyze_deed_gradio(pdf_file):
"""Main analysis function for Gradio interface - simplified non-streaming version."""
import time
# Initialize empty return values
empty_table: List[Dict[str, Any]] = []
empty_text = "No data"
# Ensure we have exactly 13 return values for error/empty states
def get_error_return(log_msg, error_detail):
return (
log_msg, # 1. Log
empty_table, # 2. Classification Table
error_detail, # 3. Risk Overview
empty_table, # 4. Clause Table
empty_table, # 5. Risk Table
error_detail, # 6. Text Output
{}, # 7. JSON Output
"❌ Error", # 8. Deed Type
"❌ Error", # 9. Jurisdiction
"❌ Error", # 10. Parties
"❌ Error", # 11. Property
"❌ Error", # 12. Consideration
None # 13. PDF Report
)
if pdf_file is None:
return get_error_return("❌ ERROR: No file uploaded.", "❌ No file uploaded")
try:
# Save uploaded file to temp location
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
if hasattr(pdf_file, 'read'):
tmp_file.write(pdf_file.read())
else:
tmp_file.write(pdf_file)
pdf_path = tmp_file.name
# --- STEP 1: Text Extraction ---
print("🔍 Extracting text from PDF...")
# Await the async function
text_json = await extract_text_from_deed_pdf(pdf_path)
# Parse JSON response
text_result = json.loads(text_json)
deed_text = text_result.get("text", "")
if not deed_text:
raise ValueError("Failed to extract text from PDF")
# --- STEP 2: Classification ---
print("🔍 Classifying deed...")
# Await the async function
classification_json = await classify_deed_type(deed_text)
classification_data = json.loads(classification_json)
# Extract metadata for display
metadata = extract_metadata_fields(classification_data)
# --- STEP 3: Clause Breakdown ---
print("🔍 Splitting into clauses...")
# Await the async function
clauses_json = await split_deed_into_clauses(deed_text)
clauses_data = json.loads(clauses_json)
# --- STEP 4: Risk Analysis ---
print("🔍 Analyzing risks...")
# Await the async function
# FIX: Pass clauses_json as first arg, and classification_json as second arg
risks_json = await analyze_deed_risks(clauses_json, classification_json)
risks_data = json.loads(risks_json)
# Build report data
report_data = {
"deed_classification": classification_data,
"clause_breakdown": clauses_data,
"risk_analysis": risks_data,
"text_preview": deed_text,
"report_metadata": {
"generated_at": time.time(),
"analysis_steps": ["text_extraction", "classification", "risk_analysis"],
"processing_method": "agentic_flow"
}
}
# Generate PDF report
report_path = _write_report_file(report_data)
# Truncate text if too large
display_text = deed_text if len(deed_text) < 50000 else deed_text[:50000] + "\n\n... (Text truncated for display. Full text available in PDF report)"
# Build final log
final_log = f"""✅ Analysis Complete!
● Step 1: Text Extraction ✓
● Step 2: Classification ✓
● Step 3: Clause Breakdown ✓
● Step 4: Risk Analysis ✓
📊 Results: {len(clauses_data.get('clauses', []))} clauses analyzed
⚖️ Overall Risk Level: {risks_data.get('overall_summary', 'N/A')[:50]}..."""
# Return final results (13 items)
return (
final_log, # 1
format_classification_table(classification_data), # 2
format_risk_overview(risks_data), # 3
format_clause_table(clauses_data), # 4
format_risk_table(risks_data), # 5
display_text, # 6
report_data, # 7
metadata["deed_type"], # 8
metadata["jurisdiction"], # 9
metadata["parties"], # 10
metadata["property"], # 11
metadata["consideration"], # 12
report_path # 13
)
except Exception as e:
print(f"Error in analysis: {e}")
return get_error_return(f"❌ SYSTEM ERROR: {str(e)}", f"❌ Analysis failed: {str(e)}")
def load_sample_deed():
"""Load the sample deed file into the file input."""
import os
sample_path = "usa_general_warranty_deed_sample.pdf"
if os.path.exists(sample_path):
return sample_path
else:
return None
# Connect event handlers
pdf_input.change(
fn=update_stats,
inputs=[pdf_input],
outputs=[stats_display]
)
analyze_button.click(
fn=analyze_deed_gradio,
inputs=[pdf_input],
outputs=[
agent_monologue,
classification_output,
risk_overview_output,
clause_table,
risk_table,
text_output,
json_output,
deed_type_box,
jurisdiction_box,
parties_box,
property_box,
consideration_box,
report_file
]
)
load_sample_btn.click(
fn=load_sample_deed,
outputs=[pdf_input]
)
# For Hugging Face Spaces: Keep gradio_app as-is for direct deployment
# For local FastAPI: Create a separate mounted app
fastapi_app = gr.mount_gradio_app(app, gradio_app, path="/")
if __name__ == "__main__":
import uvicorn
print("🏛️ Starting Legal Deed Review Web Application...")
print("📍 Server will be available at: http://localhost:8002")
print("🔧 API endpoint: http://localhost:8002/analyze-deed")
print("🌐 Gradio interface: http://localhost:8002/")
print("\n⚖️ Legal Notice: This tool provides analysis only, not legal advice.")
print("✋ Press Ctrl+C to stop the server")
uvicorn.run(fastapi_app, host="0.0.0.0", port=8002, reload=True)