|
|
import asyncio
|
|
|
import base64
|
|
|
import json
|
|
|
import mimetypes
|
|
|
import os
|
|
|
import pathlib
|
|
|
import re
|
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
|
|
import fitz
|
|
|
import httpx
|
|
|
from dotenv import load_dotenv
|
|
|
from openai import OpenAI
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
|
|
load_dotenv()
|
|
|
api_key = os.environ.get("NEBIUS_API_KEY") or os.environ.get("OPENAI_API_KEY")
|
|
|
if not api_key:
|
|
|
raise RuntimeError("NEBIUS_API_KEY or OPENAI_API_KEY must be set")
|
|
|
|
|
|
client = OpenAI(base_url="https://api.tokenfactory.nebius.com/v1/", api_key=api_key)
|
|
|
|
|
|
mcp = FastMCP("deed-legal-mcp")
|
|
|
|
|
|
|
|
|
SYSTEM_DEED_LAWYER = """
|
|
|
You are an expert conveyancing lawyer reviewing deeds.
|
|
|
Your role is to:
|
|
|
1. Identify potential legal risks and issues
|
|
|
2. Classify deed types and extract key information
|
|
|
3. Explain risks in plain language for non-lawyers
|
|
|
4. Provide risk scores and categorizations
|
|
|
|
|
|
IMPORTANT: You do NOT provide legal advice. You only identify potential issues for review by a qualified lawyer.
|
|
|
Always include appropriate disclaimers about seeking professional legal counsel.
|
|
|
"""
|
|
|
|
|
|
CLASSIFY_DEED_PROMPT = """
|
|
|
Extract information from this deed document and return ONLY a valid JSON object. Do not include any explanatory text, code fences, markdown formatting, or any other content - just the raw JSON.
|
|
|
|
|
|
Required JSON structure:
|
|
|
{
|
|
|
"deed_type": "sale|mortgage|lease|gift|warranty|quitclaim|other",
|
|
|
"jurisdiction": {
|
|
|
"country": "country name",
|
|
|
"state_province": "state or province name"
|
|
|
},
|
|
|
"key_parties": {
|
|
|
"grantor": {
|
|
|
"name": "grantor name",
|
|
|
"address": "address if available"
|
|
|
},
|
|
|
"grantee": {
|
|
|
"name": "grantee name",
|
|
|
"address": "address if available"
|
|
|
},
|
|
|
"witnesses": []
|
|
|
},
|
|
|
"property_description_and_location": {
|
|
|
"district": "district name",
|
|
|
"area": "size/area",
|
|
|
"description": "property description"
|
|
|
},
|
|
|
"consideration_amount": "monetary amount if specified",
|
|
|
"date_of_execution": "date if available",
|
|
|
"special_conditions_or_restrictions": []
|
|
|
}
|
|
|
|
|
|
CRITICAL INSTRUCTIONS:
|
|
|
- Return ONLY the JSON object starting with { and ending with }
|
|
|
- NO ```json code fences
|
|
|
- NO markdown formatting
|
|
|
- NO explanatory text before or after
|
|
|
- Use "N/A" for missing information
|
|
|
- Ensure all strings are properly quoted
|
|
|
- Ensure all JSON syntax is valid
|
|
|
"""
|
|
|
|
|
|
RISK_ANALYSIS_PROMPT = """
|
|
|
Analyze the deed clauses for potential legal risks. Provide your analysis in this EXACT format:
|
|
|
|
|
|
RISK LEVEL: [LOW|MEDIUM|HIGH]
|
|
|
RISK CATEGORY: [TITLE|ENCUMBRANCE|WARRANTY|COVENANT|EASEMENT|RESTRICTION|OTHER]
|
|
|
EXPLANATION: [Plain language explanation of the risk and potential consequences]
|
|
|
RECOMMENDATION: [Recommended actions]
|
|
|
|
|
|
Focus on common deed issues like:
|
|
|
- Title defects or clouds
|
|
|
- Undisclosed encumbrances
|
|
|
- Warranty limitations
|
|
|
- Easement problems
|
|
|
- Restrictive covenants
|
|
|
- Boundary disputes
|
|
|
- Missing signatures or witnesses
|
|
|
|
|
|
Important: Start your response with "RISK LEVEL:" and follow the exact format above.
|
|
|
"""
|
|
|
|
|
|
|
|
|
def _data_url_from_bytes(data: bytes, mime: str) -> str:
|
|
|
b64 = base64.b64encode(data).decode("ascii")
|
|
|
return f"data:{mime};base64,{b64}"
|
|
|
|
|
|
|
|
|
def _clean_llm_json_response(response: str) -> str:
|
|
|
"""Clean LLM response to extract valid JSON."""
|
|
|
|
|
|
cleaned = response.strip()
|
|
|
|
|
|
|
|
|
if cleaned.startswith("```"):
|
|
|
lines = cleaned.split("\n")
|
|
|
|
|
|
lines = lines[1:]
|
|
|
|
|
|
if lines and lines[-1].strip() == "```":
|
|
|
lines = lines[:-1]
|
|
|
cleaned = "\n".join(lines).strip()
|
|
|
|
|
|
|
|
|
start_idx = cleaned.find("{")
|
|
|
end_idx = cleaned.rfind("}") + 1
|
|
|
|
|
|
if start_idx != -1 and end_idx > start_idx:
|
|
|
cleaned = cleaned[start_idx:end_idx]
|
|
|
|
|
|
|
|
|
lines = cleaned.split("\n")
|
|
|
json_lines = []
|
|
|
in_json = False
|
|
|
|
|
|
for line in lines:
|
|
|
stripped = line.strip()
|
|
|
if stripped.startswith("{") or in_json:
|
|
|
in_json = True
|
|
|
json_lines.append(line)
|
|
|
if stripped.endswith("}") and line.count("{") <= line.count("}"):
|
|
|
break
|
|
|
|
|
|
if json_lines:
|
|
|
cleaned = "\n".join(json_lines)
|
|
|
|
|
|
return cleaned.strip()
|
|
|
|
|
|
|
|
|
def _path_to_data_url(path: pathlib.Path) -> str:
|
|
|
mime = mimetypes.guess_type(path.name)[0] or "image/png"
|
|
|
return _data_url_from_bytes(path.read_bytes(), mime)
|
|
|
|
|
|
|
|
|
async def _prepare_image_payload(image_input: str) -> str:
|
|
|
"""
|
|
|
Accept local paths, http(s) URLs, or data URLs and normalize to a data URL string.
|
|
|
"""
|
|
|
if image_input.startswith("data:"):
|
|
|
return image_input
|
|
|
|
|
|
path = pathlib.Path(image_input).expanduser()
|
|
|
if path.exists():
|
|
|
return _path_to_data_url(path)
|
|
|
|
|
|
if image_input.startswith(("http://", "https://")):
|
|
|
async with httpx.AsyncClient() as http_client:
|
|
|
response = await http_client.get(image_input)
|
|
|
response.raise_for_status()
|
|
|
mime = (
|
|
|
response.headers.get("Content-Type")
|
|
|
or mimetypes.guess_type(image_input)[0]
|
|
|
or "image/png"
|
|
|
)
|
|
|
return _data_url_from_bytes(response.content, mime)
|
|
|
|
|
|
raise FileNotFoundError(f"Unable to locate image at {image_input}")
|
|
|
|
|
|
|
|
|
def _postprocess_ocr_text(raw_text: str) -> str:
|
|
|
"""
|
|
|
Postprocess OCR text with layout preservation.
|
|
|
Preserves document structure while doing minimal cleanup.
|
|
|
"""
|
|
|
if not raw_text:
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cleaned_text = "\n".join([
|
|
|
line.rstrip() for line in raw_text.split("\n")
|
|
|
])
|
|
|
|
|
|
|
|
|
cleaned_text = re.sub(r'\n\s*\n\s*\n+', '\n\n', cleaned_text)
|
|
|
|
|
|
return cleaned_text
|
|
|
|
|
|
|
|
|
async def _run_ocr_completion(image_data_url: str) -> str:
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
|
|
def _call_api() -> str:
|
|
|
completion = client.chat.completions.create(
|
|
|
model="Qwen/Qwen2.5-VL-72B-Instruct",
|
|
|
messages=[
|
|
|
{
|
|
|
"role": "system",
|
|
|
"content": (
|
|
|
"You are an OCR assistant. Extract all text and keep layout if "
|
|
|
"possible. Note: Don't include HTML tags in your response."
|
|
|
),
|
|
|
},
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": [
|
|
|
{"type": "image_url", "image_url": {"url": image_data_url}}
|
|
|
],
|
|
|
},
|
|
|
],
|
|
|
)
|
|
|
return completion.choices[0].message.content
|
|
|
|
|
|
raw_result = await loop.run_in_executor(None, _call_api)
|
|
|
|
|
|
|
|
|
return _postprocess_ocr_text(raw_result)
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def ocr_image(image: str) -> str:
|
|
|
"""
|
|
|
Perform OCR on an image. Accepts a local path, http(s) URL, or data URL string.
|
|
|
"""
|
|
|
image_payload = await _prepare_image_payload(image)
|
|
|
return await _run_ocr_completion(image_payload)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _run_llm_completion(messages: List[Dict], temperature: float = 0.1) -> str:
|
|
|
"""Run LLM completion for legal analysis."""
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
|
|
def _call_api() -> str:
|
|
|
completion = client.chat.completions.create(
|
|
|
model="Qwen/Qwen2.5-VL-72B-Instruct",
|
|
|
messages=messages,
|
|
|
temperature=temperature,
|
|
|
max_tokens=4000
|
|
|
)
|
|
|
return completion.choices[0].message.content
|
|
|
|
|
|
return await loop.run_in_executor(None, _call_api)
|
|
|
|
|
|
|
|
|
async def _extract_text_directly_from_pdf(pdf_path: str) -> Dict[str, Any]:
|
|
|
"""Fast direct text extraction from PDF (no OCR needed)."""
|
|
|
try:
|
|
|
pdf_document = fitz.open(pdf_path)
|
|
|
all_text = []
|
|
|
pages_data = []
|
|
|
total_chars = 0
|
|
|
|
|
|
for page_num in range(pdf_document.page_count):
|
|
|
page = pdf_document[page_num]
|
|
|
page_text = page.get_text().strip()
|
|
|
|
|
|
all_text.append(f"--- Page {page_num + 1} ---\n{page_text}")
|
|
|
pages_data.append({
|
|
|
"page": page_num + 1,
|
|
|
"text": page_text,
|
|
|
"length": len(page_text)
|
|
|
})
|
|
|
total_chars += len(page_text)
|
|
|
|
|
|
pdf_document.close()
|
|
|
full_text = "\n\n".join(all_text)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"text": full_text,
|
|
|
"pages": pages_data,
|
|
|
"metadata": {
|
|
|
"total_pages": len(pages_data),
|
|
|
"method": "direct_text_extraction",
|
|
|
"total_length": total_chars,
|
|
|
"processing_time_seconds": "< 1"
|
|
|
}
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
"success": False,
|
|
|
"text": "",
|
|
|
"pages": [],
|
|
|
"metadata": {},
|
|
|
"error": str(e)
|
|
|
}
|
|
|
|
|
|
async def _convert_pdf_pages_to_images(pdf_path: str) -> List[str]:
|
|
|
"""Convert PDF pages to image data URLs for use with existing OCR tool (fallback method)."""
|
|
|
try:
|
|
|
pdf_document = fitz.open(pdf_path)
|
|
|
image_data_urls = []
|
|
|
|
|
|
for page_num in range(pdf_document.page_count):
|
|
|
page = pdf_document[page_num]
|
|
|
|
|
|
pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5))
|
|
|
img_data = pix.tobytes("png")
|
|
|
|
|
|
|
|
|
data_url = _data_url_from_bytes(img_data, "image/png")
|
|
|
image_data_urls.append(data_url)
|
|
|
|
|
|
pdf_document.close()
|
|
|
return image_data_urls
|
|
|
|
|
|
except Exception as e:
|
|
|
raise RuntimeError(f"PDF to image conversion failed: {str(e)}")
|
|
|
|
|
|
|
|
|
def _split_deed_into_clauses(text: str) -> Dict[str, Any]:
|
|
|
"""Split deed text into logical clauses using pattern matching."""
|
|
|
clauses = []
|
|
|
|
|
|
|
|
|
section_patterns = [
|
|
|
(r"WITNESSETH[:\s].*?", "Recitals"),
|
|
|
(r"TO HAVE AND TO HOLD.*?", "Habendum Clause"),
|
|
|
(r"SUBJECT TO.*?", "Exceptions and Reservations"),
|
|
|
(r"COVENANT[S]?.*?", "Covenants"),
|
|
|
(r"WARRANTY.*?", "Warranty Clause"),
|
|
|
(r"IN WITNESS WHEREOF.*?", "Execution Clause"),
|
|
|
(r"GRANTETH.*?", "Granting Clause"),
|
|
|
(r"FOR AND IN CONSIDERATION.*?", "Consideration Clause"),
|
|
|
(r"EASEMENT[S]?.*?", "Easement"),
|
|
|
(r"RESTRICTION[S]?.*?", "Restrictions")
|
|
|
]
|
|
|
|
|
|
|
|
|
text = re.sub(r'--- Page \d+ ---', '\n\n', text)
|
|
|
paragraphs = [p.strip() for p in re.split(r'\n\s*\n', text) if p.strip()]
|
|
|
|
|
|
clause_id = 1
|
|
|
for paragraph in paragraphs:
|
|
|
if len(paragraph) < 20:
|
|
|
continue
|
|
|
|
|
|
|
|
|
clause_type = "General"
|
|
|
for pattern, ctype in section_patterns:
|
|
|
if re.search(pattern, paragraph, re.IGNORECASE):
|
|
|
clause_type = ctype
|
|
|
break
|
|
|
|
|
|
clauses.append({
|
|
|
"id": f"clause_{clause_id}",
|
|
|
"type": clause_type,
|
|
|
"text": paragraph,
|
|
|
"length": len(paragraph),
|
|
|
"word_count": len(paragraph.split())
|
|
|
})
|
|
|
clause_id += 1
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"clauses": clauses,
|
|
|
"total_clauses": len(clauses),
|
|
|
"metadata": {
|
|
|
"total_paragraphs": len(paragraphs),
|
|
|
"processing_method": "pattern_matching"
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def extract_text_from_deed_pdf(pdf_path: str) -> str:
|
|
|
"""
|
|
|
Extract text from a PDF deed document. Try fast direct text first, then OCR fallback.
|
|
|
|
|
|
Args:
|
|
|
pdf_path: Path to the PDF deed file
|
|
|
|
|
|
Returns:
|
|
|
JSON string with extracted text, pages, and metadata
|
|
|
"""
|
|
|
try:
|
|
|
path = pathlib.Path(pdf_path).expanduser()
|
|
|
if not path.exists():
|
|
|
return json.dumps({"success": False, "error": f"PDF file not found: {pdf_path}"})
|
|
|
if not pdf_path.lower().endswith(".pdf"):
|
|
|
return json.dumps({"success": False, "error": "File must be a PDF document"})
|
|
|
|
|
|
|
|
|
direct_result = await _extract_text_directly_from_pdf(str(path))
|
|
|
direct_text = direct_result.get("text", "")
|
|
|
|
|
|
if direct_result.get("success") and len(direct_text) > 50:
|
|
|
print("🔍 Extracting text from PDF using direct text extraction...")
|
|
|
return json.dumps(direct_result, indent=2)
|
|
|
|
|
|
|
|
|
print("🔍 Using OCR processing...")
|
|
|
image_data_urls = await _convert_pdf_pages_to_images(str(path))
|
|
|
|
|
|
all_text, pages_data = [], []
|
|
|
for page_num, data_url in enumerate(image_data_urls, 1):
|
|
|
print(f"📄 Processing page {page_num}/{len(image_data_urls)} with OCR...")
|
|
|
page_text = await ocr_image(data_url)
|
|
|
all_text.append(f"--- Page {page_num} ---\n{page_text}")
|
|
|
pages_data.append({"page": page_num, "text": page_text, "length": len(page_text)})
|
|
|
|
|
|
full_text = "\n\n".join(all_text)
|
|
|
result = {
|
|
|
"success": True,
|
|
|
"text": full_text,
|
|
|
"pages": pages_data,
|
|
|
"metadata": {
|
|
|
"total_pages": len(pages_data),
|
|
|
"method": "PDF_to_image_OCR_via_existing_tool",
|
|
|
"total_length": len(full_text)
|
|
|
}
|
|
|
}
|
|
|
print(f"✅ OCR processing complete! {len(full_text)} characters extracted")
|
|
|
return json.dumps(result, indent=2)
|
|
|
except Exception as e:
|
|
|
return json.dumps({"success": False, "error": f"PDF processing failed: {str(e)}"})
|
|
|
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def split_deed_into_clauses(text: str) -> str:
|
|
|
"""
|
|
|
Split deed text into logical clauses and sections.
|
|
|
|
|
|
Args:
|
|
|
text: The full deed text to analyze
|
|
|
|
|
|
Returns:
|
|
|
JSON string with identified clauses and their types
|
|
|
"""
|
|
|
try:
|
|
|
if not text or not text.strip():
|
|
|
return json.dumps({
|
|
|
"success": False,
|
|
|
"error": "No text provided for clause analysis"
|
|
|
})
|
|
|
|
|
|
result = _split_deed_into_clauses(text.strip())
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
except Exception as e:
|
|
|
return json.dumps({
|
|
|
"success": False,
|
|
|
"error": f"Clause analysis failed: {str(e)}"
|
|
|
})
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def classify_deed_type(deed_text: str, metadata: Optional[str] = None) -> str:
|
|
|
"""
|
|
|
Classify the deed type and extract key metadata.
|
|
|
|
|
|
Args:
|
|
|
deed_text: The full deed text
|
|
|
metadata: Optional additional metadata about the deed
|
|
|
|
|
|
Returns:
|
|
|
JSON string with deed classification and extracted information
|
|
|
"""
|
|
|
try:
|
|
|
messages = [
|
|
|
{"role": "system", "content": SYSTEM_DEED_LAWYER},
|
|
|
{"role": "user", "content": f"{CLASSIFY_DEED_PROMPT}\n\nDEED TEXT:\n{deed_text[:3000]}"}
|
|
|
]
|
|
|
|
|
|
if metadata:
|
|
|
messages[-1]["content"] += f"\n\nADDITIONAL METADATA:\n{metadata}"
|
|
|
|
|
|
classification_result = await _run_llm_completion(messages)
|
|
|
|
|
|
|
|
|
cleaned_json = _clean_llm_json_response(classification_result)
|
|
|
|
|
|
try:
|
|
|
parsed_result = json.loads(cleaned_json)
|
|
|
result = {
|
|
|
"success": True,
|
|
|
"classification": parsed_result,
|
|
|
"raw_response": classification_result
|
|
|
}
|
|
|
except json.JSONDecodeError as e:
|
|
|
print(f"🚨 JSON parsing failed for classification: {str(e)}")
|
|
|
print(f"🔍 Original response: {classification_result[:200]}...")
|
|
|
print(f"🔍 Cleaned response: {cleaned_json[:200]}...")
|
|
|
result = {
|
|
|
"success": True,
|
|
|
"classification": {"raw_analysis": classification_result},
|
|
|
"raw_response": classification_result
|
|
|
}
|
|
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
except Exception as e:
|
|
|
return json.dumps({
|
|
|
"success": False,
|
|
|
"error": f"Deed classification failed: {str(e)}"
|
|
|
})
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def analyze_deed_risks(clauses: str, deed_classification: Optional[str] = None) -> str:
|
|
|
"""
|
|
|
Analyze legal risks in deed clauses (rule-based approach without RAG).
|
|
|
|
|
|
Args:
|
|
|
clauses: JSON string of deed clauses from split_deed_into_clauses
|
|
|
deed_classification: Optional classification data from classify_deed_type
|
|
|
|
|
|
Returns:
|
|
|
JSON string with risk analysis for each clause
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
clauses_data = json.loads(clauses) if isinstance(clauses, str) else clauses
|
|
|
if not clauses_data.get("success") or not clauses_data.get("clauses"):
|
|
|
return json.dumps({
|
|
|
"success": False,
|
|
|
"error": "Invalid clauses data provided"
|
|
|
})
|
|
|
except json.JSONDecodeError:
|
|
|
return json.dumps({
|
|
|
"success": False,
|
|
|
"error": "Could not parse clauses JSON"
|
|
|
})
|
|
|
|
|
|
clause_list = clauses_data["clauses"]
|
|
|
risks_analysis = []
|
|
|
|
|
|
|
|
|
for clause in clause_list:
|
|
|
clause_text = clause.get("text", "")
|
|
|
clause_type = clause.get("type", "General")
|
|
|
|
|
|
prompt = f"{RISK_ANALYSIS_PROMPT}\n\nCLAUSE TYPE: {clause_type}\nCLAUSE TEXT:\n{clause_text}"
|
|
|
|
|
|
if deed_classification:
|
|
|
prompt += f"\n\nDEED CONTEXT:\n{deed_classification}"
|
|
|
|
|
|
messages = [
|
|
|
{"role": "system", "content": SYSTEM_DEED_LAWYER},
|
|
|
{"role": "user", "content": prompt}
|
|
|
]
|
|
|
|
|
|
risk_analysis = await _run_llm_completion(messages, temperature=0.2)
|
|
|
print(f"📊 DEBUG Risk Analysis for {clause['id']} ({clause_type}): {risk_analysis[:200]}...")
|
|
|
|
|
|
risks_analysis.append({
|
|
|
"clause_id": clause["id"],
|
|
|
"clause_type": clause_type,
|
|
|
"risk_analysis": risk_analysis,
|
|
|
"clause_length": clause.get("length", 0)
|
|
|
})
|
|
|
|
|
|
|
|
|
individual_risk_levels = []
|
|
|
for risk_item in risks_analysis:
|
|
|
analysis_text = risk_item.get("risk_analysis", "")
|
|
|
|
|
|
import re
|
|
|
risk_match = re.search(r'RISK\s+LEVEL[:\s]+([A-Z]+)', analysis_text.upper())
|
|
|
if risk_match:
|
|
|
individual_risk_levels.append(risk_match.group(1))
|
|
|
else:
|
|
|
|
|
|
for level in ["HIGH", "MEDIUM", "LOW"]:
|
|
|
if level in analysis_text.upper():
|
|
|
individual_risk_levels.append(level)
|
|
|
break
|
|
|
|
|
|
|
|
|
overall_risk_level = "LOW"
|
|
|
if "HIGH" in individual_risk_levels:
|
|
|
overall_risk_level = "HIGH"
|
|
|
elif "MEDIUM" in individual_risk_levels:
|
|
|
overall_risk_level = "MEDIUM"
|
|
|
|
|
|
print(f"📊 DEBUG Individual risk levels found: {individual_risk_levels}")
|
|
|
print(f"📊 DEBUG Calculated overall risk level: {overall_risk_level}")
|
|
|
|
|
|
|
|
|
summary_prompt = f"""
|
|
|
Based on the following risk analyses of individual clauses, provide an overall risk assessment for this deed.
|
|
|
|
|
|
Calculated Overall Risk Level: {overall_risk_level}
|
|
|
Individual Clause Risk Levels: {individual_risk_levels}
|
|
|
|
|
|
Clause Risk Analyses:
|
|
|
{json.dumps(risks_analysis, indent=2)}
|
|
|
|
|
|
Provide your response in this EXACT format:
|
|
|
|
|
|
OVERALL RISK LEVEL: {overall_risk_level}
|
|
|
KEY FINDINGS:
|
|
|
- [Most critical issue 1]
|
|
|
- [Most critical issue 2]
|
|
|
- [Most critical issue 3]
|
|
|
|
|
|
RISK CATEGORIES FOUND: [List categories like TITLE, WARRANTY, etc.]
|
|
|
RECOMMENDATIONS:
|
|
|
- [Recommendation 1]
|
|
|
- [Recommendation 2]
|
|
|
|
|
|
DISCLAIMER: This analysis is for informational purposes only and does not constitute legal advice. Consult a qualified attorney for legal guidance.
|
|
|
|
|
|
Start your response with "OVERALL RISK LEVEL: {overall_risk_level}" and follow the exact format above.
|
|
|
"""
|
|
|
|
|
|
summary_messages = [
|
|
|
{"role": "system", "content": SYSTEM_DEED_LAWYER},
|
|
|
{"role": "user", "content": summary_prompt}
|
|
|
]
|
|
|
|
|
|
overall_summary = await _run_llm_completion(summary_messages)
|
|
|
print(f"📊 DEBUG Overall Risk Summary: {overall_summary[:300]}...")
|
|
|
|
|
|
result = {
|
|
|
"success": True,
|
|
|
"clause_risks": risks_analysis,
|
|
|
"overall_summary": overall_summary,
|
|
|
"total_clauses_analyzed": len(clause_list),
|
|
|
"analysis_method": "rule_based_llm_analysis",
|
|
|
"disclaimer": "This analysis is for informational purposes only and does not constitute legal advice. Consult a qualified attorney for legal guidance."
|
|
|
}
|
|
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
except Exception as e:
|
|
|
return json.dumps({
|
|
|
"success": False,
|
|
|
"error": f"Risk analysis failed: {str(e)}"
|
|
|
})
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def generate_comprehensive_deed_report(pdf_path: str) -> str:
|
|
|
"""
|
|
|
Generate a complete deed review report using all analysis tools.
|
|
|
|
|
|
Args:
|
|
|
pdf_path: Path to the PDF deed document
|
|
|
|
|
|
Returns:
|
|
|
JSON string with comprehensive deed analysis report
|
|
|
"""
|
|
|
try:
|
|
|
print("🔍 Extracting text from PDF using existing OCR system...")
|
|
|
text_result = await extract_text_from_deed_pdf(pdf_path)
|
|
|
text_data = json.loads(text_result)
|
|
|
|
|
|
if not text_data["success"]:
|
|
|
return json.dumps({
|
|
|
"success": False,
|
|
|
"error": f"Could not extract text from PDF: {text_data.get('error')}"
|
|
|
})
|
|
|
|
|
|
deed_text = text_data["text"]
|
|
|
|
|
|
print("📋 Classifying deed type...")
|
|
|
classification_result = await classify_deed_type(deed_text)
|
|
|
classification_data = json.loads(classification_result)
|
|
|
|
|
|
print("✂️ Splitting into clauses...")
|
|
|
clauses_result = await split_deed_into_clauses(deed_text)
|
|
|
clauses_data = json.loads(clauses_result)
|
|
|
|
|
|
print("⚠️ Analyzing legal risks...")
|
|
|
risks_result = await analyze_deed_risks(
|
|
|
clauses_result,
|
|
|
json.dumps(classification_data.get("classification", {}))
|
|
|
)
|
|
|
risks_data = json.loads(risks_result)
|
|
|
|
|
|
|
|
|
report = {
|
|
|
"success": True,
|
|
|
"pdf_path": pdf_path,
|
|
|
"extraction_metadata": text_data.get("metadata", {}),
|
|
|
"deed_classification": classification_data,
|
|
|
"clause_breakdown": clauses_data,
|
|
|
"risk_analysis": risks_data,
|
|
|
"text_preview": deed_text,
|
|
|
"report_metadata": {
|
|
|
"generated_at": asyncio.get_event_loop().time(),
|
|
|
"analysis_steps": ["text_extraction_via_ocr", "classification", "clause_parsing", "risk_analysis"],
|
|
|
"processing_method": "existing_ocr_system_reused"
|
|
|
},
|
|
|
"legal_disclaimer": {
|
|
|
"notice": "This automated analysis is for informational purposes only.",
|
|
|
"warning": "This does not constitute legal advice. Always consult with a qualified attorney.",
|
|
|
"scope": "This analysis may not identify all potential legal issues.",
|
|
|
"recommendation": "Have this deed reviewed by a licensed attorney before taking any action."
|
|
|
}
|
|
|
}
|
|
|
|
|
|
print("✅ Comprehensive deed report generated successfully")
|
|
|
return json.dumps(report, indent=2)
|
|
|
|
|
|
except Exception as e:
|
|
|
return json.dumps({
|
|
|
"success": False,
|
|
|
"error": f"Report generation failed: {str(e)}"
|
|
|
})
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
print("🏛️ Starting Legal Deed MCP Server...")
|
|
|
print("📊 Available tools:")
|
|
|
print(" - ocr_image: Original OCR for images (unchanged)")
|
|
|
print(" - extract_text_from_deed_pdf: Extract text from PDF deeds using existing OCR")
|
|
|
print(" - split_deed_into_clauses: Identify and categorize deed clauses")
|
|
|
print(" - classify_deed_type: Determine deed type and extract metadata")
|
|
|
print(" - analyze_deed_risks: Analyze legal risks without RAG system")
|
|
|
print(" - generate_comprehensive_deed_report: Complete deed analysis pipeline")
|
|
|
print("\n⚖️ Legal Notice: This tool provides analysis only, not legal advice.")
|
|
|
print("🚀 Server starting...")
|
|
|
mcp.run()
|
|
|
|