Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Enhanced CMA RAG Chatbot for Hugging Face Spaces | |
| Conversational AI with memory for Kuwait Capital Markets Authority documents | |
| Using OpenAI text-embedding-3-large and gpt-4.1-mini with robust error handling | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import gradio as gr | |
| from typing import List, Tuple, Dict, Any | |
| import uuid | |
| import traceback | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Try to import required packages with fallbacks | |
| try: | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_openai.embeddings import OpenAIEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.schema import Document | |
| from langchain_openai import ChatOpenAI | |
| from langchain.prompts import ChatPromptTemplate | |
| from langchain.schema.runnable import RunnablePassthrough | |
| from langchain.schema.output_parser import StrOutputParser | |
| LANGCHAIN_AVAILABLE = True | |
| logger.info("LangChain packages imported successfully") | |
| except ImportError as e: | |
| logger.error(f"Failed to import LangChain packages: {e}") | |
| LANGCHAIN_AVAILABLE = False | |
| class EnhancedCMARAGBot: | |
| def __init__(self): | |
| """Initialize the Enhanced CMA RAG Bot with conversational capabilities""" | |
| logger.info("🚀 Starting Enhanced CMA Conversational RAG Chatbot...") | |
| self.initialized = False | |
| self.chat_sessions: Dict[str, List[Dict[str, str]]] = {} | |
| # Check if OpenAI API key is available | |
| self.openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if not self.openai_api_key: | |
| logger.error("OPENAI_API_KEY not found in environment variables") | |
| return | |
| if not LANGCHAIN_AVAILABLE: | |
| logger.error("LangChain packages not available") | |
| return | |
| try: | |
| self._initialize_components() | |
| self.initialized = True | |
| logger.info("Enhanced CMA RAG Bot initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize bot: {e}") | |
| logger.error(traceback.format_exc()) | |
| def _initialize_components(self): | |
| """Initialize all components with error handling""" | |
| # Initialize OpenAI embedding model | |
| try: | |
| self.embedding_model = OpenAIEmbeddings( | |
| model="text-embedding-3-large", | |
| openai_api_key=self.openai_api_key | |
| ) | |
| logger.info("OpenAI embedding model initialized (text-embedding-3-large)") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize embedding model: {e}") | |
| # Fallback to a simpler embedding model | |
| try: | |
| self.embedding_model = OpenAIEmbeddings( | |
| model="text-embedding-ada-002", | |
| openai_api_key=self.openai_api_key | |
| ) | |
| logger.info("Fallback: Using text-embedding-ada-002") | |
| except Exception as e2: | |
| logger.error(f"Failed to initialize fallback embedding model: {e2}") | |
| raise | |
| # Load vector store | |
| try: | |
| self.vector_store = self._load_vector_store() | |
| logger.info("Vector store loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to load vector store: {e}") | |
| raise | |
| # Initialize OpenAI LLM | |
| try: | |
| self.llm = ChatOpenAI( | |
| model="gpt-4o-mini", # Use gpt-4o-mini as fallback if gpt-4.1-mini not available | |
| temperature=0.1, | |
| max_tokens=1000, | |
| openai_api_key=self.openai_api_key | |
| ) | |
| logger.info("OpenAI LLM initialized with gpt-4o-mini") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize LLM: {e}") | |
| raise | |
| # Setup retrieval chain | |
| try: | |
| self._setup_retrieval_chain() | |
| logger.info("Retrieval chain setup completed") | |
| except Exception as e: | |
| logger.error(f"Failed to setup retrieval chain: {e}") | |
| raise | |
| def _load_vector_store(self): | |
| """Load the pre-built FAISS vector store or create new one with OpenAI embeddings""" | |
| try: | |
| # Try to load existing vector store | |
| vector_store = FAISS.load_local( | |
| "faiss_index", | |
| self.embedding_model, | |
| allow_dangerous_deserialization=True | |
| ) | |
| logger.info("Existing vector store loaded") | |
| return vector_store | |
| except Exception as e: | |
| logger.warning(f"Could not load existing vector store: {e}") | |
| logger.info("Creating new vector store with OpenAI embeddings...") | |
| # Create from processed documents with OpenAI embeddings | |
| return self._create_vector_store_from_documents() | |
| def _create_vector_store_from_documents(self): | |
| """Create vector store from processed documents using OpenAI embeddings""" | |
| try: | |
| with open('processed_documents.json', 'r', encoding='utf-8') as f: | |
| processed_docs = json.load(f) | |
| documents = [] | |
| # Limit to first 100 documents for faster initialization in case of issues | |
| limited_docs = processed_docs[:100] if len(processed_docs) > 100 else processed_docs | |
| for doc_data in limited_docs: | |
| doc = Document( | |
| page_content=doc_data['content'], | |
| metadata=doc_data['metadata'] | |
| ) | |
| documents.append(doc) | |
| logger.info(f"Creating vector store from {len(documents)} documents...") | |
| # Create vector store with OpenAI embeddings | |
| vector_store = FAISS.from_documents(documents, self.embedding_model) | |
| # Save for future use | |
| try: | |
| vector_store.save_local("faiss_index") | |
| logger.info("Vector store created and saved with OpenAI embeddings") | |
| except Exception as e: | |
| logger.warning(f"Could not save vector store: {e}") | |
| return vector_store | |
| except Exception as e: | |
| logger.error(f"Error creating vector store from documents: {e}") | |
| raise | |
| def _setup_retrieval_chain(self): | |
| """Setup the retrieval chain with enhanced prompts""" | |
| # Create retriever | |
| self.retriever = self.vector_store.as_retriever( | |
| search_type="similarity", | |
| search_kwargs={"k": 3} # Reduced from 5 to 3 for better performance | |
| ) | |
| # Enhanced system prompt for conversational AI | |
| system_prompt = """أنت "مستشار CMA"، مساعد ذكي متخصص ومتفهم في قوانين ولوائح هيئة أسواق المال الكويتية. | |
| شخصيتك: | |
| - محترف ومتفهم ومساعد | |
| - تتحدث بطريقة ودودة وطبيعية | |
| - تتذكر السياق والمحادثات السابقة بدقة | |
| - تجيب على الأسئلة العامة والمحادثة العادية بطريقة مهذبة | |
| - تفهم الأسئلة التتابعية والمتابعة بناءً على السياق السابق | |
| قواعد الإجابة: | |
| - أجب باللغة العربية دائماً | |
| - للأسئلة القانونية: استخدم المعلومات من السياق المتوفر فقط | |
| - للمحادثة العامة (مثل "كيف حالك؟"): أجب بطريقة ودودة ومهذبة | |
| - عند طلب "مزيد من التفاصيل" أو "توضيح أكثر": ارجع للموضوع السابق في المحادثة وقدم تفاصيل إضافية | |
| - إذا لم تجد معلومات قانونية في السياق، قل "لا توجد معلومات كافية في الوثائق المتاحة" | |
| - اذكر مصدر المعلومة (اسم الكتاب ورقم المادة) عند الإمكان | |
| - كن مفصلاً ودقيقاً في الإجابات القانونية | |
| - اربط الأسئلة التتابعية بالسياق السابق في المحادثة | |
| السياق المتاح من وثائق هيئة أسواق المال: | |
| {context}""" | |
| qa_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_prompt), | |
| ("human", "تاريخ المحادثة:\n{chat_history}\n\nالسؤال الحالي: {question}") | |
| ]) | |
| # Create the chain | |
| self.rag_chain = ( | |
| { | |
| "context": self.retriever | self._format_docs, | |
| "question": RunnablePassthrough(), | |
| "chat_history": RunnablePassthrough() | |
| } | |
| | qa_prompt | |
| | self.llm | |
| | StrOutputParser() | |
| ) | |
| def _format_docs(self, docs): | |
| """Format retrieved documents for context""" | |
| formatted = [] | |
| for doc in docs: | |
| source = doc.metadata.get('source', 'غير محدد') | |
| content = doc.page_content | |
| formatted.append(f"المصدر: {source}\nالمحتوى: {content}\n") | |
| return "\n---\n".join(formatted) | |
| def get_chat_history(self, user_id: str) -> List[Dict[str, str]]: | |
| """Get chat history for a user""" | |
| return self.chat_sessions.get(user_id, []) | |
| def add_to_chat_history(self, user_id: str, user_message: str, bot_response: str): | |
| """Add message to chat history""" | |
| if user_id not in self.chat_sessions: | |
| self.chat_sessions[user_id] = [] | |
| chat_history = self.chat_sessions[user_id] | |
| chat_history.append({"role": "user", "content": user_message}) | |
| chat_history.append({"role": "assistant", "content": bot_response}) | |
| # Keep only last 10 messages to manage memory | |
| if len(chat_history) > 10: | |
| self.chat_sessions[user_id] = chat_history[-10:] | |
| def is_legal_question(self, user_input: str) -> bool: | |
| """Determine if the question is legal/regulatory in nature""" | |
| legal_keywords = [ | |
| 'قانون', 'لائحة', 'نظام', 'قاعدة', 'متطلب', 'شرط', 'إجراء', | |
| 'هيئة', 'أسواق', 'مال', 'استثمار', 'إدراج', 'تداول', 'حوكمة', | |
| 'امتثال', 'غسل', 'أموال', 'تقنيات', 'مالية', 'ترخيص', 'رقابة', | |
| 'عقوبة', 'مخالفة', 'تفتيش', 'بورصة', 'وساطة', 'صندوق', | |
| 'ما هي', 'عرف', 'اشرح', 'وضح', 'متى', 'كيف', 'أين', | |
| 'تفاصيل', 'مزيد', 'أكثر', 'توضيح', 'شرح', 'تفسير', | |
| 'نعم', 'أريد', 'أرغب', 'أود', 'هل يمكن', 'كيفية', | |
| 'تسهيلات', 'تركز', 'تمويلي', 'أطراف', 'مترابطة', 'احتساب', | |
| 'حد', 'أعلى', 'نقدية', 'غير نقدية', 'مستخدم', 'ممنوح' | |
| ] | |
| # Also check if it's a follow-up question | |
| followup_phrases = [ | |
| 'مزيد من التفاصيل', 'تفاصيل أكثر', 'وضح أكثر', 'اشرح أكثر', | |
| 'نعم أريد', 'نعم أرغب', 'أريد تفاصيل', 'أود معرفة', | |
| 'هل يمكن توضيح', 'كيف يتم', 'ما هو الإجراء' | |
| ] | |
| user_input_lower = user_input.lower().strip() | |
| # Check for follow-up phrases | |
| if any(phrase in user_input_lower for phrase in followup_phrases): | |
| return True | |
| return any(keyword in user_input for keyword in legal_keywords) | |
| def get_conversational_response(self, user_input: str) -> str: | |
| """Generate conversational responses for non-legal questions""" | |
| greetings = ['سلام', 'أهلا', 'مرحبا', 'صباح', 'مساء'] | |
| how_are_you = ['كيف حالك', 'كيف الحال', 'شلونك', 'كيفك'] | |
| thanks = ['شكرا', 'شكراً', 'مشكور', 'تسلم'] | |
| goodbye = ['وداع', 'سلامة', 'باي', 'مع السلامة'] | |
| user_lower = user_input.lower() | |
| if any(greeting in user_lower for greeting in greetings): | |
| return """وعليكم السلام ورحمة الله وبركاته. أهلاً وسهلاً بك في مستشار هيئة أسواق المال الكويتية. | |
| أنا مستشار ذكي متخصص في قوانين ولوائح هيئة أسواق المال الكويتية، مدرب على جميع الوثائق الرسمية باستخدام تقنيات الذكاء الاصطناعي المتقدمة. | |
| يمكنني مساعدتك في: | |
| • قوانين ولوائح الأوراق المالية | |
| • أنظمة الاستثمار الجماعي | |
| • قواعد الإدراج والتداول | |
| • متطلبات الحوكمة والامتثال | |
| • أحكام مكافحة غسل الأموال | |
| • التقنيات المالية والابتكار | |
| كيف يمكنني مساعدتك اليوم؟ 😊""" | |
| elif any(how in user_lower for how in how_are_you): | |
| return """أنا بخير شكراً لسؤالك! 😊 كيف يمكنني مساعدتك اليوم في موضوع يتعلق بهيئة أسواق المال الكويتية؟ | |
| أنا جاهز للإجابة على أي استفسار قانوني أو تنظيمي، سأكون سعيداً جداً بمساعدتك. | |
| يمكنك سؤالي عن أي موضوع متعلق بأسواق المال الكويتية! 📚""" | |
| elif any(thank in user_lower for thank in thanks): | |
| return "العفو! أنا سعيد لمساعدتك. إذا كان لديك أي استفسار آخر حول قوانين ولوائح هيئة أسواق المال، لا تتردد في السؤال! 😊" | |
| elif any(bye in user_lower for bye in goodbye): | |
| return "مع السلامة! كان من دواعي سروري مساعدتك. أتمنى لك يوماً سعيداً، وأراك قريباً! 👋" | |
| else: | |
| return """أقدر تفاعلك معي! | |
| أنا متخصص في الإجابة على الأسئلة المتعلقة بقوانين ولوائح هيئة أسواق المال الكويتية. إذا كان لديك أي استفسار قانوني أو تنظيمي، سأكون سعيداً جداً بمساعدتك. | |
| يمكنك سؤالي عن أي موضوع متعلق بأسواق المال الكويتية! 📚""" | |
| def format_chat_history_for_prompt(self, chat_history: List[Dict[str, str]]) -> str: | |
| """Format chat history for the prompt""" | |
| if not chat_history: | |
| return "لا يوجد تاريخ محادثة سابق." | |
| formatted_history = [] | |
| for message in chat_history[-6:]: # Last 6 messages for context | |
| role = "المستخدم" if message["role"] == "user" else "المستشار" | |
| formatted_history.append(f"{role}: {message['content']}") | |
| return "\n".join(formatted_history) | |
| def get_response(self, user_input: str, user_id: str = None) -> str: | |
| """Get response from the RAG system with conversational capabilities""" | |
| if not self.initialized: | |
| return """عذراً، النظام قيد التهيئة. يرجى التأكد من: | |
| 1. إعداد OPENAI_API_KEY في متغيرات البيئة | |
| 2. تحميل جميع الملفات المطلوبة | |
| 3. المحاولة مرة أخرى خلال دقائق قليلة""" | |
| try: | |
| if user_id is None: | |
| user_id = str(uuid.uuid4()) | |
| # Get chat history | |
| chat_history = self.get_chat_history(user_id) | |
| formatted_history = self.format_chat_history_for_prompt(chat_history) | |
| # Check if it's a legal question or general conversation | |
| if self.is_legal_question(user_input): | |
| # Use RAG for legal questions | |
| response = self.rag_chain.invoke({ | |
| "question": user_input, | |
| "chat_history": formatted_history | |
| }) | |
| else: | |
| # Use conversational responses for general chat | |
| response = self.get_conversational_response(user_input) | |
| # Add to chat history | |
| self.add_to_chat_history(user_id, user_input, response) | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error getting response: {e}") | |
| logger.error(traceback.format_exc()) | |
| return f"عذراً، حدث خطأ أثناء معالجة استفسارك: {str(e)}\nيرجى المحاولة مرة أخرى." | |
| # Initialize the bot | |
| logger.info("Initializing bot...") | |
| try: | |
| bot = EnhancedCMARAGBot() | |
| if bot.initialized: | |
| logger.info("Bot initialized successfully") | |
| else: | |
| logger.error("Bot initialization failed") | |
| bot = None | |
| except Exception as e: | |
| logger.error(f"Failed to create bot instance: {e}") | |
| logger.error(traceback.format_exc()) | |
| bot = None | |
| def chat_interface(message, history, user_id_state): | |
| """Gradio chat interface""" | |
| if bot is None or not bot.initialized: | |
| error_msg = """عذراً، النظام غير متاح حالياً. الأسباب المحتملة: | |
| 🔑 **مفتاح OpenAI مفقود**: تأكد من إعداد OPENAI_API_KEY | |
| 📁 **ملفات مفقودة**: تأكد من تحميل processed_documents.json | |
| 🔧 **خطأ في التهيئة**: راجع سجلات البناء | |
| يرجى التحقق من الإعدادات والمحاولة مرة أخرى.""" | |
| history.append((message, error_msg)) | |
| return "", history, user_id_state | |
| # Generate user ID if not exists | |
| if user_id_state is None: | |
| user_id_state = str(uuid.uuid4()) | |
| # Get response | |
| response = bot.get_response(message, user_id_state) | |
| # Update history | |
| history.append((message, response)) | |
| return "", history, user_id_state | |
| def get_stats(): | |
| """Get system statistics""" | |
| try: | |
| with open('processed_documents.json', 'r', encoding='utf-8') as f: | |
| docs = json.load(f) | |
| return len(docs), 19, "GPT-4o Mini", "تفاعلية مع ذاكرة" | |
| except: | |
| return "2,091", "19", "GPT-4o Mini", "تفاعلية مع ذاكرة" | |
| # Create Gradio interface | |
| def create_interface(): | |
| """Create the Gradio interface""" | |
| # Get stats | |
| doc_count, source_count, model_name, chat_type = get_stats() | |
| # Check bot status for display | |
| bot_status = "🟢 متاح" if (bot and bot.initialized) else "🔴 غير متاح" | |
| with gr.Blocks( | |
| title="مستشار هيئة أسواق المال الكويتية - نظام RAG التفاعلي", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .rtl { direction: rtl; text-align: right; } | |
| .main-header { background: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%); color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; } | |
| .stats-container { display: flex; gap: 10px; margin-bottom: 20px; } | |
| .stat-card { flex: 1; padding: 15px; border-radius: 8px; text-align: center; } | |
| .stat-card.docs { background-color: #e3f2fd; } | |
| .stat-card.sources { background-color: #e8f5e8; } | |
| .stat-card.model { background-color: #fff3e0; } | |
| .stat-card.chat { background-color: #f3e5f5; } | |
| .examples-container { margin-top: 15px; } | |
| .example-section { margin-bottom: 15px; } | |
| .example-buttons { display: flex; flex-wrap: wrap; gap: 8px; margin-top: 8px; } | |
| .example-btn { padding: 8px 12px; border: 1px solid #ddd; border-radius: 15px; background: #f8f9fa; cursor: pointer; font-size: 12px; } | |
| .example-btn:hover { background: #e9ecef; } | |
| """ | |
| ) as interface: | |
| # Header | |
| gr.HTML(f""" | |
| <div class="main-header rtl"> | |
| <h1>🤖 مستشار هيئة أسواق المال الكويتية</h1> | |
| <p>نظام RAG تفاعلي مع ذاكرة محادثة وذكاء اصطناعي محسّن</p> | |
| <p>مدعوم بـ OpenAI text-embedding-3-large و GPT-4o Mini</p> | |
| <p><strong>حالة النظام: {bot_status}</strong></p> | |
| </div> | |
| """) | |
| # Statistics | |
| gr.HTML(f""" | |
| <div class="stats-container"> | |
| <div class="stat-card docs"> | |
| <h3>📄 المستندات</h3> | |
| <h2>{doc_count}</h2> | |
| </div> | |
| <div class="stat-card sources"> | |
| <h3>📚 المصادر</h3> | |
| <h2>{source_count}</h2> | |
| </div> | |
| <div class="stat-card model"> | |
| <h3>🧠 النموذج</h3> | |
| <h2>{model_name}</h2> | |
| </div> | |
| <div class="stat-card chat"> | |
| <h3>💬 المحادثة</h3> | |
| <h2>{chat_type}</h2> | |
| </div> | |
| </div> | |
| """) | |
| # User ID state (hidden) | |
| user_id_state = gr.State(None) | |
| # Chat interface | |
| with gr.Row(): | |
| with gr.Column(): | |
| chatbot = gr.Chatbot( | |
| label="💬 المحادثة مع مستشار CMA", | |
| height=400, | |
| rtl=True, | |
| show_label=True, | |
| container=True, | |
| bubble_full_width=False | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="✍️ اكتب رسالتك هنا", | |
| placeholder="يمكنك سؤالي عن القوانين أو حتى المحادثة العامة...", | |
| rtl=True, | |
| scale=4 | |
| ) | |
| send_btn = gr.Button("📤 إرسال", scale=1) | |
| # Examples | |
| gr.HTML(""" | |
| <div class="examples-container rtl"> | |
| <div class="example-section"> | |
| <h4>🗣️ محادثة عامة</h4> | |
| <div class="example-buttons"> | |
| <span class="example-btn">كيف حالك؟</span> | |
| <span class="example-btn">شكراً لك</span> | |
| <span class="example-btn">مع السلامة</span> | |
| </div> | |
| </div> | |
| <div class="example-section"> | |
| <h4>⚖️ أسئلة قانونية</h4> | |
| <div class="example-buttons"> | |
| <span class="example-btn">ما هي أنظمة الاستثمار الجماعي؟</span> | |
| <span class="example-btn">عرف قواعد الإدراج</span> | |
| <span class="example-btn">ما هي متطلبات الحوكمة؟</span> | |
| </div> | |
| </div> | |
| </div> | |
| """) | |
| # Event handlers | |
| def respond(message, history, user_id_state): | |
| return chat_interface(message, history, user_id_state) | |
| msg.submit(respond, [msg, chatbot, user_id_state], [msg, chatbot, user_id_state]) | |
| send_btn.click(respond, [msg, chatbot, user_id_state], [msg, chatbot, user_id_state]) | |
| return interface | |
| # Launch the interface | |
| if __name__ == "__main__": | |
| interface = create_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True | |
| ) |