akira / modules /database.py
akra35567's picture
Update modules/database.py
280d1af
raw
history blame
13.9 kB
"""
Banco de dados SQLite para Akira IA.
Gerencia contexto, mensagens, embeddings, gírias, tom e aprendizados detalhados.
Versão 11/2025 - compatível com treinamento.py (SentenceTransformers)
"""
import sqlite3
import time
import os
from typing import Optional, List, Dict, Any, Tuple
from loguru import logger
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
self.max_retries = 5
self.retry_delay = 0.1
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self._init_db()
self._ensure_all_columns_and_indexes()
# ================================================================
# CONEXÃO + RETRY
# ================================================================
def _get_connection(self) -> sqlite3.Connection:
for attempt in range(self.max_retries):
try:
conn = sqlite3.connect(self.db_path, timeout=30.0, check_same_thread=False)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA cache_size=1000")
conn.execute("PRAGMA temp_store=MEMORY")
conn.execute("PRAGMA busy_timeout=30000")
conn.execute("PRAGMA foreign_keys=ON")
return conn
except sqlite3.OperationalError as e:
if "locked" in str(e) and attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (2 ** attempt))
continue
logger.error(f"Erro de conexão DB: {e}")
raise
raise sqlite3.OperationalError("Falha ao conectar ao banco após várias tentativas")
def _execute_with_retry(self, query: str, params: Optional[tuple] = None, commit: bool = False):
for attempt in range(self.max_retries):
try:
with self._get_connection() as conn:
cur = conn.cursor()
cur.execute(query, params or ())
result = cur.fetchall() if query.strip().upper().startswith("SELECT") else None
if commit:
conn.commit()
return result
except sqlite3.OperationalError as e:
if "locked" in str(e) and attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (2 ** attempt))
continue
logger.error(f"Erro SQL: {e}")
raise
raise sqlite3.OperationalError("Query falhou após retries")
# ================================================================
# SCHEMA + MIGRAÇÃO
# ================================================================
def _init_db(self):
try:
with self._get_connection() as conn:
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS mensagens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
usuario TEXT,
mensagem TEXT,
resposta TEXT,
numero TEXT,
is_reply BOOLEAN DEFAULT 0,
mensagem_original TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS embeddings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT,
source_type TEXT,
texto TEXT,
embedding BLOB
);
CREATE TABLE IF NOT EXISTS aprendizados (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT,
chave TEXT,
valor TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS girias_aprendidas (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT,
giria TEXT,
significado TEXT,
contexto TEXT,
frequencia INTEGER DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS tom_usuario (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numero_usuario TEXT,
tom_detectado TEXT,
intensidade REAL DEFAULT 0.5,
contexto TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS contexto (
user_key TEXT PRIMARY KEY,
historico TEXT,
emocao_atual TEXT,
termos TEXT,
girias TEXT,
tom TEXT
);
CREATE TABLE IF NOT EXISTS pronomes_por_tom (
tom TEXT PRIMARY KEY,
pronomes TEXT
);
""");
# Dados padrão
c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", ('neutro', 'tu/você'))
c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", ('formal', 'o senhor/a senhora'))
c.execute("INSERT OR IGNORE INTO pronomes_por_tom (tom, pronomes) VALUES (?, ?)", ('informal', 'puto/kota'))
conn.commit()
logger.info(f"Banco inicializado: {self.db_path}")
except Exception as e:
logger.error(f"Erro ao criar tabelas: {e}")
raise
def _ensure_all_columns_and_indexes(self):
try:
with self._get_connection() as conn:
c = conn.cursor()
migrations = {
'embeddings': [
("numero_usuario", "TEXT"),
("source_type", "TEXT")
],
'mensagens': [
("numero", "TEXT"),
("is_reply", "BOOLEAN DEFAULT 0"),
("mensagem_original", "TEXT"),
("created_at", "DATETIME DEFAULT CURRENT_TIMESTAMP")
],
'tom_usuario': [
("intensidade", "REAL DEFAULT 0.5"),
("contexto", "TEXT")
],
'contexto': [
("historico", "TEXT"),
("emocao_atual", "TEXT"),
("termos", "TEXT"),
("girias", "TEXT"),
("tom", "TEXT")
]
}
for table, cols in migrations.items():
c.execute(f"PRAGMA table_info('{table}')")
existing = {row[1] for row in c.fetchall()}
for col_name, col_def in cols:
if col_name not in existing:
try:
c.execute(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_def}")
logger.info(f"Coluna '{col_name}' adicionada em '{table}'")
except Exception as e:
logger.warning(f"Erro ao adicionar coluna {col_name}: {e}")
conn.commit()
except Exception as e:
logger.error(f"Erro na migração: {e}")
# ================================================================
# FUNÇÕES PRINCIPAIS
# ================================================================
def salvar_mensagem(self, usuario, mensagem, resposta, numero=None, is_reply=False, mensagem_original=None):
try:
cols = ['usuario', 'mensagem', 'resposta']
vals = [usuario, mensagem, resposta]
if numero:
cols.append('numero')
vals.append(numero)
if is_reply is not None:
cols.append('is_reply')
vals.append(int(is_reply))
if mensagem_original:
cols.append('mensagem_original')
vals.append(mensagem_original)
placeholders = ', '.join(['?' for _ in cols])
query = f"INSERT INTO mensagens ({', '.join(cols)}) VALUES ({placeholders})"
self._execute_with_retry(query, tuple(vals), commit=True)
except Exception as e:
logger.warning(f"Erro salvar_mensagem: {e}")
self._execute_with_retry(
"INSERT INTO mensagens (usuario, mensagem, resposta) VALUES (?, ?, ?)",
(usuario, mensagem, resposta),
commit=True
)
def recuperar_mensagens(self, usuario: str, limite: int = 5):
return self._execute_with_retry(
"SELECT mensagem, resposta FROM mensagens WHERE usuario=? OR numero=? ORDER BY id DESC LIMIT ?",
(usuario, usuario, limite)
) or []
def salvar_embedding(self, numero_usuario: str, source_type: str, texto: str, embedding):
"""Compatível com paraphrase-MiniLM e numpy arrays."""
try:
if hasattr(embedding, "tobytes"):
embedding = embedding.tobytes()
self._execute_with_retry(
"INSERT INTO embeddings (numero_usuario, source_type, texto, embedding) VALUES (?, ?, ?, ?)",
(numero_usuario, source_type, texto, embedding),
commit=True
)
except Exception as e:
logger.warning(f"Erro salvar_embedding: {e}")
self._execute_with_retry(
"INSERT INTO embeddings (texto, embedding) VALUES (?, ?)",
(texto, embedding.tobytes() if hasattr(embedding, "tobytes") else embedding),
commit=True
)
# ================================================================
# CONTEXTO / TOM / GÍRIAS / APRENDIZADOS
# ================================================================
def salvar_contexto(self, user_key, historico, emocao_atual, termos, girias, tom):
self._execute_with_retry(
"""INSERT OR REPLACE INTO contexto
(user_key, historico, emocao_atual, termos, girias, tom)
VALUES (?, ?, ?, ?, ?, ?)""",
(user_key, historico, emocao_atual, termos, girias, tom),
commit=True
)
def registrar_tom_usuario(self, numero_usuario, tom_detectado, intensidade=0.5, contexto=None):
self._execute_with_retry(
"INSERT INTO tom_usuario (numero_usuario, tom_detectado, intensidade, contexto) VALUES (?, ?, ?, ?)",
(numero_usuario, tom_detectado, intensidade, contexto),
commit=True
)
def obter_tom_predominante(self, numero_usuario):
rows = self._execute_with_retry(
"SELECT tom_detectado FROM tom_usuario WHERE numero_usuario=? ORDER BY created_at DESC LIMIT 1",
(numero_usuario,)
)
return rows[0][0] if rows else None
def salvar_aprendizado_detalhado(self, numero_usuario, chave, valor):
self._execute_with_retry(
"INSERT INTO aprendizados (numero_usuario, chave, valor) VALUES (?, ?, ?)",
(numero_usuario, chave, valor),
commit=True
)
def recuperar_aprendizado_detalhado(self, numero_usuario, *args):
rows = self._execute_with_retry(
"SELECT chave, valor FROM aprendizados WHERE numero_usuario=?",
(numero_usuario,)
)
return {r[0]: r[1] for r in rows} if rows else {}
def salvar_giria_aprendida(self, numero_usuario, giria, significado, contexto=None):
existing = self._execute_with_retry(
"SELECT id FROM girias_aprendidas WHERE numero_usuario=? AND giria=?",
(numero_usuario, giria)
)
if existing:
self._execute_with_retry(
"UPDATE girias_aprendidas SET frequencia=frequencia+1, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(existing[0][0],),
commit=True
)
else:
self._execute_with_retry(
"INSERT INTO girias_aprendidas (numero_usuario, giria, significado, contexto) VALUES (?, ?, ?, ?)",
(numero_usuario, giria, significado, contexto),
commit=True
)
# ================================================================
# MÉTODO ADICIONADO: recuperar_girias_usuario
# ================================================================
def recuperar_girias_usuario(self, numero_usuario: str) -> List[Dict[str, Any]]:
"""
Recupera todas as gírias aprendidas para um usuário.
Retorna: [{'giria': 'baza', 'significado': 'ir embora', 'frequencia': 5}, ...]
"""
try:
rows = self._execute_with_retry(
"""
SELECT giria, significado, frequencia, contexto
FROM girias_aprendidas
WHERE numero_usuario = ?
ORDER BY frequencia DESC, updated_at DESC
""",
(numero_usuario,)
)
return [
{
'giria': row[0],
'significado': row[1],
'frequencia': row[2],
'contexto': row[3]
} for row in rows
] if rows else []
except Exception as e:
logger.warning(f"Falha ao carregar gírias do usuário {numero_usuario}: {e}")
return []