akira / modules /database.py
akra35567's picture
Update modules/database.py
90a3599
raw
history blame
19.5 kB
"""
Banco de dados SQLite para Akira IA.
Gerencia contexto, mensagens, embeddings, gírias, tom e aprendizados detalhados.
Versão completa 11/2025.
"""
import sqlite3
import time
import os
import json
from typing import Optional, List, Dict, Any, Tuple
from loguru import logger
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
self.max_retries = 5
self.retry_delay = 0.1
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self._init_db()
self._ensure_all_columns_and_indexes()
# ================================================================
# CONEXÃO COM RETRY + WAL
# ================================================================
def _get_connection(self) -> sqlite3.Connection:
for attempt in range(self.max_retries):
try:
conn = sqlite3.connect(self.db_path, timeout=30.0, check_same_thread=False)
conn.execute('PRAGMA journal_mode=WAL')
conn.execute('PRAGMA synchronous=NORMAL')
conn.execute('PRAGMA cache_size=1000')
conn.execute('PRAGMA temp_store=MEMORY')
conn.execute('PRAGMA busy_timeout=30000')
conn.execute('PRAGMA foreign_keys=ON')
return conn
except sqlite3.OperationalError as e:
if "database is locked" in str(e) and attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (2 ** attempt))
continue
logger.error(f"Falha ao conectar ao banco: {e}")
raise
raise sqlite3.OperationalError("Falha ao conectar após retries")
def _execute_with_retry(self, query: str, params: Optional[tuple] = None, commit: bool = False) -> Optional[List[Tuple]]:
for attempt in range(self.max_retries):
try:
with self._get_connection() as conn:
c = conn.cursor()
if params:
c.execute(query, params)
else:
c.execute(query)
result = c.fetchall() if query.strip().upper().startswith('SELECT') else None
if commit:
conn.commit()
return result
except sqlite3.OperationalError as e:
if "database is locked" in str(e) and attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (2 ** attempt))
continue
logger.error(f"Erro SQL (tentativa {attempt+1}): {e}")
raise
raise sqlite3.OperationalError("Query falhou após retries")
# ================================================================
# INICIALIZAÇÃO + MIGRAÇÃO AUTOMÁTICA
# ================================================================
def _init_db(self):
try:
with self._get_connection() as conn:
c = conn.cursor()
c.executescript('''
CREATE TABLE IF NOT EXISTS aprendizado (
id INTEGER PRIMARY KEY AUTOINCREMENT,
usuario TEXT,
dado TEXT,
valor TEXT
);
CREATE TABLE IF NOT EXISTS exemplos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tipo TEXT NOT NULL,
entrada TEXT NOT NULL,
resposta TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS info_geral (
chave TEXT PRIMARY KEY,
valor TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS estilos (
numero_usuario TEXT PRIMARY KEY,
estilo TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS preferencias_tom (
numero_usuario TEXT PRIMARY KEY,
tom TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS afinidades (
numero_usuario TEXT PRIMARY KEY,
afinidade REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS termos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT NOT NULL,
termo TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS aprendizados (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT NOT NULL,
chave TEXT NOT NULL,
valor TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS vocabulario_patenteado (
termo TEXT PRIMARY KEY,
definicao TEXT NOT NULL,
uso TEXT NOT NULL,
exemplo TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS usuarios_privilegiados (
numero_usuario TEXT PRIMARY KEY,
nome TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS whatsapp_ids (
id INTEGER PRIMARY KEY AUTOINCREMENT,
whatsapp_id TEXT NOT NULL,
sender_number TEXT NOT NULL,
UNIQUE (whatsapp_id, sender_number)
);
CREATE TABLE IF NOT EXISTS embeddings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
texto TEXT NOT NULL,
embedding BLOB NOT NULL
);
CREATE TABLE IF NOT EXISTS mensagens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
usuario TEXT NOT NULL,
mensagem TEXT NOT NULL,
resposta TEXT NOT NULL,
numero TEXT,
is_reply BOOLEAN DEFAULT 0,
mensagem_original TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS emocao_exemplos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
emocao TEXT NOT NULL,
entrada TEXT NOT NULL,
resposta TEXT NOT NULL,
tom TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS girias_aprendidas (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT NOT NULL,
giria TEXT NOT NULL,
significado TEXT NOT NULL,
contexto TEXT,
frequencia INTEGER DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS tom_usuario (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT NOT NULL,
tom_detectado TEXT NOT NULL,
intensidade REAL DEFAULT 0.5,
contexto TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS adaptacao_dinamica (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT NOT NULL,
tipo_adaptacao TEXT NOT NULL,
valor_anterior TEXT,
valor_novo TEXT,
razao TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS pronomes_por_tom (
tom TEXT PRIMARY KEY,
pronomes TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS contexto (
user_key TEXT PRIMARY KEY,
historico TEXT,
emocao_atual TEXT,
termos TEXT,
girias TEXT,
tom TEXT
);
''')
c.executescript('''
INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES
('formal', 'Sr., ilustre, boss, maior, homem'),
('rude', 'parvo, estúpido, burro, analfabeto, desperdício de esperma'),
('casual', 'mano, puto, cota, mwangolé, kota'),
('neutro', 'amigo, parceiro, camarada');
''')
conn.commit()
logger.info(f"Banco inicializado: {self.db_path}")
except Exception as e:
logger.error(f"Erro ao criar tabelas: {e}")
raise
def _ensure_all_columns_and_indexes(self):
try:
with self._get_connection() as conn:
c = conn.cursor()
migrations = {
'mensagens': [
("numero", "TEXT"),
("is_reply", "BOOLEAN DEFAULT 0"),
("mensagem_original", "TEXT"),
("created_at", "DATETIME DEFAULT CURRENT_TIMESTAMP")
],
'girias_aprendidas': [
("contexto", "TEXT"),
("frequencia", "INTEGER DEFAULT 1"),
("updated_at", "DATETIME DEFAULT CURRENT_TIMESTAMP")
],
'tom_usuario': [
("intensidade", "REAL DEFAULT 0.5"),
("contexto", "TEXT")
],
'contexto': [
("historico", "TEXT"),
("emocao_atual", "TEXT"),
("termos", "TEXT"),
("girias", "TEXT"),
("tom", "TEXT")
],
# CORREÇÃO: Adiciona as colunas que faltavam em 'embeddings'
'embeddings': [
("numero_usuario", "TEXT"),
("source_type", "TEXT")
]
}
for table, cols in migrations.items():
c.execute(f"PRAGMA table_info('{table}')")
existing = {row[1] for row in c.fetchall()}
for col_name, col_def in cols:
if col_name not in existing:
try:
c.execute(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_def}")
logger.info(f"Coluna '{col_name}' adicionada em '{table}'")
except Exception as e:
logger.warning(f"Erro ao adicionar coluna {col_name}: {e}")
indexes = [
"CREATE INDEX IF NOT EXISTS idx_mensagens_numero ON mensagens(numero);",
"CREATE INDEX IF NOT EXISTS idx_mensagens_created ON mensagens(created_at DESC);",
"CREATE INDEX IF NOT EXISTS idx_girias_usuario ON girias_aprendidas(numero_usuario);",
"CREATE INDEX IF NOT EXISTS idx_girias_giria ON girias_aprendidas(giria);",
"CREATE INDEX IF NOT EXISTS idx_tom_usuario ON tom_usuario(numero_usuario);",
"CREATE INDEX IF NOT EXISTS idx_aprendizados_usuario ON aprendizados(numero_usuario);",
"CREATE INDEX IF NOT EXISTS idx_embeddings_texto ON embeddings(texto);",
"CREATE INDEX IF NOT EXISTS idx_pronomes_tom ON pronomes_por_tom(tom);",
"CREATE INDEX IF NOT EXISTS idx_contexto_user ON contexto(user_key);"
]
for idx in indexes:
try:
c.execute(idx)
except:
pass
conn.commit()
except Exception as e:
logger.error(f"Erro na migração/índices: {e}")
# ================================================================
# MÉTODOS PRINCIPAIS
# ================================================================
def salvar_mensagem(self, usuario, mensagem, resposta, numero=None, is_reply=False, mensagem_original=None):
try:
cols = ['usuario', 'mensagem', 'resposta']
vals = [usuario, mensagem, resposta]
if numero:
cols.append('numero')
vals.append(numero)
if is_reply is not None:
cols.append('is_reply')
vals.append(int(is_reply))
if mensagem_original:
cols.append('mensagem_original')
vals.append(mensagem_original)
placeholders = ', '.join(['?' for _ in cols])
query = f"INSERT INTO mensagens ({', '.join(cols)}) VALUES ({placeholders})"
self._execute_with_retry(query, tuple(vals), commit=True)
except Exception as e:
logger.warning(f"Fallback salvar_mensagem: {e}")
self._execute_with_retry(
"INSERT INTO mensagens (usuario, mensagem, resposta) VALUES (?, ?, ?)",
(usuario, mensagem, resposta),
commit=True
)
def recuperar_mensagens(self, usuario: str, limite: int = 5) -> List[Tuple]:
return self._execute_with_retry(
"SELECT mensagem, resposta FROM mensagens WHERE usuario=? OR numero=? ORDER BY id DESC LIMIT ?",
(usuario, usuario, limite)
) or []
# CORREÇÃO: Assinatura de 5 argumentos (self + 4) para corresponder ao erro do log
def salvar_embedding(self, numero_usuario: str, source_type: str, texto: str, embedding: bytes):
"""Compatível com paraphrase-MiniLM e numpy arrays."""
try:
if hasattr(embedding, "tobytes"):
embedding = embedding.tobytes()
# Inserindo com as novas colunas
self._execute_with_retry(
"INSERT INTO embeddings (numero_usuario, source_type, texto, embedding) VALUES (?, ?, ?, ?)",
(numero_usuario, source_type, texto, embedding),
commit=True
)
except Exception as e:
logger.warning(f"Erro ao salvar embedding (tentativa com 4 args): {e}. Tentando com 2 argumentos (texto, embedding).")
# Fallback para schema antigo, caso as colunas ainda não tenham migrado
self._execute_with_retry(
"INSERT INTO embeddings (texto, embedding) VALUES (?, ?)",
(texto, embedding.tobytes() if hasattr(embedding, "tobytes") else embedding),
commit=True
)
# ================================================================
# CONTEXTO / TOM / GÍRIAS / APRENDIZADOS
# ================================================================
# CORREÇÃO: Método adicionado para resolver o erro "'Database' object has no attribute 'salvar_contexto'"
def salvar_contexto(self, user_key: str, historico: str, emocao_atual: str, termos: str, girias: str, tom: str):
try:
self._execute_with_retry(
"""INSERT OR REPLACE INTO contexto
(user_key, historico, emocao_atual, termos, girias, tom)
VALUES (?, ?, ?, ?, ?, ?)""",
(user_key, historico, emocao_atual, termos, girias, tom),
commit=True
)
except Exception as e:
logger.error(f"Erro ao salvar contexto para {user_key}: {e}")
# CORREÇÃO: Aceita *args para ignorar o argumento extra (resolve "takes 2 positional arguments but 3 were given")
def recuperar_aprendizado_detalhado(self, numero_usuario: str, *args) -> Dict[str, str]:
# O argumento 'chave' (em *args) é ignorado aqui, pois a query busca todas as chaves
rows = self._execute_with_retry(
"SELECT chave, valor FROM aprendizados WHERE numero_usuario=?",
(numero_usuario,)
) or []
return {r[0]: r[1] for r in rows}
def recuperar_girias_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]:
rows = self._execute_with_retry(
"SELECT giria, significado, contexto, frequencia FROM girias_aprendidas WHERE numero_usuario=?",
(numero_usuario,)
) or []
return [{'giria': r[0], 'significado': r[1], 'contexto': r[2], 'frequencia': r[3]} for r in rows]
def obter_tom_predominante(self, numero_usuario: str) -> Optional[str]:
rows = self._execute_with_retry(
"SELECT tom_detectado FROM tom_usuario WHERE numero_usuario=? ORDER BY created_at DESC LIMIT 1",
(numero_usuario,)
) or []
return rows[0][0] if rows else None
def registrar_tom_usuario(self, numero_usuario: str, tom_detectado: str, intensidade: float = 0.5, contexto: Optional[str] = None):
self._execute_with_retry(
"INSERT INTO tom_usuario (numero_usuario, tom_detectado, intensidade, contexto) VALUES (?, ?, ?, ?)",
(numero_usuario, tom_detectado, intensidade, contexto),
commit=True
)
def salvar_aprendizado_detalhado(self, numero_usuario: str, chave: str, valor: str):
self._execute_with_retry(
"INSERT OR REPLACE INTO aprendizados (numero_usuario, chave, valor) VALUES (?, ?, ?)",
(numero_usuario, chave, valor),
commit=True
)
def salvar_giria_aprendida(self, numero_usuario: str, giria: str, significado: str, contexto: Optional[str] = None):
existing = self._execute_with_retry(
"SELECT id, frequencia FROM girias_aprendidas WHERE numero_usuario=? AND giria=?",
(numero_usuario, giria)
)
if existing:
self._execute_with_retry(
"UPDATE girias_aprendidas SET frequencia=frequencia+1, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(existing[0][0],),
commit=True
)
else:
self._execute_with_retry(
"INSERT INTO girias_aprendidas (numero_usuario, giria, significado, contexto) VALUES (?, ?, ?, ?)",
(numero_usuario, giria, significado, contexto),
commit=True
)
def salvar_info_geral(self, chave: str, valor: str):
self._execute_with_retry(
"INSERT OR REPLACE INTO info_geral (chave, valor) VALUES (?, ?)",
(chave, valor),
commit=True
)
def obter_info_geral(self, chave: str) -> Optional[str]:
result = self._execute_with_retry("SELECT valor FROM info_geral WHERE chave=?", (chave,))
return result[0][0] if result else None