Spaces:
Sleeping
Sleeping
| """ | |
| Database utilities for SQLite persistence | |
| Simple key-value storage with JSON serialization | |
| """ | |
| import json | |
| import sqlite3 | |
| from pathlib import Path | |
| from typing import Optional, Any, List, Dict | |
| from datetime import datetime | |
| from src.config import config | |
| class Database: | |
| """Simple SQLite database for campaign data""" | |
| def __init__(self, db_path: Optional[Path] = None): | |
| """Initialize database connection""" | |
| self.db_path = db_path or config.database.db_path | |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) | |
| self.conn: Optional[sqlite3.Connection] = None | |
| self._initialize() | |
| def _initialize(self): | |
| """Initialize database and create tables""" | |
| self.conn = sqlite3.connect(str(self.db_path), check_same_thread=False) | |
| self.conn.row_factory = sqlite3.Row | |
| # Create tables | |
| self._create_tables() | |
| def _create_tables(self): | |
| """Create database tables""" | |
| cursor = self.conn.cursor() | |
| # Generic key-value store with type | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS entities ( | |
| id TEXT PRIMARY KEY, | |
| type TEXT NOT NULL, | |
| data TEXT NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # Index for faster lookups | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(type) | |
| """) | |
| # Campaign events for memory | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS campaign_events ( | |
| id TEXT PRIMARY KEY, | |
| campaign_id TEXT NOT NULL, | |
| session_number INTEGER NOT NULL, | |
| event_type TEXT NOT NULL, | |
| data TEXT NOT NULL, | |
| importance INTEGER DEFAULT 3, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (campaign_id) REFERENCES entities(id) | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_events_campaign ON campaign_events(campaign_id) | |
| """) | |
| self.conn.commit() | |
| def save(self, entity_id: str, entity_type: str, data: Dict[str, Any]): | |
| """Save entity to database""" | |
| cursor = self.conn.cursor() | |
| # Serialize data to JSON | |
| json_data = json.dumps(data, default=str) | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO entities (id, type, data, updated_at) | |
| VALUES (?, ?, ?, CURRENT_TIMESTAMP) | |
| """, (entity_id, entity_type, json_data)) | |
| self.conn.commit() | |
| def load(self, entity_id: str, entity_type: Optional[str] = None) -> Optional[Dict[str, Any]]: | |
| """Load entity from database""" | |
| cursor = self.conn.cursor() | |
| if entity_type: | |
| cursor.execute(""" | |
| SELECT data FROM entities WHERE id = ? AND type = ? | |
| """, (entity_id, entity_type)) | |
| else: | |
| cursor.execute(""" | |
| SELECT data FROM entities WHERE id = ? | |
| """, (entity_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| return json.loads(row['data']) | |
| return None | |
| def load_all(self, entity_type: str) -> List[Dict[str, Any]]: | |
| """Load all entities of a specific type""" | |
| cursor = self.conn.cursor() | |
| cursor.execute(""" | |
| SELECT data FROM entities WHERE type = ? | |
| ORDER BY updated_at DESC | |
| """, (entity_type,)) | |
| return [json.loads(row['data']) for row in cursor.fetchall()] | |
| def delete(self, entity_id: str): | |
| """Delete entity from database""" | |
| cursor = self.conn.cursor() | |
| cursor.execute(""" | |
| DELETE FROM entities WHERE id = ? | |
| """, (entity_id,)) | |
| self.conn.commit() | |
| def search(self, entity_type: str, query: str, limit: int = 10) -> List[Dict[str, Any]]: | |
| """Simple text search in entity data""" | |
| cursor = self.conn.cursor() | |
| cursor.execute(""" | |
| SELECT data FROM entities | |
| WHERE type = ? AND data LIKE ? | |
| ORDER BY updated_at DESC | |
| LIMIT ? | |
| """, (entity_type, f"%{query}%", limit)) | |
| return [json.loads(row['data']) for row in cursor.fetchall()] | |
| # Campaign Events specific methods | |
| def save_campaign_event(self, event_data: Dict[str, Any]): | |
| """Save campaign event for memory""" | |
| cursor = self.conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO campaign_events | |
| (id, campaign_id, session_number, event_type, data, importance) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, ( | |
| event_data['id'], | |
| event_data['campaign_id'], | |
| event_data['session_number'], | |
| event_data['event_type'], | |
| json.dumps(event_data, default=str), | |
| event_data.get('importance', 3) | |
| )) | |
| self.conn.commit() | |
| def load_campaign_events( | |
| self, | |
| campaign_id: str, | |
| session_number: Optional[int] = None, | |
| limit: Optional[int] = None | |
| ) -> List[Dict[str, Any]]: | |
| """Load campaign events for memory context""" | |
| cursor = self.conn.cursor() | |
| query = """ | |
| SELECT data FROM campaign_events | |
| WHERE campaign_id = ? | |
| """ | |
| params = [campaign_id] | |
| if session_number: | |
| query += " AND session_number = ?" | |
| params.append(session_number) | |
| query += " ORDER BY timestamp DESC" | |
| if limit: | |
| query += " LIMIT ?" | |
| params.append(limit) | |
| cursor.execute(query, params) | |
| return [json.loads(row['data']) for row in cursor.fetchall()] | |
| def get_campaign_context(self, campaign_id: str, max_events: int = 50) -> str: | |
| """Get formatted campaign context for AI""" | |
| events = self.load_campaign_events(campaign_id, limit=max_events) | |
| if not events: | |
| return "No campaign history yet." | |
| context_parts = ["# Campaign History\n"] | |
| for event in reversed(events): # Chronological order | |
| context_parts.append(f"## Session {event['session_number']}: {event['title']}") | |
| context_parts.append(f"**Type:** {event['event_type']}") | |
| context_parts.append(f"{event['description']}\n") | |
| return "\n".join(context_parts) | |
| def close(self): | |
| """Close database connection""" | |
| if self.conn: | |
| self.conn.close() | |
| def __enter__(self): | |
| """Context manager entry""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit""" | |
| self.close() | |
| # Global database instance | |
| _database: Optional[Database] = None | |
| def get_database() -> Database: | |
| """Get or create global database instance""" | |
| global _database | |
| if _database is None: | |
| _database = Database() | |
| return _database | |