|
|
""" |
|
|
FastAPI web application with Gradio interface for Legal Deed Review. |
|
|
""" |
|
|
import asyncio |
|
|
import json |
|
|
import os |
|
|
import re |
|
|
import tempfile |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import Dict, Any, Optional, List |
|
|
|
|
|
import gradio as gr |
|
|
from dotenv import load_dotenv |
|
|
from fastapi import FastAPI, File, UploadFile, HTTPException |
|
|
|
|
|
|
|
|
from main import ( |
|
|
extract_text_from_deed_pdf, |
|
|
split_deed_into_clauses, |
|
|
classify_deed_type, |
|
|
analyze_deed_risks, |
|
|
generate_comprehensive_deed_report |
|
|
) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
app = FastAPI( |
|
|
title="Legal Deed Review System", |
|
|
description="Upload PDF deed documents for comprehensive legal risk analysis" |
|
|
) |
|
|
|
|
|
|
|
|
LEGAL_DISCLAIMER = """ |
|
|
⚖️ **LEGAL DISCLAIMER** |
|
|
|
|
|
**This is an automated analysis tool for informational purposes only.** |
|
|
|
|
|
- ❌ This does NOT constitute legal advice |
|
|
- ❌ This does NOT replace consultation with a qualified attorney |
|
|
- ❌ This analysis may NOT identify all potential legal issues |
|
|
- ✅ Always have deeds reviewed by a licensed attorney before taking action |
|
|
- ✅ Consult local legal professionals familiar with your jurisdiction |
|
|
|
|
|
**By using this tool, you acknowledge these limitations.** |
|
|
""" |
|
|
|
|
|
RISK_LEVEL_COLORS = { |
|
|
"LOW": "🟢", |
|
|
"MEDIUM": "🟡", |
|
|
"HIGH": "🔴" |
|
|
} |
|
|
|
|
|
|
|
|
async def process_deed_pdf(pdf_file) -> Dict[str, Any]: |
|
|
"""Process a PDF deed file and return the raw report data.""" |
|
|
if pdf_file is None: |
|
|
return {"error": "Please upload a PDF deed document first."} |
|
|
|
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: |
|
|
if hasattr(pdf_file, 'read'): |
|
|
|
|
|
tmp_file.write(pdf_file.read()) |
|
|
elif isinstance(pdf_file, str): |
|
|
|
|
|
with open(pdf_file, 'rb') as f: |
|
|
tmp_file.write(f.read()) |
|
|
elif isinstance(pdf_file, bytes): |
|
|
|
|
|
tmp_file.write(pdf_file) |
|
|
else: |
|
|
return {"error": f"Unsupported file format: {type(pdf_file)}"} |
|
|
|
|
|
tmp_path = tmp_file.name |
|
|
|
|
|
|
|
|
report_result = await generate_comprehensive_deed_report(tmp_path) |
|
|
report_data = json.loads(report_result) |
|
|
|
|
|
|
|
|
os.unlink(tmp_path) |
|
|
|
|
|
if not report_data.get("success"): |
|
|
error_msg = f"Analysis failed: {report_data.get('error', 'Unknown error')}" |
|
|
return {"error": error_msg} |
|
|
|
|
|
return report_data |
|
|
|
|
|
except Exception as e: |
|
|
return {"error": f"Error processing deed: {str(e)}"} |
|
|
|
|
|
|
|
|
def format_classification_display(classification_data: Dict) -> str: |
|
|
"""Format deed classification for display.""" |
|
|
if not classification_data or not classification_data.get("success"): |
|
|
return "❌ Classification failed" |
|
|
|
|
|
classification = classification_data.get("classification", {}) |
|
|
if isinstance(classification, str): |
|
|
try: |
|
|
classification = json.loads(classification) |
|
|
except Exception: |
|
|
classification = {} |
|
|
|
|
|
|
|
|
if "classification" in classification: |
|
|
classification = classification["classification"] |
|
|
|
|
|
if "raw_analysis" in classification: |
|
|
return f"📋 **Deed Classification (LLM):**\n\n{classification['raw_analysis']}" |
|
|
|
|
|
parts = [] |
|
|
deed_type = classification.get("deed_type") or classification.get("type") |
|
|
if deed_type: |
|
|
parts.append(f"• **Deed Type:** {deed_type}") |
|
|
|
|
|
jurisdiction = classification.get("jurisdiction") or classification.get("jurisdiction_hint") |
|
|
if jurisdiction: |
|
|
if isinstance(jurisdiction, dict): |
|
|
|
|
|
country = jurisdiction.get("country", "") |
|
|
state = jurisdiction.get("state_province", "") or jurisdiction.get("state", "") |
|
|
if country and state: |
|
|
jurisdiction_str = f"{country}, {state}" |
|
|
elif country: |
|
|
jurisdiction_str = country |
|
|
else: |
|
|
jurisdiction_str = json.dumps(jurisdiction, indent=2) |
|
|
parts.append(f"• **Jurisdiction:** {jurisdiction_str}") |
|
|
else: |
|
|
parts.append(f"• **Jurisdiction:** {jurisdiction}") |
|
|
|
|
|
parties = classification.get("key_parties") or classification.get("parties") |
|
|
if parties: |
|
|
if isinstance(parties, dict): |
|
|
|
|
|
party_lines = [] |
|
|
for role, party_info in parties.items(): |
|
|
if isinstance(party_info, dict) and "name" in party_info: |
|
|
party_lines.append(f" - {role.title()}: {party_info['name']}") |
|
|
else: |
|
|
party_lines.append(f" - {role.title()}: {party_info}") |
|
|
if party_lines: |
|
|
parts.append(f"• **Parties:**\n" + "\n".join(party_lines)) |
|
|
else: |
|
|
parts.append(f"• **Parties:** {json.dumps(parties, indent=2)}") |
|
|
else: |
|
|
parts.append(f"• **Parties:** {parties}") |
|
|
|
|
|
property_desc = ( |
|
|
classification.get("property_description_and_location") |
|
|
or classification.get("property_description") |
|
|
or classification.get("property") |
|
|
) |
|
|
if property_desc: |
|
|
if isinstance(property_desc, dict): |
|
|
|
|
|
prop_lines = [] |
|
|
for key, value in property_desc.items(): |
|
|
if value and str(value).strip(): |
|
|
prop_lines.append(f" - {key.replace('_', ' ').title()}: {value}") |
|
|
if prop_lines: |
|
|
parts.append(f"• **Property:**\n" + "\n".join(prop_lines)) |
|
|
else: |
|
|
parts.append(f"• **Property:** {json.dumps(property_desc, indent=2)}") |
|
|
else: |
|
|
parts.append(f"• **Property:** {property_desc}") |
|
|
|
|
|
consideration = classification.get("consideration_amount") or classification.get("consideration") |
|
|
if consideration: |
|
|
parts.append(f"• **Consideration:** {consideration}") |
|
|
|
|
|
special = classification.get("special_conditions_or_restrictions") |
|
|
if special: |
|
|
if isinstance(special, (dict, list)): |
|
|
parts.append(f"• **Special Conditions:** {json.dumps(special, indent=2)}") |
|
|
else: |
|
|
parts.append(f"• **Special Conditions:** {special}") |
|
|
|
|
|
return "📋 **Deed Classification:**\n\n" + "\n".join(parts) |
|
|
|
|
|
|
|
|
def _safe_preview(text: str, limit: int = 200) -> str: |
|
|
return text[:limit] + ("..." if len(text) > limit else "") |
|
|
|
|
|
|
|
|
def format_clause_table(clause_data: Dict) -> List[List[Any]]: |
|
|
"""Prepare clause breakdown as table rows in list format for Gradio DataFrame.""" |
|
|
print(f"🔍 DEBUG format_clause_table input: {clause_data}") |
|
|
|
|
|
if not clause_data: |
|
|
print("🚨 DEBUG: No clause data provided") |
|
|
return [] |
|
|
|
|
|
if not clause_data.get("success"): |
|
|
print(f"🚨 DEBUG: Clause data processing failed: {clause_data}") |
|
|
return [] |
|
|
|
|
|
clauses = clause_data.get("clauses", []) |
|
|
print(f"🔍 DEBUG: Found {len(clauses)} clauses") |
|
|
|
|
|
rows = [] |
|
|
for i, clause in enumerate(clauses): |
|
|
clause_id = clause.get("id", f"clause_{i+1}") |
|
|
clause_type = clause.get("type", "General") |
|
|
word_count = clause.get("word_count", 0) |
|
|
clause_text = clause.get("text", "") |
|
|
preview = clause_text |
|
|
|
|
|
|
|
|
row = [clause_id, clause_type, word_count, preview] |
|
|
rows.append(row) |
|
|
print(f"🔍 DEBUG: Clause {i+1}: {clause_id} ({clause_type}) - {word_count} words") |
|
|
|
|
|
print(f"🔍 DEBUG format_clause_table output: {len(rows)} rows") |
|
|
return rows |
|
|
|
|
|
|
|
|
def _flatten_json(data: Any, parent_key: str = "") -> List[tuple]: |
|
|
"""Flatten nested JSON into dotted keys.""" |
|
|
items: List[tuple] = [] |
|
|
if isinstance(data, dict): |
|
|
for k, v in data.items(): |
|
|
new_key = f"{parent_key}.{k}" if parent_key else k |
|
|
items.extend(_flatten_json(v, new_key)) |
|
|
elif isinstance(data, list): |
|
|
for idx, v in enumerate(data): |
|
|
new_key = f"{parent_key}[{idx}]" if parent_key else f"[{idx}]" |
|
|
items.extend(_flatten_json(v, new_key)) |
|
|
else: |
|
|
items.append((parent_key, data)) |
|
|
return items |
|
|
|
|
|
|
|
|
def format_classification_table(classification_data: Dict) -> List[List[Any]]: |
|
|
"""Prepare deed classification as table rows with readable nested formatting.""" |
|
|
if not classification_data: |
|
|
return [] |
|
|
|
|
|
|
|
|
def _clean_json_response(response: str) -> str: |
|
|
"""Clean JSON response similar to main.py function.""" |
|
|
cleaned = response.strip() |
|
|
|
|
|
|
|
|
if cleaned.startswith("```"): |
|
|
lines = cleaned.split("\n") |
|
|
lines = lines[1:] |
|
|
if lines and lines[-1].strip() == "```": |
|
|
lines = lines[:-1] |
|
|
cleaned = "\n".join(lines).strip() |
|
|
|
|
|
|
|
|
start_idx = cleaned.find("{") |
|
|
end_idx = cleaned.rfind("}") + 1 |
|
|
|
|
|
if start_idx != -1 and end_idx > start_idx: |
|
|
cleaned = cleaned[start_idx:end_idx] |
|
|
|
|
|
return cleaned.strip() |
|
|
|
|
|
|
|
|
print(f"🔍 DEBUG format_classification_table input: {classification_data}") |
|
|
|
|
|
|
|
|
if "classification" in classification_data: |
|
|
classification = classification_data.get("classification", {}) |
|
|
else: |
|
|
classification = classification_data |
|
|
|
|
|
|
|
|
if isinstance(classification, str): |
|
|
try: |
|
|
|
|
|
cleaned = _clean_json_response(classification) |
|
|
classification = json.loads(cleaned) |
|
|
print(f"🔍 DEBUG Successfully parsed JSON from string: {type(classification)}") |
|
|
except Exception as e: |
|
|
print(f"🚨 DEBUG JSON parsing failed: {e}") |
|
|
|
|
|
if "raw_analysis" in classification or len(classification) > 500: |
|
|
classification = {"raw_analysis": classification} |
|
|
else: |
|
|
|
|
|
classification = _extract_basic_info_from_string(classification) |
|
|
|
|
|
|
|
|
if isinstance(classification, dict) and "classification" in classification: |
|
|
classification = classification["classification"] |
|
|
|
|
|
if not isinstance(classification, dict): |
|
|
classification = {"value": str(classification)} |
|
|
|
|
|
def _pretty(value: Any, indent: int = 0) -> str: |
|
|
pad = " " * indent |
|
|
if isinstance(value, dict): |
|
|
lines = [] |
|
|
for k, v in value.items(): |
|
|
if isinstance(v, (dict, list)) and v: |
|
|
lines.append(f"{pad}{k.replace('_', ' ').title()}:") |
|
|
lines.append(_pretty(v, indent + 1)) |
|
|
elif not isinstance(v, (dict, list)): |
|
|
display_val = str(v) if v not in [None, "", "N/A"] else "N/A" |
|
|
lines.append(f"{pad}{k.replace('_', ' ').title()}: {display_val}") |
|
|
return "\n".join(lines) |
|
|
elif isinstance(value, list): |
|
|
if not value: |
|
|
return f"{pad}(None)" |
|
|
lines = [] |
|
|
for idx, v in enumerate(value, 1): |
|
|
if isinstance(v, (dict, list)): |
|
|
lines.append(f"{pad}{idx}.") |
|
|
lines.append(_pretty(v, indent + 1)) |
|
|
else: |
|
|
lines.append(f"{pad}{idx}. {v}") |
|
|
return "\n".join(lines) |
|
|
return f"{pad}{value}" |
|
|
|
|
|
rows: List[List[Any]] = [] |
|
|
for key, value in classification.items(): |
|
|
display_key = key.replace('_', ' ').title() |
|
|
rows.append([display_key, _pretty(value)]) |
|
|
|
|
|
print(f"🔍 DEBUG format_classification_table output: {len(rows)} rows") |
|
|
return rows |
|
|
|
|
|
|
|
|
def _extract_basic_info_from_string(text: str) -> Dict[str, str]: |
|
|
"""Extract basic info from problematic string responses.""" |
|
|
|
|
|
result = {} |
|
|
|
|
|
|
|
|
if any(word in text.lower() for word in ["sale", "purchase", "buy"]): |
|
|
result["deed_type"] = "sale" |
|
|
elif "mortgage" in text.lower(): |
|
|
result["deed_type"] = "mortgage" |
|
|
elif "lease" in text.lower(): |
|
|
result["deed_type"] = "lease" |
|
|
else: |
|
|
result["deed_type"] = "unknown" |
|
|
|
|
|
|
|
|
if "bangladesh" in text.lower(): |
|
|
result["jurisdiction"] = "Bangladesh" |
|
|
|
|
|
|
|
|
if len(result) < 2: |
|
|
result["raw_analysis"] = text[:500] + "..." if len(text) > 500 else text |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def _extract_risk_level(risk_text: str) -> str: |
|
|
"""Extract risk level from risk analysis text using multiple patterns.""" |
|
|
print(f"📊 DEBUG _extract_risk_level input: {risk_text[:200]}...") |
|
|
|
|
|
if not risk_text or not isinstance(risk_text, str): |
|
|
print(f"📊 DEBUG Invalid input, returning UNKNOWN") |
|
|
return "UNKNOWN" |
|
|
|
|
|
text_upper = risk_text.upper() |
|
|
|
|
|
|
|
|
risk_patterns = [ |
|
|
|
|
|
(r'\bRISK\s+LEVEL[:\s]+HIGH\b', 'HIGH'), |
|
|
(r'\bRISK\s+LEVEL[:\s]+MEDIUM\b', 'MEDIUM'), |
|
|
(r'\bRISK\s+LEVEL[:\s]+LOW\b', 'LOW'), |
|
|
|
|
|
|
|
|
(r'\bHIGH\s+RISK\b', 'HIGH'), |
|
|
(r'\bMEDIUM\s+RISK\b', 'MEDIUM'), |
|
|
(r'\bLOW\s+RISK\b', 'LOW'), |
|
|
|
|
|
|
|
|
(r'\bRISK[:\s]+HIGH\b', 'HIGH'), |
|
|
(r'\bRISK[:\s]+MEDIUM\b', 'MEDIUM'), |
|
|
(r'\bRISK[:\s]+LOW\b', 'LOW'), |
|
|
|
|
|
|
|
|
(r'\bHIGH\b', 'HIGH'), |
|
|
(r'\bMEDIUM\b', 'MEDIUM'), |
|
|
(r'\bLOW\b', 'LOW'), |
|
|
] |
|
|
|
|
|
import re |
|
|
for pattern, level in risk_patterns: |
|
|
if re.search(pattern, text_upper): |
|
|
print(f"📊 DEBUG Found risk level '{level}' using pattern: {pattern}") |
|
|
return level |
|
|
|
|
|
print(f"📊 DEBUG No risk level found in text, returning UNKNOWN") |
|
|
return "UNKNOWN" |
|
|
|
|
|
|
|
|
def format_risk_table(risk_data: Dict) -> List[List[Any]]: |
|
|
"""Prepare risk analysis table rows in list format for Gradio DataFrame.""" |
|
|
print(f"🔍 DEBUG format_risk_table input: {risk_data}") |
|
|
|
|
|
if not risk_data: |
|
|
print("🚨 DEBUG: No risk data provided") |
|
|
return [] |
|
|
|
|
|
if not risk_data.get("success"): |
|
|
print(f"🚨 DEBUG: Risk data processing failed: {risk_data}") |
|
|
return [] |
|
|
|
|
|
clause_risks = risk_data.get("clause_risks", []) |
|
|
print(f"🔍 DEBUG: Found {len(clause_risks)} clause risks") |
|
|
|
|
|
rows = [] |
|
|
for i, risk in enumerate(clause_risks): |
|
|
clause_id = risk.get("clause_id", f"clause_{i+1}") |
|
|
clause_type = risk.get("clause_type", "General") |
|
|
analysis = risk.get("risk_analysis", "") |
|
|
risk_level = _extract_risk_level(analysis) |
|
|
summary = analysis |
|
|
|
|
|
print(f"📊 DEBUG Risk {i+1}: clause_id={clause_id}, clause_type={clause_type}, extracted_risk_level={risk_level}") |
|
|
print(f"📊 DEBUG Risk {i+1}: analysis_preview={analysis[:200]}...") |
|
|
|
|
|
|
|
|
row = [clause_id, clause_type, risk_level, summary] |
|
|
rows.append(row) |
|
|
print(f"🔍 DEBUG: Risk {i+1}: {clause_id} ({clause_type}) - {risk_level}") |
|
|
|
|
|
print(f"🔍 DEBUG format_risk_table output: {len(rows)} rows") |
|
|
return rows |
|
|
|
|
|
|
|
|
def format_risk_overview(risk_data: Dict) -> str: |
|
|
"""Format overall risk summary with validation.""" |
|
|
if not risk_data.get("success"): |
|
|
return "❌ Risk analysis failed" |
|
|
|
|
|
overall_summary = risk_data.get("overall_summary", "") |
|
|
disclaimer = risk_data.get("disclaimer", "") |
|
|
clause_risks = risk_data.get("clause_risks", []) |
|
|
|
|
|
|
|
|
validation_warnings = _validate_risk_consistency(overall_summary, clause_risks) |
|
|
|
|
|
parts = ["⚠️ **Risk Analysis Overview:**"] |
|
|
|
|
|
|
|
|
if validation_warnings: |
|
|
parts.append("🔍 **Validation Notices:**") |
|
|
parts.extend([f"• {warning}" for warning in validation_warnings]) |
|
|
parts.append("---") |
|
|
|
|
|
if overall_summary: |
|
|
parts.append(overall_summary) |
|
|
if disclaimer: |
|
|
parts.append(f"\n_{disclaimer}_") |
|
|
return "\n\n".join(parts) |
|
|
|
|
|
|
|
|
def _validate_risk_consistency(overall_summary: str, clause_risks: List[Dict]) -> List[str]: |
|
|
"""Validate consistency between overall risk summary and individual clause risks.""" |
|
|
warnings = [] |
|
|
|
|
|
if not overall_summary or not clause_risks: |
|
|
return warnings |
|
|
|
|
|
|
|
|
overall_risk_level = "UNKNOWN" |
|
|
overall_upper = overall_summary.upper() |
|
|
for level in ["HIGH", "MEDIUM", "LOW"]: |
|
|
if f"OVERALL RISK LEVEL: {level}" in overall_upper or f"RISK LEVEL: {level}" in overall_upper: |
|
|
overall_risk_level = level |
|
|
break |
|
|
|
|
|
|
|
|
if overall_risk_level == "UNKNOWN": |
|
|
overall_risk_patterns = [ |
|
|
(r'OVERALL.*RISK.*LEVEL.*HIGH', 'HIGH'), |
|
|
(r'OVERALL.*RISK.*LEVEL.*MEDIUM', 'MEDIUM'), |
|
|
(r'OVERALL.*RISK.*LEVEL.*LOW', 'LOW'), |
|
|
(r'RISK.*LEVEL.*HIGH', 'HIGH'), |
|
|
(r'RISK.*LEVEL.*MEDIUM', 'MEDIUM'), |
|
|
(r'RISK.*LEVEL.*LOW', 'LOW'), |
|
|
] |
|
|
|
|
|
for pattern, level in overall_risk_patterns: |
|
|
if re.search(pattern, overall_upper): |
|
|
overall_risk_level = level |
|
|
break |
|
|
|
|
|
|
|
|
individual_levels = [] |
|
|
for clause_risk in clause_risks: |
|
|
analysis = clause_risk.get("risk_analysis", "") |
|
|
extracted_level = _extract_risk_level(analysis) |
|
|
if extracted_level != "UNKNOWN": |
|
|
individual_levels.append(extracted_level) |
|
|
|
|
|
print(f"📊 DEBUG Risk Validation - Overall: {overall_risk_level}, Individual: {individual_levels}") |
|
|
|
|
|
|
|
|
if overall_risk_level != "UNKNOWN" and individual_levels: |
|
|
|
|
|
has_high = "HIGH" in individual_levels |
|
|
has_medium = "MEDIUM" in individual_levels |
|
|
|
|
|
if overall_risk_level == "LOW" and has_high: |
|
|
warnings.append(f"Overall risk shows {overall_risk_level} but found HIGH risk clauses") |
|
|
elif overall_risk_level == "LOW" and has_medium: |
|
|
warnings.append(f"Overall risk shows {overall_risk_level} but found MEDIUM risk clauses") |
|
|
elif overall_risk_level == "HIGH" and not has_high and not has_medium: |
|
|
warnings.append(f"Overall risk shows {overall_risk_level} but no HIGH or MEDIUM risk clauses found") |
|
|
|
|
|
|
|
|
unknown_count = sum(1 for clause_risk in clause_risks if _extract_risk_level(clause_risk.get("risk_analysis", "")) == "UNKNOWN") |
|
|
if unknown_count > 0: |
|
|
warnings.append(f"{unknown_count} clause(s) have unclear risk levels") |
|
|
|
|
|
return warnings |
|
|
|
|
|
|
|
|
def extract_metadata_fields(classification_data: Dict) -> Dict[str, str]: |
|
|
"""Pull key metadata fields for sidebar display.""" |
|
|
|
|
|
print(f"🔍 DEBUG extract_metadata_fields input: {classification_data}") |
|
|
|
|
|
|
|
|
classification = classification_data.get("classification", {}) if classification_data else {} |
|
|
|
|
|
|
|
|
if isinstance(classification, str): |
|
|
try: |
|
|
|
|
|
cleaned = _clean_json_response_metadata(classification) |
|
|
classification = json.loads(cleaned) |
|
|
print(f"🔍 DEBUG extract_metadata_fields: Successfully parsed JSON from string") |
|
|
except Exception as e: |
|
|
print(f"🔍 DEBUG extract_metadata_fields: JSON parsing failed: {e}") |
|
|
|
|
|
return _extract_from_raw_text(classification) |
|
|
|
|
|
|
|
|
if isinstance(classification, dict) and "classification" in classification: |
|
|
classification = classification["classification"] |
|
|
|
|
|
|
|
|
if isinstance(classification, dict) and "raw_analysis" in classification and len(classification) == 1: |
|
|
|
|
|
raw_text = classification["raw_analysis"] |
|
|
print(f"🔍 DEBUG: Extracting from raw_analysis: {raw_text[:200]}...") |
|
|
return _extract_from_raw_text(raw_text) |
|
|
|
|
|
|
|
|
deed_type = classification.get("deed_type") or classification.get("type") or "N/A" |
|
|
|
|
|
|
|
|
jurisdiction_value = classification.get("jurisdiction") or classification.get("jurisdiction_hint") or "N/A" |
|
|
if isinstance(jurisdiction_value, dict): |
|
|
|
|
|
country = jurisdiction_value.get("country", "") |
|
|
state = jurisdiction_value.get("state_province", "") or jurisdiction_value.get("state", "") |
|
|
if country and state: |
|
|
jurisdiction = f"{country}, {state}" |
|
|
elif country: |
|
|
jurisdiction = country |
|
|
else: |
|
|
jurisdiction = json.dumps(jurisdiction_value, indent=2) |
|
|
elif isinstance(jurisdiction_value, list): |
|
|
jurisdiction = json.dumps(jurisdiction_value, indent=2) |
|
|
else: |
|
|
jurisdiction = str(jurisdiction_value) |
|
|
|
|
|
|
|
|
parties = classification.get("key_parties") or classification.get("parties") or {} |
|
|
if isinstance(parties, dict) and parties: |
|
|
|
|
|
parts = [] |
|
|
if "grantor" in parties: |
|
|
grantor = parties["grantor"] |
|
|
if isinstance(grantor, dict): |
|
|
name = grantor.get("name", "") |
|
|
if name: |
|
|
parts.append(f"Grantor: {name}") |
|
|
if "grantee" in parties: |
|
|
grantee = parties["grantee"] |
|
|
if isinstance(grantee, dict): |
|
|
name = grantee.get("name", "") |
|
|
if name: |
|
|
parts.append(f"Grantee: {name}") |
|
|
parties_str = "\n".join(parts) if parts else json.dumps(parties, indent=2) |
|
|
else: |
|
|
parties_str = "N/A" |
|
|
|
|
|
|
|
|
property_desc = ( |
|
|
classification.get("property_description_and_location") |
|
|
or classification.get("property_description") |
|
|
or classification.get("property") |
|
|
or "N/A" |
|
|
) |
|
|
if isinstance(property_desc, dict): |
|
|
|
|
|
parts = [] |
|
|
if "district" in property_desc: |
|
|
parts.append(f"District: {property_desc['district']}") |
|
|
if "upazila_thana" in property_desc: |
|
|
parts.append(f"Area: {property_desc['upazila_thana']}") |
|
|
if "mouza" in property_desc: |
|
|
parts.append(f"Mouza: {property_desc['mouza']}") |
|
|
if "area" in property_desc: |
|
|
parts.append(f"Size: {property_desc['area']}") |
|
|
property_str = "\n".join(parts) if parts else json.dumps(property_desc, indent=2) |
|
|
elif isinstance(property_desc, list): |
|
|
property_str = json.dumps(property_desc, indent=2) |
|
|
else: |
|
|
property_str = str(property_desc) |
|
|
|
|
|
|
|
|
consideration = classification.get("consideration_amount") or classification.get("consideration") or "N/A" |
|
|
|
|
|
result = { |
|
|
"deed_type": deed_type, |
|
|
"jurisdiction": jurisdiction, |
|
|
"parties": parties_str, |
|
|
"property": property_str, |
|
|
"consideration": str(consideration), |
|
|
} |
|
|
|
|
|
print(f"🔍 DEBUG extract_metadata_fields result: {result}") |
|
|
return result |
|
|
|
|
|
|
|
|
def _extract_from_raw_text(raw_text: str) -> Dict[str, str]: |
|
|
"""Extract metadata from raw text analysis when JSON parsing fails.""" |
|
|
print(f"🔍 DEBUG: Attempting to extract from raw text: {raw_text[:300]}...") |
|
|
|
|
|
|
|
|
result = { |
|
|
"deed_type": "N/A", |
|
|
"jurisdiction": "N/A", |
|
|
"parties": "N/A", |
|
|
"property": "N/A", |
|
|
"consideration": "N/A" |
|
|
} |
|
|
|
|
|
|
|
|
deed_type_patterns = [ |
|
|
r"deed\s+type[:\-\s]+([\w\s]+?)(?:\n|$|;|,)", |
|
|
r"type\s+of\s+deed[:\-\s]+([\w\s]+?)(?:\n|$|;|,)", |
|
|
r"this\s+is\s+a[n]?\s+([\w\s]+?)\s+deed", |
|
|
r"(sale|mortgage|lease|gift|warranty|quitclaim)\s+deed" |
|
|
] |
|
|
|
|
|
for pattern in deed_type_patterns: |
|
|
match = re.search(pattern, raw_text, re.IGNORECASE) |
|
|
if match: |
|
|
result["deed_type"] = match.group(1).strip().title() |
|
|
break |
|
|
|
|
|
|
|
|
jurisdiction_patterns = [ |
|
|
r"jurisdiction[:\-\s]+([\w\s,]+?)(?:\n|$)", |
|
|
r"state[:\-\s]+([\w\s,]+?)(?:\n|$)", |
|
|
r"country[:\-\s]+([\w\s,]+?)(?:\n|$)", |
|
|
r"location[:\-\s]+([\w\s,]+?)(?:\n|$)" |
|
|
] |
|
|
|
|
|
for pattern in jurisdiction_patterns: |
|
|
match = re.search(pattern, raw_text, re.IGNORECASE) |
|
|
if match: |
|
|
result["jurisdiction"] = match.group(1).strip() |
|
|
break |
|
|
|
|
|
|
|
|
parties_patterns = [ |
|
|
r"grantor[:\-\s]+([\w\s,]+?)(?:,\s*resident|$|\n)", |
|
|
r"grantee[:\-\s]+([\w\s,]+?)(?:,\s*resident|$|\n)", |
|
|
r"seller[:\-\s]+([\w\s,]+?)(?:,\s*resident|$|\n)", |
|
|
r"buyer[:\-\s]+([\w\s,]+?)(?:,\s*resident|$|\n)" |
|
|
] |
|
|
|
|
|
parties_found = [] |
|
|
for pattern in parties_patterns: |
|
|
matches = re.finditer(pattern, raw_text, re.IGNORECASE) |
|
|
for match in matches: |
|
|
|
|
|
full_match = match.group(0).strip() |
|
|
name = match.group(1).strip() |
|
|
role = full_match.split(':')[0].strip().title() |
|
|
party_info = f"{role}: {name}" |
|
|
if party_info not in parties_found and name: |
|
|
parties_found.append(party_info) |
|
|
|
|
|
if parties_found: |
|
|
result["parties"] = "\n".join(parties_found) |
|
|
|
|
|
|
|
|
property_patterns = [ |
|
|
r"property[:\-\s]+([\w\s,]+?)(?:\n|$)", |
|
|
r"district[:\-\s]+([\w\s]+?)(?:\n|$)", |
|
|
r"area[:\-\s]+([\d\.\s\w]+?)(?:\n|$)" |
|
|
] |
|
|
|
|
|
property_found = [] |
|
|
for pattern in property_patterns: |
|
|
matches = re.finditer(pattern, raw_text, re.IGNORECASE) |
|
|
for match in matches: |
|
|
prop_info = match.group(0).strip() |
|
|
if prop_info not in property_found: |
|
|
property_found.append(prop_info) |
|
|
|
|
|
if property_found: |
|
|
result["property"] = "\n".join(property_found) |
|
|
|
|
|
|
|
|
consideration_patterns = [ |
|
|
r"consideration[:\-\s]+([\d,\.\s\w]+?)(?:\n|$)", |
|
|
r"amount[:\-\s]+([\d,\.\s\w]+?)(?:\n|$)", |
|
|
r"price[:\-\s]+([\d,\.\s\w]+?)(?:\n|$)", |
|
|
r"(\d+[,\d]*\s*(?:taka|dollars?|usd|€|£|\$))" |
|
|
] |
|
|
|
|
|
for pattern in consideration_patterns: |
|
|
match = re.search(pattern, raw_text, re.IGNORECASE) |
|
|
if match: |
|
|
result["consideration"] = match.group(1).strip() |
|
|
break |
|
|
|
|
|
print(f"🔍 DEBUG: Extracted from raw text: {result}") |
|
|
return result |
|
|
|
|
|
|
|
|
def _clean_json_response_metadata(response: str) -> str: |
|
|
"""Clean JSON response for metadata extraction (same as main.py logic).""" |
|
|
cleaned = response.strip() |
|
|
|
|
|
|
|
|
if cleaned.startswith("```"): |
|
|
lines = cleaned.split("\n") |
|
|
lines = lines[1:] |
|
|
if lines and lines[-1].strip() == "```": |
|
|
lines = lines[:-1] |
|
|
cleaned = "\n".join(lines).strip() |
|
|
|
|
|
|
|
|
start_idx = cleaned.find("{") |
|
|
end_idx = cleaned.rfind("}") + 1 |
|
|
|
|
|
if start_idx != -1 and end_idx > start_idx: |
|
|
cleaned = cleaned[start_idx:end_idx] |
|
|
|
|
|
return cleaned.strip() |
|
|
|
|
|
|
|
|
def build_report_pdf(report: Dict[str, Any]) -> str: |
|
|
"""Create a professional PDF report for download.""" |
|
|
from reportlab.lib.pagesizes import letter |
|
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
|
from reportlab.lib.units import inch |
|
|
from reportlab.lib.enums import TA_LEFT, TA_CENTER |
|
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak |
|
|
from reportlab.lib import colors |
|
|
|
|
|
|
|
|
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") |
|
|
pdf_path = tmp.name |
|
|
tmp.close() |
|
|
|
|
|
|
|
|
doc = SimpleDocTemplate(pdf_path, pagesize=letter, |
|
|
rightMargin=72, leftMargin=72, |
|
|
topMargin=72, bottomMargin=18) |
|
|
|
|
|
|
|
|
elements = [] |
|
|
|
|
|
|
|
|
styles = getSampleStyleSheet() |
|
|
title_style = ParagraphStyle( |
|
|
'CustomTitle', |
|
|
parent=styles['Heading1'], |
|
|
fontSize=24, |
|
|
textColor=colors.HexColor('#1f2937'), |
|
|
spaceAfter=30, |
|
|
alignment=TA_CENTER |
|
|
) |
|
|
heading_style = ParagraphStyle( |
|
|
'CustomHeading', |
|
|
parent=styles['Heading2'], |
|
|
fontSize=16, |
|
|
textColor=colors.HexColor('#374151'), |
|
|
spaceAfter=12, |
|
|
spaceBefore=12 |
|
|
) |
|
|
normal_style = styles['BodyText'] |
|
|
|
|
|
|
|
|
elements.append(Paragraph("Legal Deed Analysis Report", title_style)) |
|
|
elements.append(Spacer(1, 0.2*inch)) |
|
|
|
|
|
|
|
|
disclaimer_text = """ |
|
|
<b>LEGAL DISCLAIMER:</b> This automated analysis is for informational purposes only |
|
|
and does not constitute legal advice. Always consult with a qualified attorney licensed |
|
|
in your jurisdiction before making decisions based on deed documents. |
|
|
""" |
|
|
elements.append(Paragraph(disclaimer_text, normal_style)) |
|
|
elements.append(Spacer(1, 0.3*inch)) |
|
|
|
|
|
|
|
|
elements.append(Paragraph("Deed Classification", heading_style)) |
|
|
classification_data = report.get("deed_classification", {}) |
|
|
if classification_data.get("success"): |
|
|
classification = classification_data.get("classification", {}) |
|
|
|
|
|
|
|
|
deed_type = classification.get("deed_type", "N/A") |
|
|
jurisdiction = classification.get("jurisdiction", {}) |
|
|
if isinstance(jurisdiction, dict): |
|
|
jurisdiction_str = f"{jurisdiction.get('country', 'N/A')}, {jurisdiction.get('state_province', 'N/A')}" |
|
|
else: |
|
|
jurisdiction_str = str(jurisdiction) |
|
|
|
|
|
consideration = classification.get("consideration_amount", "N/A") |
|
|
date_exec = classification.get("date_of_execution", "N/A") |
|
|
|
|
|
|
|
|
header_style = ParagraphStyle( |
|
|
'TableHeader', |
|
|
parent=normal_style, |
|
|
fontName='Helvetica-Bold', |
|
|
fontSize=11 |
|
|
) |
|
|
|
|
|
classification_table_data = [ |
|
|
[Paragraph("Field", header_style), Paragraph("Value", header_style)], |
|
|
["Deed Type", str(deed_type)], |
|
|
["Jurisdiction", jurisdiction_str], |
|
|
["Consideration", str(consideration)], |
|
|
["Date of Execution", str(date_exec)] |
|
|
] |
|
|
|
|
|
classification_table = Table(classification_table_data, colWidths=[2*inch, 4*inch]) |
|
|
classification_table.setStyle(TableStyle([ |
|
|
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#e5e7eb')), |
|
|
('TEXTCOLOR', (0, 0), (-1, 0), colors.black), |
|
|
('ALIGN', (0, 0), (-1, -1), 'LEFT'), |
|
|
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), |
|
|
('FONTSIZE', (0, 0), (-1, 0), 12), |
|
|
('BOTTOMPADDING', (0, 0), (-1, 0), 12), |
|
|
('GRID', (0, 0), (-1, -1), 1, colors.grey), |
|
|
('VALIGN', (0, 0), (-1, -1), 'TOP'), |
|
|
])) |
|
|
|
|
|
elements.append(classification_table) |
|
|
else: |
|
|
elements.append(Paragraph("Classification data not available", normal_style)) |
|
|
|
|
|
elements.append(Spacer(1, 0.3*inch)) |
|
|
|
|
|
|
|
|
elements.append(Paragraph("Risk Analysis Overview", heading_style)) |
|
|
risk_data = report.get("risk_analysis", {}) |
|
|
if risk_data.get("success"): |
|
|
overall_summary = risk_data.get("overall_summary", "No summary available") |
|
|
|
|
|
|
|
|
lines = overall_summary.split('\n') |
|
|
|
|
|
|
|
|
section_style = ParagraphStyle( |
|
|
'SectionHeading', |
|
|
parent=styles['Heading3'], |
|
|
fontSize=12, |
|
|
textColor=colors.HexColor('#374151'), |
|
|
spaceAfter=6, |
|
|
spaceBefore=10, |
|
|
fontName='Helvetica-Bold' |
|
|
) |
|
|
|
|
|
bullet_style = ParagraphStyle( |
|
|
'BulletText', |
|
|
parent=normal_style, |
|
|
fontSize=10, |
|
|
leftIndent=20, |
|
|
bulletIndent=10, |
|
|
spaceAfter=4 |
|
|
) |
|
|
|
|
|
i = 0 |
|
|
while i < len(lines): |
|
|
line = lines[i].strip() |
|
|
|
|
|
|
|
|
if not line: |
|
|
i += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
if line.startswith("OVERALL RISK LEVEL:"): |
|
|
|
|
|
risk_level_text = line.replace("OVERALL RISK LEVEL:", "").strip() |
|
|
if "HIGH" in risk_level_text.upper(): |
|
|
risk_color = colors.red |
|
|
elif "MEDIUM" in risk_level_text.upper(): |
|
|
risk_color = colors.orange |
|
|
else: |
|
|
risk_color = colors.green |
|
|
|
|
|
elements.append(Paragraph( |
|
|
f"<b>OVERALL RISK LEVEL:</b> <font color='{risk_color.hexval()}'><b>{risk_level_text}</b></font>", |
|
|
section_style |
|
|
)) |
|
|
|
|
|
elif line.startswith("KEY FINDINGS:"): |
|
|
elements.append(Paragraph("<b>Key Findings:</b>", section_style)) |
|
|
|
|
|
i += 1 |
|
|
while i < len(lines) and lines[i].strip().startswith('-'): |
|
|
bullet_text = lines[i].strip()[1:].strip() |
|
|
elements.append(Paragraph(f"• {bullet_text}", bullet_style)) |
|
|
i += 1 |
|
|
i -= 1 |
|
|
|
|
|
elif line.startswith("RISK CATEGORIES FOUND:"): |
|
|
categories = line.replace("RISK CATEGORIES FOUND:", "").strip() |
|
|
elements.append(Paragraph(f"<b>Risk Categories Found:</b> {categories}", section_style)) |
|
|
|
|
|
elif line.startswith("RECOMMENDATIONS:"): |
|
|
elements.append(Paragraph("<b>Recommendations:</b>", section_style)) |
|
|
|
|
|
i += 1 |
|
|
while i < len(lines) and lines[i].strip().startswith('-'): |
|
|
bullet_text = lines[i].strip()[1:].strip() |
|
|
elements.append(Paragraph(f"• {bullet_text}", bullet_style)) |
|
|
i += 1 |
|
|
i -= 1 |
|
|
|
|
|
elif line.startswith("DISCLAIMER:"): |
|
|
disclaimer_text = line.replace("DISCLAIMER:", "").strip() |
|
|
|
|
|
full_disclaimer = [disclaimer_text] |
|
|
i += 1 |
|
|
while i < len(lines) and lines[i].strip() and not any(lines[i].strip().startswith(s) for s in ["OVERALL", "KEY", "RISK", "RECOMMENDATIONS"]): |
|
|
full_disclaimer.append(lines[i].strip()) |
|
|
i += 1 |
|
|
i -= 1 |
|
|
|
|
|
disclaimer_style = ParagraphStyle( |
|
|
'Disclaimer', |
|
|
parent=normal_style, |
|
|
fontSize=9, |
|
|
textColor=colors.HexColor('#6b7280'), |
|
|
fontName='Helvetica-Oblique', |
|
|
spaceAfter=10, |
|
|
spaceBefore=10 |
|
|
) |
|
|
elements.append(Paragraph(f"<b>Disclaimer:</b> {' '.join(full_disclaimer)}", disclaimer_style)) |
|
|
|
|
|
else: |
|
|
|
|
|
if line: |
|
|
elements.append(Paragraph(line, normal_style)) |
|
|
|
|
|
i += 1 |
|
|
else: |
|
|
elements.append(Paragraph("Risk analysis not available", normal_style)) |
|
|
|
|
|
elements.append(Spacer(1, 0.3*inch)) |
|
|
|
|
|
|
|
|
elements.append(Paragraph("Clause Breakdown (Summary)", heading_style)) |
|
|
clauses = report.get("clause_breakdown", {}).get("clauses", []) |
|
|
if clauses: |
|
|
|
|
|
small_style = ParagraphStyle( |
|
|
'SmallText', |
|
|
parent=normal_style, |
|
|
fontSize=8, |
|
|
leading=10 |
|
|
) |
|
|
|
|
|
clause_table_data = [[Paragraph("<b>ID</b>", normal_style), |
|
|
Paragraph("<b>Type</b>", normal_style), |
|
|
Paragraph("<b>Preview</b>", normal_style)]] |
|
|
|
|
|
for clause in clauses[:15]: |
|
|
clause_id = clause.get('id', '') |
|
|
clause_type = clause.get('type', 'General') |
|
|
clause_text = clause.get('text', '') |
|
|
|
|
|
|
|
|
clause_table_data.append([ |
|
|
Paragraph(clause_id, small_style), |
|
|
Paragraph(clause_type, small_style), |
|
|
Paragraph(clause_text, small_style) |
|
|
]) |
|
|
|
|
|
clause_table = Table(clause_table_data, colWidths=[0.6*inch, 1.2*inch, 4.2*inch]) |
|
|
clause_table.setStyle(TableStyle([ |
|
|
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#e5e7eb')), |
|
|
('TEXTCOLOR', (0, 0), (-1, 0), colors.black), |
|
|
('ALIGN', (0, 0), (-1, -1), 'LEFT'), |
|
|
('VALIGN', (0, 0), (-1, -1), 'TOP'), |
|
|
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), |
|
|
('FONTSIZE', (0, 0), (-1, 0), 10), |
|
|
('BOTTOMPADDING', (0, 0), (-1, 0), 8), |
|
|
('TOPPADDING', (0, 1), (-1, -1), 6), |
|
|
('BOTTOMPADDING', (0, 1), (-1, -1), 6), |
|
|
('GRID', (0, 0), (-1, -1), 0.5, colors.grey), |
|
|
('WORDWRAP', (0, 0), (-1, -1), True), |
|
|
])) |
|
|
|
|
|
elements.append(clause_table) |
|
|
else: |
|
|
elements.append(Paragraph("No clauses detected", normal_style)) |
|
|
|
|
|
elements.append(Spacer(1, 0.3*inch)) |
|
|
|
|
|
|
|
|
elements.append(PageBreak()) |
|
|
|
|
|
|
|
|
elements.append(Paragraph("Detailed Risk Analysis by Clause", heading_style)) |
|
|
clause_risks = risk_data.get("clause_risks", []) |
|
|
if clause_risks: |
|
|
for idx, risk in enumerate(clause_risks[:10], 1): |
|
|
clause_id = risk.get("clause_id", "Unknown") |
|
|
clause_type = risk.get("clause_type", "General") |
|
|
risk_analysis = risk.get("risk_analysis", "No analysis available") |
|
|
|
|
|
|
|
|
risk_level = "UNKNOWN" |
|
|
if "RISK LEVEL: HIGH" in risk_analysis.upper(): |
|
|
risk_level = "HIGH" |
|
|
risk_color = colors.red |
|
|
elif "RISK LEVEL: MEDIUM" in risk_analysis.upper(): |
|
|
risk_level = "MEDIUM" |
|
|
risk_color = colors.orange |
|
|
elif "RISK LEVEL: LOW" in risk_analysis.upper(): |
|
|
risk_level = "LOW" |
|
|
risk_color = colors.green |
|
|
else: |
|
|
risk_color = colors.grey |
|
|
|
|
|
|
|
|
risk_header = f"<b>Clause {clause_id}</b> ({clause_type}) - <font color='{risk_color.hexval()}'>Risk: {risk_level}</font>" |
|
|
elements.append(Paragraph(risk_header, normal_style)) |
|
|
elements.append(Spacer(1, 0.1*inch)) |
|
|
|
|
|
|
|
|
|
|
|
sections = risk_analysis.split('\n') |
|
|
for section in sections[:10]: |
|
|
if section.strip(): |
|
|
|
|
|
elements.append(Paragraph(section.strip(), normal_style)) |
|
|
|
|
|
elements.append(Spacer(1, 0.3*inch)) |
|
|
else: |
|
|
elements.append(Paragraph("No detailed risk analysis available", normal_style)) |
|
|
|
|
|
|
|
|
doc.build(elements) |
|
|
|
|
|
return pdf_path |
|
|
|
|
|
|
|
|
|
|
|
@app.post("/analyze-deed", response_model=Dict[str, Any]) |
|
|
async def analyze_deed_endpoint(file: UploadFile = File(...)): |
|
|
"""FastAPI endpoint for deed analysis.""" |
|
|
try: |
|
|
|
|
|
if not file.content_type or not file.content_type == "application/pdf": |
|
|
raise HTTPException(status_code=400, detail="Only PDF files are supported") |
|
|
|
|
|
|
|
|
content = await file.read() |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: |
|
|
tmp_file.write(content) |
|
|
tmp_path = tmp_file.name |
|
|
|
|
|
try: |
|
|
|
|
|
report_result = await generate_comprehensive_deed_report(tmp_path) |
|
|
report_data = json.loads(report_result) |
|
|
|
|
|
return { |
|
|
"success": report_data.get("success", False), |
|
|
"filename": file.filename, |
|
|
"file_size": len(content), |
|
|
"report": report_data |
|
|
} |
|
|
finally: |
|
|
os.unlink(tmp_path) |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as gradio_app: |
|
|
|
|
|
|
|
|
gr.Markdown("# ⚖️ Legal Deed Review System") |
|
|
gr.Markdown("Upload a PDF deed document to receive comprehensive legal risk analysis.") |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<div style=" |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
|
border: 2px solid #5a67d8; |
|
|
border-radius: 10px; |
|
|
padding: 20px; |
|
|
margin: 20px 0; |
|
|
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.3); |
|
|
"> |
|
|
<h3 style="color: #fff; margin-top: 0; margin-bottom: 15px; display: flex; align-items: center;"> |
|
|
⚖️ LEGAL DISCLAIMER |
|
|
</h3> |
|
|
<div style="color: #e6e6ff; line-height: 1.8; font-size: 14px;"> |
|
|
<p style="margin-bottom: 12px; font-weight: 500;">This is an automated analysis tool for informational purposes only.</p> |
|
|
<ul style="list-style: none; padding-left: 0; margin: 10px 0;"> |
|
|
<li style="margin-bottom: 8px;">✅ This does NOT constitute legal advice</li> |
|
|
<li style="margin-bottom: 8px;">✅ This does NOT replace consultation with a qualified attorney</li> |
|
|
<li style="margin-bottom: 8px;">✅ This analysis may NOT identify all potential legal issues</li> |
|
|
<li style="margin-bottom: 8px;">✅ Always have deeds reviewed by a licensed attorney before taking action</li> |
|
|
<li style="margin-bottom: 8px;">✅ Consult local legal professionals familiar with your jurisdiction</li> |
|
|
</ul> |
|
|
<p style="margin-top: 15px; font-weight: 500;">By using this tool, you acknowledge these limitations.</p> |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("## 📄 Upload & Stats") |
|
|
|
|
|
|
|
|
gr.Markdown("### 💡 Try with a Sample") |
|
|
with gr.Row(): |
|
|
sample_download = gr.File( |
|
|
value="usa_general_warranty_deed_sample.pdf", |
|
|
label="📥 Download Sample Deed", |
|
|
visible=True |
|
|
) |
|
|
load_sample_btn = gr.Button( |
|
|
"🔗 Load Sample", |
|
|
variant="secondary", |
|
|
size="sm" |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
**Quick start options:** |
|
|
1. **Easy**: Click "🔗 Load Sample" to auto-load the sample deed |
|
|
2. **Manual**: Download sample above, then upload below |
|
|
3. **Your own**: Upload your PDF deed document |
|
|
""") |
|
|
|
|
|
pdf_input = gr.File( |
|
|
label="Upload PDF Deed", |
|
|
file_types=[".pdf"], |
|
|
type="binary" |
|
|
) |
|
|
|
|
|
analyze_button = gr.Button( |
|
|
"🔍 Analyze Deed", |
|
|
variant="primary", |
|
|
size="lg", |
|
|
elem_id="analyze-btn" |
|
|
) |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<style> |
|
|
#analyze-btn { |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; |
|
|
border: none !important; |
|
|
color: white !important; |
|
|
font-weight: 600 !important; |
|
|
box-shadow: 0 4px 6px rgba(102, 126, 234, 0.4) !important; |
|
|
transition: all 0.3s ease !important; |
|
|
} |
|
|
#analyze-btn:hover { |
|
|
background: linear-gradient(135deg, #5a67d8 0%, #6a3f8f 100%) !important; |
|
|
box-shadow: 0 6px 12px rgba(102, 126, 234, 0.6) !important; |
|
|
transform: translateY(-2px) !important; |
|
|
} |
|
|
</style> |
|
|
""") |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<style> |
|
|
/* Force blue color for all tab interactions */ |
|
|
.tabs > button.selected, |
|
|
.tab-nav > button.selected, |
|
|
button[role="tab"][aria-selected="true"], |
|
|
button[role="tab"].selected { |
|
|
color: #667eea !important; |
|
|
border-bottom-color: #667eea !important; |
|
|
} |
|
|
|
|
|
.tabs > button:hover, |
|
|
.tab-nav > button:hover, |
|
|
button[role="tab"]:hover { |
|
|
color: #667eea !important; |
|
|
border-bottom-color: #667eea !important; |
|
|
} |
|
|
|
|
|
/* Target the specific orange underline if present */ |
|
|
.tabs > button.selected::after, |
|
|
button[role="tab"][aria-selected="true"]::after { |
|
|
background-color: #667eea !important; |
|
|
} |
|
|
|
|
|
/* Custom Loading Animation - Blue Pulse */ |
|
|
.generating { |
|
|
border-color: #667eea !important; |
|
|
} |
|
|
|
|
|
/* Override default orange spinner/loader */ |
|
|
.loader { |
|
|
--loader-color: #667eea !important; |
|
|
border-top-color: #667eea !important; |
|
|
border-left-color: #667eea !important; |
|
|
} |
|
|
|
|
|
/* Add a subtle blue glow to active processing elements */ |
|
|
.generating::before { |
|
|
background: linear-gradient(90deg, transparent, rgba(102, 126, 234, 0.2), transparent) !important; |
|
|
} |
|
|
</style> |
|
|
""") |
|
|
|
|
|
gr.Markdown("### 📊 Quick Stats") |
|
|
stats_display = gr.Markdown( |
|
|
value="Upload a deed to see document statistics...", |
|
|
elem_id="stats" |
|
|
) |
|
|
|
|
|
gr.Markdown("### 🧭 Deed Metadata") |
|
|
deed_type_box = gr.Textbox(label="Deed Type", interactive=False) |
|
|
jurisdiction_box = gr.Textbox(label="Jurisdiction", interactive=False) |
|
|
consideration_box = gr.Textbox(label="Consideration / Price", interactive=False) |
|
|
parties_box = gr.Textbox(label="Parties", lines=6, interactive=False) |
|
|
property_box = gr.Textbox(label="Property Description", lines=4, interactive=False) |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
|
|
|
agent_monologue = gr.Code( |
|
|
label=" Reasoning Trace ", |
|
|
language="shell", |
|
|
interactive=False, |
|
|
elem_id="agent-terminal", |
|
|
lines=12, |
|
|
value="⚡ Waiting for document upload......" |
|
|
) |
|
|
|
|
|
gr.Markdown("## 📋 Analysis Results") |
|
|
|
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("📝 Overview"): |
|
|
classification_output = gr.DataFrame( |
|
|
headers=["field", "value"], |
|
|
label="Deed Classification (Table)", |
|
|
interactive=False, |
|
|
datatype=["str", "str"], |
|
|
column_count=(2, "fixed"), |
|
|
row_count=(0, "dynamic") |
|
|
) |
|
|
risk_overview_output = gr.Markdown( |
|
|
value="Risk overview will appear here after analysis.", |
|
|
label="Risk Overview" |
|
|
) |
|
|
report_file = gr.File(label="Download Report", interactive=False) |
|
|
|
|
|
with gr.TabItem("✂️ Clause Breakdown"): |
|
|
clause_table = gr.DataFrame( |
|
|
headers=["id", "type", "words", "preview"], |
|
|
label="Clauses", |
|
|
interactive=False, |
|
|
wrap=True, |
|
|
column_widths=["10%", "20%", "10%", "60%"] |
|
|
) |
|
|
|
|
|
with gr.TabItem("⚠️ Risk Analysis"): |
|
|
risk_table = gr.DataFrame( |
|
|
headers=["clause_id", "clause_type", "risk_level", "summary"], |
|
|
label="Clause Risks", |
|
|
interactive=False, |
|
|
wrap=True, |
|
|
column_widths=["10%", "20%", "10%", "60%"] |
|
|
) |
|
|
|
|
|
with gr.TabItem("📄 Extracted Text"): |
|
|
text_output = gr.Textbox( |
|
|
value="Upload and analyze a deed to see extracted text...", |
|
|
label="OCR Text Extraction", |
|
|
lines=30, |
|
|
max_lines=None, |
|
|
interactive=False, |
|
|
autoscroll=False |
|
|
) |
|
|
|
|
|
with gr.TabItem("🗃️ Raw JSON"): |
|
|
json_output = gr.JSON( |
|
|
label="Full Response", |
|
|
value=None |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown(""" |
|
|
## 🔧 How to Use |
|
|
|
|
|
1. **Upload** a PDF deed document using the file uploader |
|
|
2. **Click** the "Analyze Deed" button to start processing |
|
|
3. **Review** the results in the tabs: |
|
|
- **Classification:** Deed type, parties, and key information |
|
|
- **Clause Breakdown:** Identified legal clauses and sections |
|
|
- **Risk Analysis:** Potential legal risks and recommendations |
|
|
- **Extracted Text:** Raw text extracted from the PDF |
|
|
4. **Consult** a qualified attorney for legal advice based on the analysis |
|
|
|
|
|
### ⚡ Processing Time |
|
|
- Analysis typically takes 30-60 seconds or more depending on document complexity |
|
|
- Multi-page deeds may take longer for OCR processing |
|
|
|
|
|
### 📋 Supported Documents |
|
|
- Property sale deeds |
|
|
- Mortgage deeds |
|
|
- Lease agreements |
|
|
- Gift deeds |
|
|
- Warranty deeds |
|
|
- Quitclaim deeds |
|
|
""") |
|
|
|
|
|
|
|
|
def update_stats(pdf_file): |
|
|
"""Update quick stats display.""" |
|
|
if pdf_file is None: |
|
|
return "No document uploaded" |
|
|
|
|
|
try: |
|
|
|
|
|
if hasattr(pdf_file, 'read') and hasattr(pdf_file, 'seek'): |
|
|
|
|
|
file_size = len(pdf_file.read()) |
|
|
pdf_file.seek(0) |
|
|
elif isinstance(pdf_file, str): |
|
|
|
|
|
import os |
|
|
file_size = os.path.getsize(pdf_file) |
|
|
elif isinstance(pdf_file, bytes): |
|
|
|
|
|
file_size = len(pdf_file) |
|
|
else: |
|
|
return f"📊 **Document Stats:**\n• File type: {type(pdf_file).__name__}\n• Status: Ready for analysis" |
|
|
|
|
|
return f"""📊 **Document Stats:** |
|
|
• File size: {file_size:,} bytes |
|
|
• Status: Ready for analysis |
|
|
• Click 'Analyze Deed' to start processing""" |
|
|
except: |
|
|
return "Error reading document information" |
|
|
|
|
|
def _write_report_file(report_data: Dict[str, Any]) -> Optional[str]: |
|
|
"""Generate and return PDF report file path.""" |
|
|
try: |
|
|
return build_report_pdf(report_data) |
|
|
except Exception as e: |
|
|
print(f"Error generating PDF report: {e}") |
|
|
return None |
|
|
|
|
|
async def analyze_deed_gradio(pdf_file): |
|
|
"""Main analysis function for Gradio interface - simplified non-streaming version.""" |
|
|
import time |
|
|
|
|
|
|
|
|
empty_table: List[Dict[str, Any]] = [] |
|
|
empty_text = "No data" |
|
|
|
|
|
|
|
|
def get_error_return(log_msg, error_detail): |
|
|
return ( |
|
|
log_msg, |
|
|
empty_table, |
|
|
error_detail, |
|
|
empty_table, |
|
|
empty_table, |
|
|
error_detail, |
|
|
{}, |
|
|
"❌ Error", |
|
|
"❌ Error", |
|
|
"❌ Error", |
|
|
"❌ Error", |
|
|
"❌ Error", |
|
|
None |
|
|
) |
|
|
|
|
|
if pdf_file is None: |
|
|
return get_error_return("❌ ERROR: No file uploaded.", "❌ No file uploaded") |
|
|
|
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: |
|
|
if hasattr(pdf_file, 'read'): |
|
|
tmp_file.write(pdf_file.read()) |
|
|
else: |
|
|
tmp_file.write(pdf_file) |
|
|
pdf_path = tmp_file.name |
|
|
|
|
|
|
|
|
print("🔍 Extracting text from PDF...") |
|
|
|
|
|
text_json = await extract_text_from_deed_pdf(pdf_path) |
|
|
|
|
|
text_result = json.loads(text_json) |
|
|
deed_text = text_result.get("text", "") |
|
|
|
|
|
if not deed_text: |
|
|
raise ValueError("Failed to extract text from PDF") |
|
|
|
|
|
|
|
|
print("🔍 Classifying deed...") |
|
|
|
|
|
classification_json = await classify_deed_type(deed_text) |
|
|
classification_data = json.loads(classification_json) |
|
|
|
|
|
|
|
|
metadata = extract_metadata_fields(classification_data) |
|
|
|
|
|
|
|
|
print("🔍 Splitting into clauses...") |
|
|
|
|
|
clauses_json = await split_deed_into_clauses(deed_text) |
|
|
clauses_data = json.loads(clauses_json) |
|
|
|
|
|
|
|
|
print("🔍 Analyzing risks...") |
|
|
|
|
|
|
|
|
risks_json = await analyze_deed_risks(clauses_json, classification_json) |
|
|
risks_data = json.loads(risks_json) |
|
|
|
|
|
|
|
|
report_data = { |
|
|
"deed_classification": classification_data, |
|
|
"clause_breakdown": clauses_data, |
|
|
"risk_analysis": risks_data, |
|
|
"text_preview": deed_text, |
|
|
"report_metadata": { |
|
|
"generated_at": time.time(), |
|
|
"analysis_steps": ["text_extraction", "classification", "risk_analysis"], |
|
|
"processing_method": "agentic_flow" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
report_path = _write_report_file(report_data) |
|
|
|
|
|
|
|
|
display_text = deed_text if len(deed_text) < 50000 else deed_text[:50000] + "\n\n... (Text truncated for display. Full text available in PDF report)" |
|
|
|
|
|
|
|
|
final_log = f"""✅ Analysis Complete! |
|
|
|
|
|
● Step 1: Text Extraction ✓ |
|
|
● Step 2: Classification ✓ |
|
|
● Step 3: Clause Breakdown ✓ |
|
|
● Step 4: Risk Analysis ✓ |
|
|
|
|
|
📊 Results: {len(clauses_data.get('clauses', []))} clauses analyzed |
|
|
⚖️ Overall Risk Level: {risks_data.get('overall_summary', 'N/A')[:50]}...""" |
|
|
|
|
|
|
|
|
return ( |
|
|
final_log, |
|
|
format_classification_table(classification_data), |
|
|
format_risk_overview(risks_data), |
|
|
format_clause_table(clauses_data), |
|
|
format_risk_table(risks_data), |
|
|
display_text, |
|
|
report_data, |
|
|
metadata["deed_type"], |
|
|
metadata["jurisdiction"], |
|
|
metadata["parties"], |
|
|
metadata["property"], |
|
|
metadata["consideration"], |
|
|
report_path |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in analysis: {e}") |
|
|
return get_error_return(f"❌ SYSTEM ERROR: {str(e)}", f"❌ Analysis failed: {str(e)}") |
|
|
|
|
|
def load_sample_deed(): |
|
|
"""Load the sample deed file into the file input.""" |
|
|
import os |
|
|
sample_path = "usa_general_warranty_deed_sample.pdf" |
|
|
if os.path.exists(sample_path): |
|
|
return sample_path |
|
|
else: |
|
|
return None |
|
|
|
|
|
|
|
|
pdf_input.change( |
|
|
fn=update_stats, |
|
|
inputs=[pdf_input], |
|
|
outputs=[stats_display] |
|
|
) |
|
|
|
|
|
analyze_button.click( |
|
|
fn=analyze_deed_gradio, |
|
|
inputs=[pdf_input], |
|
|
outputs=[ |
|
|
agent_monologue, |
|
|
classification_output, |
|
|
risk_overview_output, |
|
|
clause_table, |
|
|
risk_table, |
|
|
text_output, |
|
|
json_output, |
|
|
deed_type_box, |
|
|
jurisdiction_box, |
|
|
parties_box, |
|
|
property_box, |
|
|
consideration_box, |
|
|
report_file |
|
|
] |
|
|
) |
|
|
|
|
|
load_sample_btn.click( |
|
|
fn=load_sample_deed, |
|
|
outputs=[pdf_input] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
fastapi_app = gr.mount_gradio_app(app, gradio_app, path="/") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
print("🏛️ Starting Legal Deed Review Web Application...") |
|
|
print("📍 Server will be available at: http://localhost:8002") |
|
|
print("🔧 API endpoint: http://localhost:8002/analyze-deed") |
|
|
print("🌐 Gradio interface: http://localhost:8002/") |
|
|
print("\n⚖️ Legal Notice: This tool provides analysis only, not legal advice.") |
|
|
print("✋ Press Ctrl+C to stop the server") |
|
|
|
|
|
uvicorn.run(fastapi_app, host="0.0.0.0", port=8002, reload=True) |
|
|
|
|
|
|