Spaces:
Running
Running
| # agent_logic.py (Fixed: OpenAI and Nebius Initialization + Robust Unpacking) | |
| import asyncio | |
| from typing import AsyncGenerator, Dict, Optional, List, Tuple | |
| import json | |
| import os | |
| import uuid | |
| import datetime | |
| import google.generativeai as genai | |
| from anthropic import AsyncAnthropic | |
| from openai import AsyncOpenAI | |
| import re | |
| from personas import PERSONAS_DATA | |
| import config | |
| from utils import load_prompt | |
| from mcp_servers import AgentCalibrator, BusinessSolutionEvaluator, get_llm_response | |
| from self_correction import SelfCorrector | |
| CLASSIFIER_SYSTEM_PROMPT = load_prompt(config.PROMPT_FILES["classifier"]) | |
| HOMOGENEOUS_MANAGER_PROMPT = load_prompt(config.PROMPT_FILES["manager_homogeneous"]) | |
| HETEROGENEOUS_MANAGER_PROMPT = load_prompt(config.PROMPT_FILES["manager_heterogeneous"]) | |
| METRIC_MAPPING = { | |
| "novelty": "Novelty", | |
| "usefulness": "Usefulness_Feasibility", | |
| "feasibility": "Usefulness_Feasibility", | |
| "usefulness_feasibility": "Usefulness_Feasibility", | |
| "usefulness/feasibility": "Usefulness_Feasibility", | |
| "flexibility": "Flexibility", | |
| "elaboration": "Elaboration", | |
| "cultural_appropriateness": "Cultural_Appropriateness", | |
| "cultural_sensitivity": "Cultural_Appropriateness", | |
| "cultural appropriateness": "Cultural_Appropriateness", | |
| "cultural appropriateness/sensitivity": "Cultural_Appropriateness" | |
| } | |
| # --- HELPER CLASSES --- | |
| class Baseline_Single_Agent: | |
| def __init__(self, api_clients: dict): | |
| self.gemini_client = api_clients.get("Gemini") | |
| async def solve(self, problem: str, persona_prompt: str) -> Tuple[str, dict]: | |
| if not self.gemini_client: raise ValueError("Single_Agent requires a Google/Gemini client.") | |
| return await get_llm_response("Gemini", self.gemini_client, persona_prompt, problem) | |
| class Baseline_Static_Homogeneous: | |
| def __init__(self, api_clients: dict): | |
| self.api_clients = {name: client for name, client in api_clients.items() if client} | |
| self.gemini_client = api_clients.get("Gemini") | |
| async def solve(self, problem: str, persona_prompt: str) -> Tuple[str, List[dict]]: | |
| if not self.gemini_client: raise ValueError("Homogeneous_Team requires a Google/Gemini client.") | |
| system_prompt = persona_prompt | |
| user_prompt = f"As an expert Implementer, generate a detailed plan for this problem: {problem}" | |
| tasks = [get_llm_response(llm, client, system_prompt, user_prompt) for llm, client in self.api_clients.items()] | |
| results = await asyncio.gather(*tasks) | |
| responses = [r[0] for r in results] | |
| usages = [r[1] for r in results] | |
| manager_system_prompt = HOMOGENEOUS_MANAGER_PROMPT | |
| reports_str = "\n\n".join(f"Report from Team Member {i+1}:\n{resp}" for i, resp in enumerate(responses)) | |
| manager_user_prompt = f"Original Problem: {problem}\n\n{reports_str}\n\nPlease synthesize these reports into one final, comprehensive solution." | |
| final_text, final_usage = await get_llm_response("Gemini", self.gemini_client, manager_system_prompt, manager_user_prompt) | |
| usages.append(final_usage) | |
| return final_text, usages | |
| class Baseline_Static_Heterogeneous: | |
| def __init__(self, api_clients: dict): | |
| self.api_clients = api_clients | |
| self.gemini_client = api_clients.get("Gemini") | |
| async def solve(self, problem: str, team_plan: dict) -> Tuple[str, List[dict]]: | |
| if not self.gemini_client: raise ValueError("Heterogeneous_Team requires a Google/Gemini client.") | |
| tasks = [] | |
| for role, config_data in team_plan.items(): | |
| llm_name = config_data["llm"] | |
| persona_key = config_data["persona"] | |
| client = self.api_clients.get(llm_name) | |
| if not client: | |
| llm_name = "Gemini" | |
| client = self.gemini_client | |
| system_prompt = PERSONAS_DATA[persona_key]["description"] | |
| user_prompt = f"As the team's '{role}', provide your unique perspective on how to solve this problem: {problem}" | |
| tasks.append(get_llm_response(llm_name, client, system_prompt, user_prompt)) | |
| results = await asyncio.gather(*tasks) | |
| responses = [r[0] for r in results] | |
| usages = [r[1] for r in results] | |
| manager_system_prompt = HETEROGENEOUS_MANAGER_PROMPT | |
| reports_str = "\n\n".join(f"Report from {team_plan[role]['llm']} (as {role}):\n{resp}" for (role, resp) in zip(team_plan.keys(), responses)) | |
| manager_user_prompt = f"Original Problem: {problem}\n\n{reports_str}\n\nPlease synthesize these specialist reports into one final, comprehensive solution." | |
| final_text, final_usage = await get_llm_response("Gemini", self.gemini_client, manager_system_prompt, manager_user_prompt) | |
| usages.append(final_usage) | |
| return final_text, usages | |
| # --- MAIN AGENT --- | |
| class StrategicSelectorAgent: | |
| def __init__(self, api_keys: Dict[str, Optional[str]]): | |
| self.api_keys = api_keys | |
| # Initialize potential clients including new providers | |
| self.api_clients = { | |
| "Gemini": None, | |
| "Anthropic": None, | |
| "SambaNova": None, | |
| "OpenAI": None, | |
| "Nebius": None | |
| } | |
| # --- INIT GEMINI --- | |
| if api_keys.get("google") and api_keys["google"].strip(): | |
| try: | |
| genai.configure(api_key=api_keys["google"]) | |
| self.api_clients["Gemini"] = genai.GenerativeModel(config.MODELS["Gemini"]["default"]) | |
| except Exception as e: print(f"Warning: Gemini init failed: {e}") | |
| # --- INIT ANTHROPIC --- | |
| if api_keys.get("anthropic") and api_keys["anthropic"].strip(): | |
| try: | |
| self.api_clients["Anthropic"] = AsyncAnthropic(api_key=api_keys["anthropic"]) | |
| except Exception as e: print(f"Warning: Anthropic init failed: {e}") | |
| # --- INIT SAMBANOVA --- | |
| if api_keys.get("sambanova") and api_keys["sambanova"].strip(): | |
| try: | |
| self.api_clients["SambaNova"] = AsyncOpenAI( | |
| api_key=api_keys["sambanova"], | |
| base_url=os.getenv("SAMBANOVA_BASE_URL", "https://api.sambanova.ai/v1") | |
| ) | |
| except Exception as e: print(f"Warning: SambaNova init failed: {e}") | |
| # --- INIT OPENAI (NEW) --- | |
| if api_keys.get("openai") and api_keys["openai"].strip(): | |
| try: | |
| self.api_clients["OpenAI"] = AsyncOpenAI(api_key=api_keys["openai"]) | |
| except Exception as e: print(f"Warning: OpenAI init failed: {e}") | |
| # --- INIT NEBIUS (NEW) --- | |
| if api_keys.get("nebius") and api_keys["nebius"].strip(): | |
| try: | |
| self.api_clients["Nebius"] = AsyncOpenAI( | |
| api_key=api_keys["nebius"], | |
| base_url="https://api.studio.nebius.ai/v1/" | |
| ) | |
| except Exception as e: print(f"Warning: Nebius init failed: {e}") | |
| if not self.api_clients["Gemini"]: | |
| raise ValueError("Google API Key is required.") | |
| self.evaluator = BusinessSolutionEvaluator(self.api_clients["Gemini"]) | |
| self.calibrator = AgentCalibrator(self.api_clients, self.evaluator) | |
| self.corrector = SelfCorrector(threshold=3.0) | |
| self.single_agent = Baseline_Single_Agent(self.api_clients) | |
| self.homo_team = Baseline_Static_Homogeneous(self.api_clients) | |
| self.hetero_team = Baseline_Static_Heterogeneous(self.api_clients) | |
| self.current_team_plan = None | |
| if "ERROR:" in CLASSIFIER_SYSTEM_PROMPT: raise FileNotFoundError(CLASSIFIER_SYSTEM_PROMPT) | |
| async def solve(self, problem: str) -> AsyncGenerator[str, None]: | |
| run_id = str(uuid.uuid4())[:8] | |
| # Initialize Financial Tracking | |
| financial_report = { | |
| "calibration_cost": 0.0, | |
| "generation_cost": 0.0, | |
| "total_cost": 0.0, | |
| "usage_breakdown": [] | |
| } | |
| debug_log = { | |
| "run_id": run_id, | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "problem": problem, | |
| "classification": "", | |
| "trace": [], | |
| "financial_report": financial_report | |
| } | |
| # Helper to add usage and calculate cost | |
| def add_usage(usage_list): | |
| if isinstance(usage_list, dict): usage_list = [usage_list] | |
| current_step_cost = 0.0 | |
| for u in usage_list: | |
| financial_report["usage_breakdown"].append(u) | |
| # Lookup pricing | |
| model_name = u.get("model", "Gemini") | |
| # Safely get pricing with default fallbacks | |
| pricing = config.PRICING.get(model_name, {"input": 0, "output": 0}) | |
| # Calculate Cost | |
| cost = (u.get("input", 0) / 1_000_000 * pricing["input"]) + \ | |
| (u.get("output", 0) / 1_000_000 * pricing["output"]) | |
| financial_report["total_cost"] += cost | |
| current_step_cost += cost | |
| return current_step_cost | |
| try: | |
| yield "Classifying problem archetype (live)..." | |
| classification, cls_usage = await get_llm_response("Gemini", self.api_clients["Gemini"], CLASSIFIER_SYSTEM_PROMPT, problem) | |
| classification = classification.strip().replace("\"", "") | |
| yield f"Diagnosis: {classification}" | |
| add_usage(cls_usage) | |
| debug_log["classification"] = classification | |
| if "Error generating response" in classification: | |
| yield "Classifier failed. Defaulting to Single Agent." | |
| classification = "Direct_Procedure" | |
| solution_draft = "" | |
| v_fitness_json = {} | |
| scores = {} | |
| # --- MAIN LOOP --- | |
| for i in range(2): | |
| current_problem = problem | |
| if i > 0: | |
| yield f"--- (Loop {i}) Score is too low. Initiating Self-Correction... ---" | |
| correction_prompt_text = self.corrector.get_correction_plan(v_fitness_json) | |
| yield f"Diagnosis: {correction_prompt_text.splitlines()[3].strip()}" | |
| current_problem = f"{problem}\n\n{correction_prompt_text}" | |
| debug_log["trace"].append({ | |
| "step_type": "correction_plan", | |
| "loop_index": i, | |
| "prompt": correction_prompt_text | |
| }) | |
| # --- DEPLOY --- | |
| default_persona = PERSONAS_DATA[config.DEFAULT_PERSONA_KEY]["description"] | |
| current_usages = [] | |
| if classification == "Direct_Procedure" or classification == "Holistic_Abstract_Reasoning": | |
| if i == 0: yield "Deploying: Baseline Single Agent (Simplicity Hypothesis)..." | |
| solution_draft, u = await self.single_agent.solve(current_problem, default_persona) | |
| current_usages.append(u) | |
| elif classification == "Local_Geometric_Procedural": | |
| if i == 0: yield "Deploying: Static Homogeneous Team (Expert Anomaly)..." | |
| solution_draft, u_list = await self.homo_team.solve(current_problem, default_persona) | |
| current_usages.extend(u_list) | |
| elif classification == "Cognitive_Labyrinth": | |
| if i == 0: | |
| yield "Deploying: Static Heterogeneous Team (Cognitive Diversity)..." | |
| # --- UNPACK 4 VALUES FROM CALIBRATOR --- | |
| # Safely call and unpack, providing debugging if it fails | |
| calib_result = await self.calibrator.calibrate_team(current_problem) | |
| if calib_result is None: | |
| raise ValueError("CRITICAL ERROR: calibrate_team returned None. Please verify mcp_servers.py on the server.") | |
| if len(calib_result) != 4: | |
| # Fallback logic if server has old mcp_servers.py (e.g. 2 or 3 values) | |
| if len(calib_result) == 2: | |
| team_plan, calibration_errors = calib_result | |
| calib_details, calib_usage = [], [] # Defaults | |
| elif len(calib_result) == 3: | |
| team_plan, calibration_errors, calib_details = calib_result | |
| calib_usage = [] # Default | |
| else: | |
| raise ValueError(f"Calibrator returned {len(calib_result)} values, expected 4.") | |
| else: | |
| team_plan, calibration_errors, calib_details, calib_usage = calib_result | |
| # Track Calibration Cost explicitly | |
| calib_step_cost = add_usage(calib_usage) | |
| financial_report["calibration_cost"] += calib_step_cost | |
| debug_log["trace"].append({ | |
| "step_type": "calibration", | |
| "details": calib_details, | |
| "errors": calibration_errors, | |
| "selected_plan": team_plan | |
| }) | |
| if calibration_errors: | |
| yield "--- CALIBRATION WARNINGS ---" | |
| for err in calibration_errors: yield err | |
| yield "-----------------------------" | |
| yield f"Calibration complete. Best Team: {json.dumps({k: v['llm'] for k, v in team_plan.items()})}" | |
| self.current_team_plan = team_plan | |
| solution_draft, u_list = await self.hetero_team.solve(current_problem, self.current_team_plan) | |
| current_usages.extend(u_list) | |
| else: | |
| if i == 0: yield f"Diagnosis '{classification}' is unknown. Defaulting to Single Agent." | |
| solution_draft, u = await self.single_agent.solve(current_problem, default_persona) | |
| current_usages.append(u) | |
| add_usage(current_usages) | |
| if "Error generating response" in solution_draft: | |
| raise Exception(f"The specialist team failed to generate a solution. Error: {solution_draft}") | |
| yield f"Draft solution received: '{solution_draft[:60]}...'" | |
| # --- EVALUATE --- | |
| yield "Evaluating draft (live)..." | |
| v_fitness_json, eval_usage = await self.evaluator.evaluate(current_problem, solution_draft) | |
| add_usage(eval_usage) | |
| if isinstance(v_fitness_json, list): | |
| if len(v_fitness_json) > 0 and isinstance(v_fitness_json[0], dict): | |
| v_fitness_json = v_fitness_json[0] | |
| else: | |
| v_fitness_json = {} | |
| normalized_fitness = {} | |
| if isinstance(v_fitness_json, dict): | |
| for k, v in v_fitness_json.items(): | |
| canonical_key = None | |
| clean_k = k.lower().strip() | |
| if clean_k in METRIC_MAPPING: canonical_key = METRIC_MAPPING[clean_k] | |
| if not canonical_key: continue | |
| if isinstance(v, dict): | |
| score_value = v.get('score') | |
| justification_value = v.get('justification', str(v)) | |
| elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], dict): | |
| score_value = v[0].get('score') | |
| justification_value = v[0].get('justification', str(v[0])) | |
| else: | |
| score_value = v | |
| justification_value = "Score extracted directly." | |
| if isinstance(score_value, str): | |
| try: | |
| match = re.search(r'\d+', score_value) | |
| score_value = int(match.group()) if match else 0 | |
| except: | |
| score_value = 0 | |
| try: | |
| score_value = int(score_value) | |
| except (ValueError, TypeError): | |
| score_value = 0 | |
| normalized_fitness[canonical_key] = {'score': score_value, 'justification': justification_value} | |
| else: | |
| normalized_fitness = {k: {'score': 0, 'justification': "Invalid JSON structure"} for k in ["Novelty", "Usefulness_Feasibility", "Flexibility", "Elaboration", "Cultural_Appropriateness"]} | |
| v_fitness_json = normalized_fitness | |
| scores = {k: v.get('score', 0) for k, v in v_fitness_json.items()} | |
| yield f"Evaluation Score: {scores}" | |
| debug_log["trace"].append({ | |
| "step_type": "attempt", | |
| "loop_index": i, | |
| "draft": solution_draft, | |
| "scores": scores, | |
| "full_evaluation": v_fitness_json | |
| }) | |
| if scores.get('Novelty', 0) <= 1: | |
| yield f"⚠️ Low Score Detected. Reason: {v_fitness_json.get('Novelty', {}).get('justification', 'Unknown')}" | |
| if self.corrector.is_good_enough(scores): | |
| yield "--- Solution approved by self-corrector. ---" | |
| break | |
| elif i == 1: | |
| yield "--- Max correction loops reached. Accepting best effort. ---" | |
| # --- FINALIZE --- | |
| financial_report["generation_cost"] = financial_report["total_cost"] - financial_report["calibration_cost"] | |
| debug_log["financial_report"] = financial_report | |
| await asyncio.sleep(0.5) | |
| solution_draft_json_safe = json.dumps(solution_draft) | |
| debug_log_json_safe = json.dumps(debug_log) | |
| yield f"FINAL: {{\"text\": {solution_draft_json_safe}, \"audio\": null, \"log\": {debug_log_json_safe}}}" | |
| except Exception as e: | |
| error_msg = f"An error occurred in the agent's solve loop: {e}" | |
| print(error_msg) | |
| debug_log["error"] = str(e) | |
| yield error_msg | |
| finally: | |
| try: | |
| os.makedirs("logs", exist_ok=True) | |
| log_path = f"logs/run_{run_id}.json" | |
| with open(log_path, "w", encoding="utf-8") as f: | |
| json.dump(debug_log, f, indent=2) | |
| print(f"Detailed execution log saved to {log_path}") | |
| except Exception as log_err: | |
| print(f"Failed to save log: {log_err}") |