#!/usr/bin/env python3 """ Enhanced Arabic Document Chatbot with Knowledge Base Automatic document loading and persistent knowledge storage. """ import os import sys import asyncio import logging from pathlib import Path # Add src directory to Python path sys.path.insert(0, str(Path(__file__).parent / "src")) from src.ui.enhanced_gradio_app import EnhancedArabicChatbot from src.utils.logger import setup_logging def main(): """Main application entry point.""" print("\n" + "=" * 70) print("Enhanced Arabic Document Chatbot with Persistent Knowledge Base") print("=" * 70) try: # Setup logging setup_logging(log_level="INFO", log_file="logs/enhanced_app.log") logger = logging.getLogger(__name__) # Check Python version if sys.version_info < (3, 8): print("ERROR: Python 3.8 or higher is required") sys.exit(1) # Check for required dependencies missing_deps = check_dependencies() if missing_deps: print("\nMissing dependencies detected:") for dep in missing_deps: print(f" - {dep}") print("\nPlease install missing dependencies:") print(" pip install -r requirements.txt") sys.exit(1) # Setup directories setup_directories() # Check for data directories check_data_directories() # Load environment variables load_environment() print("\nAll checks passed!") print("\nRegulatory AI Features:") print(" - Kuwait regulatory expertise (CBK, CMA, AML)") print(" - Definitive regulatory decisions (يُسمح/لا يُسمح/مطلوب)") print(" - Cross-regulatory framework analysis") print(" - Legal citation and compliance guidance") print(" - Advanced semantic chunking for Arabic legal documents") print(" - Persistent vector database - no re-indexing needed") print(" - Context-aware retrieval with 40% better accuracy") print(" - AI-powered query transformation with few-shot learning") print(" - Multi-dimensional example selection (semantic + category + structure)") print(" - 27 regulatory Q&A examples for enhanced context understanding") print("\nRegulatory Document Sources:") print(" - data_cmp/data_cmp/CBK/ - Central Bank of Kuwait (CBK) regulations") print(" - data_cmp/data_cmp/CMA/ - Capital Markets Authority (CMA) rules") print(" - data_cmp/data_cmp/Legal_Principles/ - Disciplinary Council legal principles") print(" - Total: 99 regulatory documents with 10,166+ semantic chunks") print("\n" + "=" * 70) print("Starting web interface...") print("=" * 70 + "\n") # Create and launch the application app = EnhancedArabicChatbot() # Launch with configuration app.launch( share=False, # Set to True for public link debug=False # Set to True for debugging ) except KeyboardInterrupt: print("\n\nApplication stopped by user") sys.exit(0) except Exception as e: logger.error(f"Application failed: {e}", exc_info=True) print(f"\nERROR: {e}") print("\nCheck logs in 'logs/' directory for details") sys.exit(1) def check_dependencies(): """Check if required dependencies are installed.""" required_packages = [ ('gradio', 'gradio'), ('google.generativeai', 'google-generativeai'), ('faiss', 'faiss-cpu'), ('sentence_transformers', 'sentence-transformers'), ('openai', 'openai'), # Added for OpenAI embeddings ('fitz', 'PyMuPDF'), ('pdfplumber', 'pdfplumber'), ('yaml', 'PyYAML'), ('numpy', 'numpy'), ('tenacity', 'tenacity'), ] missing = [] for package, pip_name in required_packages: try: __import__(package) except ImportError: missing.append(pip_name) return missing def setup_directories(): """Create necessary directories if they don't exist.""" directories = [ 'logs', 'knowledge_base', 'knowledge_base/vectors', 'knowledge_base/chunks', 'knowledge_base/metadata', 'cache', 'config' ] for directory in directories: Path(directory).mkdir(exist_ok=True, parents=True) print("Directory structure verified") def check_data_directories(): """Check if data directories exist and contain PDFs.""" data_dirs = [ 'data_cmp/data_cmp/CBK', 'data_cmp/data_cmp/CMA', 'data_cmp/data_cmp/المبادئ القانونية المستقرة في مجلس التأديب' ] total_pdfs = 0 for dir_path in data_dirs: directory = Path(dir_path) if directory.exists(): pdf_files = list(directory.glob("*.pdf")) total_pdfs += len(pdf_files) if pdf_files: # Handle Unicode in path names try: print(f"Found {len(pdf_files)} PDFs in {dir_path}") except UnicodeEncodeError: # Fallback for Windows console encoding issues safe_path = dir_path.encode('ascii', 'replace').decode('ascii') print(f"Found {len(pdf_files)} PDFs in {safe_path}") else: try: print(f"Directory not found: {dir_path}") except UnicodeEncodeError: safe_path = dir_path.encode('ascii', 'replace').decode('ascii') print(f"Directory not found: {safe_path}") if total_pdfs == 0: print("\nWARNING: No PDF files found in data directories") print(" The system will work but won't have any documents to search") print(" Add PDF files to the monitored directories to enable search") else: print(f"\nTotal PDFs found: {total_pdfs} documents ready for indexing") return total_pdfs > 0 def load_environment(): """Load environment variables from .env file if it exists.""" env_file = Path(".env") if env_file.exists(): try: # Manual .env loading if dotenv is not available with open(env_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() print("Environment variables loaded from .env") except Exception as e: print(f"Error loading .env file: {e}") # Check for Gemini API key and set Google API key gemini_key = os.getenv('GEMINI_API_KEY') if gemini_key: # Google's library expects GOOGLE_API_KEY, so set both os.environ['GOOGLE_API_KEY'] = gemini_key print("Gemini API key found and configured") else: print("No Gemini API key found") print(" Set GEMINI_API_KEY environment variable for AI responses") print(" Without it, only search functionality will work") # Check for OpenAI API key openai_key = os.getenv('OPENAI_API_KEY') if openai_key: print("OpenAI API key found and configured") print(" Using OpenAI text-embedding-3-large for fast embeddings") else: print("No OpenAI API key found") print(" Set OPENAI_API_KEY environment variable for faster embeddings") print(" Will use local sentence-transformers as fallback") def create_default_config(): """Create a default configuration file if it doesn't exist.""" config_file = Path("config/settings.yaml") if not config_file.exists(): config_file.parent.mkdir(exist_ok=True, parents=True) default_config = """# Enhanced Arabic Document Chatbot Configuration app: name: Enhanced Arabic Document Chatbot version: 2.0.0 host: 0.0.0.0 port: 7860 ui: title: مستشار الامتثال التنظيمي الكويتي description: نظام ذكي متخصص في الوثائق التنظيمية والامتثال المالي rtl_enabled: true max_input_length: 2000 max_chat_history: 50 knowledge_base: storage_dir: knowledge_base data_directories: - data_cmp/data_cmp/CBK - data_cmp/data_cmp/CMA - data_cmp/data_cmp/المبادئ القانونية المستقرة في مجلس التأديب auto_index: false # Only index when explicitly requested check_interval_seconds: 0 # Disable background checking batch_size: 5 vector_search: enabled: true embedding_dim: 3072 # Updated for text-embedding-3-large model_name: text-embedding-3-large # OpenAI's best model cache_dir: cache/vectors # Use existing cache location batch_size: 16 # Improved similarity thresholds for Arabic content primary_threshold: 0.4 fallback_threshold: 0.3 minimum_threshold: 0.2 documents: max_file_size_mb: 50 # Advanced regulatory-optimized chunking configuration chunking_strategy: semantic # Regulatory boundary detection optimized chunk_size: 2000 # Optimized for Arabic legal document structure chunk_overlap: 300 # 15% overlap for regulatory context preservation min_chunk_size: 500 # Minimum for meaningful regulatory content similarity_threshold: 0.75 # For merging similar legal segments preserve_boundaries: true # Respect Arabic legal document boundaries regulatory_optimization: true # Enhanced for Kuwait regulatory documents extraction_backends: - pymupdf - pdfplumber # Enhanced encoding handling encoding_fix: true arabic: enable_normalization: true enable_diacritics_removal: true enable_number_conversion: true # Enhanced Arabic processing use_camel_tools: true # Advanced Arabic NLP remove_kashida: true # Handle elongated text normalization_level: 3 # Full normalization gemini: model: gemini-1.5-flash temperature: 0.7 max_output_tokens: 2048 max_context_length: 3000 timeout_seconds: 30 max_retries: 3 # Enhanced RAG System with AI-Powered Query Transformation rag_enhancements: query_transformation: enabled: true # Enable AI-powered query transformation with few-shot examples model: gemini-1.5-flash # Fast model for query transformation few_shot_enabled: true # Enable few-shot prompting with Q&A examples max_examples: 3 # Maximum number of examples to include in prompt similarity_threshold: 0.3 # Minimum similarity threshold for example selection category_boost: 0.2 # Boost factor for same-category examples qa_knowledge_base: qa_knowledge_base.json # Path to Q&A knowledge base cache_embeddings: true # Cache embeddings for performance fallback_to_expansion: true # Fall back to rule-based expansion if AI fails logging: level: INFO format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s' """ with open(config_file, 'w', encoding='utf-8') as f: f.write(default_config) print(f"Created default configuration file: {config_file}") if __name__ == "__main__": # Create default config if needed create_default_config() # Parse command line arguments import argparse parser = argparse.ArgumentParser(description="Enhanced Arabic Document Chatbot") parser.add_argument("--test", action="store_true", help="Run installation test") parser.add_argument("--reindex", action="store_true", help="Force reindex all documents") parser.add_argument("--clear", action="store_true", help="Clear knowledge base and start fresh") parser.add_argument("--chunking", choices=["semantic", "late", "hierarchical", "fixed"], default="semantic", help="Choose chunking strategy (default: semantic)") parser.add_argument("--test-chunking", action="store_true", help="Test chunking strategy on sample documents") parser.add_argument("--query", type=str, help="Test a specific query through the regulatory system") parser.add_argument("--test-few-shot", action="store_true", help="Test the few-shot example selector system") parser.add_argument("--test-transformation", action="store_true", help="Test AI-powered query transformation") args = parser.parse_args() if args.test: # Test mode print("\nRunning installation test...") setup_directories() missing = check_dependencies() if missing: print(f"\nTest failed: Missing {len(missing)} dependencies") sys.exit(1) else: print("\nInstallation test passed!") sys.exit(0) elif args.reindex: # Reindex mode print("\nForcing reindex of all documents...") async def reindex(): from src.core.knowledge_base import KnowledgeBase config = {} # Will use defaults kb = KnowledgeBase(config) result = await kb.reindex_all() print(f"\nReindexing complete: {result}") asyncio.run(reindex()) sys.exit(0) elif args.clear: # Clear knowledge base print("\nClearing knowledge base...") async def clear(): from src.core.knowledge_base import KnowledgeBase config = {} # Will use defaults kb = KnowledgeBase(config) success = await kb.clear_index() if success: print("\nKnowledge base cleared successfully") else: print("\nFailed to clear knowledge base") asyncio.run(clear()) sys.exit(0) elif args.test_few_shot: # Test few-shot example selector print("\nTesting Few-Shot Example Selector...") print("=" * 70) async def test_few_shot(): from src.core.few_shot_selector import FewShotExampleSelector selector = FewShotExampleSelector() # Wait for loading print("Loading Q&A knowledge base...") await asyncio.sleep(2) # Test queries test_queries = [ "ما هي العقوبات التأديبية المطبقة على الموظفين؟", "ما هي شروط فتح حساب مصرفي؟", "كيف يتم تداول الأسهم في البورصة؟" ] for i, query in enumerate(test_queries, 1): print(f"\nTest Query {i}: {query}") print("-" * 40) # Analyze query context analysis = await selector.analyze_query_context(query) print(f"Likely categories: {analysis['likely_categories']}") print(f"Question type: {analysis['query_characteristics']['question_type']}") # Select examples examples = await selector.select_examples( query=query, category_hint=analysis['likely_categories'][0] if analysis['likely_categories'] else None ) print(f"\nSelected {len(examples)} examples:") for j, (example, scores) in enumerate(examples, 1): print(f" Example {j}: {example.category} (Score: {scores.total_score:.3f})") print(f" Question: {example.question[:80]}...") # Show formatted output if examples: formatted = selector.format_examples_for_prompt(examples[:1]) print(f"\nFormatted Example:\n{formatted[:200]}...") # Show statistics stats = selector.get_selection_stats() print(f"\n\nSystem Statistics:") print(f" Total examples: {stats['total_examples']}") print(f" Categories: {list(stats['category_distribution'].keys())}") print(f" Avg selection time: {stats['selection_stats']['avg_selection_time']:.3f}s") asyncio.run(test_few_shot()) sys.exit(0) elif args.test_transformation: # Test AI-powered query transformation print("\nTesting AI-Powered Query Transformation...") print("=" * 70) async def test_transformation(): from src.ui.enhanced_gradio_app import EnhancedArabicChatbot print("Initializing chatbot with AI transformation...") app = EnhancedArabicChatbot() # Test queries test_queries = [ "عقوبات الموظفين", "افتح حساب بنك", "شراء اسهم", "ما يُسمح في التمويل الإسلامي؟" ] for i, query in enumerate(test_queries, 1): print(f"\nTest Query {i}: {query}") print("-" * 40) try: # Test the transformation (we'll access the internal method) if hasattr(app, '_transform_query_with_ai'): transformed = await app._transform_query_with_ai(query) print(f"Original: {query}") print(f"Transformed: {transformed}") print(f"Improvement: {'Yes' if transformed != query else 'No'}") else: print("AI transformation method not available") except Exception as e: print(f"Error testing transformation: {e}") print("\n" + "=" * 70) print("Query transformation test complete!") asyncio.run(test_transformation()) sys.exit(0) elif args.test_chunking: # Test chunking strategy print(f"\nTesting {args.chunking} chunking strategy...") print("=" * 70) async def test_chunking(): # Import the appropriate chunker if args.chunking == "semantic": from semantic_chunking import AdvancedSemanticChunker chunker = AdvancedSemanticChunker( min_chunk_size=500, max_chunk_size=2000, similarity_threshold=0.75, overlap_ratio=0.15 ) # Sample Arabic legal text for testing sample_text = """ الباب الأول: التوريق المالي المادة 1: تعريف التوريق التوريق هو عملية تحويل الأصول المالية إلى أوراق مالية قابلة للتداول. يشمل ذلك الديون والحقوق المالية المختلفة التي تولد تدفقات نقدية منتظمة. المادة 2: شروط التوريق يجب أن تكون الأصول المراد توريقها ذات تدفقات نقدية منتظمة ومتوقعة. يشترط موافقة البنك المركزي على عملية التوريق قبل التنفيذ. تخضع جميع عمليات التوريق للرقابة المستمرة من الجهات المختصة. المادة 3: الضمانات والحماية يجب توفير ضمانات كافية لحماية حقوق المستثمرين في الأوراق المالية المصدرة. تشمل الضمانات التأمين ضد المخاطر والاحتياطيات النقدية الكافية. """ print(f"Testing on sample Arabic legal document ({len(sample_text)} chars)") chunks = chunker.chunk_document(sample_text, add_overlap=True) print(f"\nCreated {len(chunks)} semantic chunks:") for i, chunk in enumerate(chunks): print(f"\nChunk {i+1}:") print(f" Type: {chunk.chunk_type}") print(f" Size: {len(chunk.content)} chars") print(f" Preview: {chunk.content[:150]}...") elif args.chunking == "late": print("Late chunking requires long-context models.") print("Please ensure you have the required models installed.") from late_chunking import OptimalChunkingStrategy processor = OptimalChunkingStrategy() # Test with sample text print("Late chunking test would go here...") elif args.chunking == "hierarchical": from semantic_chunking import AdvancedSemanticChunker, HierarchicalChunker semantic_chunker = AdvancedSemanticChunker() chunker = HierarchicalChunker(semantic_chunker) print("Hierarchical chunking test would go here...") else: # fixed print("Using traditional fixed-size chunking (current method)") print("Chunk size: 800 chars, Overlap: 200 chars") print("\n" + "=" * 70) print("Chunking test complete!") asyncio.run(test_chunking()) sys.exit(0) elif args.query: # Query test mode print(f"\nTesting query through enhanced regulatory system...") print("=" * 70) async def test_query(): from src.ui.enhanced_gradio_app import EnhancedArabicChatbot # Initialize the system print("Initializing enhanced regulatory system...") app = EnhancedArabicChatbot() # Test the query print(f"\nQuery: {args.query}") print("-" * 40) try: # Process the query history = [] result_history, status = await app.process_query(args.query, history) # Display results if result_history and len(result_history) >= 2: response = result_history[-1]['content'] print(f"Status: {status}") print(f"Response:\n{response}") else: print(f"Status: {status}") print("No response generated") except Exception as e: print(f"Error processing query: {e}") import traceback traceback.print_exc() print("\n" + "=" * 70) print("Query test complete!") asyncio.run(test_query()) sys.exit(0) else: # Normal operation with selected chunking strategy if args.chunking != "fixed": print(f"\nUsing {args.chunking} chunking strategy") print("This provides better context preservation for Arabic documents") main()