|
|
"""OpenRouter API client with improved error handling and retry logic.""" |
|
|
|
|
|
import httpx |
|
|
import asyncio |
|
|
from typing import List, Dict, Any, Optional |
|
|
from .config_improved import ( |
|
|
OPENROUTER_API_KEY, |
|
|
OPENROUTER_API_URL, |
|
|
DEFAULT_TIMEOUT, |
|
|
MAX_RETRIES, |
|
|
RETRY_DELAY |
|
|
) |
|
|
|
|
|
|
|
|
async def query_model( |
|
|
model: str, |
|
|
messages: List[Dict[str, str]], |
|
|
timeout: float = DEFAULT_TIMEOUT, |
|
|
max_retries: int = MAX_RETRIES |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Query a single model via OpenRouter API with retry logic. |
|
|
|
|
|
Args: |
|
|
model: OpenRouter model identifier (e.g., "openai/gpt-4o") |
|
|
messages: List of message dicts with 'role' and 'content' |
|
|
timeout: Request timeout in seconds |
|
|
max_retries: Maximum number of retry attempts |
|
|
|
|
|
Returns: |
|
|
Response dict with 'content' and optional 'reasoning_details', or None if failed |
|
|
""" |
|
|
headers = { |
|
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}", |
|
|
"Content-Type": "application/json", |
|
|
"X-Title": "LLM Council", |
|
|
} |
|
|
|
|
|
payload = { |
|
|
"model": model, |
|
|
"messages": messages, |
|
|
} |
|
|
|
|
|
for attempt in range(max_retries + 1): |
|
|
try: |
|
|
async with httpx.AsyncClient(timeout=timeout) as client: |
|
|
response = await client.post(OPENROUTER_API_URL, headers=headers, json=payload) |
|
|
response.raise_for_status() |
|
|
|
|
|
data = response.json() |
|
|
message = data["choices"][0]["message"] |
|
|
|
|
|
return { |
|
|
"content": message.get("content"), |
|
|
"reasoning_details": message.get("reasoning_details") |
|
|
} |
|
|
|
|
|
except httpx.TimeoutException as e: |
|
|
print(f"β±οΈ Timeout querying model {model} (attempt {attempt + 1}/{max_retries + 1}): {e}") |
|
|
if attempt < max_retries: |
|
|
await asyncio.sleep(RETRY_DELAY * (attempt + 1)) |
|
|
continue |
|
|
return None |
|
|
|
|
|
except httpx.HTTPStatusError as e: |
|
|
print(f"π« HTTP error querying model {model}: {e.response.status_code} - {e.response.text}") |
|
|
|
|
|
if 400 <= e.response.status_code < 500: |
|
|
return None |
|
|
|
|
|
if attempt < max_retries: |
|
|
await asyncio.sleep(RETRY_DELAY * (attempt + 1)) |
|
|
continue |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error querying model {model} (attempt {attempt + 1}/{max_retries + 1}): {e}") |
|
|
if attempt < max_retries: |
|
|
await asyncio.sleep(RETRY_DELAY) |
|
|
continue |
|
|
return None |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
async def query_model_stream( |
|
|
model: str, |
|
|
messages: List[Dict[str, str]], |
|
|
timeout: float = DEFAULT_TIMEOUT |
|
|
): |
|
|
""" |
|
|
Query a model via OpenRouter API and stream the response. |
|
|
Yields content chunks as they arrive. |
|
|
|
|
|
Args: |
|
|
model: OpenRouter model identifier |
|
|
messages: List of message dicts with 'role' and 'content' |
|
|
timeout: Request timeout in seconds |
|
|
|
|
|
Yields: |
|
|
Content chunks as strings |
|
|
""" |
|
|
headers = { |
|
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}", |
|
|
"Content-Type": "application/json", |
|
|
"X-Title": "LLM Council", |
|
|
} |
|
|
|
|
|
payload = { |
|
|
"model": model, |
|
|
"messages": messages, |
|
|
"stream": True |
|
|
} |
|
|
|
|
|
import json |
|
|
|
|
|
try: |
|
|
async with httpx.AsyncClient(timeout=timeout) as client: |
|
|
async with client.stream("POST", OPENROUTER_API_URL, headers=headers, json=payload) as response: |
|
|
response.raise_for_status() |
|
|
async for line in response.aiter_lines(): |
|
|
if line.startswith("data: "): |
|
|
data_str = line[6:] |
|
|
if data_str.strip() == "[DONE]": |
|
|
break |
|
|
try: |
|
|
data = json.loads(data_str) |
|
|
delta = data["choices"][0]["delta"] |
|
|
content = delta.get("content") |
|
|
if content: |
|
|
yield content |
|
|
except json.JSONDecodeError: |
|
|
pass |
|
|
except KeyError: |
|
|
pass |
|
|
|
|
|
except httpx.TimeoutException as e: |
|
|
print(f"β±οΈ Timeout streaming model {model}: {e}") |
|
|
yield f"\n\n[Error: Request timed out after {timeout}s]" |
|
|
|
|
|
except httpx.HTTPStatusError as e: |
|
|
print(f"π« HTTP error streaming model {model}: {e.response.status_code}") |
|
|
yield f"\n\n[Error: HTTP {e.response.status_code}]" |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error streaming model {model}: {e}") |
|
|
yield f"\n\n[Error: {str(e)}]" |
|
|
|
|
|
|
|
|
async def query_models_parallel( |
|
|
models: List[str], |
|
|
messages: List[Dict[str, str]], |
|
|
timeout: float = DEFAULT_TIMEOUT |
|
|
) -> Dict[str, Optional[Dict[str, Any]]]: |
|
|
""" |
|
|
Query multiple models in parallel with individual error handling. |
|
|
|
|
|
Args: |
|
|
models: List of OpenRouter model identifiers |
|
|
messages: List of message dicts to send to each model |
|
|
timeout: Request timeout in seconds |
|
|
|
|
|
Returns: |
|
|
Dict mapping model identifier to response dict (or None if failed) |
|
|
""" |
|
|
import asyncio |
|
|
|
|
|
print(f"π Querying {len(models)} models in parallel...") |
|
|
|
|
|
|
|
|
tasks = [query_model(model, messages, timeout=timeout) for model in models] |
|
|
|
|
|
|
|
|
responses = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
result = {} |
|
|
for model, response in zip(models, responses): |
|
|
if isinstance(response, Exception): |
|
|
print(f"β Model {model} raised exception: {response}") |
|
|
result[model] = None |
|
|
else: |
|
|
result[model] = response |
|
|
status = "β
" if response else "β" |
|
|
print(f"{status} Model {model} completed") |
|
|
|
|
|
successful = sum(1 for r in result.values() if r is not None) |
|
|
print(f"π {successful}/{len(models)} models responded successfully") |
|
|
|
|
|
return result |
|
|
|