Pial2233's picture
Commit with fix
e089c13 verified
import asyncio
import base64
import json
import mimetypes
import os
import pathlib
import re
from typing import Dict, List, Any, Optional
import fitz # PyMuPDF
import httpx
from dotenv import load_dotenv
from openai import OpenAI
from mcp.server.fastmcp import FastMCP
load_dotenv()
api_key = os.environ.get("NEBIUS_API_KEY") or os.environ.get("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("NEBIUS_API_KEY or OPENAI_API_KEY must be set")
client = OpenAI(base_url="https://api.tokenfactory.nebius.com/v1/", api_key=api_key)
mcp = FastMCP("deed-legal-mcp")
# Legal system prompts
SYSTEM_DEED_LAWYER = """
You are an expert conveyancing lawyer reviewing deeds.
Your role is to:
1. Identify potential legal risks and issues
2. Classify deed types and extract key information
3. Explain risks in plain language for non-lawyers
4. Provide risk scores and categorizations
IMPORTANT: You do NOT provide legal advice. You only identify potential issues for review by a qualified lawyer.
Always include appropriate disclaimers about seeking professional legal counsel.
"""
CLASSIFY_DEED_PROMPT = """
Extract information from this deed document and return ONLY a valid JSON object. Do not include any explanatory text, code fences, markdown formatting, or any other content - just the raw JSON.
Required JSON structure:
{
"deed_type": "sale|mortgage|lease|gift|warranty|quitclaim|other",
"jurisdiction": {
"country": "country name",
"state_province": "state or province name"
},
"key_parties": {
"grantor": {
"name": "grantor name",
"address": "address if available"
},
"grantee": {
"name": "grantee name",
"address": "address if available"
},
"witnesses": []
},
"property_description_and_location": {
"district": "district name",
"area": "size/area",
"description": "property description"
},
"consideration_amount": "monetary amount if specified",
"date_of_execution": "date if available",
"special_conditions_or_restrictions": []
}
CRITICAL INSTRUCTIONS:
- Return ONLY the JSON object starting with { and ending with }
- NO ```json code fences
- NO markdown formatting
- NO explanatory text before or after
- Use "N/A" for missing information
- Ensure all strings are properly quoted
- Ensure all JSON syntax is valid
"""
RISK_ANALYSIS_PROMPT = """
Analyze the deed clauses for potential legal risks. Provide your analysis in this EXACT format:
RISK LEVEL: [LOW|MEDIUM|HIGH]
RISK CATEGORY: [TITLE|ENCUMBRANCE|WARRANTY|COVENANT|EASEMENT|RESTRICTION|OTHER]
EXPLANATION: [Plain language explanation of the risk and potential consequences]
RECOMMENDATION: [Recommended actions]
Focus on common deed issues like:
- Title defects or clouds
- Undisclosed encumbrances
- Warranty limitations
- Easement problems
- Restrictive covenants
- Boundary disputes
- Missing signatures or witnesses
Important: Start your response with "RISK LEVEL:" and follow the exact format above.
"""
def _data_url_from_bytes(data: bytes, mime: str) -> str:
b64 = base64.b64encode(data).decode("ascii")
return f"data:{mime};base64,{b64}"
def _clean_llm_json_response(response: str) -> str:
"""Clean LLM response to extract valid JSON."""
# Remove leading/trailing whitespace
cleaned = response.strip()
# Remove code fences if present
if cleaned.startswith("```"):
lines = cleaned.split("\n")
# Remove first line with ```json or ```
lines = lines[1:]
# Remove last line with ```
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
cleaned = "\n".join(lines).strip()
# Look for JSON object boundaries
start_idx = cleaned.find("{")
end_idx = cleaned.rfind("}") + 1
if start_idx != -1 and end_idx > start_idx:
cleaned = cleaned[start_idx:end_idx]
# Remove any remaining non-JSON text before or after
lines = cleaned.split("\n")
json_lines = []
in_json = False
for line in lines:
stripped = line.strip()
if stripped.startswith("{") or in_json:
in_json = True
json_lines.append(line)
if stripped.endswith("}") and line.count("{") <= line.count("}"):
break
if json_lines:
cleaned = "\n".join(json_lines)
return cleaned.strip()
def _path_to_data_url(path: pathlib.Path) -> str:
mime = mimetypes.guess_type(path.name)[0] or "image/png"
return _data_url_from_bytes(path.read_bytes(), mime)
async def _prepare_image_payload(image_input: str) -> str:
"""
Accept local paths, http(s) URLs, or data URLs and normalize to a data URL string.
"""
if image_input.startswith("data:"):
return image_input
path = pathlib.Path(image_input).expanduser()
if path.exists():
return _path_to_data_url(path)
if image_input.startswith(("http://", "https://")):
async with httpx.AsyncClient() as http_client:
response = await http_client.get(image_input)
response.raise_for_status()
mime = (
response.headers.get("Content-Type")
or mimetypes.guess_type(image_input)[0]
or "image/png"
)
return _data_url_from_bytes(response.content, mime)
raise FileNotFoundError(f"Unable to locate image at {image_input}")
def _postprocess_ocr_text(raw_text: str) -> str:
"""
Postprocess OCR text with layout preservation.
Preserves document structure while doing minimal cleanup.
"""
if not raw_text:
return ""
# Preserve layout formatting while doing minimal cleanup
# Only remove excessive empty lines (more than 2 consecutive newlines)
# while preserving the overall layout structure
cleaned_text = "\n".join([
line.rstrip() for line in raw_text.split("\n")
])
# Remove excessive blank lines (keep max 2 consecutive)
cleaned_text = re.sub(r'\n\s*\n\s*\n+', '\n\n', cleaned_text)
return cleaned_text
async def _run_ocr_completion(image_data_url: str) -> str:
loop = asyncio.get_running_loop()
def _call_api() -> str:
completion = client.chat.completions.create(
model="Qwen/Qwen2.5-VL-72B-Instruct",
messages=[
{
"role": "system",
"content": (
"You are an OCR assistant. Extract all text and keep layout if "
"possible. Note: Don't include HTML tags in your response."
),
},
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_data_url}}
],
},
],
)
return completion.choices[0].message.content
raw_result = await loop.run_in_executor(None, _call_api)
# Apply layout-preserving postprocessing
return _postprocess_ocr_text(raw_result)
@mcp.tool()
async def ocr_image(image: str) -> str:
"""
Perform OCR on an image. Accepts a local path, http(s) URL, or data URL string.
"""
image_payload = await _prepare_image_payload(image)
return await _run_ocr_completion(image_payload)
# Helper functions for legal processing
async def _run_llm_completion(messages: List[Dict], temperature: float = 0.1) -> str:
"""Run LLM completion for legal analysis."""
loop = asyncio.get_running_loop()
def _call_api() -> str:
completion = client.chat.completions.create(
model="Qwen/Qwen2.5-VL-72B-Instruct",
messages=messages,
temperature=temperature,
max_tokens=4000
)
return completion.choices[0].message.content
return await loop.run_in_executor(None, _call_api)
async def _extract_text_directly_from_pdf(pdf_path: str) -> Dict[str, Any]:
"""Fast direct text extraction from PDF (no OCR needed)."""
try:
pdf_document = fitz.open(pdf_path)
all_text = []
pages_data = []
total_chars = 0
for page_num in range(pdf_document.page_count):
page = pdf_document[page_num]
page_text = page.get_text().strip()
all_text.append(f"--- Page {page_num + 1} ---\n{page_text}")
pages_data.append({
"page": page_num + 1,
"text": page_text,
"length": len(page_text)
})
total_chars += len(page_text)
pdf_document.close()
full_text = "\n\n".join(all_text)
return {
"success": True,
"text": full_text,
"pages": pages_data,
"metadata": {
"total_pages": len(pages_data),
"method": "direct_text_extraction",
"total_length": total_chars,
"processing_time_seconds": "< 1"
}
}
except Exception as e:
return {
"success": False,
"text": "",
"pages": [],
"metadata": {},
"error": str(e)
}
async def _convert_pdf_pages_to_images(pdf_path: str) -> List[str]:
"""Convert PDF pages to image data URLs for use with existing OCR tool (fallback method)."""
try:
pdf_document = fitz.open(pdf_path)
image_data_urls = []
for page_num in range(pdf_document.page_count):
page = pdf_document[page_num]
# Convert page to image with lower resolution for faster processing
pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) # Reduced from 2.0 to 1.5
img_data = pix.tobytes("png")
# Convert to data URL that the existing ocr_image tool expects
data_url = _data_url_from_bytes(img_data, "image/png")
image_data_urls.append(data_url)
pdf_document.close()
return image_data_urls
except Exception as e:
raise RuntimeError(f"PDF to image conversion failed: {str(e)}")
def _split_deed_into_clauses(text: str) -> Dict[str, Any]:
"""Split deed text into logical clauses using pattern matching."""
clauses = []
# Common deed section patterns
section_patterns = [
(r"WITNESSETH[:\s].*?", "Recitals"),
(r"TO HAVE AND TO HOLD.*?", "Habendum Clause"),
(r"SUBJECT TO.*?", "Exceptions and Reservations"),
(r"COVENANT[S]?.*?", "Covenants"),
(r"WARRANTY.*?", "Warranty Clause"),
(r"IN WITNESS WHEREOF.*?", "Execution Clause"),
(r"GRANTETH.*?", "Granting Clause"),
(r"FOR AND IN CONSIDERATION.*?", "Consideration Clause"),
(r"EASEMENT[S]?.*?", "Easement"),
(r"RESTRICTION[S]?.*?", "Restrictions")
]
# Split by paragraphs and page breaks
text = re.sub(r'--- Page \d+ ---', '\n\n', text)
paragraphs = [p.strip() for p in re.split(r'\n\s*\n', text) if p.strip()]
clause_id = 1
for paragraph in paragraphs:
if len(paragraph) < 20: # Skip very short paragraphs
continue
# Try to identify clause type
clause_type = "General"
for pattern, ctype in section_patterns:
if re.search(pattern, paragraph, re.IGNORECASE):
clause_type = ctype
break
clauses.append({
"id": f"clause_{clause_id}",
"type": clause_type,
"text": paragraph,
"length": len(paragraph),
"word_count": len(paragraph.split())
})
clause_id += 1
return {
"success": True,
"clauses": clauses,
"total_clauses": len(clauses),
"metadata": {
"total_paragraphs": len(paragraphs),
"processing_method": "pattern_matching"
}
}
# NEW LEGAL DEED PROCESSING TOOLS
@mcp.tool()
async def extract_text_from_deed_pdf(pdf_path: str) -> str:
"""
Extract text from a PDF deed document. Try fast direct text first, then OCR fallback.
Args:
pdf_path: Path to the PDF deed file
Returns:
JSON string with extracted text, pages, and metadata
"""
try:
path = pathlib.Path(pdf_path).expanduser()
if not path.exists():
return json.dumps({"success": False, "error": f"PDF file not found: {pdf_path}"})
if not pdf_path.lower().endswith(".pdf"):
return json.dumps({"success": False, "error": "File must be a PDF document"})
# Step 1: Fast direct text extraction
direct_result = await _extract_text_directly_from_pdf(str(path))
direct_text = direct_result.get("text", "")
if direct_result.get("success") and len(direct_text) > 50:
print("🔍 Extracting text from PDF using direct text extraction...")
return json.dumps(direct_result, indent=2)
# Step 2: OCR fallback when direct text is empty/short
print("🔍 Using OCR processing...")
image_data_urls = await _convert_pdf_pages_to_images(str(path))
all_text, pages_data = [], []
for page_num, data_url in enumerate(image_data_urls, 1):
print(f"📄 Processing page {page_num}/{len(image_data_urls)} with OCR...")
page_text = await ocr_image(data_url)
all_text.append(f"--- Page {page_num} ---\n{page_text}")
pages_data.append({"page": page_num, "text": page_text, "length": len(page_text)})
full_text = "\n\n".join(all_text)
result = {
"success": True,
"text": full_text,
"pages": pages_data,
"metadata": {
"total_pages": len(pages_data),
"method": "PDF_to_image_OCR_via_existing_tool",
"total_length": len(full_text)
}
}
print(f"✅ OCR processing complete! {len(full_text)} characters extracted")
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": f"PDF processing failed: {str(e)}"})
@mcp.tool()
async def split_deed_into_clauses(text: str) -> str:
"""
Split deed text into logical clauses and sections.
Args:
text: The full deed text to analyze
Returns:
JSON string with identified clauses and their types
"""
try:
if not text or not text.strip():
return json.dumps({
"success": False,
"error": "No text provided for clause analysis"
})
result = _split_deed_into_clauses(text.strip())
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({
"success": False,
"error": f"Clause analysis failed: {str(e)}"
})
@mcp.tool()
async def classify_deed_type(deed_text: str, metadata: Optional[str] = None) -> str:
"""
Classify the deed type and extract key metadata.
Args:
deed_text: The full deed text
metadata: Optional additional metadata about the deed
Returns:
JSON string with deed classification and extracted information
"""
try:
messages = [
{"role": "system", "content": SYSTEM_DEED_LAWYER},
{"role": "user", "content": f"{CLASSIFY_DEED_PROMPT}\n\nDEED TEXT:\n{deed_text[:3000]}"}
]
if metadata:
messages[-1]["content"] += f"\n\nADDITIONAL METADATA:\n{metadata}"
classification_result = await _run_llm_completion(messages)
# Clean and parse the JSON response more robustly
cleaned_json = _clean_llm_json_response(classification_result)
try:
parsed_result = json.loads(cleaned_json)
result = {
"success": True,
"classification": parsed_result,
"raw_response": classification_result
}
except json.JSONDecodeError as e:
print(f"🚨 JSON parsing failed for classification: {str(e)}")
print(f"🔍 Original response: {classification_result[:200]}...")
print(f"🔍 Cleaned response: {cleaned_json[:200]}...")
result = {
"success": True,
"classification": {"raw_analysis": classification_result},
"raw_response": classification_result
}
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({
"success": False,
"error": f"Deed classification failed: {str(e)}"
})
@mcp.tool()
async def analyze_deed_risks(clauses: str, deed_classification: Optional[str] = None) -> str:
"""
Analyze legal risks in deed clauses (rule-based approach without RAG).
Args:
clauses: JSON string of deed clauses from split_deed_into_clauses
deed_classification: Optional classification data from classify_deed_type
Returns:
JSON string with risk analysis for each clause
"""
try:
# Parse clauses input
try:
clauses_data = json.loads(clauses) if isinstance(clauses, str) else clauses
if not clauses_data.get("success") or not clauses_data.get("clauses"):
return json.dumps({
"success": False,
"error": "Invalid clauses data provided"
})
except json.JSONDecodeError:
return json.dumps({
"success": False,
"error": "Could not parse clauses JSON"
})
clause_list = clauses_data["clauses"]
risks_analysis = []
# Analyze each clause for risks
for clause in clause_list:
clause_text = clause.get("text", "")
clause_type = clause.get("type", "General")
prompt = f"{RISK_ANALYSIS_PROMPT}\n\nCLAUSE TYPE: {clause_type}\nCLAUSE TEXT:\n{clause_text}"
if deed_classification:
prompt += f"\n\nDEED CONTEXT:\n{deed_classification}"
messages = [
{"role": "system", "content": SYSTEM_DEED_LAWYER},
{"role": "user", "content": prompt}
]
risk_analysis = await _run_llm_completion(messages, temperature=0.2)
print(f"📊 DEBUG Risk Analysis for {clause['id']} ({clause_type}): {risk_analysis[:200]}...")
risks_analysis.append({
"clause_id": clause["id"],
"clause_type": clause_type,
"risk_analysis": risk_analysis,
"clause_length": clause.get("length", 0)
})
# Extract risk levels from individual analyses for aggregation
individual_risk_levels = []
for risk_item in risks_analysis:
analysis_text = risk_item.get("risk_analysis", "")
# Extract risk level using regex patterns
import re
risk_match = re.search(r'RISK\s+LEVEL[:\s]+([A-Z]+)', analysis_text.upper())
if risk_match:
individual_risk_levels.append(risk_match.group(1))
else:
# Fallback to old extraction method
for level in ["HIGH", "MEDIUM", "LOW"]:
if level in analysis_text.upper():
individual_risk_levels.append(level)
break
# Calculate overall risk level based on individual clause risks
overall_risk_level = "LOW" # Default
if "HIGH" in individual_risk_levels:
overall_risk_level = "HIGH"
elif "MEDIUM" in individual_risk_levels:
overall_risk_level = "MEDIUM"
print(f"📊 DEBUG Individual risk levels found: {individual_risk_levels}")
print(f"📊 DEBUG Calculated overall risk level: {overall_risk_level}")
# Generate overall risk summary with structured format
summary_prompt = f"""
Based on the following risk analyses of individual clauses, provide an overall risk assessment for this deed.
Calculated Overall Risk Level: {overall_risk_level}
Individual Clause Risk Levels: {individual_risk_levels}
Clause Risk Analyses:
{json.dumps(risks_analysis, indent=2)}
Provide your response in this EXACT format:
OVERALL RISK LEVEL: {overall_risk_level}
KEY FINDINGS:
- [Most critical issue 1]
- [Most critical issue 2]
- [Most critical issue 3]
RISK CATEGORIES FOUND: [List categories like TITLE, WARRANTY, etc.]
RECOMMENDATIONS:
- [Recommendation 1]
- [Recommendation 2]
DISCLAIMER: This analysis is for informational purposes only and does not constitute legal advice. Consult a qualified attorney for legal guidance.
Start your response with "OVERALL RISK LEVEL: {overall_risk_level}" and follow the exact format above.
"""
summary_messages = [
{"role": "system", "content": SYSTEM_DEED_LAWYER},
{"role": "user", "content": summary_prompt}
]
overall_summary = await _run_llm_completion(summary_messages)
print(f"📊 DEBUG Overall Risk Summary: {overall_summary[:300]}...")
result = {
"success": True,
"clause_risks": risks_analysis,
"overall_summary": overall_summary,
"total_clauses_analyzed": len(clause_list),
"analysis_method": "rule_based_llm_analysis",
"disclaimer": "This analysis is for informational purposes only and does not constitute legal advice. Consult a qualified attorney for legal guidance."
}
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({
"success": False,
"error": f"Risk analysis failed: {str(e)}"
})
@mcp.tool()
async def generate_comprehensive_deed_report(pdf_path: str) -> str:
"""
Generate a complete deed review report using all analysis tools.
Args:
pdf_path: Path to the PDF deed document
Returns:
JSON string with comprehensive deed analysis report
"""
try:
print("🔍 Extracting text from PDF using existing OCR system...")
text_result = await extract_text_from_deed_pdf(pdf_path)
text_data = json.loads(text_result)
if not text_data["success"]:
return json.dumps({
"success": False,
"error": f"Could not extract text from PDF: {text_data.get('error')}"
})
deed_text = text_data["text"]
print("📋 Classifying deed type...")
classification_result = await classify_deed_type(deed_text)
classification_data = json.loads(classification_result)
print("✂️ Splitting into clauses...")
clauses_result = await split_deed_into_clauses(deed_text)
clauses_data = json.loads(clauses_result)
print("⚠️ Analyzing legal risks...")
risks_result = await analyze_deed_risks(
clauses_result,
json.dumps(classification_data.get("classification", {}))
)
risks_data = json.loads(risks_result)
# Compile comprehensive report
report = {
"success": True,
"pdf_path": pdf_path,
"extraction_metadata": text_data.get("metadata", {}),
"deed_classification": classification_data,
"clause_breakdown": clauses_data,
"risk_analysis": risks_data,
"text_preview": deed_text, # Full text instead of truncated preview
"report_metadata": {
"generated_at": asyncio.get_event_loop().time(),
"analysis_steps": ["text_extraction_via_ocr", "classification", "clause_parsing", "risk_analysis"],
"processing_method": "existing_ocr_system_reused"
},
"legal_disclaimer": {
"notice": "This automated analysis is for informational purposes only.",
"warning": "This does not constitute legal advice. Always consult with a qualified attorney.",
"scope": "This analysis may not identify all potential legal issues.",
"recommendation": "Have this deed reviewed by a licensed attorney before taking any action."
}
}
print("✅ Comprehensive deed report generated successfully")
return json.dumps(report, indent=2)
except Exception as e:
return json.dumps({
"success": False,
"error": f"Report generation failed: {str(e)}"
})
if __name__ == "__main__":
print("🏛️ Starting Legal Deed MCP Server...")
print("📊 Available tools:")
print(" - ocr_image: Original OCR for images (unchanged)")
print(" - extract_text_from_deed_pdf: Extract text from PDF deeds using existing OCR")
print(" - split_deed_into_clauses: Identify and categorize deed clauses")
print(" - classify_deed_type: Determine deed type and extract metadata")
print(" - analyze_deed_risks: Analyze legal risks without RAG system")
print(" - generate_comprehensive_deed_report: Complete deed analysis pipeline")
print("\n⚖️ Legal Notice: This tool provides analysis only, not legal advice.")
print("🚀 Server starting...")
mcp.run()