File size: 5,272 Bytes
92f3410 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
"""Comprehensive error handling with retries and fallbacks."""
from __future__ import annotations
import asyncio
import logging
import time
from functools import wraps
from typing import Any, Callable, Dict, Optional, TypeVar, Union
logger = logging.getLogger(__name__)
T = TypeVar('T')
class RetryConfig:
"""Configuration for retry logic."""
def __init__(
self,
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
max_delay: float = 60.0,
retryable_exceptions: tuple = (Exception,)
):
self.max_retries = max_retries
self.initial_delay = initial_delay
self.backoff_factor = backoff_factor
self.max_delay = max_delay
self.retryable_exceptions = retryable_exceptions
def retry_with_backoff(config: Optional[RetryConfig] = None):
"""Decorator for retrying functions with exponential backoff."""
if config is None:
config = RetryConfig()
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def sync_wrapper(*args, **kwargs) -> T:
last_exception = None
delay = config.initial_delay
for attempt in range(config.max_retries + 1):
try:
return func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt < config.max_retries:
logger.warning(
f"{func.__name__} failed (attempt {attempt + 1}/{config.max_retries + 1}): {e}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)
delay = min(delay * config.backoff_factor, config.max_delay)
else:
logger.error(f"{func.__name__} failed after {config.max_retries + 1} attempts: {e}")
raise last_exception
@wraps(func)
async def async_wrapper(*args, **kwargs) -> T:
last_exception = None
delay = config.initial_delay
for attempt in range(config.max_retries + 1):
try:
return await func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt < config.max_retries:
logger.warning(
f"{func.__name__} failed (attempt {attempt + 1}/{config.max_retries + 1}): {e}. "
f"Retrying in {delay:.1f}s..."
)
await asyncio.sleep(delay)
delay = min(delay * config.backoff_factor, config.max_delay)
else:
logger.error(f"{func.__name__} failed after {config.max_retries + 1} attempts: {e}")
raise last_exception
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
def safe_execute(
func: Callable[..., T],
*args,
default: Optional[T] = None,
error_message: str = "Operation failed",
**kwargs
) -> Union[T, Dict[str, Any]]:
"""Safely execute a function with fallback."""
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"{error_message}: {e}")
if default is not None:
return default
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__
}
async def safe_execute_async(
func: Callable[..., T],
*args,
default: Optional[T] = None,
error_message: str = "Operation failed",
**kwargs
) -> Union[T, Dict[str, Any]]:
"""Safely execute an async function with fallback."""
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"{error_message}: {e}")
if default is not None:
return default
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__
}
class PartialResult:
"""Container for partial results when some operations fail."""
def __init__(self):
self.results: Dict[str, Any] = {}
self.errors: Dict[str, str] = {}
self.success_count: int = 0
self.failure_count: int = 0
def add_result(self, key: str, value: Any):
"""Add a successful result."""
self.results[key] = value
self.success_count += 1
def add_error(self, key: str, error: str):
"""Add an error."""
self.errors[key] = error
self.failure_count += 1
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"results": self.results,
"errors": self.errors,
"success_count": self.success_count,
"failure_count": self.failure_count,
"has_errors": self.failure_count > 0,
"is_partial": self.failure_count > 0 and self.success_count > 0
}
|