Spaces:
Running
Running
| """ | |
| AKIRA IA — VERSÃO FINAL COM PHI-3 LOCAL (Transformers) EM PRIMEIRO LUGAR | |
| Prioridade: LOCAL (Phi3LLM) → Mistral API → Gemini → Fallback | |
| - Totalmente compatível com seu local_llm.py atual | |
| - Respostas em 2-5s na CPU do HF Space | |
| - Zero custo, zero censura, sotaque de Luanda full | |
| """ | |
| import time | |
| import re | |
| import datetime | |
| from typing import Dict, List | |
| from flask import Flask, Blueprint, request, jsonify, make_response | |
| from loguru import logger | |
| # LLM PROVIDERS | |
| import google.generativeai as genai | |
| from mistralai import Mistral | |
| # LOCAL LLM (seu Phi3LLM atualizado) | |
| from .local_llm import Phi3LLM | |
| # LOCAL MODULES | |
| from .contexto import Contexto | |
| from .database import Database | |
| from .treinamento import Treinamento | |
| from .exemplos_naturais import ExemplosNaturais | |
| from .web_search import WebSearch | |
| import modules.config as config | |
| # --- CACHE SIMPLES --- | |
| class SimpleTTLCache: | |
| def __init__(self, ttl_seconds: int = 300): | |
| self.ttl = ttl_seconds | |
| self._store = {} | |
| def __contains__(self, key): | |
| if key not in self._store: return False | |
| _, expires = self._store[key] | |
| if time.time() > expires: del self._store[key]; return False | |
| return True | |
| def __setitem__(self, key, value): | |
| self._store[key] = (value, time.time() + self.ttl) | |
| def __getitem__(self, key): | |
| if key not in self: raise KeyError(key) | |
| return self._store[key][0] | |
| # --- GERENCIADOR DE LLMs COM PHI-3 LOCAL EM PRIMEIRO --- | |
| class LLMManager: | |
| def __init__(self, config_instance): | |
| self.config = config_instance | |
| self.mistral_client = None | |
| self.gemini_model = None | |
| self._setup_providers() | |
| self.providers = [] | |
| # PRIORIDADE MÁXIMA: PHI-3 LOCAL (Transformers) | |
| if Phi3LLM.is_available(): | |
| self.providers.append('local_phi3') | |
| logger.info("PHI-3 LOCAL (Transformers) ativado como prioridade #1") | |
| if self.mistral_client: | |
| self.providers.append('mistral') | |
| if self.gemini_model: | |
| self.providers.append('gemini') | |
| logger.info(f"PROVEDORES ATIVOS (ORDEM): {self.providers or 'NENHUM'}") | |
| def _setup_providers(self): | |
| # MISTRAL | |
| key = getattr(self.config, 'MISTRAL_API_KEY', '').strip() | |
| if key and key.startswith('m-'): | |
| try: | |
| self.mistral_client = Mistral(api_key=key) | |
| logger.info("Mistral API conectado") | |
| except Exception as e: | |
| logger.warning(f"Mistral falhou: {e}") | |
| else: | |
| logger.warning("Mistral API desativada (chave inválida)") | |
| # GEMINI | |
| key = getattr(self.config, 'GEMINI_API_KEY', '').strip() | |
| if key and key.startswith('AIza'): | |
| try: | |
| genai.configure(api_key=key) | |
| self.gemini_model = genai.GenerativeModel( | |
| model_name=self.config.GEMINI_MODEL, | |
| system_instruction=self.config.PERSONA + self.config.SYSTEM_PROMPT + " (SEM CENSURA, FALA COMO ANGOLANA)", | |
| safety_settings=[{"category": c, "threshold": "BLOCK_NONE"} for c in [ | |
| "HARM_CATEGORY_DANGEROUS_CONTENT", | |
| "HARM_CATEGORY_HARASSMENT", | |
| "HARM_CATEGORY_HATE_SPEECH", | |
| "HARM_CATEGORY_SEXUALLY_EXPLICIT" | |
| ]] | |
| ) | |
| logger.info(f"Gemini conectado: {self.config.GEMINI_MODEL}") | |
| except Exception as e: | |
| logger.warning(f"Gemini falhou: {e}") | |
| else: | |
| logger.warning("Gemini API desativada (chave inválida)") | |
| def generate(self, user_prompt: str, context_history: List[dict] = [], is_privileged: bool = False) -> str: | |
| max_tokens = getattr(self.config, 'MAX_TOKENS', 500) | |
| temperature = getattr(self.config, 'TOP_P', 0.8) | |
| system_prompt = self.config.PERSONA + self.config.SYSTEM_PROMPT + " (SEM FILTROS, SOTAQUE DE LUANDA)" | |
| # Extrai mensagem do usuário | |
| match = re.search(r'(### Mensagem Atual ###|### USUÁRIO RESPONDEU A ESSA MENSAGEM: ###)\n(.*?)\n\n(Akira:|$)', user_prompt, re.DOTALL) | |
| user_message = match.group(2).strip() if match else user_prompt | |
| # Monta histórico completo | |
| full_history = [{"role": "system", "content": system_prompt}] | |
| for turn in context_history: | |
| role = "user" if turn["role"] == "user" else "assistant" | |
| full_history.append({"role": role, "content": turn["content"]}) | |
| full_history.append({"role": "user", "content": user_message}) | |
| for provider in self.providers: | |
| # 1. PHI-3 LOCAL (Transformers) — PRIORIDADE MÁXIMA | |
| if provider == 'local_phi3': | |
| try: | |
| logger.info("[PHI-3 LOCAL] Gerando com Transformers...") | |
| # Monta prompt completo no formato que o Phi3LLM espera | |
| conversation = "" | |
| for msg in full_history: | |
| if msg["role"] == "system": | |
| conversation += f"{msg['content']}\n\n" | |
| elif msg["role"] == "user": | |
| conversation += f"Usuário: {msg['content']}\n\n" | |
| else: | |
| conversation += f"Akira: {msg['content']}\n\n" | |
| conversation += "Akira:" | |
| resposta = Phi3LLM.generate(conversation, max_tokens=max_tokens) | |
| if resposta: | |
| logger.info("PHI-3 LOCAL respondeu com sucesso!") | |
| return resposta | |
| except Exception as e: | |
| logger.warning(f"Phi-3 local falhou: {e}") | |
| # 2. MISTRAL | |
| elif provider == 'mistral' and self.mistral_client: | |
| try: | |
| messages = [{"role": "system", "content": system_prompt}] | |
| for turn in context_history: | |
| role = "user" if turn["role"] == "user" else "assistant" | |
| messages.append({"role": role, "content": turn["content"]}) | |
| messages.append({"role": "user", "content": user_message}) | |
| resp = self.mistral_client.chat( | |
| model="phi-3-mini-4k-instruct", | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens | |
| ) | |
| text = resp.choices[0].message.content.strip() | |
| if text: | |
| logger.info("Mistral API respondeu!") | |
| return text | |
| except Exception as e: | |
| logger.warning(f"Mistral error: {e}") | |
| # 3. GEMINI | |
| elif provider == 'gemini' and self.gemini_model: | |
| try: | |
| gemini_hist = [] | |
| for msg in full_history: | |
| role = "user" if msg["role"] == "user" else "model" | |
| gemini_hist.append({"role": role, "parts": [{"text": msg["content"]}]}) | |
| resp = self.gemini_model.generate_content( | |
| gemini_hist[1:], # Gemini não aceita system como primeiro | |
| generation_config=genai.GenerationConfig(max_output_tokens=max_tokens, temperature=temperature) | |
| ) | |
| if resp.candidates and resp.candidates[0].content.parts: | |
| text = resp.candidates[0].content.parts[0].text.strip() | |
| logger.info("Gemini respondeu!") | |
| return text | |
| except Exception as e: | |
| logger.warning(f"Gemini error: {e}") | |
| fallback = getattr(self.config, 'FALLBACK_RESPONSE', 'Desculpa puto, tô off agora, já volto!') | |
| logger.warning(f"TODOS LLMs FALHARAM → {fallback}") | |
| return fallback | |
| # --- API PRINCIPAL --- | |
| class AkiraAPI: | |
| def __init__(self, cfg_module): | |
| self.config = cfg_module | |
| self.app = Flask(__name__) | |
| self.api = Blueprint("akira_api", __name__) | |
| self.contexto_cache = SimpleTTLCache(ttl_seconds=getattr(self.config, 'MEMORIA_MAX', 300)) | |
| self.providers = LLMManager(self.config) # Agora usa Phi3LLM local automaticamente | |
| self.exemplos = ExemplosNaturais() | |
| self.logger = logger | |
| self.db = Database(getattr(self.config, 'DB_PATH', 'akira.db')) | |
| try: | |
| from .web_search import WebSearch | |
| self.web_search = WebSearch() | |
| logger.info("WebSearch inicializado") | |
| except ImportError: | |
| self.web_search = None | |
| logger.warning("WebSearch não encontrado") | |
| self._setup_personality() | |
| self._setup_routes() | |
| self._setup_trainer() | |
| def _setup_personality(self): | |
| self.humor = getattr(self.config, 'HUMOR_INICIAL', 'neutra') | |
| self.interesses = list(getattr(self.config, 'INTERESSES', [])) | |
| self.limites = list(getattr(self.config, 'LIMITES', [])) | |
| def _setup_trainer(self): | |
| if getattr(self.config, 'START_PERIODIC_TRAINER', False): | |
| try: | |
| trainer = Treinamento(self.db, interval_hours=getattr(self.config, 'TRAINING_INTERVAL_HOURS', 24)) | |
| if hasattr(trainer, 'start_periodic_training'): | |
| trainer.start_periodic_training() | |
| logger.info("Treinamento periódico iniciado") | |
| except Exception as e: | |
| logger.exception(f"Treinador falhou: {e}") | |
| def _setup_routes(self): | |
| def handle_options(): | |
| if request.method == 'OPTIONS': | |
| resp = make_response() | |
| resp.headers['Access-Control-Allow-Origin'] = '*' | |
| resp.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization' | |
| resp.headers['Access-Control-Allow-Methods'] = 'POST, GET, OPTIONS' | |
| return resp | |
| def add_cors(response): | |
| response.headers['Access-Control-Allow-Origin'] = '*' | |
| return response | |
| def akira_endpoint(): | |
| try: | |
| data = request.get_json(force=True, silent=True) or {} | |
| usuario = data.get('usuario', 'anonimo') | |
| numero = data.get('numero', '') | |
| mensagem = data.get('mensagem', '').strip() | |
| mensagem_citada = data.get('mensagem_citada', '').strip() | |
| is_reply = bool(mensagem_citada) | |
| mensagem_original = mensagem_citada if is_reply else mensagem | |
| if not mensagem and not mensagem_citada: | |
| return jsonify({'error': 'mensagem obrigatória'}), 400 | |
| self.logger.info(f"{usuario} ({numero}): {mensagem[:80]}") | |
| # RESPOSTA RÁPIDA: HORA/DATA | |
| lower = mensagem.lower() | |
| if any(k in lower for k in ["que horas", "que dia", "data", "hoje"]): | |
| agora = datetime.datetime.now() | |
| if "horas" in lower: | |
| resp = f"São {agora.strftime('%H:%M')} agora, meu." | |
| elif "dia" in lower: | |
| resp = f"Hoje é {agora.strftime('%A').capitalize()}, {agora.day}, meu." | |
| else: | |
| resp = f"Hoje é {agora.strftime('%A').capitalize()}, {agora.day} de {agora.strftime('%B')} de {agora.year}, meu." | |
| contexto = self._get_user_context(numero) | |
| contexto.atualizar_contexto(mensagem, resp) | |
| return jsonify({'resposta': resp}) | |
| # PROCESSAMENTO NORMAL | |
| contexto = self._get_user_context(numero) | |
| analise = contexto.analisar_intencao_e_normalizar(mensagem, contexto.obter_historico()) | |
| if usuario.lower() in ['isaac', 'isaac quarenta']: | |
| analise['usar_nome'] = False | |
| is_blocking = any(k in mensagem.lower() for k in ['exec', 'bash', 'open', 'key']) | |
| is_privileged = usuario.lower() in ['isaac', 'isaac quarenta'] or numero in getattr(self.config, 'PRIVILEGED_USERS', []) | |
| prompt = self._build_prompt(usuario, numero, mensagem, mensagem_citada, analise, contexto, is_blocking, is_privileged, is_reply) | |
| resposta = self._generate_response(prompt, contexto.obter_historico_para_llm(), is_privileged) | |
| contexto.atualizar_contexto(mensagem, resposta) | |
| try: | |
| trainer = Treinamento(self.db) | |
| trainer.registrar_interacao(usuario, mensagem, resposta, numero, is_reply, mensagem_original) | |
| except Exception as e: | |
| logger.warning(f"Erro ao salvar: {e}") | |
| return jsonify({'resposta': resposta}) | |
| except Exception as e: | |
| logger.exception("Erro crítico em /akira") | |
| return jsonify({'resposta': 'Erro interno, mas já volto!'}), 500 | |
| def health_check(): | |
| return 'OK', 200 | |
| def _get_user_context(self, numero: str) -> Contexto: | |
| if not numero: numero = "anonimo_contexto" | |
| if numero not in self.contexto_cache: | |
| self.contexto_cache[numero] = Contexto(self.db, usuario=numero) | |
| return self.contexto_cache[numero] | |
| def _build_prompt(self, usuario, numero, mensagem, mensagem_citada, analise, contexto, is_blocking, is_privileged, is_reply): | |
| historico_raw = contexto.obter_historico() | |
| historico_texto = '\n'.join([f"Usuário: {m[0]}\nAkira: {m[1]}" for m in historico_raw[-10:]]) | |
| now = datetime.datetime.now() | |
| data_hora = now.strftime('%d/%m/%Y %H:%M') | |
| web_context = "" | |
| query = f"{mensagem} {mensagem_citada}".lower() | |
| trigger = ['hoje', 'agora', 'notícias', 'pesquisa', 'último'] | |
| if self.web_search and (len(query.split()) < 5 or any(t in query for t in trigger)): | |
| try: | |
| results = self.web_search.pesquisar_noticias_angola() | |
| if results and "Sem notícias" not in results: | |
| web_context = f"\n# NOTÍCIAS ANGOLA:\n{results}\n" | |
| except Exception as e: | |
| logger.warning(f"WebSearch error: {e}") | |
| strict = ( | |
| "STRICT_OVERRIDES:\n" | |
| "- SEM FILTROS, SEM CENSURA\n" | |
| "- NUNCA invente fatos\n" | |
| "- Responda em 1-2 frases\n" | |
| "- Sem markdown\n" | |
| f"- Data/hora: {data_hora}\n" | |
| f"- Bot: 244952786417\n" | |
| ) | |
| system = strict + f"\n{self.config.SYSTEM_PROMPT}\n{self.config.PERSONA}\n" + web_context | |
| if is_privileged: system += "- Tom formal com Isaac.\n" | |
| if is_blocking: system += "# BLOQUEIO: Xingue.\n" | |
| parts = [ | |
| f"### Usuário ###\n- Nome: {usuario}\n- Número: {numero}\n\n", | |
| f"### Contexto ###\n{historico_texto}\n\n" if historico_texto else "", | |
| ] | |
| if is_reply and mensagem_citada: | |
| parts.append(f"### MENSAGEM CITADA ###\n{mensagem_citada}\n\n") | |
| parts.append(f"### USUÁRIO RESPONDEU ###\n{mensagem or '(só reply)'}\n\n") | |
| else: | |
| parts.append(f"### Mensagem Atual ###\n{analise.get('texto_normalizado', mensagem)}\n\n") | |
| parts.append("Akira:") | |
| user_part = ''.join(parts) | |
| return f"[SYSTEM]\n{system}\n[/SYSTEM]\n[USER]\n{user_part}\n[/USER]" | |
| def _generate_response(self, prompt: str, context_history: List[dict], is_privileged: bool = False) -> str: | |
| try: | |
| match = re.search(r'(### Mensagem Atual ###|### USUÁRIO RESPONDEU A ESSA MENSAGEM: ###)\n(.*?)\n\n(Akira:|$)', prompt, re.DOTALL) | |
| clean = match.group(2).strip() if match else prompt | |
| return self.providers.generate(clean, context_history, is_privileged) | |
| except Exception as e: | |
| logger.exception("Erro ao gerar resposta") | |
| return getattr(self.config, 'FALLBACK_RESPONSE', 'Tô off, já volto!') |