Spaces:
Paused
Paused
| # What this does? | |
| ## Gets a key's redis cache, and store it in memory for 1 minute. | |
| ## This reduces the number of REDIS GET requests made during high-traffic by the proxy. | |
| ### [BETA] this is in Beta. And might change. | |
| import traceback | |
| from typing import Literal, Optional | |
| from fastapi import HTTPException | |
| import litellm | |
| from litellm._logging import verbose_proxy_logger | |
| from litellm.caching.caching import DualCache, InMemoryCache, RedisCache | |
| from litellm.integrations.custom_logger import CustomLogger | |
| from litellm.proxy._types import UserAPIKeyAuth | |
| class _PROXY_BatchRedisRequests(CustomLogger): | |
| # Class variables or attributes | |
| in_memory_cache: Optional[InMemoryCache] = None | |
| def __init__(self): | |
| if litellm.cache is not None: | |
| litellm.cache.async_get_cache = ( | |
| self.async_get_cache | |
| ) # map the litellm 'get_cache' function to our custom function | |
| def print_verbose( | |
| self, print_statement, debug_level: Literal["INFO", "DEBUG"] = "DEBUG" | |
| ): | |
| if debug_level == "DEBUG": | |
| verbose_proxy_logger.debug(print_statement) | |
| elif debug_level == "INFO": | |
| verbose_proxy_logger.debug(print_statement) | |
| if litellm.set_verbose is True: | |
| print(print_statement) # noqa | |
| async def async_pre_call_hook( | |
| self, | |
| user_api_key_dict: UserAPIKeyAuth, | |
| cache: DualCache, | |
| data: dict, | |
| call_type: str, | |
| ): | |
| try: | |
| """ | |
| Get the user key | |
| Check if a key starting with `litellm:<api_key>:<call_type:` exists in-memory | |
| If no, then get relevant cache from redis | |
| """ | |
| api_key = user_api_key_dict.api_key | |
| cache_key_name = f"litellm:{api_key}:{call_type}" | |
| self.in_memory_cache = cache.in_memory_cache | |
| key_value_dict = {} | |
| in_memory_cache_exists = False | |
| for key in cache.in_memory_cache.cache_dict.keys(): | |
| if isinstance(key, str) and key.startswith(cache_key_name): | |
| in_memory_cache_exists = True | |
| if in_memory_cache_exists is False and litellm.cache is not None: | |
| """ | |
| - Check if `litellm.Cache` is redis | |
| - Get the relevant values | |
| """ | |
| if litellm.cache.type is not None and isinstance( | |
| litellm.cache.cache, RedisCache | |
| ): | |
| # Initialize an empty list to store the keys | |
| keys = [] | |
| self.print_verbose(f"cache_key_name: {cache_key_name}") | |
| # Use the SCAN iterator to fetch keys matching the pattern | |
| keys = await litellm.cache.cache.async_scan_iter( | |
| pattern=cache_key_name, count=100 | |
| ) | |
| # If you need the truly "last" based on time or another criteria, | |
| # ensure your key naming or storage strategy allows this determination | |
| # Here you would sort or filter the keys as needed based on your strategy | |
| self.print_verbose(f"redis keys: {keys}") | |
| if len(keys) > 0: | |
| key_value_dict = ( | |
| await litellm.cache.cache.async_batch_get_cache( | |
| key_list=keys | |
| ) | |
| ) | |
| ## Add to cache | |
| if len(key_value_dict.items()) > 0: | |
| await cache.in_memory_cache.async_set_cache_pipeline( | |
| cache_list=list(key_value_dict.items()), ttl=60 | |
| ) | |
| ## Set cache namespace if it's a miss | |
| data["metadata"]["redis_namespace"] = cache_key_name | |
| except HTTPException as e: | |
| raise e | |
| except Exception as e: | |
| verbose_proxy_logger.error( | |
| "litellm.proxy.hooks.batch_redis_get.py::async_pre_call_hook(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| verbose_proxy_logger.debug(traceback.format_exc()) | |
| async def async_get_cache(self, *args, **kwargs): | |
| """ | |
| - Check if the cache key is in-memory | |
| - Else: | |
| - add missing cache key from REDIS | |
| - update in-memory cache | |
| - return redis cache request | |
| """ | |
| try: # never block execution | |
| cache_key: Optional[str] = None | |
| if "cache_key" in kwargs: | |
| cache_key = kwargs["cache_key"] | |
| elif litellm.cache is not None: | |
| cache_key = litellm.cache.get_cache_key( | |
| *args, **kwargs | |
| ) # returns "<cache_key_name>:<hash>" - we pass redis_namespace in async_pre_call_hook. Done to avoid rewriting the async_set_cache logic | |
| if ( | |
| cache_key is not None | |
| and self.in_memory_cache is not None | |
| and litellm.cache is not None | |
| ): | |
| cache_control_args = kwargs.get("cache", {}) | |
| max_age = cache_control_args.get( | |
| "s-max-age", cache_control_args.get("s-maxage", float("inf")) | |
| ) | |
| cached_result = self.in_memory_cache.get_cache( | |
| cache_key, *args, **kwargs | |
| ) | |
| if cached_result is None: | |
| cached_result = await litellm.cache.cache.async_get_cache( | |
| cache_key, *args, **kwargs | |
| ) | |
| if cached_result is not None: | |
| await self.in_memory_cache.async_set_cache( | |
| cache_key, cached_result, ttl=60 | |
| ) | |
| return litellm.cache._get_cache_logic( | |
| cached_result=cached_result, max_age=max_age | |
| ) | |
| except Exception: | |
| return None | |