""" Lead Storage Service Saves customer leads collected during scenario conversations """ from typing import Dict, Optional from datetime import datetime from pymongo.collection import Collection class LeadStorageService: """ Store customer leads from scenario interactions """ def __init__(self, leads_collection: Collection): self.collection = leads_collection self._ensure_indexes() def _ensure_indexes(self): """Create indexes for leads collection""" try: self.collection.create_index("email") self.collection.create_index("phone") self.collection.create_index("created_at") print("✓ Leads indexes created") except Exception as e: print(f"Leads indexes already exist: {e}") def save_lead( self, event_name: str, email: Optional[str] = None, phone: Optional[str] = None, interests: Optional[Dict] = None, session_id: Optional[str] = None, user_id: Optional[str] = None ) -> str: """ Save customer lead Args: event_name: Event they're interested in email: Customer email phone: Customer phone interests: Additional data (group_size, etc.) session_id: Conversation session user_id: User ID if authenticated Returns: Lead ID """ lead = { "event_name": event_name, "email": email, "phone": phone, "interests": interests or {}, "session_id": session_id, "user_id": user_id, "source": "chatbot_scenario", "created_at": datetime.utcnow(), "status": "new" } result = self.collection.insert_one(lead) lead_id = str(result.inserted_id) print(f"💾 Saved lead: {lead_id} | Event: {event_name} | Email: {email} | Phone: {phone}") return lead_id def get_leads( self, event_name: Optional[str] = None, limit: int = 50, skip: int = 0 ): """Get leads with optional filtering""" query = {} if event_name: query["event_name"] = event_name leads = self.collection.find(query).sort("created_at", -1).skip(skip).limit(limit) return list(leads) def count_leads(self, event_name: Optional[str] = None) -> int: """Count total leads""" query = {} if event_name: query["event_name"] = event_name return self.collection.count_documents(query)