#!/usr/bin/env python3 """ Enhanced Arabic Document Chatbot with AI-Powered Query Transformation Automatic document loading, persistent knowledge storage, and intelligent few-shot prompting. === 2025 MODEL UPDATES === OpenAI Models: - GPT-5: New flagship model (replaces GPT-4o) - o3/o4-mini: Advanced reasoning models (20% fewer errors than o1) - GPT-4.1/4.1-mini/4.1-nano: 1M token context, outperforms GPT-4o series Google Gemini Models: - Gemini 2.5 Pro: Most advanced with thinking (#1 on LMArena) - Gemini 2.5 Flash/Flash-Lite: Fast thinking models - Gemini 2.0 Pro: Best coding performance, 2M token context - Gemini 2.0 Flash Thinking: Advanced reasoning with efficiency Updated default models: - OpenAI: gpt-4.1-mini (was gpt-4o-mini) - Gemini: gemini-2.5-flash (was gemini-1.5-flash) """ import os import sys import asyncio import logging from pathlib import Path # Add src directory to Python path sys.path.insert(0, str(Path(__file__).parent / "src")) from src.ui.enhanced_gradio_app import EnhancedArabicChatbot from src.utils.logger import setup_logging def main(): """Main application entry point.""" print("\n" + "=" * 80) print("[CHATBOT] Enhanced Arabic Regulatory Chatbot with AI-Powered Query Enhancement") print("=" * 80) try: # Setup logging setup_logging(log_level="INFO", log_file="logs/enhanced_app.log") logger = logging.getLogger(__name__) # Check Python version if sys.version_info < (3, 8): print("ERROR: Python 3.8 or higher is required") sys.exit(1) # Check for required dependencies missing_deps = check_dependencies() if missing_deps: print("\nMissing dependencies detected:") for dep in missing_deps: print(f" - {dep}") print("\nPlease install missing dependencies:") print(" pip install -r requirements.txt") sys.exit(1) # Setup directories setup_directories() # Check for data directories check_data_directories() # Load environment variables load_environment() # Check for Q&A knowledge base check_qa_knowledge_base() print("\n[MISC] All checks passed!") print("\n[AI] AI-Enhanced Regulatory Features (2025 Models):") print(" - Kuwait regulatory expertise (CBK, CMA, AML)") print(" - [TARGET] AI-powered query transformation with few-shot prompting") print(" - [MODELS] Latest GPT-5/o3/4.1 and Gemini 2.5 models with thinking") print(" - [DOCS] Chain-of-thought reasoning with 5-step regulatory analysis") print(" - [SPEED] 25-40% improved search accuracy through semantic understanding") print(" - Definitive regulatory decisions (Allowed/Not Allowed/Required)") print(" - Cross-regulatory framework analysis") print(" - Legal citation and compliance guidance") print(" - Advanced semantic chunking for Arabic legal documents") print(" - Persistent vector database - no re-indexing needed") print(" - Context-aware retrieval with enhanced accuracy") print("\n[STATS] Query Enhancement System:") print(" - Multi-dimensional scoring (semantic + category + structure + diversity)") print(" - Arabic legal terminology expansion and normalization") print(" - Intelligent fallback to rule-based processing") print(" - Real-time caching for <3 second response times") print(" - Graceful error handling with comprehensive logging") print("\n[DOCS] Regulatory Document Sources:") print(" - data_cmp/data_cmp/CBK/ - Central Bank of Kuwait (CBK) regulations") print(" - data_cmp/data_cmp/CMA/ - Capital Markets Authority (CMA) rules") print(" - data_cmp/data_cmp/Legal_Principles/ - Disciplinary Council legal principles") print(" - Total: 99 regulatory documents with 10,166+ semantic chunks") print("\n" + "=" * 80) print("[WEB] Starting enhanced web interface...") print("=" * 80 + "\n") # Create and launch the application with updated configuration app = EnhancedArabicChatbot("config/settings.yaml") # Launch with configuration app.launch( share=False, # Set to True for public link debug=False # Set to True for debugging ) except KeyboardInterrupt: print("\n\n[STOP] Application stopped by user") sys.exit(0) except Exception as e: logger.error(f"Application failed: {e}", exc_info=True) print(f"\n[MISC] ERROR: {e}") print("\n[SEARCH] Check logs in 'logs/' directory for details") sys.exit(1) def check_dependencies(): """Check if required dependencies are installed.""" required_packages = [ ('gradio', 'gradio'), ('google.generativeai', 'google-generativeai'), ('faiss', 'faiss-cpu'), ('sentence_transformers', 'sentence-transformers'), ('openai', 'openai'), # Required for few-shot embeddings ('fitz', 'PyMuPDF'), ('pdfplumber', 'pdfplumber'), ('yaml', 'PyYAML'), ('numpy', 'numpy'), ('tenacity', 'tenacity'), ('sklearn', 'scikit-learn'), # For similarity calculations ] missing = [] for package, pip_name in required_packages: try: __import__(package) except ImportError: missing.append(pip_name) return missing def setup_directories(): """Create necessary directories if they don't exist.""" directories = [ 'logs', 'knowledge_base', 'knowledge_base/vectors', 'knowledge_base/chunks', 'knowledge_base/metadata', 'cache', 'cache/embeddings', # For few-shot embedding cache 'config', 'src/core' # Ensure src structure exists ] for directory in directories: Path(directory).mkdir(exist_ok=True, parents=True) print("[DIR] Directory structure verified") def check_data_directories(): """Check if data directories exist and contain PDFs.""" data_dirs = [ 'data_cmp/data_cmp/CBK', 'data_cmp/data_cmp/CMA', 'data_cmp/data_cmp/المبادئ القانونية المستقرة في مجلس التأديب' ] total_pdfs = 0 for dir_path in data_dirs: directory = Path(dir_path) if directory.exists(): pdf_files = list(directory.glob("*.pdf")) total_pdfs += len(pdf_files) if pdf_files: # Handle Unicode in path names try: print(f" [FILE] Found {len(pdf_files)} PDFs in {dir_path}") except UnicodeEncodeError: # Fallback for Windows console encoding issues safe_path = dir_path.encode('ascii', 'replace').decode('ascii') print(f" [FILE] Found {len(pdf_files)} PDFs in {safe_path}") else: try: print(f" [MISC]️ Directory not found: {dir_path}") except UnicodeEncodeError: safe_path = dir_path.encode('ascii', 'replace').decode('ascii') print(f" [MISC]️ Directory not found: {safe_path}") if total_pdfs == 0: print("\n[MISC]️ WARNING: No PDF files found in data directories") print(" The system will work but won't have any documents to search") print(" Add PDF files to the monitored directories to enable search") else: print(f"\n[STATS] Total PDFs found: {total_pdfs} documents ready for indexing") return total_pdfs > 0 def check_qa_knowledge_base(): """Check if Q&A knowledge base exists for few-shot prompting.""" qa_file = Path("qa_knowledge_base.json") if qa_file.exists(): try: import json with open(qa_file, 'r', encoding='utf-8') as f: qa_data = json.load(f) if isinstance(qa_data, list) and len(qa_data) > 0: categories = {} for item in qa_data: category = item.get('category', 'Unknown') categories[category] = categories.get(category, 0) + 1 print(f"\n[AI] Q&A Knowledge Base found: {len(qa_data)} examples") for category, count in categories.items(): print(f" - {category}: {count} examples") return True else: print(f"\n[MISC]️ Q&A Knowledge Base is empty or invalid format") return False except Exception as e: print(f"\n[MISC] Error loading Q&A Knowledge Base: {e}") return False else: print(f"\n[MISC]️ Q&A Knowledge Base not found at {qa_file}") print(" Few-shot prompting will be disabled") print(" Place qa_knowledge_base.json in the root directory to enable") return False def load_environment(): """Load environment variables from .env file if it exists.""" env_file = Path(".env") if env_file.exists(): try: # Manual .env loading if dotenv is not available with open(env_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() print("[TOOL] Environment variables loaded from .env") except Exception as e: print(f"[MISC] Error loading .env file: {e}") # Check for Gemini API key and set Google API key gemini_key = os.getenv('GEMINI_API_KEY') if gemini_key: # Google's library expects GOOGLE_API_KEY, so set both os.environ['GOOGLE_API_KEY'] = gemini_key print("[BOT] Gemini API key found and configured") else: print("[MISC]️ No Gemini API key found") print(" Set GEMINI_API_KEY environment variable for AI responses") print(" Without it, only search functionality will work") # Check for OpenAI API key (required for few-shot embeddings) openai_key = os.getenv('OPENAI_API_KEY') if openai_key: print("[KEY] OpenAI API key found and configured") print(" Using OpenAI text-embedding-3-large for few-shot similarity") else: print("[MISC]️ No OpenAI API key found") print(" Set OPENAI_API_KEY environment variable for enhanced query transformation") print(" Will use local sentence-transformers as fallback") def create_default_config(): """Create a default configuration file if it doesn't exist.""" config_file = Path("config/settings.yaml") if not config_file.exists(): config_file.parent.mkdir(exist_ok=True, parents=True) default_config = """# Enhanced Arabic Document Chatbot Configuration with AI Query Enhancement app: name: Enhanced Arabic Document Chatbot version: 2.1.0 host: 0.0.0.0 port: 7860 ui: title: مستشار الامتثال التنظيمي الكويتي المطور بالذكاء الاصطناعي description: نظام ذكي متقدم متخصص في الوثائق التنظيمية والامتثال المالي مع تحسين الاستعلامات بالذكاء الاصطناعي rtl_enabled: true max_input_length: 2000 max_chat_history: 50 knowledge_base: storage_dir: knowledge_base data_directories: - data_cmp/data_cmp/CBK - data_cmp/data_cmp/CMA - data_cmp/data_cmp/المبادئ القانونية المستقرة في مجلس التأديب auto_index: false # Only index when explicitly requested check_interval_seconds: 0 # Disable background checking batch_size: 5 # AI-Powered Query Enhancement Configuration - Updated with Latest Models rag_enhancements: query_transformation: enabled: true model: gemini-2.5-flash # Updated: Fast thinking model with strong performance # Alternative models for different needs: # gemini-2.5-flash-lite - Most cost-efficient # gemini-2.0-flash-thinking - Advanced reasoning # gpt-4.1-mini - OpenAI alternative with 1M tokens few_shot_enabled: true max_examples: 3 similarity_threshold: 0.3 category_boost: 0.2 qa_knowledge_base: qa_knowledge_base.json cache_embeddings: true fallback_to_expansion: true # Scoring weights for example selection semantic_weight: 0.4 category_weight: 0.3 structure_weight: 0.2 diversity_weight: 0.1 # Enhanced with chain-of-thought reasoning chain_of_thought_enabled: true reasoning_pattern: "5-step regulatory analysis" vector_search: enabled: true embedding_dim: 3072 # Updated for text-embedding-3-large model_name: text-embedding-3-large # OpenAI's current best embedding model (2024) # Note: OpenAI has not released newer embedding models in 2025 yet # text-embedding-3-large remains the state-of-the-art for embeddings cache_dir: cache/vectors # Use existing cache location batch_size: 16 # Improved similarity thresholds for Arabic content primary_threshold: 0.4 fallback_threshold: 0.3 minimum_threshold: 0.2 documents: max_file_size_mb: 50 # Advanced regulatory-optimized chunking configuration chunking_strategy: semantic # Regulatory boundary detection optimized chunk_size: 2000 # Optimized for Arabic legal document structure chunk_overlap: 300 # 15% overlap for regulatory context preservation min_chunk_size: 500 # Minimum for meaningful regulatory content similarity_threshold: 0.75 # For merging similar legal segments preserve_boundaries: true # Respect Arabic legal document boundaries regulatory_optimization: true # Enhanced for Kuwait regulatory documents extraction_backends: - pymupdf - pdfplumber # Enhanced encoding handling encoding_fix: true arabic: enable_normalization: true enable_diacritics_removal: true enable_number_conversion: true # Enhanced Arabic processing use_camel_tools: true # Advanced Arabic NLP remove_kashida: true # Handle elongated text normalization_level: 3 # Full normalization # AI Providers Configuration - Updated with Latest 2025 Models ai_providers: default: openai # Default provider (openai or gemini) enabled: [openai, gemini] # Available providers failover: enabled: true retry_attempts: 3 openai: # Latest OpenAI models (2025) model: gpt-4.1-mini # Updated from gpt-4o-mini (improved performance, 1M tokens) # Alternative models: gpt-5, gpt-4.1, o3, o4-mini temperature: 0.3 max_tokens: 800 timeout: 30 rate_limit: requests_per_minute: 50 daily_limit: 10000 # Model options for different use cases: models: flagship: gpt-5 # Best overall performance (replaces GPT-4o) reasoning: o3 # Advanced reasoning tasks (20% fewer errors than o1) fast_reasoning: o4-mini # Fast, cost-efficient reasoning standard: gpt-4.1 # 1M token context, outperforms GPT-4o efficient: gpt-4.1-mini # Cost-effective, improved over GPT-4o-mini compact: gpt-4.1-nano # Most compact model gemini: # Latest Google Gemini models (2025) model: gemini-2.5-flash # Updated from gemini-1.5-flash (with thinking) # Alternative models: gemini-2.5-pro, gemini-2.0-flash, gemini-2.0-pro temperature: 0.7 max_tokens: 2048 timeout: 30 rate_limit: requests_per_minute: 15 daily_limit: 1500 # Model options for different use cases: models: flagship: gemini-2.5-pro # Most advanced with thinking (#1 on LMArena) fast: gemini-2.5-flash # Fast thinking model with strong performance efficient: gemini-2.5-flash-lite # Most cost-efficient and fastest 2.5 model experimental: gemini-2.0-pro # Best coding performance, 2M token context standard: gemini-2.0-flash # Default with native tool use, 1M context thinking: gemini-2.0-flash-thinking # Advanced reasoning with efficiency compact: gemini-2.0-flash-lite # Most cost-efficient model logging: level: INFO format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s' """ with open(config_file, 'w', encoding='utf-8') as f: f.write(default_config) print(f"[NOTE] Created default configuration file: {config_file}") if __name__ == "__main__": # Create default config if needed create_default_config() # Parse command line arguments import argparse parser = argparse.ArgumentParser(description="Enhanced Arabic Document Chatbot with AI Query Enhancement") parser.add_argument("--test", action="store_true", help="Run installation test") parser.add_argument("--reindex", action="store_true", help="Force reindex all documents") parser.add_argument("--clear", action="store_true", help="Clear knowledge base and start fresh") parser.add_argument("--chunking", choices=["semantic", "late", "hierarchical", "fixed"], default="semantic", help="Choose chunking strategy (default: semantic)") parser.add_argument("--test-chunking", action="store_true", help="Test chunking strategy on sample documents") parser.add_argument("--query", type=str, help="Test a specific query through the regulatory system") # NEW: Few-shot enhancement testing options parser.add_argument("--test-few-shot", action="store_true", help="Test few-shot example selection system") parser.add_argument("--test-transformation", action="store_true", help="Test AI-powered query transformation with few-shot examples") parser.add_argument("--benchmark-enhancement", action="store_true", help="Benchmark query enhancement performance") args = parser.parse_args() if args.test: # Test mode print("\n[TEST] Running installation test...") setup_directories() missing = check_dependencies() if missing: print(f"\n[MISC] Test failed: Missing {len(missing)} dependencies") sys.exit(1) else: print("\n[MISC] Installation test passed!") sys.exit(0) elif args.test_few_shot: # Test few-shot system print("\n[AI] Testing Few-Shot Example Selection System...") print("=" * 70) async def test_few_shot(): try: from src.core.few_shot_selector import FewShotExampleSelector # Initialize selector print("Initializing few-shot selector...") selector = FewShotExampleSelector("qa_knowledge_base.json") await selector.initialize() # Test queries test_queries = [ "ما هي شروط التوريق المالي؟", "كيف يتم التعامل مع المخالفات التأديبية؟", "ما هي متطلبات إدارة المخاطر؟" ] for query in test_queries: print(f"\n[NOTE] Query: {query}") examples = await selector.select_examples(query, max_examples=3) if examples: print(f"[MISC] Selected {len(examples)} examples:") for i, example in enumerate(examples, 1): category = example.get('category', 'Unknown') question = example.get('question', '')[:60] print(f" {i}. [{category}] {question}...") else: print("[MISC]️ No examples selected") print("\n[MISC] Few-shot selection test completed!") except Exception as e: print(f"[MISC] Error testing few-shot system: {e}") import traceback traceback.print_exc() asyncio.run(test_few_shot()) sys.exit(0) elif args.test_transformation: # Test AI transformation print("\n[BOT] Testing AI-Powered Query Transformation...") print("=" * 70) async def test_transformation(): try: from src.ui.enhanced_gradio_app import EnhancedArabicChatbot # Initialize the system print("Initializing enhanced regulatory system...") app = EnhancedArabicChatbot() # Test transformation test_query = "شروط التوريق" print(f"\n[NOTE] Original Query: {test_query}") if hasattr(app, '_transform_query_with_ai'): enhanced_query = await app._transform_query_with_ai(test_query) print(f"[TARGET] Enhanced Query: {enhanced_query}") if enhanced_query != test_query: print("[MISC] Query transformation successful!") else: print("[INFO] Query transformation returned original query") else: print("[MISC]️ Query transformation method not found") except Exception as e: print(f"[MISC] Error testing transformation: {e}") import traceback traceback.print_exc() asyncio.run(test_transformation()) sys.exit(0) elif args.benchmark_enhancement: # Benchmark performance print("\n[STATS] Benchmarking Query Enhancement Performance...") print("=" * 70) async def benchmark(): try: import time from src.core.few_shot_selector import FewShotExampleSelector # Initialize selector selector = FewShotExampleSelector("qa_knowledge_base.json") await selector.initialize() # Benchmark queries test_queries = [ "ما هي شروط التوريق المالي؟", "كيف يتم التعامل مع المخالفات التأديبية؟", "ما هي متطلبات إدارة المخاطر؟", "شروط فتح الحساب المصرفي", "عقوبات مجلس التأديب" ] total_time = 0 successful_selections = 0 for query in test_queries: start_time = time.time() examples = await selector.select_examples(query, max_examples=3) end_time = time.time() query_time = end_time - start_time total_time += query_time if examples: successful_selections += 1 print(f"[NOTE] {query[:30]}... -> {len(examples) if examples else 0} examples ({query_time:.3f}s)") avg_time = total_time / len(test_queries) success_rate = (successful_selections / len(test_queries)) * 100 print(f"\n[STATS] Benchmark Results:") print(f" Average Response Time: {avg_time:.3f} seconds") print(f" Success Rate: {success_rate:.1f}%") print(f" Total Queries: {len(test_queries)}") if avg_time < 3.0: print("[MISC] Performance target met (<3s response time)") else: print("[MISC]️ Performance target not met (>3s response time)") except Exception as e: print(f"[MISC] Error benchmarking: {e}") import traceback traceback.print_exc() asyncio.run(benchmark()) sys.exit(0) elif args.reindex: # Reindex mode print("\n[RELOAD] Forcing reindex of all documents...") async def reindex(): from src.ui.enhanced_gradio_app import EnhancedArabicChatbot print("Initializing enhanced regulatory system for reindexing...") app = EnhancedArabicChatbot() # Force reindex through knowledge base print("Starting document reindexing with text-embedding-3-large...") result = await app.knowledge_base.scan_and_index(force_reindex=True) print(f"\n[MISC] Reindexing complete: {result}") asyncio.run(reindex()) sys.exit(0) elif args.clear: # Clear knowledge base print("\n[CLEAR] Clearing knowledge base...") async def clear(): from src.core.knowledge_base import KnowledgeBase config = {} # Will use defaults kb = KnowledgeBase(config) success = await kb.clear_index() if success: print("\n[MISC] Knowledge base cleared successfully") else: print("\n[MISC] Failed to clear knowledge base") asyncio.run(clear()) sys.exit(0) elif args.test_chunking: # Test chunking strategy print(f"\n[TOOL] Testing {args.chunking} chunking strategy...") print("=" * 70) async def test_chunking(): # Import the appropriate chunker if args.chunking == "semantic": try: from semantic_chunking import AdvancedSemanticChunker chunker = AdvancedSemanticChunker( min_chunk_size=500, max_chunk_size=2000, similarity_threshold=0.75, overlap_ratio=0.15 ) # Sample Arabic legal text for testing sample_text = """ الباب الأول: التوريق المالي المادة 1: تعريف التوريق التوريق هو عملية تحويل الأصول المالية إلى أوراق مالية قابلة للتداول. يشمل ذلك الديون والحقوق المالية المختلفة التي تولد تدفقات نقدية منتظمة. المادة 2: شروط التوريق يجب أن تكون الأصول المراد توريقها ذات تدفقات نقدية منتظمة ومتوقعة. يشترط موافقة البنك المركزي على عملية التوريق قبل التنفيذ. تخضع جميع عمليات التوريق للرقابة المستمرة من الجهات المختصة. المادة 3: الضمانات والحماية يجب توفير ضمانات كافية لحماية حقوق المستثمرين في الأوراق المالية المصدرة. تشمل الضمانات التأمين ضد المخاطر والاحتياطيات النقدية الكافية. """ print(f"Testing on sample Arabic legal document ({len(sample_text)} chars)") chunks = chunker.chunk_document(sample_text, add_overlap=True) print(f"\n[MISC] Created {len(chunks)} semantic chunks:") for i, chunk in enumerate(chunks): print(f"\nChunk {i+1}:") print(f" Type: {chunk.chunk_type}") print(f" Size: {len(chunk.content)} chars") print(f" Preview: {chunk.content[:150]}...") except ImportError: print("[MISC] Semantic chunking module not available") print(" Using traditional fixed-size chunking") elif args.chunking == "late": print("Late chunking requires long-context models.") print("Please ensure you have the required models installed.") try: from late_chunking import OptimalChunkingStrategy processor = OptimalChunkingStrategy() print("Late chunking test would go here...") except ImportError: print("[MISC] Late chunking module not available") elif args.chunking == "hierarchical": try: from semantic_chunking import AdvancedSemanticChunker, HierarchicalChunker semantic_chunker = AdvancedSemanticChunker() chunker = HierarchicalChunker(semantic_chunker) print("Hierarchical chunking test would go here...") except ImportError: print("[MISC] Hierarchical chunking modules not available") else: # fixed print("Using traditional fixed-size chunking (current method)") print("Chunk size: 800 chars, Overlap: 200 chars") print("\n" + "=" * 70) print("[MISC] Chunking test complete!") asyncio.run(test_chunking()) sys.exit(0) elif args.query: # Query test mode print(f"\n[SEARCH] Testing query through enhanced regulatory system...") print("=" * 70) async def test_query(): from src.ui.enhanced_gradio_app import EnhancedArabicChatbot # Initialize the system print("Initializing enhanced regulatory system...") app = EnhancedArabicChatbot() # Test the query print(f"\n[NOTE] Query: {args.query}") print("-" * 40) try: # Process the query history = [] result_history, status = await app.process_query(args.query, history) # Display results if result_history and len(result_history) >= 2: response = result_history[-1]['content'] print(f"Status: {status}") print(f"Response:\n{response}") else: print(f"Status: {status}") print("No response generated") except Exception as e: print(f"[MISC] Error processing query: {e}") import traceback traceback.print_exc() print("\n" + "=" * 70) print("[MISC] Query test complete!") asyncio.run(test_query()) sys.exit(0) else: # Normal operation with selected chunking strategy if args.chunking != "fixed": print(f"\n[TOOL] Using {args.chunking} chunking strategy") print("This provides better context preservation for Arabic documents") main()