""" Banco de dados SQLite para Akira IA. Gerencia contexto, mensagens, embeddings, gírias, tom e aprendizados detalhados. Versão 11/2025 - compatível com treinamento.py (SentenceTransformers) """ import sqlite3 import time import os from typing import Optional, List, Dict, Any, Tuple from loguru import logger class Database: def __init__(self, db_path: str): self.db_path = db_path self.max_retries = 5 self.retry_delay = 0.1 os.makedirs(os.path.dirname(db_path), exist_ok=True) self._init_db() self._ensure_all_columns_and_indexes() # ================================================================ # CONEXÃO + RETRY # ================================================================ def _get_connection(self) -> sqlite3.Connection: for attempt in range(self.max_retries): try: conn = sqlite3.connect(self.db_path, timeout=30.0, check_same_thread=False) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") conn.execute("PRAGMA cache_size=1000") conn.execute("PRAGMA temp_store=MEMORY") conn.execute("PRAGMA busy_timeout=30000") conn.execute("PRAGMA foreign_keys=ON") return conn except sqlite3.OperationalError as e: if "locked" in str(e) and attempt < self.max_retries - 1: time.sleep(self.retry_delay * (2 ** attempt)) continue logger.error(f"Erro de conexão DB: {e}") raise raise sqlite3.OperationalError("Falha ao conectar ao banco após várias tentativas") def _execute_with_retry(self, query: str, params: Optional[tuple] = None, commit: bool = False): for attempt in range(self.max_retries): try: with self._get_connection() as conn: cur = conn.cursor() cur.execute(query, params or ()) result = cur.fetchall() if query.strip().upper().startswith("SELECT") else None if commit: conn.commit() return result except sqlite3.OperationalError as e: if "locked" in str(e) and attempt < self.max_retries - 1: time.sleep(self.retry_delay * (2 ** attempt)) continue logger.error(f"Erro SQL: {e}") raise raise sqlite3.OperationalError("Query falhou após retries") # ================================================================ # SCHEMA + MIGRAÇÃO # ================================================================ def _init_db(self): try: with self._get_connection() as conn: c = conn.cursor() c.executescript(""" CREATE TABLE IF NOT EXISTS mensagens ( id INTEGER PRIMARY KEY AUTOINCREMENT, usuario TEXT, mensagem TEXT, resposta TEXT, numero TEXT, is_reply BOOLEAN DEFAULT 0, mensagem_original TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS embeddings ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT, source_type TEXT, texto TEXT, embedding BLOB ); CREATE TABLE IF NOT EXISTS aprendizados ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT, chave TEXT, valor TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS girias_aprendidas ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT, giria TEXT, significado TEXT, contexto TEXT, frequencia INTEGER DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS tom_usuario ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT, tom_detectado TEXT, intensidade REAL DEFAULT 0.5, contexto TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS contexto ( user_key TEXT PRIMARY KEY, historico TEXT, emocao_atual TEXT, termos TEXT, girias TEXT, tom TEXT ); CREATE TABLE IF NOT EXISTS pronomes_por_tom ( tom TEXT PRIMARY KEY, pronomes TEXT ); """); # Dados padrão c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", ('neutro', 'tu/você')) c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", ('formal', 'o senhor/a senhora')) c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", ('informal', 'puto/kota')) conn.commit() logger.info(f"Banco inicializado: {self.db_path}") except Exception as e: logger.error(f"Erro ao criar tabelas: {e}") raise def _ensure_all_columns_and_indexes(self): try: with self._get_connection() as conn: c = conn.cursor() migrations = { 'embeddings': [ ("numero_usuario", "TEXT"), ("source_type", "TEXT") ], 'mensagens': [ ("numero", "TEXT"), ("is_reply", "BOOLEAN DEFAULT 0"), ("mensagem_original", "TEXT"), ("created_at", "DATETIME DEFAULT CURRENT_TIMESTAMP") ], 'tom_usuario': [ ("intensidade", "REAL DEFAULT 0.5"), ("contexto", "TEXT") ], 'contexto': [ ("historico", "TEXT"), ("emocao_atual", "TEXT"), ("termos", "TEXT"), ("girias", "TEXT"), ("tom", "TEXT") ] } for table, cols in migrations.items(): c.execute(f"PRAGMA table_info('{table}')") existing = {row[1] for row in c.fetchall()} for col_name, col_def in cols: if col_name not in existing: try: c.execute(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_def}") logger.info(f"Coluna '{col_name}' adicionada em '{table}'") except Exception as e: logger.warning(f"Erro ao adicionar coluna {col_name}: {e}") conn.commit() except Exception as e: logger.error(f"Erro na migração: {e}") # ================================================================ # FUNÇÕES PRINCIPAIS # ================================================================ def salvar_mensagem(self, usuario, mensagem, resposta, numero=None, is_reply=False, mensagem_original=None): try: cols = ['usuario', 'mensagem', 'resposta'] vals = [usuario, mensagem, resposta] if numero: cols.append('numero') vals.append(numero) if is_reply is not None: cols.append('is_reply') vals.append(int(is_reply)) if mensagem_original: cols.append('mensagem_original') vals.append(mensagem_original) placeholders = ', '.join(['?' for _ in cols]) query = f"INSERT INTO mensagens ({', '.join(cols)}) VALUES ({placeholders})" self._execute_with_retry(query, tuple(vals), commit=True) except Exception as e: logger.warning(f"Erro salvar_mensagem: {e}") self._execute_with_retry( "INSERT INTO mensagens (usuario, mensagem, resposta) VALUES (?, ?, ?)", (usuario, mensagem, resposta), commit=True ) def recuperar_mensagens(self, usuario: str, limite: int = 5): return self._execute_with_retry( "SELECT mensagem, resposta FROM mensagens WHERE usuario=? OR numero=? ORDER BY id DESC LIMIT ?", (usuario, usuario, limite) ) or [] def salvar_embedding(self, numero_usuario: str, source_type: str, texto: str, embedding): """Compatível com paraphrase-MiniLM e numpy arrays.""" try: if hasattr(embedding, "tobytes"): embedding = embedding.tobytes() self._execute_with_retry( "INSERT INTO embeddings (numero_usuario, source_type, texto, embedding) VALUES (?, ?, ?, ?)", (numero_usuario, source_type, texto, embedding), commit=True ) except Exception as e: logger.warning(f"Erro salvar_embedding: {e}") self._execute_with_retry( "INSERT INTO embeddings (texto, embedding) VALUES (?, ?)", (texto, embedding.tobytes() if hasattr(embedding, "tobytes") else embedding), commit=True ) # ================================================================ # CONTEXTO / TOM / GÍRIAS / APRENDIZADOS # ================================================================ def salvar_contexto(self, user_key, historico, emocao_atual, termos, girias, tom): self._execute_with_retry( """INSERT OR REPLACE INTO contexto (user_key, historico, emocao_atual, termos, girias, tom) VALUES (?, ?, ?, ?, ?, ?)""", (user_key, historico, emocao_atual, termos, girias, tom), commit=True ) def registrar_tom_usuario(self, numero_usuario, tom_detectado, intensidade=0.5, contexto=None): self._execute_with_retry( "INSERT INTO tom_usuario (numero_usuario, tom_detectado, intensidade, contexto) VALUES (?, ?, ?, ?)", (numero_usuario, tom_detectado, intensidade, contexto), commit=True ) def obter_tom_predominante(self, numero_usuario): rows = self._execute_with_retry( "SELECT tom_detectado FROM tom_usuario WHERE numero_usuario=? ORDER BY created_at DESC LIMIT 1", (numero_usuario,) ) return rows[0][0] if rows else None def salvar_aprendizado_detalhado(self, numero_usuario, chave, valor): self._execute_with_retry( "INSERT INTO aprendizados (numero_usuario, chave, valor) VALUES (?, ?, ?)", (numero_usuario, chave, valor), commit=True ) def recuperar_aprendizado_detalhado(self, numero_usuario, *args): rows = self._execute_with_retry( "SELECT chave, valor FROM aprendizados WHERE numero_usuario=?", (numero_usuario,) ) return {r[0]: r[1] for r in rows} if rows else {} def salvar_giria_aprendida(self, numero_usuario, giria, significado, contexto=None): existing = self._execute_with_retry( "SELECT id FROM girias_aprendidas WHERE numero_usuario=? AND giria=?", (numero_usuario, giria) ) if existing: self._execute_with_retry( "UPDATE girias_aprendidas SET frequencia=frequencia+1, updated_at=CURRENT_TIMESTAMP WHERE id=?", (existing[0][0],), commit=True ) else: self._execute_with_retry( "INSERT INTO girias_aprendidas (numero_usuario, giria, significado, contexto) VALUES (?, ?, ?, ?)", (numero_usuario, giria, significado, contexto), commit=True ) # ================================================================ # MÉTODO ADICIONADO: recuperar_girias_usuario # ================================================================ def recuperar_girias_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]: """ Recupera todas as gírias aprendidas para um usuário. Retorna: [{'giria': 'baza', 'significado': 'ir embora', 'frequencia': 5}, ...] """ try: rows = self._execute_with_retry( """ SELECT giria, significado, frequencia, contexto FROM girias_aprendidas WHERE numero_usuario = ? ORDER BY frequencia DESC, updated_at DESC """, (numero_usuario,) ) return [ { 'giria': row[0], 'significado': row[1], 'frequencia': row[2], 'contexto': row[3] } for row in rows ] if rows else [] except Exception as e: logger.warning(f"Falha ao carregar gírias do usuário {numero_usuario}: {e}") return []