andrej-karpathy-llm-council / backend /openrouter_improved.py
Krishna Chaitanya Cheedella
Remove all original source attribution and URLs
537891a
"""OpenRouter API client with improved error handling and retry logic."""
import httpx
import asyncio
from typing import List, Dict, Any, Optional
from .config_improved import (
OPENROUTER_API_KEY,
OPENROUTER_API_URL,
DEFAULT_TIMEOUT,
MAX_RETRIES,
RETRY_DELAY
)
async def query_model(
model: str,
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT,
max_retries: int = MAX_RETRIES
) -> Optional[Dict[str, Any]]:
"""
Query a single model via OpenRouter API with retry logic.
Args:
model: OpenRouter model identifier (e.g., "openai/gpt-4o")
messages: List of message dicts with 'role' and 'content'
timeout: Request timeout in seconds
max_retries: Maximum number of retry attempts
Returns:
Response dict with 'content' and optional 'reasoning_details', or None if failed
"""
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
"X-Title": "LLM Council",
}
payload = {
"model": model,
"messages": messages,
}
for attempt in range(max_retries + 1):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(OPENROUTER_API_URL, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
message = data["choices"][0]["message"]
return {
"content": message.get("content"),
"reasoning_details": message.get("reasoning_details")
}
except httpx.TimeoutException as e:
print(f"⏱️ Timeout querying model {model} (attempt {attempt + 1}/{max_retries + 1}): {e}")
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY * (attempt + 1)) # Exponential backoff
continue
return None
except httpx.HTTPStatusError as e:
print(f"🚫 HTTP error querying model {model}: {e.response.status_code} - {e.response.text}")
# Don't retry on 4xx errors (client errors)
if 400 <= e.response.status_code < 500:
return None
# Retry on 5xx errors (server errors)
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
continue
return None
except Exception as e:
print(f"❌ Error querying model {model} (attempt {attempt + 1}/{max_retries + 1}): {e}")
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY)
continue
return None
return None
async def query_model_stream(
model: str,
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT
):
"""
Query a model via OpenRouter API and stream the response.
Yields content chunks as they arrive.
Args:
model: OpenRouter model identifier
messages: List of message dicts with 'role' and 'content'
timeout: Request timeout in seconds
Yields:
Content chunks as strings
"""
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
"X-Title": "LLM Council",
}
payload = {
"model": model,
"messages": messages,
"stream": True
}
import json
try:
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream("POST", OPENROUTER_API_URL, headers=headers, json=payload) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:]
if data_str.strip() == "[DONE]":
break
try:
data = json.loads(data_str)
delta = data["choices"][0]["delta"]
content = delta.get("content")
if content:
yield content
except json.JSONDecodeError:
pass
except KeyError:
pass
except httpx.TimeoutException as e:
print(f"⏱️ Timeout streaming model {model}: {e}")
yield f"\n\n[Error: Request timed out after {timeout}s]"
except httpx.HTTPStatusError as e:
print(f"🚫 HTTP error streaming model {model}: {e.response.status_code}")
yield f"\n\n[Error: HTTP {e.response.status_code}]"
except Exception as e:
print(f"❌ Error streaming model {model}: {e}")
yield f"\n\n[Error: {str(e)}]"
async def query_models_parallel(
models: List[str],
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT
) -> Dict[str, Optional[Dict[str, Any]]]:
"""
Query multiple models in parallel with individual error handling.
Args:
models: List of OpenRouter model identifiers
messages: List of message dicts to send to each model
timeout: Request timeout in seconds
Returns:
Dict mapping model identifier to response dict (or None if failed)
"""
import asyncio
print(f"πŸš€ Querying {len(models)} models in parallel...")
# Create tasks for all models
tasks = [query_model(model, messages, timeout=timeout) for model in models]
# Wait for all to complete
responses = await asyncio.gather(*tasks, return_exceptions=True)
# Map models to their responses, handling exceptions
result = {}
for model, response in zip(models, responses):
if isinstance(response, Exception):
print(f"❌ Model {model} raised exception: {response}")
result[model] = None
else:
result[model] = response
status = "βœ…" if response else "❌"
print(f"{status} Model {model} completed")
successful = sum(1 for r in result.values() if r is not None)
print(f"πŸ“Š {successful}/{len(models)} models responded successfully")
return result