Spaces:
Running
Running
| # mcp_servers.py (FIXED: Added OpenAI & Nebius Support to get_llm_response) | |
| import asyncio | |
| import json | |
| import re | |
| import google.generativeai as genai | |
| from anthropic import AsyncAnthropic | |
| from openai import AsyncOpenAI | |
| from typing import Dict, Optional, Tuple, List, Any | |
| import config | |
| from utils import load_prompt | |
| from personas import PERSONAS_DATA | |
| EVALUATION_PROMPT_TEMPLATE = load_prompt(config.PROMPT_FILES["evaluator"]) | |
| # Schema definition | |
| EVALUATION_SCHEMA = { | |
| "type": "OBJECT", | |
| "properties": { | |
| "Novelty": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}, | |
| "Usefulness_Feasibility": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}, | |
| "Flexibility": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}, | |
| "Elaboration": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}, | |
| "Cultural_Appropriateness": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]} | |
| }, | |
| "required": ["Novelty", "Usefulness_Feasibility", "Flexibility", "Elaboration", "Cultural_Appropriateness"] | |
| } | |
| def extract_json(text: str) -> dict: | |
| try: | |
| clean_text = text.strip() | |
| if "```json" in clean_text: | |
| clean_text = clean_text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in clean_text: | |
| clean_text = clean_text.split("```")[1].split("```")[0].strip() | |
| return json.loads(clean_text) | |
| except (json.JSONDecodeError, IndexError): | |
| try: | |
| match = re.search(r'(\{[\s\S]*\})', text) | |
| if match: return json.loads(match.group(1)) | |
| except: pass | |
| raise ValueError(f"Could not extract JSON from response: {text[:100]}...") | |
| class BusinessSolutionEvaluator: | |
| def __init__(self, gemini_client: Optional[genai.GenerativeModel]): | |
| if not gemini_client: raise ValueError("BusinessSolutionEvaluator requires a Google/Gemini client.") | |
| self.gemini_model = gemini_client | |
| async def evaluate(self, problem: str, solution_text: str) -> Tuple[dict, dict]: | |
| print(f"Evaluating solution (live): {solution_text[:50]}...") | |
| base_prompt = EVALUATION_PROMPT_TEMPLATE.format(problem=problem, solution_text=solution_text) | |
| schema_instruction = """ | |
| [IMPORTANT SYSTEM INSTRUCTION] | |
| Ignore any previous examples of JSON formatting in this prompt. | |
| You MUST strictly follow the Output Schema provided below. | |
| For EACH of the 5 metrics, you must provide an object with TWO fields: "score" (integer) and "justification" (string). | |
| Do not output a list. Return a single JSON object. | |
| """ | |
| final_prompt = base_prompt + schema_instruction | |
| usage = {"model": "Gemini", "input": 0, "output": 0} | |
| try: | |
| response = await self.gemini_model.generate_content_async( | |
| final_prompt, | |
| generation_config=genai.types.GenerationConfig( | |
| response_mime_type="application/json", | |
| response_schema=EVALUATION_SCHEMA | |
| ) | |
| ) | |
| if hasattr(response, "usage_metadata"): | |
| usage["input"] = response.usage_metadata.prompt_token_count | |
| usage["output"] = response.usage_metadata.candidates_token_count | |
| v_fitness = extract_json(response.text) | |
| if not isinstance(v_fitness, (dict, list)): raise ValueError(f"Judge returned invalid type: {type(v_fitness)}") | |
| return v_fitness, usage | |
| except Exception as e: | |
| print(f"ERROR: BusinessSolutionEvaluator failed: {e}") | |
| return {"Novelty": {"score": 1, "justification": f"Error: {str(e)}"}}, usage | |
| class AgentCalibrator: | |
| def __init__(self, api_clients: dict, evaluator: BusinessSolutionEvaluator): | |
| self.evaluator = evaluator | |
| self.api_clients = {name: client for name, client in api_clients.items() if client} | |
| self.sponsor_llms = list(self.api_clients.keys()) | |
| print(f"AgentCalibrator initialized with enabled clients: {self.sponsor_llms}") | |
| async def calibrate_team(self, problem: str) -> Tuple[Dict[str, Any], List[str], List[Dict[str, Any]], List[Dict[str, Any]]]: | |
| print(f"Running LIVE calibration test for specialist team on {self.sponsor_llms}...") | |
| error_log = [] | |
| detailed_results = [] | |
| all_usage_stats = [] | |
| if not self.sponsor_llms: | |
| raise Exception("AgentCalibrator cannot run: No LLM clients are configured.") | |
| if len(self.sponsor_llms) == 1: | |
| default_llm = self.sponsor_llms[0] | |
| print("Only one LLM available. Skipping calibration.") | |
| plan = { | |
| "Plant": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Plant"], "llm": default_llm}, | |
| "Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": default_llm}, | |
| "Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": default_llm} | |
| } | |
| return plan, error_log, [], [] | |
| roles_to_test = {role: PERSONAS_DATA[key]["description"] for role, key in config.CALIBRATION_CONFIG["roles_to_test"].items()} | |
| test_problem = f"For the business problem '{problem}', generate a single, brief, one-paragraph concept-level solution." | |
| tasks = [] | |
| for role, persona in roles_to_test.items(): | |
| for llm_name in self.sponsor_llms: | |
| tasks.append(self.run_calibration_test(problem, role, llm_name, persona, test_problem)) | |
| results = await asyncio.gather(*tasks) | |
| detailed_results = results | |
| for res in results: | |
| if "usage_gen" in res: all_usage_stats.append(res["usage_gen"]) | |
| if "usage_eval" in res: all_usage_stats.append(res["usage_eval"]) | |
| best_llms = {} | |
| role_metrics = config.CALIBRATION_CONFIG["role_metrics"] | |
| for role in roles_to_test.keys(): | |
| best_score = -1 | |
| best_llm = self.sponsor_llms[0] | |
| for res in results: | |
| if res["role"] == role: | |
| if res.get("error"): | |
| error_log.append(f"Calibration failed for {res['llm']} (as {role}): {res['error']}") | |
| continue | |
| metric = role_metrics[role] | |
| raw_score_data = res.get("score", {}) | |
| if not isinstance(raw_score_data, (dict, list)): raw_score_data = {} | |
| if isinstance(raw_score_data, list): raw_score_data = raw_score_data[0] if len(raw_score_data) > 0 else {} | |
| metric_data = raw_score_data.get(metric, {}) | |
| if not isinstance(metric_data, (dict, list)): metric_data = {} | |
| if isinstance(metric_data, list): metric_data = metric_data[0] if len(metric_data) > 0 else {} | |
| score = metric_data.get("score", 0) | |
| if score > best_score: | |
| best_score = score | |
| best_llm = res["llm"] | |
| best_llms[role] = best_llm | |
| team_plan = { | |
| "Plant": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Plant"], "llm": best_llms["Plant"]}, | |
| "Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": best_llms["Implementer"]}, | |
| "Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": best_llms["Monitor"]} | |
| } | |
| print(f"Calibration complete (live). Team plan: {team_plan}") | |
| return team_plan, error_log, detailed_results, all_usage_stats | |
| async def run_calibration_test(self, problem, role, llm_name, persona, test_problem): | |
| client = self.api_clients[llm_name] | |
| solution, gen_usage = await get_llm_response(llm_name, client, persona, test_problem) | |
| if "Error generating response" in solution: | |
| return {"role": role, "llm": llm_name, "error": solution, "output": solution, "usage_gen": gen_usage} | |
| score, eval_usage = await self.evaluator.evaluate(problem, solution) | |
| return { | |
| "role": role, "llm": llm_name, "score": score, "output": solution, "usage_gen": gen_usage, "usage_eval": eval_usage | |
| } | |
| # --- UPDATED: Handles OpenAI and Nebius --- | |
| async def get_llm_response(client_name: str, client, system_prompt: str, user_prompt: str) -> Tuple[str, dict]: | |
| """Returns (text_response, usage_dict)""" | |
| usage = {"model": client_name, "input": 0, "output": 0} | |
| try: | |
| if client_name == "Gemini": | |
| model = client | |
| full_prompt = [{'role': 'user', 'parts': [system_prompt]}, {'role': 'model', 'parts': ["Understood."]}, {'role': 'user', 'parts': [user_prompt]}] | |
| response = await model.generate_content_async(full_prompt) | |
| if hasattr(response, "usage_metadata"): | |
| usage["input"] = response.usage_metadata.prompt_token_count | |
| usage["output"] = response.usage_metadata.candidates_token_count | |
| return response.text, usage | |
| elif client_name == "Anthropic": | |
| response = await client.messages.create( | |
| model=config.MODELS["Anthropic"]["default"], max_tokens=8192, system=system_prompt, messages=[{"role": "user", "content": user_prompt}] | |
| ) | |
| if hasattr(response, "usage"): | |
| usage["input"] = response.usage.input_tokens | |
| usage["output"] = response.usage.output_tokens | |
| return response.content[0].text, usage | |
| # --- THIS IS THE PART THAT WAS MISSING OR INCOMPLETE --- | |
| elif client_name in ["SambaNova", "OpenAI", "Nebius"]: | |
| # Dynamically get the correct model ID from config.py | |
| model_id = config.MODELS.get(client_name, {}).get("default", "gpt-4o-mini") | |
| completion = await client.chat.completions.create( | |
| model=model_id, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| ) | |
| if hasattr(completion, "usage"): | |
| usage["input"] = completion.usage.prompt_tokens | |
| usage["output"] = completion.usage.completion_tokens | |
| return completion.choices[0].message.content, usage | |
| except Exception as e: | |
| error_message = f"Error generating response from {client_name}: {str(e)}" | |
| print(f"ERROR: API call to {client_name} failed: {e}") | |
| return error_message, usage |