Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| FastAPI interface for the LangGraph cyber-legal assistant | |
| """ | |
| import os | |
| import asyncio | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from pydantic import BaseModel, Field | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| from dotenv import load_dotenv | |
| from fastapi import Depends | |
| from fastapi.security import APIKeyHeader | |
| import secrets | |
| from structured_outputs.api_models import ( | |
| Message, DocumentAnalysis, ChatRequest, ChatResponse, | |
| HealthResponse, AnalyzePDFRequest, AnalyzePDFResponse, | |
| LawyerProfile, DocCreatorRequest, DocCreatorResponse, | |
| DocumentsTree, TreeNode | |
| ) | |
| from langgraph_agent import CyberLegalAgent | |
| from utils.conversation_manager import ConversationManager | |
| from utils.utils import validate_query | |
| from utils.lightrag_client import LightRAGClient | |
| from utils import tools | |
| from subagents.lawyer_selector import LawyerSelectorAgent | |
| from subagents.lawyer_messenger import LawyerMessengerAgent | |
| from prompts.main import SYSTEM_PROMPT_CLIENT, SYSTEM_PROMPT_LAWYER | |
| from subagents.pdf_analyzer import PDFAnalyzerAgent | |
| from subagents.doc_editor import DocumentEditorAgent | |
| from langchain_openai import ChatOpenAI | |
| from mistralai import Mistral | |
| import logging | |
| import traceback | |
| import base64 | |
| import tempfile | |
| import os as pathlib | |
| import json | |
| from langchain_tavily import TavilySearch | |
| import resend | |
| # Load environment variables | |
| load_dotenv(dotenv_path=".env", override=False) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="CyberLegal AI API", | |
| description="LangGraph-powered cyber-legal assistant API", | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| API_PASSWORD = os.getenv("API_PASSWORD", "") # set this in HF Space Secrets | |
| api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) | |
| def require_password(x_api_key: str = Depends(api_key_header)): | |
| if not API_PASSWORD: | |
| return # if you forgot to set it, it won't lock you out | |
| if x_api_key and secrets.compare_digest(x_api_key, API_PASSWORD): | |
| return | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| # Global agent instance | |
| agent_instance = None | |
| class CyberLegalAPI: | |
| """ | |
| API wrapper for the LangGraph agent | |
| """ | |
| def __init__(self): | |
| load_dotenv(dotenv_path=".env", override=True) | |
| llm_provider = os.getenv("LLM_PROVIDER", "openai").lower() | |
| self.llm_provider = llm_provider | |
| llm = ChatOpenAI( | |
| model=os.getenv("LLM_MODEL", "gpt-5-nano-2025-08-07"), | |
| reasoning_effort="low", | |
| api_key=os.getenv("OPENAI_API_KEY"), | |
| base_url=os.getenv("LLM_BINDING_HOST", "https://api.openai.com/v1"), | |
| default_headers={ | |
| "X-Cerebras-3rd-Party-Integration": "langgraph" | |
| } | |
| ) | |
| mistral_client = Mistral(api_key=os.getenv("MISTRAL_API_KEY")) | |
| logger.info("✅ Mistral OCR client initialized") | |
| # Initialize subagents and set them globally in tools.py | |
| global lawyer_selector_agent, lawyer_messenger_agent, lightrag_client, tavily_search | |
| lawyer_selector_agent = LawyerSelectorAgent(llm=llm) | |
| tools.lawyer_selector_agent = lawyer_selector_agent | |
| lawyer_messenger_agent = LawyerMessengerAgent(llm=llm) | |
| tools.lawyer_messenger_agent = lawyer_messenger_agent | |
| logger.info("✅ LawyerMessengerAgent initialized") | |
| lightrag_client = LightRAGClient() | |
| tools.lightrag_client = lightrag_client | |
| tavily_search = TavilySearch( | |
| api_key=os.getenv("TAVILY_API_KEY"), | |
| max_results=5, | |
| topic="general", | |
| search_depth="advanced", | |
| include_answer=True, | |
| include_raw_content=False | |
| ) | |
| tools.tavily_search = tavily_search | |
| logger.info("✅ Tavily search client initialized") | |
| # Initialize Resend | |
| resend.api_key = os.getenv("RESEND_API_KEY") | |
| logger.info("✅ Resend client initialized") | |
| self.agent_client = CyberLegalAgent(llm=llm, tools=tools.tools_for_client,tools_facade=tools.tools_for_client_facade) | |
| self.agent_lawyer = CyberLegalAgent(llm=llm, tools=tools.tools_for_lawyer,tools_facade=tools.tools_for_lawyer_facade) | |
| self.pdf_analyzer = PDFAnalyzerAgent(llm=llm, mistral_client=mistral_client) | |
| self.doc_editor = DocumentEditorAgent(llm=llm) | |
| self.conversation_manager = ConversationManager() | |
| logger.info(f"🔧 CyberLegalAPI initialized with {llm_provider.upper()} provider") | |
| def _format_documents_tree(self, node: TreeNode, indent: int = 0) -> str: | |
| """ | |
| Format documents tree as hierarchical text with indentation. | |
| Example: | |
| - Subdirectory 1: | |
| - file11: summary | actors | key_details | |
| - Sub-sub-directory 11: | |
| - file111: summary | actors | key_details | |
| """ | |
| lines = [] | |
| indent_str = " " * indent | |
| if node.type == "folder": | |
| lines.append(f"{indent_str}- {node.name}:") | |
| if node.children: | |
| for child in node.children: | |
| lines.append(self._format_documents_tree(child, indent + 3)) | |
| elif node.type == "file" and node.analysis: | |
| analysis_parts = [] | |
| if node.analysis.summary: | |
| summary_preview = node.analysis.summary[:100] + "..." if len(node.analysis.summary) > 100 else node.analysis.summary | |
| analysis_parts.append(f"summary: {summary_preview}") | |
| if node.analysis.actors: | |
| actors_preview = node.analysis.actors[:100] + "..." if len(node.analysis.actors) > 100 else node.analysis.actors | |
| analysis_parts.append(f"actors: {actors_preview}") | |
| if node.analysis.key_details: | |
| details_preview = node.analysis.key_details[:100] + "..." if len(node.analysis.key_details) > 100 else node.analysis.key_details | |
| analysis_parts.append(f"key_details: {details_preview}") | |
| analysis_text = " | ".join(analysis_parts) if analysis_parts else "No analysis available" | |
| lines.append(f"{indent_str}- {node.name}: {analysis_text}") | |
| return "\n".join(lines) | |
| def _extract_flat_documents(self, node: TreeNode) -> List[Dict[str, Any]]: | |
| """ | |
| Recursively extract all documents with analysis from tree into flat list. | |
| Used for endpoints that expect flat document structure. | |
| """ | |
| docs = [] | |
| if node.type == "file" and node.analysis: | |
| docs.append({ | |
| "file_name": node.name, | |
| "summary": node.analysis.summary, | |
| "actors": node.analysis.actors, | |
| "key_details": node.analysis.key_details | |
| }) | |
| if node.children: | |
| for child in node.children: | |
| docs.extend(self._extract_flat_documents(child)) | |
| return docs | |
| def _build_lawyer_prompt(self, documents_tree: Optional[DocumentsTree], jurisdiction: str, lawyer_profile: Optional[LawyerProfile] = None) -> str: | |
| """Build lawyer prompt with optional document context and lawyer profile""" | |
| prompt_parts = [] | |
| # Add lawyer profile context if available | |
| if lawyer_profile: | |
| profile_text = "\n\n### Lawyer Profile Context\n" | |
| if lawyer_profile.full_name: | |
| profile_text += f"Name: {lawyer_profile.full_name}\n" | |
| if lawyer_profile.primary_specialty: | |
| profile_text += f"Primary Specialty: {lawyer_profile.primary_specialty}\n" | |
| if lawyer_profile.legal_specialties: | |
| profile_text += f"Specialties: {', '.join(lawyer_profile.legal_specialties)}\n" | |
| if lawyer_profile.experience_level: | |
| profile_text += f"Experience Level: {lawyer_profile.experience_level}\n" | |
| if lawyer_profile.languages: | |
| profile_text += f"Languages: {', '.join(lawyer_profile.languages)}\n" | |
| if lawyer_profile.lawyer_description: | |
| profile_text += f"Description: {lawyer_profile.lawyer_description}\n" | |
| profile_text += "\nWhen answering, consider this lawyer's expertise and experience level. Tailor your responses to be appropriate for their seniority and specialization.\n" | |
| prompt_parts.append(profile_text) | |
| # Add documents tree if available | |
| if documents_tree and documents_tree.children: | |
| docs_text = "\n### Documents in Lawyer's Database\n" | |
| docs_text += self._format_documents_tree(documents_tree) | |
| docs_text += "\n\nUse these documents when relevant to the question.\n" | |
| prompt_parts.append(docs_text) | |
| # Combine base prompt with context | |
| base_prompt = SYSTEM_PROMPT_LAWYER.format(jurisdiction=jurisdiction) | |
| if prompt_parts: | |
| return base_prompt + "\n".join(prompt_parts) | |
| return base_prompt | |
| async def process_request(self, request: ChatRequest) -> ChatResponse: | |
| """ | |
| Process chat request through the agent | |
| """ | |
| is_valid, error_msg = validate_query(request.message) | |
| if not is_valid: | |
| raise HTTPException(status_code=400, detail=error_msg) | |
| # Determine user type | |
| logger.info(f"Received request: {request}") | |
| # Select appropriate agent | |
| if request.userType == "lawyer": | |
| agent = self.agent_lawyer | |
| logger.info("👨⚖️ Using lawyer specialist agent") | |
| else: | |
| agent = self.agent_client | |
| logger.info("👤 Using client-friendly agent") | |
| # Convert conversation history format | |
| logger.info(f"Received this request: {request}") | |
| conversation_history = [] | |
| for msg in request.conversationHistory or []: | |
| conversation_history.append({ | |
| "role": msg.role, | |
| "content": msg.content | |
| }) | |
| logger.info(f"🚀 Starting request processing - user_type: {request.userType}, jurisdiction: {request.jurisdiction}") | |
| logger.info(f"💬 User query: {request.message}") | |
| try: | |
| # Build dynamic system prompt for lawyers with documents tree and/or lawyer profile | |
| if request.userType == "lawyer": | |
| system_prompt = self._build_lawyer_prompt( | |
| request.documents_tree, | |
| request.jurisdiction, | |
| request.lawyerProfile | |
| ) | |
| context_parts = [] | |
| if request.lawyerProfile: | |
| context_parts.append("lawyer profile") | |
| if request.documents_tree and request.documents_tree.children: | |
| # Count documents in tree | |
| doc_count = sum(1 for node in self._extract_flat_documents(request.documents_tree)) | |
| context_parts.append(f"{doc_count} documents") | |
| if context_parts: | |
| logger.info(f"📚 Using lawyer prompt with {', '.join(context_parts)}") | |
| else: | |
| logger.info(f"📝 Using default lawyer prompt with jurisdiction: {request.jurisdiction}") | |
| else: | |
| system_prompt = SYSTEM_PROMPT_CLIENT.format(jurisdiction=request.jurisdiction) | |
| logger.info(f"👤 Using client prompt with jurisdiction: {request.jurisdiction}") | |
| # Process through selected agent with raw message and conversation history | |
| logger.info(f"🤖 Calling agent.process_query with jurisdiction: {request.jurisdiction}") | |
| result = await agent.process_query( | |
| user_query=request.message, | |
| user_id=request.userId, | |
| conversation_history=conversation_history, | |
| jurisdiction=request.jurisdiction, | |
| system_prompt=system_prompt | |
| ) | |
| logger.info(f"✅ Agent processing completed successfully") | |
| # Create response | |
| response = ChatResponse( | |
| response=result["response"], | |
| processing_time=result.get("processing_time", 0.0), | |
| references=result.get("references", []), | |
| timestamp=result.get("timestamp", datetime.now().isoformat()), | |
| error=result.get("error") | |
| ) | |
| logger.info(f"📤 Returning response to user") | |
| return response | |
| except Exception as e: | |
| # Log full traceback for debugging | |
| error_traceback = traceback.format_exc() | |
| logger.error(f"❌ Request processing failed: {str(e)}") | |
| logger.error(f"🔍 Full traceback:\n{error_traceback}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail={ | |
| "error": "Processing failed", | |
| "message": str(e), | |
| "traceback": error_traceback, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| ) | |
| async def health_check(self) -> HealthResponse: | |
| """ | |
| Check health status of the API and dependencies | |
| """ | |
| try: | |
| from utils.lightrag_client import LightRAGClient | |
| lightrag_client = LightRAGClient() | |
| lightrag_healthy = lightrag_client.health_check() | |
| return HealthResponse( | |
| status="healthy" if lightrag_healthy else "degraded", | |
| agent_ready=True, | |
| lightrag_healthy=lightrag_healthy, | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| except Exception as e: | |
| return HealthResponse( | |
| status="unhealthy", | |
| agent_ready=False, | |
| lightrag_healthy=False, | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| async def analyze_pdf(self, request: AnalyzePDFRequest) -> AnalyzePDFResponse: | |
| """ | |
| Analyze PDF document through the PDF analyzer agent | |
| """ | |
| start_time = datetime.now() | |
| try: | |
| # Decode base64 PDF content | |
| pdf_bytes = base64.b64decode(request.pdf_content) | |
| # Create temporary file to save PDF | |
| with tempfile.NamedTemporaryFile(mode='wb', suffix='.pdf', delete=False) as tmp_file: | |
| tmp_file.write(pdf_bytes) | |
| tmp_file_path = tmp_file.name | |
| logger.info(f"📄 Analyzing PDF: {request.filename}") | |
| try: | |
| # Analyze the PDF | |
| result = await self.pdf_analyzer.analyze_pdf(tmp_file_path) | |
| # Calculate processing time | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| # Create response | |
| response = AnalyzePDFResponse( | |
| actors=result.get("actors", ""), | |
| key_details=result.get("key_details", ""), | |
| summary=result.get("summary", ""), | |
| extracted_text=result.get("extracted_text", ""), | |
| processing_status=result.get("processing_status", "unknown"), | |
| processing_time=processing_time, | |
| timestamp=datetime.now().isoformat(), | |
| error=result.get("error") | |
| ) | |
| logger.info(f"✅ PDF analysis completed in {processing_time:.2f}s") | |
| return response | |
| finally: | |
| # Clean up temporary file | |
| if pathlib.path.exists(tmp_file_path): | |
| pathlib.unlink(tmp_file_path) | |
| logger.debug(f"🗑️ Cleaned up temporary file: {tmp_file_path}") | |
| except Exception as e: | |
| error_traceback = traceback.format_exc() | |
| logger.error(f"❌ PDF analysis failed: {str(e)}") | |
| logger.error(f"🔍 Full traceback:\n{error_traceback}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail={ | |
| "error": "PDF analysis failed", | |
| "message": str(e), | |
| "traceback": error_traceback, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| ) | |
| async def create_or_edit_document(self, request: DocCreatorRequest) -> DocCreatorResponse: | |
| """ | |
| Create or edit an HTML document using the document editor agent | |
| Args: | |
| request: Document editing request with HTML content | |
| Returns: | |
| DocCreatorResponse with assistant's response and modified document | |
| """ | |
| start_time = datetime.now() | |
| # Log incoming request details | |
| logger.info("=" * 80) | |
| logger.info("📥 DOC_CREATOR REQUEST RECEIVED") | |
| logger.info("=" * 80) | |
| logger.info(f"👤 User ID: {request.userId}") | |
| logger.info(f"📋 Instruction: {request.instruction}") | |
| logger.info(f"📏 Document size: {len(request.documentContent)} bytes") | |
| # Count documents in tree | |
| doc_count = 0 | |
| if request.documents_tree and request.documents_tree.children: | |
| doc_count = sum(1 for node in self._extract_flat_documents(request.documents_tree)) | |
| logger.info(f"📚 Documents in tree: {doc_count}") | |
| logger.info(f"💬 Conversation history: {len(request.conversationHistory) if request.conversationHistory else 0} messages") | |
| try: | |
| # Use HTML directly (no canonicalization needed) | |
| logger.info("🔄 Using HTML document content directly...") | |
| doc_text = request.documentContent | |
| logger.info(f"✅ HTML document ready - size: {len(doc_text)} bytes") | |
| # Extract documents from tree if provided (convert to flat list for doc_editor agent) | |
| doc_summaries = [] | |
| if request.documents_tree and request.documents_tree.children: | |
| logger.info("📚 Processing documents from tree...") | |
| doc_summaries = self._extract_flat_documents(request.documents_tree) | |
| for i, doc in enumerate(doc_summaries, 1): | |
| logger.info(f" [{i}] {doc['file_name']} - {doc['summary'][:50] if doc['summary'] else 'No summary'}...") | |
| logger.info(f"✅ {len(doc_summaries)} documents loaded from tree") | |
| else: | |
| logger.info("ℹ️ No documents provided") | |
| # Convert conversation history if provided | |
| conversation_history = [] | |
| if request.conversationHistory: | |
| logger.info(f"💬 Processing conversation history ({len(request.conversationHistory)} messages)...") | |
| for i, msg in enumerate(request.conversationHistory, 1): | |
| role_emoji = "👤" if msg.role == "user" else "🤖" | |
| logger.info(f" [{i}] {role_emoji} {msg.role}: {msg.content[:50]}...") | |
| conversation_history.append({ | |
| "role": msg.role, | |
| "content": msg.content | |
| }) | |
| logger.info(f"✅ {len(conversation_history)} conversation messages loaded") | |
| else: | |
| logger.info("ℹ️ No conversation history provided") | |
| # Call document editor agent | |
| logger.info("=" * 80) | |
| logger.info("🤖 CALLING DOCUMENT EDITOR AGENT") | |
| logger.info("=" * 80) | |
| result = await self.doc_editor.edit_document( | |
| doc_text=doc_text, | |
| user_instruction=request.instruction, | |
| doc_summaries=doc_summaries, | |
| conversation_history=conversation_history, | |
| max_iterations=10 | |
| ) | |
| # Calculate processing time | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| # Log results | |
| logger.info("=" * 80) | |
| logger.info("📊 DOCUMENT EDITING RESULTS") | |
| logger.info("=" * 80) | |
| logger.info(f"✅ Success: {result['success']}") | |
| logger.info(f"🔄 Iterations: {result['iteration_count']}") | |
| logger.info(f"⏱️ Processing time: {processing_time:.2f}s") | |
| logger.info(f"💬 Message: {result['message'][:100]}...") | |
| # Prepare response - return HTML directly | |
| modified_document = result['doc_text'] if result['success'] else None | |
| if result['success']: | |
| logger.info(f"📏 Modified document size: {len(result['doc_text'])} bytes") | |
| logger.info(f"📈 Size change: {len(result['doc_text']) - len(doc_text):+d} bytes") | |
| response = DocCreatorResponse( | |
| response=result['message'], | |
| modifiedDocument=modified_document, | |
| processing_time=processing_time, | |
| timestamp=datetime.now().isoformat(), | |
| error=None if result['success'] else result.get('message') | |
| ) | |
| logger.info("=" * 80) | |
| logger.info("✅ DOCUMENT EDITING COMPLETED SUCCESSFULLY") | |
| logger.info("=" * 80) | |
| return response | |
| except Exception as e: | |
| error_traceback = traceback.format_exc() | |
| logger.error(f"❌ Document editing failed: {str(e)}") | |
| logger.error(f"🔍 Full traceback:\n{error_traceback}") | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| return DocCreatorResponse( | |
| response="", | |
| modifiedDocument=None, | |
| processing_time=processing_time, | |
| timestamp=datetime.now().isoformat(), | |
| error=str(e) | |
| ) | |
| # Initialize API instance | |
| api = CyberLegalAPI() | |
| async def startup_event(): | |
| """ | |
| Initialize the API on startup | |
| """ | |
| llm_provider = os.getenv("LLM_PROVIDER", "openai").upper() | |
| print("🚀 Starting CyberLegal AI API...") | |
| print(f"🤖 LLM Provider: {llm_provider}") | |
| print("🔧 Powered by: LangGraph + LightRAG") | |
| print("📍 API endpoints:") | |
| print(" - POST /chat - Chat with the assistant") | |
| print(" - POST /doc_creator - Edit TipTap documents") | |
| print(" - POST /analyze-pdf - Analyze PDF document") | |
| print(" - GET /health - Health check") | |
| print(" - GET / - API info") | |
| async def chat_endpoint(request: ChatRequest): | |
| """ | |
| Chat endpoint for the cyber-legal assistant | |
| Args: | |
| request: Chat request with message, user_type (client/lawyer), and history | |
| Returns: | |
| ChatResponse with assistant's response and metadata | |
| User Types: | |
| - client: For general users (default) - client-friendly language, can find lawyers | |
| - lawyer: For legal professionals - technical language, knowledge graph access only | |
| """ | |
| return await api.process_request(request) | |
| async def doc_creator_endpoint(request: DocCreatorRequest): | |
| """ | |
| Document creator/editor endpoint for HTML documents | |
| Args: | |
| request: Document editing request | |
| - instruction: User's instruction for document editing | |
| - documentContent: HTML document content | |
| - contentFormat: Always "html" | |
| - documentSummaries: Optional context from analyzed documents | |
| - conversationHistory: Optional previous conversation messages | |
| - userId: Unique user identifier (UUID) | |
| Returns: | |
| DocCreatorResponse with assistant's response and modified document | |
| On success: | |
| - response: Completion message | |
| - modifiedDocument: Modified HTML | |
| - error: null | |
| On failure (validation error or max iterations reached): | |
| - response: Error message | |
| - modifiedDocument: null | |
| - error: Error description | |
| Usage: | |
| - Send HTML content in documentContent | |
| - Provide clear instructions for modifications | |
| - Optionally include document summaries for context | |
| - Returns modified HTML ready for display | |
| Supported Operations: | |
| - Replace text: "Change '12 months' to '24 months'" | |
| - Add content: "Add Article 3 about pricing after Article 2" | |
| - Delete content: "Remove the section about confidentiality" | |
| - Complex edits: "Add a clause about GDPR compliance in Article 1" | |
| Example HTML structure: | |
| <h1>Contract</h1> | |
| <h2>Article 1 - Duration</h2> | |
| <p>This contract shall last for 12 months.</p> | |
| Error Handling: | |
| - The agent validates all modifications with BeautifulSoup | |
| - If a modification is invalid (HTML structure broken), the agent automatically retries | |
| - If max iterations (10) is reached without completion, an error is returned | |
| - Check the 'error' field in the response to detect failures | |
| """ | |
| return await api.create_or_edit_document(request) | |
| async def health_endpoint(): | |
| """ | |
| Health check endpoint | |
| Returns: | |
| HealthResponse with system status | |
| """ | |
| return await api.health_check() | |
| async def analyze_pdf_endpoint(request: AnalyzePDFRequest): | |
| """ | |
| Analyze document endpoint (PDF or images) | |
| Args: | |
| request: Document analysis request with base64-encoded content | |
| - Supports: PDF, JPG, JPEG, PNG, BMP, TIFF, WEBP | |
| Returns: | |
| AnalyzePDFResponse with actors, key_details, summary, and metadata | |
| Usage: | |
| - Upload a PDF or image file as base64 encoded string | |
| - PDFs: Text-based (direct extraction) or scanned (OCR) | |
| - Images: Always use Mistral OCR | |
| - The endpoint extracts text, analyzes actors, key details, and generates summary | |
| - Results are compact and suitable for further processing | |
| Supported Formats: | |
| - PDF (.pdf): Both text-based and scanned documents | |
| - Images (.jpg, .jpeg, .png, .bmp, .tiff, .webp): Using Mistral OCR | |
| """ | |
| return await api.analyze_pdf(request) | |
| async def root(): | |
| """ | |
| Root endpoint with API information | |
| """ | |
| llm_provider = os.getenv("LLM_PROVIDER", "openai").upper() | |
| technology_map = { | |
| "OPENAI": "LangGraph + RAG + Cerebras (GPT-5-Nano)" | |
| } | |
| return { | |
| "name": "CyberLegal AI API", | |
| "version": "1.0.0", | |
| "description": "LangGraph-powered cyber-legal assistant API", | |
| "llm_provider": llm_provider, | |
| "technology": technology_map.get(llm_provider, "LangGraph + RAG + Cerebras"), | |
| "endpoints": { | |
| "chat": "POST /chat - Chat with the assistant", | |
| "doc_creator": "POST /doc_creator - Edit TipTap documents", | |
| "analyze-pdf": "POST /analyze-pdf - Analyze PDF document", | |
| "health": "GET /health - Health check" | |
| }, | |
| "expertise": [ | |
| "GDPR", "NIS2", "DORA", "Cyber Resilience Act", "eIDAS 2.0" | |
| ] | |
| } | |
| async def global_exception_handler(request, exc): | |
| """ | |
| Global exception handler with full traceback for debugging | |
| """ | |
| error_traceback = traceback.format_exc() | |
| logger.error(f"❌ Unhandled exception: {str(exc)}") | |
| logger.error(f"🔍 Full traceback:\n{error_traceback}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "error": "Internal server error", | |
| "detail": str(exc), | |
| "traceback": error_traceback, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| ) | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", os.getenv("API_PORT", "8000"))) | |
| uvicorn.run( | |
| "agent_api:app", | |
| host="0.0.0.0", | |
| port=port, | |
| reload=False, | |
| log_level="info" | |
| ) |