""" Enhanced Message Classification System for Arabic Regulatory Chatbot Improves differentiation between casual chat and regulatory queries. """ import re import logging from typing import Dict, List, Tuple, Optional from enum import Enum from dataclasses import dataclass logger = logging.getLogger(__name__) class ClassificationConfidence(Enum): """Classification confidence levels.""" HIGH = "high" # 0.8+ MEDIUM = "medium" # 0.5-0.8 LOW = "low" # 0.3-0.5 VERY_LOW = "very_low" # <0.3 @dataclass class ClassificationResult: """Result of message classification.""" message_type: str confidence: float confidence_level: ClassificationConfidence reasoning: str alternative_types: List[Tuple[str, float]] class EnhancedMessageClassifier: """ Enhanced message classifier with improved casual vs regulatory detection. """ def __init__(self): self.setup_patterns() self.setup_weights() def setup_patterns(self): """Setup comprehensive classification patterns.""" # === GREETING PATTERNS === self.greeting_patterns = { 'arabic': [ r'(^|\s)(مرحبا|أهلا|السلام عليكم|سلام عليكم|اهلا وسهلا|مرحباً|أهلاً)(\s|$)', r'(^|\s)(كيف حالك|كيف الحال|كيف الأحوال|شلونك|كيفك)(\s|$)', r'(^|\s)(صباح الخير|مساء الخير|تحية طيبة)(\s|$)', r'(^|\s)(هلا|هالو|هلو)(\s|$)', ], 'english': [ r'(^|\s)(hello|hi|hey|greetings|good morning|good afternoon|good evening)(\s|$)', r'(^|\s)(how are you|how do you do|what\'s up|how\'s it going)(\s|$)', r'(^|\s)(hola|bonjour|guten tag)(\s|$)', ] } # === ENHANCED CASUAL PATTERNS === self.casual_patterns = { 'time_queries': [ r'(what|ما|كم).*(time|وقت|ساعة)', r'(what time|كم الساعة|ما الوقت)', r'(current time|الوقت الحالي)', r'(time.*now|الوقت.*الآن)', ], 'weather_queries': [ r'(weather|طقس|جو)', r'(temperature|درجة حرارة)', r'(rain|snow|sunny|مطر|شمس|غائم)', r'(how.*weather|كيف.*الطقس)', ], 'personal_inquiries': [ r'(how.*day|كيف.*اليوم)', r'(how.*doing|كيف.*حالك)', r'(how.*feeling|كيف.*شعورك)', r'(what.*up|شو الأخبار)', r'(how.*going|كيف.*الأمور)', ], 'general_questions': [ r'(tell me about yourself|أخبرني عن نفسك)', r'(who are you|من أنت)', r'(what.*you do|ماذا تفعل)', r'(can you help|تقدر تساعد)', r'(what.*capabilities|ما قدراتك)', ], 'social_interactions': [ r'(thank.*you|شكرا|شكراً|تسلم)', r'(you\'re welcome|عفواً|لا شكر على واجب)', r'(goodbye|bye|وداع|باي|مع السلامة)', r'(see you|أراك لاحقاً)', r'(nice.*talk|كان من الجميل)', ], 'casual_requests': [ r'(tell.*joke|احك نكتة)', r'(how.*weekend|كيف.*الأسبوع)', r'(favorite.*color|اللون المفضل)', r'(what.*think|ما رأيك)', ] } # === REGULATORY PATTERNS (More Specific) === self.regulatory_patterns = { 'licensing_keywords': [ r'(ترخيص|license|licensing|permit|تصريح)', r'(تسجيل|registration|register)', r'(موافقة|approval|اعتماد)', ], 'compliance_keywords': [ r'(امتثال|compliance|comply)', r'(متطلبات|requirements|شروط)', r'(ضوابط|regulations|أنظمة|لوائح)', r'(قوانين|laws|legislation)', ], 'financial_keywords': [ r'(مصرف|بنك|bank|banking|مصرفي)', r'(ائتمان|credit|قرض|loan)', r'(استثمار|investment|توريق)', r'(أسواق المال|capital markets|securities)', r'(صندوق|fund|محفظة|portfolio)', ], 'authorities_keywords': [ r'(البنك المركزي|central bank|cbk)', r'(هيئة أسواق المال|capital markets authority|cma)', r'(مؤسسة النقد|monetary authority)', ], 'violations_keywords': [ r'(مخالفة|violation|infringement)', r'(عقوبة|penalty|fine|جزاء)', r'(تأديب|disciplinary|تأديبي)', r'(محاكمة|hearing|جلسة)', ], 'aml_keywords': [ r'(غسل الأموال|money laundering|aml)', r'(مشبوه|suspicious|مشكوك)', r'(kyc|know your customer|اعرف عميلك)', r'(بيانات العميل|customer data)', ] } # === CONTEXT CLUES === self.context_indicators = { 'casual_indicators': [ r'^(just|فقط|بس)', r'(curious|فضول)', r'(wondering|أتساءل)', r'(by the way|بالمناسبة)', r'(quick question|سؤال سريع)', ], 'regulatory_indicators': [ r'(according to|وفقاً ل|حسب)', r'(article|مادة|فقرة)', r'(section|قسم|باب)', r'(regulation number|رقم اللائحة)', r'(what are the|ما هي)', r'(requirements for|متطلبات)', ] } def setup_weights(self): """Setup scoring weights for different pattern categories.""" self.weights = { 'greeting': 1.0, 'casual_time': 0.9, 'casual_weather': 0.9, 'casual_personal': 0.8, 'casual_general': 0.7, 'casual_social': 0.8, 'casual_requests': 0.7, 'regulatory_licensing': 0.9, 'regulatory_compliance': 0.9, 'regulatory_financial': 0.8, 'regulatory_authorities': 1.0, 'regulatory_violations': 0.9, 'regulatory_aml': 0.9, 'context_casual': 0.3, 'context_regulatory': 0.4, } def classify_message(self, message: str) -> ClassificationResult: """ Classify message with confidence scoring and reasoning. """ message_lower = message.lower().strip() # Score different categories scores = { 'greeting': self._score_greeting(message_lower), 'casual': self._score_casual(message_lower), 'regulatory': self._score_regulatory(message_lower) } # Determine best classification best_type = max(scores, key=scores.get) best_score = scores[best_type] # Get confidence level confidence_level = self._get_confidence_level(best_score) # Generate reasoning reasoning = self._generate_reasoning(message_lower, scores, best_type) # Get alternative classifications sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True) alternatives = [(t, s) for t, s in sorted_scores[1:] if s > 0.1] # Handle ambiguous cases with smart defaults if confidence_level == ClassificationConfidence.VERY_LOW: # If all scores are very low, check message characteristics if len(message.split()) <= 3 and not any(char in message for char in '?!.'): # Short message without punctuation - likely casual best_type = 'casual' best_score = 0.4 confidence_level = ClassificationConfidence.LOW reasoning = "Short informal message defaulted to casual" elif '?' in message: # Question format - analyze further if any(word in message_lower for word in ['what', 'how', 'when', 'where', 'why', 'ماذا', 'كيف', 'متى', 'أين', 'لماذا']): best_type = 'casual' best_score = 0.4 confidence_level = ClassificationConfidence.LOW reasoning = "Question format defaulted to casual conversation" return ClassificationResult( message_type=best_type, confidence=best_score, confidence_level=confidence_level, reasoning=reasoning, alternative_types=alternatives ) def _score_greeting(self, message: str) -> float: """Score greeting likelihood.""" score = 0.0 for lang, patterns in self.greeting_patterns.items(): for pattern in patterns: if re.search(pattern, message, re.IGNORECASE): score = max(score, self.weights['greeting']) return min(score, 1.0) def _score_casual(self, message: str) -> float: """Score casual conversation likelihood.""" score = 0.0 # Check casual patterns for category, patterns in self.casual_patterns.items(): category_weight = self.weights.get(f'casual_{category.split("_")[0]}', 0.7) for pattern in patterns: if re.search(pattern, message, re.IGNORECASE): score = max(score, category_weight) # Add context indicators for pattern in self.context_indicators['casual_indicators']: if re.search(pattern, message, re.IGNORECASE): score += self.weights['context_casual'] # Boost for common casual question words casual_words = ['how', 'what', 'when', 'where', 'كيف', 'ماذا', 'متى', 'أين'] if any(word in message for word in casual_words) and len(message.split()) <= 6: score += 0.2 return min(score, 1.0) def _score_regulatory(self, message: str) -> float: """Score regulatory query likelihood.""" score = 0.0 # Check regulatory patterns for category, patterns in self.regulatory_patterns.items(): category_weight = self.weights.get(f'regulatory_{category.split("_")[0]}', 0.8) for pattern in patterns: if re.search(pattern, message, re.IGNORECASE): score = max(score, category_weight) # Add context indicators for pattern in self.context_indicators['regulatory_indicators']: if re.search(pattern, message, re.IGNORECASE): score += self.weights['context_regulatory'] # Penalty for very short messages if len(message.split()) <= 3: score *= 0.7 return min(score, 1.0) def _get_confidence_level(self, score: float) -> ClassificationConfidence: """Convert score to confidence level.""" if score >= 0.8: return ClassificationConfidence.HIGH elif score >= 0.5: return ClassificationConfidence.MEDIUM elif score >= 0.3: return ClassificationConfidence.LOW else: return ClassificationConfidence.VERY_LOW def _generate_reasoning(self, message: str, scores: Dict[str, float], best_type: str) -> str: """Generate human-readable reasoning for classification.""" reasoning_parts = [] if best_type == 'greeting': reasoning_parts.append("Contains greeting patterns") elif best_type == 'casual': if any(word in message for word in ['time', 'وقت', 'ساعة']): reasoning_parts.append("Time-related inquiry") elif any(word in message for word in ['weather', 'طقس']): reasoning_parts.append("Weather inquiry") elif any(word in message for word in ['how', 'كيف']): reasoning_parts.append("Personal/casual question") else: reasoning_parts.append("General casual conversation") elif best_type == 'regulatory': if any(word in message for word in ['bank', 'مصرف', 'بنك']): reasoning_parts.append("Banking-related query") elif any(word in message for word in ['license', 'ترخيص']): reasoning_parts.append("Licensing inquiry") elif any(word in message for word in ['compliance', 'امتثال']): reasoning_parts.append("Compliance question") else: reasoning_parts.append("Regulatory content detected") # Add confidence information if scores[best_type] < 0.5: reasoning_parts.append("(Low confidence - ambiguous)") return "; ".join(reasoning_parts) # Test the enhanced classifier if __name__ == "__main__": classifier = EnhancedMessageClassifier() test_cases = [ "Hello", "How is the day?", "What is the time right now?", "How are you doing today?", "What are the banking license requirements?", "Tell me about compliance regulations", "مرحبا", "كيف حالك اليوم؟", "ما الوقت الآن؟", "ما هي متطلبات الترخيص المصرفي؟" ] print("Enhanced Message Classification Test Results:") print("=" * 60) for message in test_cases: result = classifier.classify_message(message) print(f"\nMessage: \"{message}\"") print(f"Classification: {result.message_type}") print(f"Confidence: {result.confidence:.2f} ({result.confidence_level.value})") print(f"Reasoning: {result.reasoning}") if result.alternative_types: alts = ", ".join([f"{t}({s:.2f})" for t, s in result.alternative_types[:2]]) print(f"Alternatives: {alts}")