MudabbirAI / mcp_servers.py
youssefleb's picture
Update mcp_servers.py
414c707 verified
# mcp_servers.py (FIXED: Added OpenAI & Nebius Support to get_llm_response)
import asyncio
import json
import re
import google.generativeai as genai
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI
from typing import Dict, Optional, Tuple, List, Any
import config
from utils import load_prompt
from personas import PERSONAS_DATA
EVALUATION_PROMPT_TEMPLATE = load_prompt(config.PROMPT_FILES["evaluator"])
# Schema definition
EVALUATION_SCHEMA = {
"type": "OBJECT",
"properties": {
"Novelty": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
"Usefulness_Feasibility": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
"Flexibility": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
"Elaboration": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]},
"Cultural_Appropriateness": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}
},
"required": ["Novelty", "Usefulness_Feasibility", "Flexibility", "Elaboration", "Cultural_Appropriateness"]
}
def extract_json(text: str) -> dict:
try:
clean_text = text.strip()
if "```json" in clean_text:
clean_text = clean_text.split("```json")[1].split("```")[0].strip()
elif "```" in clean_text:
clean_text = clean_text.split("```")[1].split("```")[0].strip()
return json.loads(clean_text)
except (json.JSONDecodeError, IndexError):
try:
match = re.search(r'(\{[\s\S]*\})', text)
if match: return json.loads(match.group(1))
except: pass
raise ValueError(f"Could not extract JSON from response: {text[:100]}...")
class BusinessSolutionEvaluator:
def __init__(self, gemini_client: Optional[genai.GenerativeModel]):
if not gemini_client: raise ValueError("BusinessSolutionEvaluator requires a Google/Gemini client.")
self.gemini_model = gemini_client
async def evaluate(self, problem: str, solution_text: str) -> Tuple[dict, dict]:
print(f"Evaluating solution (live): {solution_text[:50]}...")
base_prompt = EVALUATION_PROMPT_TEMPLATE.format(problem=problem, solution_text=solution_text)
schema_instruction = """
[IMPORTANT SYSTEM INSTRUCTION]
Ignore any previous examples of JSON formatting in this prompt.
You MUST strictly follow the Output Schema provided below.
For EACH of the 5 metrics, you must provide an object with TWO fields: "score" (integer) and "justification" (string).
Do not output a list. Return a single JSON object.
"""
final_prompt = base_prompt + schema_instruction
usage = {"model": "Gemini", "input": 0, "output": 0}
try:
response = await self.gemini_model.generate_content_async(
final_prompt,
generation_config=genai.types.GenerationConfig(
response_mime_type="application/json",
response_schema=EVALUATION_SCHEMA
)
)
if hasattr(response, "usage_metadata"):
usage["input"] = response.usage_metadata.prompt_token_count
usage["output"] = response.usage_metadata.candidates_token_count
v_fitness = extract_json(response.text)
if not isinstance(v_fitness, (dict, list)): raise ValueError(f"Judge returned invalid type: {type(v_fitness)}")
return v_fitness, usage
except Exception as e:
print(f"ERROR: BusinessSolutionEvaluator failed: {e}")
return {"Novelty": {"score": 1, "justification": f"Error: {str(e)}"}}, usage
class AgentCalibrator:
def __init__(self, api_clients: dict, evaluator: BusinessSolutionEvaluator):
self.evaluator = evaluator
self.api_clients = {name: client for name, client in api_clients.items() if client}
self.sponsor_llms = list(self.api_clients.keys())
print(f"AgentCalibrator initialized with enabled clients: {self.sponsor_llms}")
async def calibrate_team(self, problem: str) -> Tuple[Dict[str, Any], List[str], List[Dict[str, Any]], List[Dict[str, Any]]]:
print(f"Running LIVE calibration test for specialist team on {self.sponsor_llms}...")
error_log = []
detailed_results = []
all_usage_stats = []
if not self.sponsor_llms:
raise Exception("AgentCalibrator cannot run: No LLM clients are configured.")
if len(self.sponsor_llms) == 1:
default_llm = self.sponsor_llms[0]
print("Only one LLM available. Skipping calibration.")
plan = {
"Plant": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Plant"], "llm": default_llm},
"Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": default_llm},
"Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": default_llm}
}
return plan, error_log, [], []
roles_to_test = {role: PERSONAS_DATA[key]["description"] for role, key in config.CALIBRATION_CONFIG["roles_to_test"].items()}
test_problem = f"For the business problem '{problem}', generate a single, brief, one-paragraph concept-level solution."
tasks = []
for role, persona in roles_to_test.items():
for llm_name in self.sponsor_llms:
tasks.append(self.run_calibration_test(problem, role, llm_name, persona, test_problem))
results = await asyncio.gather(*tasks)
detailed_results = results
for res in results:
if "usage_gen" in res: all_usage_stats.append(res["usage_gen"])
if "usage_eval" in res: all_usage_stats.append(res["usage_eval"])
best_llms = {}
role_metrics = config.CALIBRATION_CONFIG["role_metrics"]
for role in roles_to_test.keys():
best_score = -1
best_llm = self.sponsor_llms[0]
for res in results:
if res["role"] == role:
if res.get("error"):
error_log.append(f"Calibration failed for {res['llm']} (as {role}): {res['error']}")
continue
metric = role_metrics[role]
raw_score_data = res.get("score", {})
if not isinstance(raw_score_data, (dict, list)): raw_score_data = {}
if isinstance(raw_score_data, list): raw_score_data = raw_score_data[0] if len(raw_score_data) > 0 else {}
metric_data = raw_score_data.get(metric, {})
if not isinstance(metric_data, (dict, list)): metric_data = {}
if isinstance(metric_data, list): metric_data = metric_data[0] if len(metric_data) > 0 else {}
score = metric_data.get("score", 0)
if score > best_score:
best_score = score
best_llm = res["llm"]
best_llms[role] = best_llm
team_plan = {
"Plant": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Plant"], "llm": best_llms["Plant"]},
"Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": best_llms["Implementer"]},
"Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": best_llms["Monitor"]}
}
print(f"Calibration complete (live). Team plan: {team_plan}")
return team_plan, error_log, detailed_results, all_usage_stats
async def run_calibration_test(self, problem, role, llm_name, persona, test_problem):
client = self.api_clients[llm_name]
solution, gen_usage = await get_llm_response(llm_name, client, persona, test_problem)
if "Error generating response" in solution:
return {"role": role, "llm": llm_name, "error": solution, "output": solution, "usage_gen": gen_usage}
score, eval_usage = await self.evaluator.evaluate(problem, solution)
return {
"role": role, "llm": llm_name, "score": score, "output": solution, "usage_gen": gen_usage, "usage_eval": eval_usage
}
# --- UPDATED: Handles OpenAI and Nebius ---
async def get_llm_response(client_name: str, client, system_prompt: str, user_prompt: str) -> Tuple[str, dict]:
"""Returns (text_response, usage_dict)"""
usage = {"model": client_name, "input": 0, "output": 0}
try:
if client_name == "Gemini":
model = client
full_prompt = [{'role': 'user', 'parts': [system_prompt]}, {'role': 'model', 'parts': ["Understood."]}, {'role': 'user', 'parts': [user_prompt]}]
response = await model.generate_content_async(full_prompt)
if hasattr(response, "usage_metadata"):
usage["input"] = response.usage_metadata.prompt_token_count
usage["output"] = response.usage_metadata.candidates_token_count
return response.text, usage
elif client_name == "Anthropic":
response = await client.messages.create(
model=config.MODELS["Anthropic"]["default"], max_tokens=8192, system=system_prompt, messages=[{"role": "user", "content": user_prompt}]
)
if hasattr(response, "usage"):
usage["input"] = response.usage.input_tokens
usage["output"] = response.usage.output_tokens
return response.content[0].text, usage
# --- THIS IS THE PART THAT WAS MISSING OR INCOMPLETE ---
elif client_name in ["SambaNova", "OpenAI", "Nebius"]:
# Dynamically get the correct model ID from config.py
model_id = config.MODELS.get(client_name, {}).get("default", "gpt-4o-mini")
completion = await client.chat.completions.create(
model=model_id,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
if hasattr(completion, "usage"):
usage["input"] = completion.usage.prompt_tokens
usage["output"] = completion.usage.completion_tokens
return completion.choices[0].message.content, usage
except Exception as e:
error_message = f"Error generating response from {client_name}: {str(e)}"
print(f"ERROR: API call to {client_name} failed: {e}")
return error_message, usage