File size: 12,264 Bytes
ab1b163 e996b22 ab1b163 0a12050 ab1b163 e996b22 ab1b163 0a12050 ab1b163 e996b22 ab1b163 0a12050 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 0a12050 ab1b163 e996b22 ab1b163 0a12050 ab1b163 e996b22 ab1b163 e996b22 ab1b163 0a12050 e996b22 ab1b163 0a12050 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 e996b22 ab1b163 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
"""3-stage LLM Council orchestration."""
from typing import List, Dict, Any, Tuple
from .openrouter import query_models_parallel, query_model, query_model_stream
from .config import COUNCIL_MODELS, CHAIRMAN_MODEL
async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]:
"""
Stage 1: Collect individual responses from all council models.
Args:
user_query: The user's question
Returns:
List of dicts with 'model' and 'response' keys
"""
print("STAGE 1: Collecting individual responses from council members...")
messages = [{"role": "user", "content": user_query}]
# Query all models in parallel
responses = await query_models_parallel(COUNCIL_MODELS, messages)
# Format results
stage1_results = []
for model, response in responses.items():
if response is not None: # Only include successful responses
stage1_results.append({"model": model, "response": response.get("content", "")})
print(f"STAGE 1 COMPLETE: Received {len(stage1_results)} responses.")
return stage1_results
async def stage2_collect_rankings(
user_query: str, stage1_results: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
"""
Stage 2: Each model ranks the anonymized responses.
Args:
user_query: The original user query
stage1_results: Results from Stage 1
Returns:
Tuple of (rankings list, label_to_model mapping)
"""
print("STAGE 2: Council members are ranking each other's responses...")
# Create anonymized labels for responses (Response A, Response B, etc.)
labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ...
# Create mapping from label to model name
label_to_model = {f"Response {label}": result["model"] for label, result in zip(labels, stage1_results)}
# Build the ranking prompt
responses_text = "\n\n".join(
[f"Response {label}:\n{result['response']}" for label, result in zip(labels, stage1_results)]
)
ranking_prompt = f"""You are evaluating different responses to the following question:
Question: {user_query}
Here are the responses from different models (anonymized):
{responses_text}
Your task:
1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly.
2. Then, at the very end of your response, provide a final ranking.
IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows:
- Start with the line "FINAL RANKING:" (all caps, with colon)
- Then list the responses from best to worst as a numbered list
- Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A")
- Do not add any other text or explanations in the ranking section
Example of the correct format for your ENTIRE response:
Response A provides good detail on X but misses Y...
Response B is accurate but lacks depth on Z...
Response C offers the most comprehensive answer...
FINAL RANKING:
1. Response C
2. Response A
3. Response B
Now provide your evaluation and ranking:"""
messages = [{"role": "user", "content": ranking_prompt}]
# Get rankings from all council models in parallel
responses = await query_models_parallel(COUNCIL_MODELS, messages)
# Format results
stage2_results = []
for model, response in responses.items():
if response is not None:
full_text = response.get("content", "")
parsed = parse_ranking_from_text(full_text)
stage2_results.append({"model": model, "ranking": full_text, "parsed_ranking": parsed})
print("STAGE 2 COMPLETE: Rankings collected.")
return stage2_results, label_to_model
async def stage3_synthesize_final(
user_query: str, stage1_results: List[Dict[str, Any]], stage2_results: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Stage 3: Chairman synthesizes final response.
Args:
user_query: The original user query
stage1_results: Individual model responses from Stage 1
stage2_results: Rankings from Stage 2
Returns:
Dict with 'model' and 'response' keys
"""
print("STAGE 3: Chairman is synthesizing the final answer...")
# Build comprehensive context for chairman
stage1_text = "\n\n".join(
[f"Model: {result['model']}\nResponse: {result['response']}" for result in stage1_results]
)
stage2_text = "\n\n".join(
[f"Model: {result['model']}\nRanking: {result['ranking']}" for result in stage2_results]
)
chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses.
Original Question: {user_query}
STAGE 1 - Individual Responses:
{stage1_text}
STAGE 2 - Peer Rankings:
{stage2_text}
Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider:
- The individual responses and their insights
- The peer rankings and what they reveal about response quality
- Any patterns of agreement or disagreement
Provide a clear, well-reasoned final answer that represents the council's collective wisdom:"""
messages = [{"role": "user", "content": chairman_prompt}]
# Query the chairman model
response = await query_model(CHAIRMAN_MODEL, messages)
if response is None:
# Fallback if chairman fails
print("STAGE 3 ERROR: Unable to generate final synthesis.")
return {"model": CHAIRMAN_MODEL, "response": "Error: Unable to generate final synthesis."}
print("STAGE 3 COMPLETE: Final answer synthesized.")
return {"model": CHAIRMAN_MODEL, "response": response.get("content", "")}
async def stage3_synthesize_final_stream(
user_query: str, stage1_results: List[Dict[str, Any]], stage2_results: List[Dict[str, Any]]
):
"""
Stage 3: Chairman synthesizes final response (Streaming).
Yields chunks of text.
"""
print("STAGE 3: Chairman is synthesizing the final answer (Streaming)...")
# Build comprehensive context for chairman
stage1_text = "\n\n".join(
[f"Model: {result['model']}\nResponse: {result['response']}" for result in stage1_results]
)
stage2_text = "\n\n".join(
[f"Model: {result['model']}\nRanking: {result['ranking']}" for result in stage2_results]
)
chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses.
Original Question: {user_query}
STAGE 1 - Individual Responses:
{stage1_text}
STAGE 2 - Peer Rankings:
{stage2_text}
Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider:
- The individual responses and their insights
- The peer rankings and what they reveal about response quality
- Any patterns of agreement or disagreement
Provide a clear, well-reasoned final answer that represents the council's collective wisdom:"""
messages = [{"role": "user", "content": chairman_prompt}]
# Stream the chairman model
async for chunk in query_model_stream(CHAIRMAN_MODEL, messages):
yield chunk
print("STAGE 3 COMPLETE: Final answer stream finished.")
def parse_ranking_from_text(ranking_text: str) -> List[str]:
"""
Parse the FINAL RANKING section from the model's response.
Args:
ranking_text: The full text response from the model
Returns:
List of response labels in ranked order
"""
import re
# Look for "FINAL RANKING:" section
if "FINAL RANKING:" in ranking_text:
# Extract everything after "FINAL RANKING:"
parts = ranking_text.split("FINAL RANKING:")
if len(parts) >= 2:
ranking_section = parts[1]
# Try to extract numbered list format (e.g., "1. Response A")
# This pattern looks for: number, period, optional space, "Response X"
numbered_matches = re.findall(r"\d+\.\s*Response [A-Z]", ranking_section)
if numbered_matches:
# Extract just the "Response X" part
return [re.search(r"Response [A-Z]", m).group() for m in numbered_matches]
# Fallback: Extract all "Response X" patterns in order
matches = re.findall(r"Response [A-Z]", ranking_section)
return matches
# Fallback: try to find any "Response X" patterns in order
matches = re.findall(r"Response [A-Z]", ranking_text)
return matches
def calculate_aggregate_rankings(
stage2_results: List[Dict[str, Any]], label_to_model: Dict[str, str]
) -> List[Dict[str, Any]]:
"""
Calculate aggregate rankings across all models.
Args:
stage2_results: Rankings from each model
label_to_model: Mapping from anonymous labels to model names
Returns:
List of dicts with model name and average rank, sorted best to worst
"""
from collections import defaultdict
# Track positions for each model
model_positions = defaultdict(list)
for ranking in stage2_results:
ranking_text = ranking["ranking"]
# Parse the ranking from the structured format
parsed_ranking = parse_ranking_from_text(ranking_text)
for position, label in enumerate(parsed_ranking, start=1):
if label in label_to_model:
model_name = label_to_model[label]
model_positions[model_name].append(position)
# Calculate average position for each model
aggregate = []
for model, positions in model_positions.items():
if positions:
avg_rank = sum(positions) / len(positions)
aggregate.append(
{"model": model, "average_rank": round(avg_rank, 2), "rankings_count": len(positions)}
)
# Sort by average rank (lower is better)
aggregate.sort(key=lambda x: x["average_rank"])
return aggregate
async def generate_conversation_title(user_query: str) -> str:
"""
Generate a short title for a conversation based on the first user message.
Args:
user_query: The first user message
Returns:
A short title (3-5 words)
"""
title_prompt = f"""Generate a very short title (3-5 words maximum) that summarizes the following question.
The title should be concise and descriptive. Do not use quotes or punctuation in the title.
Question: {user_query}
Title:"""
messages = [{"role": "user", "content": title_prompt}]
# Use gemini-2.5-flash for title generation (fast and cheap)
response = await query_model("google/gemini-2.5-flash", messages, timeout=30.0)
if response is None:
# Fallback to a generic title
return "New Conversation"
title = response.get("content", "New Conversation").strip()
# Clean up the title - remove quotes, limit length
title = title.strip("\"'")
# Truncate if too long
if len(title) > 50:
title = title[:47] + "..."
return title
async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]:
"""
Run the complete 3-stage council process.
Args:
user_query: The user's question
Returns:
Tuple of (stage1_results, stage2_results, stage3_result, metadata)
"""
# Stage 1: Collect individual responses
stage1_results = await stage1_collect_responses(user_query)
# If no models responded successfully, return error
if not stage1_results:
return [], [], {"model": "error", "response": "All models failed to respond. Please try again."}, {}
# Stage 2: Collect rankings
stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results)
# Calculate aggregate rankings
aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model)
# Stage 3: Synthesize final answer
stage3_result = await stage3_synthesize_final(user_query, stage1_results, stage2_results)
# Prepare metadata
metadata = {"label_to_model": label_to_model, "aggregate_rankings": aggregate_rankings}
return stage1_results, stage2_results, stage3_result, metadata
|