sql_barcelona_agent / main_agent.py
fredcaixeta
go
9b523a6
import asyncio
import os
from typing import List, Dict, Any, Optional
from pathlib import Path
from functools import partial
from dotenv import load_dotenv
from psycopg2 import pool
from psycopg2.extras import RealDictCursor
load_dotenv()
from pydantic_ai import Agent, RunContext
from pydantic_ai.providers.groq import GroqProvider
from pydantic_ai.models.groq import GroqModel
from pydantic_ai.messages import PartDeltaEvent, TextPartDelta
from pydantic_graph import End
from pydantic import BaseModel
# PROMPTS
from prompts import matches_prompt, players_prompt, player_matchid_prompt, graph_agent_prompt
MATCHES_SYSTEM_PROMPT = matches_prompt.MATCHES_SYSTEM_PROMPT
PLAYERS_SYSTEM_PROMPT = players_prompt.PLAYERS_SYSTEM_PROMPT
PLAYER_MATCHID_SYSTEM_PROMPT = player_matchid_prompt.SYSTEM_PROMPT
GRAPH_AGENT_SYSTEM_PROMPT = graph_agent_prompt.GRAPH_AGENT_SYSTEM_PROMPT # Ajustado
import pandas as pd
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import io
from PIL import Image, ImageDraw, ImageFont
from dataclasses import dataclass
SUPABASE_DB_URI = os.getenv("SUPABASE_DB_URI")
# ========================= SupabaseConnection (migrada do MCP) =========================
class SupabaseConnection:
def __init__(self, dsn: str):
self.pool = pool.ThreadedConnectionPool(minconn=1, maxconn=10, dsn=dsn)
print("✓ Pool Supabase criado")
# Teste conexão
conn = self.pool.getconn()
try:
with conn.cursor() as cur:
cur.execute("SELECT 1")
finally:
self.pool.putconn(conn)
async def execute_query_async(self, query: str, parameters: Optional[tuple] = None):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, partial(self._execute_query_sync, query, parameters))
def _execute_query_sync(self, query: str, parameters: Optional[tuple] = None):
conn = None
try:
conn = self.pool.getconn()
conn.autocommit = True
with conn.cursor(cursor_factory=RealDictCursor) as cur:
if parameters:
cur.execute(query, parameters)
else:
cur.execute(query)
if cur.description:
return [dict(r) for r in cur.fetchall()]
return []
except Exception as e:
raise Exception(f"Erro query: {str(e)}")
finally:
if conn:
self.pool.putconn(conn)
def close(self):
if hasattr(self, 'pool') and self.pool:
self.pool.closeall()
print("✓ Pool fechado")
db = SupabaseConnection(SUPABASE_DB_URI)
# ========================= Agents (sem MCP) =========================
class Deps(BaseModel):
ai_query: str
api_key = os.getenv("GROQ_DEV_API_KEY")
groq_model = GroqModel("moonshotai/kimi-k2-instruct-0905", provider=GroqProvider(api_key=api_key))
api_key_2 = os.getenv("GROQ_DEV_API_KEY_2")
groq_model_2 = GroqModel("openai/gpt-oss-20b", provider=GroqProvider(api_key=api_key_2))
one_player_agent = Agent(
model=groq_model,
system_prompt=PLAYER_MATCHID_SYSTEM_PROMPT
)
# ========================= TOOLS DIRETAS no one_player_agent =========================
@one_player_agent.tool_plain()
async def execute_sql_query(query: str, limit: int = 100) -> str:
"""Executa query SQL READ-ONLY."""
query_upper = query.upper().strip()
dangerous = ['DELETE', 'DROP', 'INSERT', 'UPDATE', 'ALTER', 'CREATE', 'TRUNCATE']
if any(k in query_upper for k in dangerous):
return "❌ Apenas SELECT permitidas."
try:
if 'LIMIT' not in query_upper:
query += f" LIMIT {limit}"
results = await db.execute_query_async(query)
if not results:
return "✓ Query OK, sem resultados."
lines = [f"📊 {len(results)} registros:\n"]
for i, record in enumerate(results[:10], 1):
items = [f"{k}={v}" for k, v in record.items()]
lines.append(f"{i}. {', '.join(items)}")
if len(results) > 10:
lines.append(f"\n... +{len(results)-10}")
return "\n".join(lines)
except Exception as e:
return f"❌ Erro: {str(e)}"
@one_player_agent.tool_plain()
async def get_player_match_history(player_name: str, limit: int = 10) -> str:
query = """
SELECT match_date, opponent, home_away, minutes_played, goals, assists, shots, xg, pass_completion_pct, player_nickname
FROM player_match_stats WHERE player_nickname ILIKE %s ORDER BY match_date DESC LIMIT %s
"""
try:
results = await db.execute_query_async(query, (f'%{player_name}%', limit))
if not results: return f"❌ Nenhum dado para '{player_name}'"
out = [f"📊 HISTÓRICO - {player_name.upper()}\n"]
for r in results:
xg = f"{(r.get('xg') or 0):.2f}"
pcp = f"{(r.get('pass_completion_pct') or 0):.1f}"
out.append(f"📅 {r['match_date']} vs {r['opponent']} ({r['home_away']}) - {r['minutes_played']}min | ⚽{r['goals']}G 🤝{r['assists']}A | 🎯{r['shots']} (xG:{xg}) | 📈{pcp}%")
return "\n".join(out)
except Exception as e:
return f"❌ Erro: {str(e)}"
@one_player_agent.tool_plain()
async def get_match_performances(match_date: Optional[str] = None, opponent: Optional[str] = None, limit: int = 15) -> str:
if not match_date and not opponent: return "❌ Forneça match_date OU opponent"
where_clauses, params = [], []
if match_date:
where_clauses.append("match_date = %s"); params.append(match_date)
if opponent:
where_clauses.append("opponent ILIKE %s"); params.append(f'%{opponent}%')
where_sql = " AND ".join(where_clauses)
params.append(limit)
query = f"""
SELECT player_nickname, minutes_played, goals, assists, (goals + assists) as contributions, shots, xg, pass_completion_pct, touches
FROM player_match_stats WHERE {where_sql} ORDER BY (goals + assists) DESC, xg DESC LIMIT %s
"""
try:
results = await db.execute_query_async(query, tuple(params))
if not results: return "❌ Partida não encontrada"
info = f"{match_date or ''} vs {opponent or ''}".strip()
out = [f"🏟️ PERFORMANCES - {info}\n"]
for i, r in enumerate(results, 1):
xg = f"{(r.get('xg') or 0):.2f}"; pcp = f"{(r.get('pass_completion_pct') or 0):.1f}"
touches = r.get('touches', 0)
out.append(f"{i}. {r['player_nickname']} ({r['minutes_played']}min): ⚽{r['goals']}G + 🤝{r['assists']}A = {r['contributions']} | 🎯{r['shots']} (xG:{xg}) | 📈{pcp}% | 👟{touches}")
return "\n".join(out)
except Exception as e:
return f"❌ Erro: {str(e)}"
@one_player_agent.tool_plain()
async def show_available_tables() -> str:
query = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name
""" # listar tabelas via information_schema [web:61]
try:
results = await db.execute_query_async(query)
if not results:
return "❌ Nenhuma tabela encontrada no schema public."
return "📚 TABELAS:\n" + "\n".join([f"- {r['table_name']}" for r in results])
except Exception as e:
return f"❌ Erro: {str(e)}"
@one_player_agent.tool_plain()
async def get_goal_events(player_scorer: str = None,
assist_from: str = None,
opponent: str = None,
only_crosses: bool = False,
only_high_crosses: bool = False,
limit: int = 50) -> str:
# IMPORTANT: troque player_goals_stats pelo nome REAL da sua tabela
table = "player_goals_stats"
where = ["team = 'Barcelona'"]
params = []
if player_scorer:
where.append("player_scorer_nick ILIKE %s"); params.append(f"%{player_scorer}%")
if assist_from:
where.append("pass_from_nick ILIKE %s"); params.append(f"%{assist_from}%")
if opponent:
where.append("opponent ILIKE %s"); params.append(f"%{opponent}%")
if only_crosses:
where.append("is_cross_pass = TRUE")
if only_high_crosses:
where.append("is_cross_pass_high = TRUE")
params.append(limit)
where_sql = " AND ".join(where)
query = f"""
SELECT match_date, opponent, home_away,
minute, second,
player_scorer_nick, pass_from_nick,
is_cross_pass, is_cross_pass_high,
play_pattern, shot_type, shot_body_part,
xg
FROM {table}
WHERE {where_sql}
ORDER BY match_date DESC, minute DESC, second DESC
LIMIT %s
"""
try:
results = await db.execute_query_async(query, tuple(params))
if not results:
return "❌ Sem gols encontrados para esse filtro."
out = ["⚽ EVENTOS DE GOL\n"]
for i, r in enumerate(results, 1):
out.append(
f"{i}. {r['match_date']} vs {r['opponent']} ({r['home_away']}) "
f"{r.get('minute')}:{r.get('second')} - "
f"{r.get('player_scorer_nick')} (assist: {r.get('pass_from_nick')}) | "
f"cross={r.get('is_cross_pass')} high_cross={r.get('is_cross_pass_high')} | "
f"{r.get('shot_body_part')} | xG={r.get('xg')}"
)
return "\n".join(out)
except Exception as e:
return f"❌ Erro: {str(e)}"
@one_player_agent.tool_plain()
async def get_top_performances(metric: str = "goals", limit: int = 10) -> str:
valid = {"goals": ("goals", "Gols"), "assists": ("assists", "Assistências"), "contributions": ("goals + assists", "Contribuições"), "xg": ("xg", "xG"), "shots": ("shots", "Finalizações")}
if metric not in valid: return f"❌ Métricas: {', '.join(valid)}"
metric_sql, metric_name = valid[metric]
query = f"""
SELECT player_nickname, match_date, opponent, home_away, goals, assists, xg, shots, pass_completion_pct
FROM player_match_stats ORDER BY {metric_sql} DESC LIMIT %s
"""
try:
results = await db.execute_query_async(query, (limit,))
if not results: return "❌ Sem dados"
out = [f"🏆 TOP {metric_name.upper()}\n"]
for i, r in enumerate(results, 1):
xg = f"{(r.get('xg') or 0):.2f}"
out.append(f"{i}. {r['player_nickname']} - {r['match_date']} vs {r['opponent']} ({r['home_away']}): ⚽{r['goals']}G 🤝{r['assists']}A xG:{xg}")
return "\n".join(out)
except Exception as e:
return f"❌ Erro: {str(e)}"
# Manter create_chart (já era tool local)
last_chart_image = None
# ... (código do create_chart igual, global last_chart_image)
# Funções de response (já sem MCP)
async def agent_conventional_response(user_query: str) -> str:
res = await one_player_agent.run(user_prompt=user_query)
#print(res.output)
return res.output
async def stream_agent_response_safe(user_query: str) -> str:
try:
async with one_player_agent.iter(user_query) as agent_run:
async for node in agent_run:
if isinstance(node, End) and agent_run.result:
print(str(agent_run.result.output))
return str(agent_run.result.output)
except Exception as e:
import traceback
traceback.print_exc()
return f"Erro: {str(e)}"
return "Nenhuma resposta."
# Cleanup global
async def shutdown():
db.close()
if __name__ == "__main__":
_ = asyncio.run(agent_conventional_response("quantos sao os gols baseados em cruzamentos?"))
print(_)