CMP_AI_RAG / app.py
AhmedEwis's picture
Update app.py
367a238 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Enhanced CMA RAG Chatbot for Hugging Face Spaces
Conversational AI with memory for Kuwait Capital Markets Authority documents
Using OpenAI text-embedding-3-large and gpt-4.1-mini with robust error handling
"""
import os
import json
import logging
import gradio as gr
from typing import List, Tuple, Dict, Any
import uuid
import traceback
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Try to import required packages with fallbacks
try:
from langchain_community.vectorstores import FAISS
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser
LANGCHAIN_AVAILABLE = True
logger.info("LangChain packages imported successfully")
except ImportError as e:
logger.error(f"Failed to import LangChain packages: {e}")
LANGCHAIN_AVAILABLE = False
class EnhancedCMARAGBot:
def __init__(self):
"""Initialize the Enhanced CMA RAG Bot with conversational capabilities"""
logger.info("🚀 Starting Enhanced CMA Conversational RAG Chatbot...")
self.initialized = False
self.chat_sessions: Dict[str, List[Dict[str, str]]] = {}
# Check if OpenAI API key is available
self.openai_api_key = os.getenv("OPENAI_API_KEY")
if not self.openai_api_key:
logger.error("OPENAI_API_KEY not found in environment variables")
return
if not LANGCHAIN_AVAILABLE:
logger.error("LangChain packages not available")
return
try:
self._initialize_components()
self.initialized = True
logger.info("Enhanced CMA RAG Bot initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize bot: {e}")
logger.error(traceback.format_exc())
def _initialize_components(self):
"""Initialize all components with error handling"""
# Initialize OpenAI embedding model
try:
self.embedding_model = OpenAIEmbeddings(
model="text-embedding-3-large",
openai_api_key=self.openai_api_key
)
logger.info("OpenAI embedding model initialized (text-embedding-3-large)")
except Exception as e:
logger.error(f"Failed to initialize embedding model: {e}")
# Fallback to a simpler embedding model
try:
self.embedding_model = OpenAIEmbeddings(
model="text-embedding-ada-002",
openai_api_key=self.openai_api_key
)
logger.info("Fallback: Using text-embedding-ada-002")
except Exception as e2:
logger.error(f"Failed to initialize fallback embedding model: {e2}")
raise
# Load vector store
try:
self.vector_store = self._load_vector_store()
logger.info("Vector store loaded successfully")
except Exception as e:
logger.error(f"Failed to load vector store: {e}")
raise
# Initialize OpenAI LLM
try:
self.llm = ChatOpenAI(
model="gpt-4o-mini", # Use gpt-4o-mini as fallback if gpt-4.1-mini not available
temperature=0.1,
max_tokens=1000,
openai_api_key=self.openai_api_key
)
logger.info("OpenAI LLM initialized with gpt-4o-mini")
except Exception as e:
logger.error(f"Failed to initialize LLM: {e}")
raise
# Setup retrieval chain
try:
self._setup_retrieval_chain()
logger.info("Retrieval chain setup completed")
except Exception as e:
logger.error(f"Failed to setup retrieval chain: {e}")
raise
def _load_vector_store(self):
"""Load the pre-built FAISS vector store or create new one with OpenAI embeddings"""
try:
# Try to load existing vector store
vector_store = FAISS.load_local(
"faiss_index",
self.embedding_model,
allow_dangerous_deserialization=True
)
logger.info("Existing vector store loaded")
return vector_store
except Exception as e:
logger.warning(f"Could not load existing vector store: {e}")
logger.info("Creating new vector store with OpenAI embeddings...")
# Create from processed documents with OpenAI embeddings
return self._create_vector_store_from_documents()
def _create_vector_store_from_documents(self):
"""Create vector store from processed documents using OpenAI embeddings"""
try:
with open('processed_documents.json', 'r', encoding='utf-8') as f:
processed_docs = json.load(f)
documents = []
# Limit to first 100 documents for faster initialization in case of issues
limited_docs = processed_docs[:100] if len(processed_docs) > 100 else processed_docs
for doc_data in limited_docs:
doc = Document(
page_content=doc_data['content'],
metadata=doc_data['metadata']
)
documents.append(doc)
logger.info(f"Creating vector store from {len(documents)} documents...")
# Create vector store with OpenAI embeddings
vector_store = FAISS.from_documents(documents, self.embedding_model)
# Save for future use
try:
vector_store.save_local("faiss_index")
logger.info("Vector store created and saved with OpenAI embeddings")
except Exception as e:
logger.warning(f"Could not save vector store: {e}")
return vector_store
except Exception as e:
logger.error(f"Error creating vector store from documents: {e}")
raise
def _setup_retrieval_chain(self):
"""Setup the retrieval chain with enhanced prompts"""
# Create retriever
self.retriever = self.vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": 3} # Reduced from 5 to 3 for better performance
)
# Enhanced system prompt for conversational AI
system_prompt = """أنت "مستشار CMA"، مساعد ذكي متخصص ومتفهم في قوانين ولوائح هيئة أسواق المال الكويتية.
شخصيتك:
- محترف ومتفهم ومساعد
- تتحدث بطريقة ودودة وطبيعية
- تتذكر السياق والمحادثات السابقة بدقة
- تجيب على الأسئلة العامة والمحادثة العادية بطريقة مهذبة
- تفهم الأسئلة التتابعية والمتابعة بناءً على السياق السابق
قواعد الإجابة:
- أجب باللغة العربية دائماً
- للأسئلة القانونية: استخدم المعلومات من السياق المتوفر فقط
- للمحادثة العامة (مثل "كيف حالك؟"): أجب بطريقة ودودة ومهذبة
- عند طلب "مزيد من التفاصيل" أو "توضيح أكثر": ارجع للموضوع السابق في المحادثة وقدم تفاصيل إضافية
- إذا لم تجد معلومات قانونية في السياق، قل "لا توجد معلومات كافية في الوثائق المتاحة"
- اذكر مصدر المعلومة (اسم الكتاب ورقم المادة) عند الإمكان
- كن مفصلاً ودقيقاً في الإجابات القانونية
- اربط الأسئلة التتابعية بالسياق السابق في المحادثة
السياق المتاح من وثائق هيئة أسواق المال:
{context}"""
qa_prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "تاريخ المحادثة:\n{chat_history}\n\nالسؤال الحالي: {question}")
])
# Create the chain
self.rag_chain = (
{
"context": self.retriever | self._format_docs,
"question": RunnablePassthrough(),
"chat_history": RunnablePassthrough()
}
| qa_prompt
| self.llm
| StrOutputParser()
)
def _format_docs(self, docs):
"""Format retrieved documents for context"""
formatted = []
for doc in docs:
source = doc.metadata.get('source', 'غير محدد')
content = doc.page_content
formatted.append(f"المصدر: {source}\nالمحتوى: {content}\n")
return "\n---\n".join(formatted)
def get_chat_history(self, user_id: str) -> List[Dict[str, str]]:
"""Get chat history for a user"""
return self.chat_sessions.get(user_id, [])
def add_to_chat_history(self, user_id: str, user_message: str, bot_response: str):
"""Add message to chat history"""
if user_id not in self.chat_sessions:
self.chat_sessions[user_id] = []
chat_history = self.chat_sessions[user_id]
chat_history.append({"role": "user", "content": user_message})
chat_history.append({"role": "assistant", "content": bot_response})
# Keep only last 10 messages to manage memory
if len(chat_history) > 10:
self.chat_sessions[user_id] = chat_history[-10:]
def is_legal_question(self, user_input: str) -> bool:
"""Determine if the question is legal/regulatory in nature"""
legal_keywords = [
'قانون', 'لائحة', 'نظام', 'قاعدة', 'متطلب', 'شرط', 'إجراء',
'هيئة', 'أسواق', 'مال', 'استثمار', 'إدراج', 'تداول', 'حوكمة',
'امتثال', 'غسل', 'أموال', 'تقنيات', 'مالية', 'ترخيص', 'رقابة',
'عقوبة', 'مخالفة', 'تفتيش', 'بورصة', 'وساطة', 'صندوق',
'ما هي', 'عرف', 'اشرح', 'وضح', 'متى', 'كيف', 'أين',
'تفاصيل', 'مزيد', 'أكثر', 'توضيح', 'شرح', 'تفسير',
'نعم', 'أريد', 'أرغب', 'أود', 'هل يمكن', 'كيفية',
'تسهيلات', 'تركز', 'تمويلي', 'أطراف', 'مترابطة', 'احتساب',
'حد', 'أعلى', 'نقدية', 'غير نقدية', 'مستخدم', 'ممنوح'
]
# Also check if it's a follow-up question
followup_phrases = [
'مزيد من التفاصيل', 'تفاصيل أكثر', 'وضح أكثر', 'اشرح أكثر',
'نعم أريد', 'نعم أرغب', 'أريد تفاصيل', 'أود معرفة',
'هل يمكن توضيح', 'كيف يتم', 'ما هو الإجراء'
]
user_input_lower = user_input.lower().strip()
# Check for follow-up phrases
if any(phrase in user_input_lower for phrase in followup_phrases):
return True
return any(keyword in user_input for keyword in legal_keywords)
def get_conversational_response(self, user_input: str) -> str:
"""Generate conversational responses for non-legal questions"""
greetings = ['سلام', 'أهلا', 'مرحبا', 'صباح', 'مساء']
how_are_you = ['كيف حالك', 'كيف الحال', 'شلونك', 'كيفك']
thanks = ['شكرا', 'شكراً', 'مشكور', 'تسلم']
goodbye = ['وداع', 'سلامة', 'باي', 'مع السلامة']
user_lower = user_input.lower()
if any(greeting in user_lower for greeting in greetings):
return """وعليكم السلام ورحمة الله وبركاته. أهلاً وسهلاً بك في مستشار هيئة أسواق المال الكويتية.
أنا مستشار ذكي متخصص في قوانين ولوائح هيئة أسواق المال الكويتية، مدرب على جميع الوثائق الرسمية باستخدام تقنيات الذكاء الاصطناعي المتقدمة.
يمكنني مساعدتك في:
• قوانين ولوائح الأوراق المالية
• أنظمة الاستثمار الجماعي
• قواعد الإدراج والتداول
• متطلبات الحوكمة والامتثال
• أحكام مكافحة غسل الأموال
• التقنيات المالية والابتكار
كيف يمكنني مساعدتك اليوم؟ 😊"""
elif any(how in user_lower for how in how_are_you):
return """أنا بخير شكراً لسؤالك! 😊 كيف يمكنني مساعدتك اليوم في موضوع يتعلق بهيئة أسواق المال الكويتية؟
أنا جاهز للإجابة على أي استفسار قانوني أو تنظيمي، سأكون سعيداً جداً بمساعدتك.
يمكنك سؤالي عن أي موضوع متعلق بأسواق المال الكويتية! 📚"""
elif any(thank in user_lower for thank in thanks):
return "العفو! أنا سعيد لمساعدتك. إذا كان لديك أي استفسار آخر حول قوانين ولوائح هيئة أسواق المال، لا تتردد في السؤال! 😊"
elif any(bye in user_lower for bye in goodbye):
return "مع السلامة! كان من دواعي سروري مساعدتك. أتمنى لك يوماً سعيداً، وأراك قريباً! 👋"
else:
return """أقدر تفاعلك معي!
أنا متخصص في الإجابة على الأسئلة المتعلقة بقوانين ولوائح هيئة أسواق المال الكويتية. إذا كان لديك أي استفسار قانوني أو تنظيمي، سأكون سعيداً جداً بمساعدتك.
يمكنك سؤالي عن أي موضوع متعلق بأسواق المال الكويتية! 📚"""
def format_chat_history_for_prompt(self, chat_history: List[Dict[str, str]]) -> str:
"""Format chat history for the prompt"""
if not chat_history:
return "لا يوجد تاريخ محادثة سابق."
formatted_history = []
for message in chat_history[-6:]: # Last 6 messages for context
role = "المستخدم" if message["role"] == "user" else "المستشار"
formatted_history.append(f"{role}: {message['content']}")
return "\n".join(formatted_history)
def get_response(self, user_input: str, user_id: str = None) -> str:
"""Get response from the RAG system with conversational capabilities"""
if not self.initialized:
return """عذراً، النظام قيد التهيئة. يرجى التأكد من:
1. إعداد OPENAI_API_KEY في متغيرات البيئة
2. تحميل جميع الملفات المطلوبة
3. المحاولة مرة أخرى خلال دقائق قليلة"""
try:
if user_id is None:
user_id = str(uuid.uuid4())
# Get chat history
chat_history = self.get_chat_history(user_id)
formatted_history = self.format_chat_history_for_prompt(chat_history)
# Check if it's a legal question or general conversation
if self.is_legal_question(user_input):
# Use RAG for legal questions
response = self.rag_chain.invoke({
"question": user_input,
"chat_history": formatted_history
})
else:
# Use conversational responses for general chat
response = self.get_conversational_response(user_input)
# Add to chat history
self.add_to_chat_history(user_id, user_input, response)
return response
except Exception as e:
logger.error(f"Error getting response: {e}")
logger.error(traceback.format_exc())
return f"عذراً، حدث خطأ أثناء معالجة استفسارك: {str(e)}\nيرجى المحاولة مرة أخرى."
# Initialize the bot
logger.info("Initializing bot...")
try:
bot = EnhancedCMARAGBot()
if bot.initialized:
logger.info("Bot initialized successfully")
else:
logger.error("Bot initialization failed")
bot = None
except Exception as e:
logger.error(f"Failed to create bot instance: {e}")
logger.error(traceback.format_exc())
bot = None
def chat_interface(message, history, user_id_state):
"""Gradio chat interface"""
if bot is None or not bot.initialized:
error_msg = """عذراً، النظام غير متاح حالياً. الأسباب المحتملة:
🔑 **مفتاح OpenAI مفقود**: تأكد من إعداد OPENAI_API_KEY
📁 **ملفات مفقودة**: تأكد من تحميل processed_documents.json
🔧 **خطأ في التهيئة**: راجع سجلات البناء
يرجى التحقق من الإعدادات والمحاولة مرة أخرى."""
history.append((message, error_msg))
return "", history, user_id_state
# Generate user ID if not exists
if user_id_state is None:
user_id_state = str(uuid.uuid4())
# Get response
response = bot.get_response(message, user_id_state)
# Update history
history.append((message, response))
return "", history, user_id_state
def get_stats():
"""Get system statistics"""
try:
with open('processed_documents.json', 'r', encoding='utf-8') as f:
docs = json.load(f)
return len(docs), 19, "GPT-4o Mini", "تفاعلية مع ذاكرة"
except:
return "2,091", "19", "GPT-4o Mini", "تفاعلية مع ذاكرة"
# Create Gradio interface
def create_interface():
"""Create the Gradio interface"""
# Get stats
doc_count, source_count, model_name, chat_type = get_stats()
# Check bot status for display
bot_status = "🟢 متاح" if (bot and bot.initialized) else "🔴 غير متاح"
with gr.Blocks(
title="مستشار هيئة أسواق المال الكويتية - نظام RAG التفاعلي",
theme=gr.themes.Soft(),
css="""
.rtl { direction: rtl; text-align: right; }
.main-header { background: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%); color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; }
.stats-container { display: flex; gap: 10px; margin-bottom: 20px; }
.stat-card { flex: 1; padding: 15px; border-radius: 8px; text-align: center; }
.stat-card.docs { background-color: #e3f2fd; }
.stat-card.sources { background-color: #e8f5e8; }
.stat-card.model { background-color: #fff3e0; }
.stat-card.chat { background-color: #f3e5f5; }
.examples-container { margin-top: 15px; }
.example-section { margin-bottom: 15px; }
.example-buttons { display: flex; flex-wrap: wrap; gap: 8px; margin-top: 8px; }
.example-btn { padding: 8px 12px; border: 1px solid #ddd; border-radius: 15px; background: #f8f9fa; cursor: pointer; font-size: 12px; }
.example-btn:hover { background: #e9ecef; }
"""
) as interface:
# Header
gr.HTML(f"""
<div class="main-header rtl">
<h1>🤖 مستشار هيئة أسواق المال الكويتية</h1>
<p>نظام RAG تفاعلي مع ذاكرة محادثة وذكاء اصطناعي محسّن</p>
<p>مدعوم بـ OpenAI text-embedding-3-large و GPT-4o Mini</p>
<p><strong>حالة النظام: {bot_status}</strong></p>
</div>
""")
# Statistics
gr.HTML(f"""
<div class="stats-container">
<div class="stat-card docs">
<h3>📄 المستندات</h3>
<h2>{doc_count}</h2>
</div>
<div class="stat-card sources">
<h3>📚 المصادر</h3>
<h2>{source_count}</h2>
</div>
<div class="stat-card model">
<h3>🧠 النموذج</h3>
<h2>{model_name}</h2>
</div>
<div class="stat-card chat">
<h3>💬 المحادثة</h3>
<h2>{chat_type}</h2>
</div>
</div>
""")
# User ID state (hidden)
user_id_state = gr.State(None)
# Chat interface
with gr.Row():
with gr.Column():
chatbot = gr.Chatbot(
label="💬 المحادثة مع مستشار CMA",
height=400,
rtl=True,
show_label=True,
container=True,
bubble_full_width=False
)
with gr.Row():
msg = gr.Textbox(
label="✍️ اكتب رسالتك هنا",
placeholder="يمكنك سؤالي عن القوانين أو حتى المحادثة العامة...",
rtl=True,
scale=4
)
send_btn = gr.Button("📤 إرسال", scale=1)
# Examples
gr.HTML("""
<div class="examples-container rtl">
<div class="example-section">
<h4>🗣️ محادثة عامة</h4>
<div class="example-buttons">
<span class="example-btn">كيف حالك؟</span>
<span class="example-btn">شكراً لك</span>
<span class="example-btn">مع السلامة</span>
</div>
</div>
<div class="example-section">
<h4>⚖️ أسئلة قانونية</h4>
<div class="example-buttons">
<span class="example-btn">ما هي أنظمة الاستثمار الجماعي؟</span>
<span class="example-btn">عرف قواعد الإدراج</span>
<span class="example-btn">ما هي متطلبات الحوكمة؟</span>
</div>
</div>
</div>
""")
# Event handlers
def respond(message, history, user_id_state):
return chat_interface(message, history, user_id_state)
msg.submit(respond, [msg, chatbot, user_id_state], [msg, chatbot, user_id_state])
send_btn.click(respond, [msg, chatbot, user_id_state], [msg, chatbot, user_id_state])
return interface
# Launch the interface
if __name__ == "__main__":
interface = create_interface()
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)