"""Comprehensive error handling with retries and fallbacks.""" from __future__ import annotations import asyncio import logging import time from functools import wraps from typing import Any, Callable, Dict, Optional, TypeVar, Union logger = logging.getLogger(__name__) T = TypeVar('T') class RetryConfig: """Configuration for retry logic.""" def __init__( self, max_retries: int = 3, initial_delay: float = 1.0, backoff_factor: float = 2.0, max_delay: float = 60.0, retryable_exceptions: tuple = (Exception,) ): self.max_retries = max_retries self.initial_delay = initial_delay self.backoff_factor = backoff_factor self.max_delay = max_delay self.retryable_exceptions = retryable_exceptions def retry_with_backoff(config: Optional[RetryConfig] = None): """Decorator for retrying functions with exponential backoff.""" if config is None: config = RetryConfig() def decorator(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def sync_wrapper(*args, **kwargs) -> T: last_exception = None delay = config.initial_delay for attempt in range(config.max_retries + 1): try: return func(*args, **kwargs) except config.retryable_exceptions as e: last_exception = e if attempt < config.max_retries: logger.warning( f"{func.__name__} failed (attempt {attempt + 1}/{config.max_retries + 1}): {e}. " f"Retrying in {delay:.1f}s..." ) time.sleep(delay) delay = min(delay * config.backoff_factor, config.max_delay) else: logger.error(f"{func.__name__} failed after {config.max_retries + 1} attempts: {e}") raise last_exception @wraps(func) async def async_wrapper(*args, **kwargs) -> T: last_exception = None delay = config.initial_delay for attempt in range(config.max_retries + 1): try: return await func(*args, **kwargs) except config.retryable_exceptions as e: last_exception = e if attempt < config.max_retries: logger.warning( f"{func.__name__} failed (attempt {attempt + 1}/{config.max_retries + 1}): {e}. " f"Retrying in {delay:.1f}s..." ) await asyncio.sleep(delay) delay = min(delay * config.backoff_factor, config.max_delay) else: logger.error(f"{func.__name__} failed after {config.max_retries + 1} attempts: {e}") raise last_exception if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator def safe_execute( func: Callable[..., T], *args, default: Optional[T] = None, error_message: str = "Operation failed", **kwargs ) -> Union[T, Dict[str, Any]]: """Safely execute a function with fallback.""" try: return func(*args, **kwargs) except Exception as e: logger.error(f"{error_message}: {e}") if default is not None: return default return { "success": False, "error": str(e), "error_type": type(e).__name__ } async def safe_execute_async( func: Callable[..., T], *args, default: Optional[T] = None, error_message: str = "Operation failed", **kwargs ) -> Union[T, Dict[str, Any]]: """Safely execute an async function with fallback.""" try: return await func(*args, **kwargs) except Exception as e: logger.error(f"{error_message}: {e}") if default is not None: return default return { "success": False, "error": str(e), "error_type": type(e).__name__ } class PartialResult: """Container for partial results when some operations fail.""" def __init__(self): self.results: Dict[str, Any] = {} self.errors: Dict[str, str] = {} self.success_count: int = 0 self.failure_count: int = 0 def add_result(self, key: str, value: Any): """Add a successful result.""" self.results[key] = value self.success_count += 1 def add_error(self, key: str, error: str): """Add an error.""" self.errors[key] = error self.failure_count += 1 def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "results": self.results, "errors": self.errors, "success_count": self.success_count, "failure_count": self.failure_count, "has_errors": self.failure_count > 0, "is_partial": self.failure_count > 0 and self.success_count > 0 }