File size: 5,272 Bytes
92f3410 |
|
"""Comprehensive error handling with retries and fallbacks."""
from __future__ import annotations
import asyncio
import logging
import time
from functools import wraps
from typing import Any, Callable, Dict, Optional, TypeVar, Union
logger = logging.getLogger(__name__)
T = TypeVar('T')
class RetryConfig:
"""Configuration for retry logic."""
def __init__(
self,
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
max_delay: float = 60.0,
retryable_exceptions: tuple = (Exception,)
):
self.max_retries = max_retries
self.initial_delay = initial_delay
self.backoff_factor = backoff_factor
self.max_delay = max_delay
self.retryable_exceptions = retryable_exceptions
def retry_with_backoff(config: Optional[RetryConfig] = None):
"""Decorator for retrying functions with exponential backoff."""
if config is None:
config = RetryConfig()
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def sync_wrapper(*args, **kwargs) -> T:
last_exception = None
delay = config.initial_delay
for attempt in range(config.max_retries + 1):
try:
return func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt < config.max_retries:
logger.warning(
f"{func.__name__} failed (attempt {attempt + 1}/{config.max_retries + 1}): {e}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)
delay = min(delay * config.backoff_factor, config.max_delay)
else:
logger.error(f"{func.__name__} failed after {config.max_retries + 1} attempts: {e}")
raise last_exception
@wraps(func)
async def async_wrapper(*args, **kwargs) -> T:
last_exception = None
delay = config.initial_delay
for attempt in range(config.max_retries + 1):
try:
return await func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt < config.max_retries:
logger.warning(
f"{func.__name__} failed (attempt {attempt + 1}/{config.max_retries + 1}): {e}. "
f"Retrying in {delay:.1f}s..."
)
await asyncio.sleep(delay)
delay = min(delay * config.backoff_factor, config.max_delay)
else:
logger.error(f"{func.__name__} failed after {config.max_retries + 1} attempts: {e}")
raise last_exception
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
def safe_execute(
func: Callable[..., T],
*args,
default: Optional[T] = None,
error_message: str = "Operation failed",
**kwargs
) -> Union[T, Dict[str, Any]]:
"""Safely execute a function with fallback."""
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"{error_message}: {e}")
if default is not None:
return default
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__
}
async def safe_execute_async(
func: Callable[..., T],
*args,
default: Optional[T] = None,
error_message: str = "Operation failed",
**kwargs
) -> Union[T, Dict[str, Any]]:
"""Safely execute an async function with fallback."""
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"{error_message}: {e}")
if default is not None:
return default
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__
}
class PartialResult:
"""Container for partial results when some operations fail."""
def __init__(self):
self.results: Dict[str, Any] = {}
self.errors: Dict[str, str] = {}
self.success_count: int = 0
self.failure_count: int = 0
def add_result(self, key: str, value: Any):
"""Add a successful result."""
self.results[key] = value
self.success_count += 1
def add_error(self, key: str, error: str):
"""Add an error."""
self.errors[key] = error
self.failure_count += 1
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"results": self.results,
"errors": self.errors,
"success_count": self.success_count,
"failure_count": self.failure_count,
"has_errors": self.failure_count > 0,
"is_partial": self.failure_count > 0 and self.success_count > 0
}
|