""" Banco de dados SQLite para Akira IA. Gerencia contexto, mensagens, embeddings, gírias, tom e aprendizados detalhados. Versão completa 11/2025. """ import sqlite3 import time import os import json from typing import Optional, List, Dict, Any, Tuple from loguru import logger class Database: def __init__(self, db_path: str): self.db_path = db_path self.max_retries = 5 self.retry_delay = 0.1 os.makedirs(os.path.dirname(db_path), exist_ok=True) self._init_db() self._ensure_all_columns_and_indexes() # ================================================================ # CONEXÃO COM RETRY + WAL # ================================================================ def _get_connection(self) -> sqlite3.Connection: for attempt in range(self.max_retries): try: conn = sqlite3.connect(self.db_path, timeout=30.0, check_same_thread=False) conn.execute('PRAGMA journal_mode=WAL') conn.execute('PRAGMA synchronous=NORMAL') conn.execute('PRAGMA cache_size=1000') conn.execute('PRAGMA temp_store=MEMORY') conn.execute('PRAGMA busy_timeout=30000') conn.execute('PRAGMA foreign_keys=ON') return conn except sqlite3.OperationalError as e: if "database is locked" in str(e) and attempt < self.max_retries - 1: time.sleep(self.retry_delay * (2 ** attempt)) continue logger.error(f"Falha ao conectar ao banco: {e}") raise raise sqlite3.OperationalError("Falha ao conectar após retries") def _execute_with_retry(self, query: str, params: Optional[tuple] = None, commit: bool = False) -> Optional[List[Tuple]]: for attempt in range(self.max_retries): try: with self._get_connection() as conn: c = conn.cursor() if params: c.execute(query, params) else: c.execute(query) result = c.fetchall() if query.strip().upper().startswith('SELECT') else None if commit: conn.commit() return result except sqlite3.OperationalError as e: if "database is locked" in str(e) and attempt < self.max_retries - 1: time.sleep(self.retry_delay * (2 ** attempt)) continue logger.error(f"Erro SQL (tentativa {attempt+1}): {e}") raise raise sqlite3.OperationalError("Query falhou após retries") # ================================================================ # INICIALIZAÇÃO + MIGRAÇÃO AUTOMÁTICA # ================================================================ def _init_db(self): try: with self._get_connection() as conn: c = conn.cursor() c.executescript(''' CREATE TABLE IF NOT EXISTS aprendizado ( id INTEGER PRIMARY KEY AUTOINCREMENT, usuario TEXT, dado TEXT, valor TEXT ); CREATE TABLE IF NOT EXISTS exemplos ( id INTEGER PRIMARY KEY AUTOINCREMENT, tipo TEXT NOT NULL, entrada TEXT NOT NULL, resposta TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS info_geral ( chave TEXT PRIMARY KEY, valor TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS estilos ( numero_usuario TEXT PRIMARY KEY, estilo TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS preferencias_tom ( numero_usuario TEXT PRIMARY KEY, tom TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS afinidades ( numero_usuario TEXT PRIMARY KEY, afinidade REAL NOT NULL ); CREATE TABLE IF NOT EXISTS termos ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT NOT NULL, termo TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS aprendizados ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT NOT NULL, chave TEXT NOT NULL, valor TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS vocabulario_patenteado ( termo TEXT PRIMARY KEY, definicao TEXT NOT NULL, uso TEXT NOT NULL, exemplo TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS usuarios_privilegiados ( numero_usuario TEXT PRIMARY KEY, nome TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS whatsapp_ids ( id INTEGER PRIMARY KEY AUTOINCREMENT, whatsapp_id TEXT NOT NULL, sender_number TEXT NOT NULL, UNIQUE (whatsapp_id, sender_number) ); CREATE TABLE IF NOT EXISTS embeddings ( id INTEGER PRIMARY KEY AUTOINCREMENT, texto TEXT NOT NULL, embedding BLOB NOT NULL ); CREATE TABLE IF NOT EXISTS mensagens ( id INTEGER PRIMARY KEY AUTOINCREMENT, usuario TEXT NOT NULL, mensagem TEXT NOT NULL, resposta TEXT NOT NULL, numero TEXT, is_reply BOOLEAN DEFAULT 0, mensagem_original TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS emocao_exemplos ( id INTEGER PRIMARY KEY AUTOINCREMENT, emocao TEXT NOT NULL, entrada TEXT NOT NULL, resposta TEXT NOT NULL, tom TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS girias_aprendidas ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT NOT NULL, giria TEXT NOT NULL, significado TEXT NOT NULL, contexto TEXT, frequencia INTEGER DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS tom_usuario ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT NOT NULL, tom_detectado TEXT NOT NULL, intensidade REAL DEFAULT 0.5, contexto TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS adaptacao_dinamica ( id INTEGER PRIMARY KEY AUTOINCREMENT, numero_usuario TEXT NOT NULL, tipo_adaptacao TEXT NOT NULL, valor_anterior TEXT, valor_novo TEXT, razao TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS pronomes_por_tom ( tom TEXT PRIMARY KEY, pronomes TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS contexto ( user_key TEXT PRIMARY KEY, historico TEXT, emocao_atual TEXT, termos TEXT, girias TEXT, tom TEXT ); ''') c.executescript(''' INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES ('formal', 'Sr., ilustre, boss, maior, homem'), ('rude', 'parvo, estúpido, burro, analfabeto, desperdício de esperma'), ('casual', 'mano, puto, cota, mwangolé, kota'), ('neutro', 'amigo, parceiro, camarada'); ''') conn.commit() logger.info(f"Banco inicializado: {self.db_path}") except Exception as e: logger.error(f"Erro ao criar tabelas: {e}") raise def _ensure_all_columns_and_indexes(self): try: with self._get_connection() as conn: c = conn.cursor() migrations = { 'mensagens': [ ("numero", "TEXT"), ("is_reply", "BOOLEAN DEFAULT 0"), ("mensagem_original", "TEXT"), ("created_at", "DATETIME DEFAULT CURRENT_TIMESTAMP") ], 'girias_aprendidas': [ ("contexto", "TEXT"), ("frequencia", "INTEGER DEFAULT 1"), ("updated_at", "DATETIME DEFAULT CURRENT_TIMESTAMP") ], 'tom_usuario': [ ("intensidade", "REAL DEFAULT 0.5"), ("contexto", "TEXT") ], 'contexto': [ ("historico", "TEXT"), ("emocao_atual", "TEXT"), ("termos", "TEXT"), ("girias", "TEXT"), ("tom", "TEXT") ], # CORREÇÃO: Adiciona as colunas que faltavam em 'embeddings' 'embeddings': [ ("numero_usuario", "TEXT"), ("source_type", "TEXT") ] } for table, cols in migrations.items(): c.execute(f"PRAGMA table_info('{table}')") existing = {row[1] for row in c.fetchall()} for col_name, col_def in cols: if col_name not in existing: try: c.execute(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_def}") logger.info(f"Coluna '{col_name}' adicionada em '{table}'") except Exception as e: logger.warning(f"Erro ao adicionar coluna {col_name}: {e}") indexes = [ "CREATE INDEX IF NOT EXISTS idx_mensagens_numero ON mensagens(numero);", "CREATE INDEX IF NOT EXISTS idx_mensagens_created ON mensagens(created_at DESC);", "CREATE INDEX IF NOT EXISTS idx_girias_usuario ON girias_aprendidas(numero_usuario);", "CREATE INDEX IF NOT EXISTS idx_girias_giria ON girias_aprendidas(giria);", "CREATE INDEX IF NOT EXISTS idx_tom_usuario ON tom_usuario(numero_usuario);", "CREATE INDEX IF NOT EXISTS idx_aprendizados_usuario ON aprendizados(numero_usuario);", "CREATE INDEX IF NOT EXISTS idx_embeddings_texto ON embeddings(texto);", "CREATE INDEX IF NOT EXISTS idx_pronomes_tom ON pronomes_por_tom(tom);", "CREATE INDEX IF NOT EXISTS idx_contexto_user ON contexto(user_key);" ] for idx in indexes: try: c.execute(idx) except: pass conn.commit() except Exception as e: logger.error(f"Erro na migração/índices: {e}") # ================================================================ # MÉTODOS PRINCIPAIS # ================================================================ def salvar_mensagem(self, usuario, mensagem, resposta, numero=None, is_reply=False, mensagem_original=None): try: cols = ['usuario', 'mensagem', 'resposta'] vals = [usuario, mensagem, resposta] if numero: cols.append('numero') vals.append(numero) if is_reply is not None: cols.append('is_reply') vals.append(int(is_reply)) if mensagem_original: cols.append('mensagem_original') vals.append(mensagem_original) placeholders = ', '.join(['?' for _ in cols]) query = f"INSERT INTO mensagens ({', '.join(cols)}) VALUES ({placeholders})" self._execute_with_retry(query, tuple(vals), commit=True) except Exception as e: logger.warning(f"Fallback salvar_mensagem: {e}") self._execute_with_retry( "INSERT INTO mensagens (usuario, mensagem, resposta) VALUES (?, ?, ?)", (usuario, mensagem, resposta), commit=True ) def recuperar_mensagens(self, usuario: str, limite: int = 5) -> List[Tuple]: return self._execute_with_retry( "SELECT mensagem, resposta FROM mensagens WHERE usuario=? OR numero=? ORDER BY id DESC LIMIT ?", (usuario, usuario, limite) ) or [] # CORREÇÃO: Assinatura de 5 argumentos (self + 4) para corresponder ao erro do log def salvar_embedding(self, numero_usuario: str, source_type: str, texto: str, embedding: bytes): """Compatível com paraphrase-MiniLM e numpy arrays.""" try: if hasattr(embedding, "tobytes"): embedding = embedding.tobytes() # Inserindo com as novas colunas self._execute_with_retry( "INSERT INTO embeddings (numero_usuario, source_type, texto, embedding) VALUES (?, ?, ?, ?)", (numero_usuario, source_type, texto, embedding), commit=True ) except Exception as e: logger.warning(f"Erro ao salvar embedding (tentativa com 4 args): {e}. Tentando com 2 argumentos (texto, embedding).") # Fallback para schema antigo, caso as colunas ainda não tenham migrado self._execute_with_retry( "INSERT INTO embeddings (texto, embedding) VALUES (?, ?)", (texto, embedding.tobytes() if hasattr(embedding, "tobytes") else embedding), commit=True ) # ================================================================ # CONTEXTO / TOM / GÍRIAS / APRENDIZADOS # ================================================================ # CORREÇÃO: Método adicionado para resolver o erro "'Database' object has no attribute 'salvar_contexto'" def salvar_contexto(self, user_key: str, historico: str, emocao_atual: str, termos: str, girias: str, tom: str): try: self._execute_with_retry( """INSERT OR REPLACE INTO contexto (user_key, historico, emocao_atual, termos, girias, tom) VALUES (?, ?, ?, ?, ?, ?)""", (user_key, historico, emocao_atual, termos, girias, tom), commit=True ) except Exception as e: logger.error(f"Erro ao salvar contexto para {user_key}: {e}") # CORREÇÃO: Aceita *args para ignorar o argumento extra (resolve "takes 2 positional arguments but 3 were given") def recuperar_aprendizado_detalhado(self, numero_usuario: str, *args) -> Dict[str, str]: # O argumento 'chave' (em *args) é ignorado aqui, pois a query busca todas as chaves rows = self._execute_with_retry( "SELECT chave, valor FROM aprendizados WHERE numero_usuario=?", (numero_usuario,) ) or [] return {r[0]: r[1] for r in rows} def recuperar_girias_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]: rows = self._execute_with_retry( "SELECT giria, significado, contexto, frequencia FROM girias_aprendidas WHERE numero_usuario=?", (numero_usuario,) ) or [] return [{'giria': r[0], 'significado': r[1], 'contexto': r[2], 'frequencia': r[3]} for r in rows] def obter_tom_predominante(self, numero_usuario: str) -> Optional[str]: rows = self._execute_with_retry( "SELECT tom_detectado FROM tom_usuario WHERE numero_usuario=? ORDER BY created_at DESC LIMIT 1", (numero_usuario,) ) or [] return rows[0][0] if rows else None def registrar_tom_usuario(self, numero_usuario: str, tom_detectado: str, intensidade: float = 0.5, contexto: Optional[str] = None): self._execute_with_retry( "INSERT INTO tom_usuario (numero_usuario, tom_detectado, intensidade, contexto) VALUES (?, ?, ?, ?)", (numero_usuario, tom_detectado, intensidade, contexto), commit=True ) def salvar_aprendizado_detalhado(self, numero_usuario: str, chave: str, valor: str): self._execute_with_retry( "INSERT OR REPLACE INTO aprendizados (numero_usuario, chave, valor) VALUES (?, ?, ?)", (numero_usuario, chave, valor), commit=True ) def salvar_giria_aprendida(self, numero_usuario: str, giria: str, significado: str, contexto: Optional[str] = None): existing = self._execute_with_retry( "SELECT id, frequencia FROM girias_aprendidas WHERE numero_usuario=? AND giria=?", (numero_usuario, giria) ) if existing: self._execute_with_retry( "UPDATE girias_aprendidas SET frequencia=frequencia+1, updated_at=CURRENT_TIMESTAMP WHERE id=?", (existing[0][0],), commit=True ) else: self._execute_with_retry( "INSERT INTO girias_aprendidas (numero_usuario, giria, significado, contexto) VALUES (?, ?, ?, ?)", (numero_usuario, giria, significado, contexto), commit=True ) def salvar_info_geral(self, chave: str, valor: str): self._execute_with_retry( "INSERT OR REPLACE INTO info_geral (chave, valor) VALUES (?, ?)", (chave, valor), commit=True ) def obter_info_geral(self, chave: str) -> Optional[str]: result = self._execute_with_retry("SELECT valor FROM info_geral WHERE chave=?", (chave,)) return result[0][0] if result else None