Spaces:
Paused
Paused
| #### What this does #### | |
| # Class for sending Slack Alerts # | |
| import asyncio | |
| import datetime | |
| import os | |
| import random | |
| import time | |
| from datetime import timedelta | |
| from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union | |
| from openai import APIError | |
| import litellm | |
| import litellm.litellm_core_utils | |
| import litellm.litellm_core_utils.litellm_logging | |
| import litellm.types | |
| from litellm._logging import verbose_logger, verbose_proxy_logger | |
| from litellm.caching.caching import DualCache | |
| from litellm.constants import HOURS_IN_A_DAY | |
| from litellm.integrations.custom_batch_logger import CustomBatchLogger | |
| from litellm.litellm_core_utils.duration_parser import duration_in_seconds | |
| from litellm.litellm_core_utils.exception_mapping_utils import ( | |
| _add_key_name_and_team_to_alert, | |
| ) | |
| from litellm.llms.custom_httpx.http_handler import ( | |
| get_async_httpx_client, | |
| httpxSpecialProvider, | |
| ) | |
| from litellm.proxy._types import AlertType, CallInfo, VirtualKeyEvent, WebhookEvent | |
| from litellm.types.integrations.slack_alerting import * | |
| from ..email_templates.templates import * | |
| from .batching_handler import send_to_webhook, squash_payloads | |
| from .utils import _add_langfuse_trace_id_to_alert, process_slack_alerting_variables | |
| if TYPE_CHECKING: | |
| from litellm.router import Router as _Router | |
| Router = _Router | |
| else: | |
| Router = Any | |
| class SlackAlerting(CustomBatchLogger): | |
| """ | |
| Class for sending Slack Alerts | |
| """ | |
| # Class variables or attributes | |
| def __init__( | |
| self, | |
| internal_usage_cache: Optional[DualCache] = None, | |
| alerting_threshold: Optional[ | |
| float | |
| ] = None, # threshold for slow / hanging llm responses (in seconds) | |
| alerting: Optional[List] = [], | |
| alert_types: List[AlertType] = DEFAULT_ALERT_TYPES, | |
| alert_to_webhook_url: Optional[ | |
| Dict[AlertType, Union[List[str], str]] | |
| ] = None, # if user wants to separate alerts to diff channels | |
| alerting_args={}, | |
| default_webhook_url: Optional[str] = None, | |
| **kwargs, | |
| ): | |
| if alerting_threshold is None: | |
| alerting_threshold = 300 | |
| self.alerting_threshold = alerting_threshold | |
| self.alerting = alerting | |
| self.alert_types = alert_types | |
| self.internal_usage_cache = internal_usage_cache or DualCache() | |
| self.async_http_handler = get_async_httpx_client( | |
| llm_provider=httpxSpecialProvider.LoggingCallback | |
| ) | |
| self.alert_to_webhook_url = process_slack_alerting_variables( | |
| alert_to_webhook_url=alert_to_webhook_url | |
| ) | |
| self.is_running = False | |
| self.alerting_args = SlackAlertingArgs(**alerting_args) | |
| self.default_webhook_url = default_webhook_url | |
| self.flush_lock = asyncio.Lock() | |
| super().__init__(**kwargs, flush_lock=self.flush_lock) | |
| def update_values( | |
| self, | |
| alerting: Optional[List] = None, | |
| alerting_threshold: Optional[float] = None, | |
| alert_types: Optional[List[AlertType]] = None, | |
| alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] = None, | |
| alerting_args: Optional[Dict] = None, | |
| llm_router: Optional[Router] = None, | |
| ): | |
| if alerting is not None: | |
| self.alerting = alerting | |
| asyncio.create_task(self.periodic_flush()) | |
| if alerting_threshold is not None: | |
| self.alerting_threshold = alerting_threshold | |
| if alert_types is not None: | |
| self.alert_types = alert_types | |
| if alerting_args is not None: | |
| self.alerting_args = SlackAlertingArgs(**alerting_args) | |
| if alert_to_webhook_url is not None: | |
| # update the dict | |
| if self.alert_to_webhook_url is None: | |
| self.alert_to_webhook_url = process_slack_alerting_variables( | |
| alert_to_webhook_url=alert_to_webhook_url | |
| ) | |
| else: | |
| _new_values = ( | |
| process_slack_alerting_variables( | |
| alert_to_webhook_url=alert_to_webhook_url | |
| ) | |
| or {} | |
| ) | |
| self.alert_to_webhook_url.update(_new_values) | |
| if llm_router is not None: | |
| self.llm_router = llm_router | |
| async def deployment_in_cooldown(self): | |
| pass | |
| async def deployment_removed_from_cooldown(self): | |
| pass | |
| def _all_possible_alert_types(self): | |
| # used by the UI to show all supported alert types | |
| # Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select | |
| # return list of all values AlertType enum | |
| return list(AlertType) | |
| def _response_taking_too_long_callback_helper( | |
| self, | |
| kwargs, # kwargs to completion | |
| start_time, | |
| end_time, # start/end time | |
| ): | |
| try: | |
| time_difference = end_time - start_time | |
| # Convert the timedelta to float (in seconds) | |
| time_difference_float = time_difference.total_seconds() | |
| litellm_params = kwargs.get("litellm_params", {}) | |
| model = kwargs.get("model", "") | |
| api_base = litellm.get_api_base(model=model, optional_params=litellm_params) | |
| messages = kwargs.get("messages", None) | |
| # if messages does not exist fallback to "input" | |
| if messages is None: | |
| messages = kwargs.get("input", None) | |
| # only use first 100 chars for alerting | |
| _messages = str(messages)[:100] | |
| return time_difference_float, model, api_base, _messages | |
| except Exception as e: | |
| raise e | |
| def _get_deployment_latencies_to_alert(self, metadata=None): | |
| if metadata is None: | |
| return None | |
| if "_latency_per_deployment" in metadata: | |
| # Translate model_id to -> api_base | |
| # _latency_per_deployment is a dictionary that looks like this: | |
| """ | |
| _latency_per_deployment: { | |
| api_base: 0.01336697916666667 | |
| } | |
| """ | |
| _message_to_send = "" | |
| _deployment_latencies = metadata["_latency_per_deployment"] | |
| if len(_deployment_latencies) == 0: | |
| return None | |
| _deployment_latency_map: Optional[dict] = None | |
| try: | |
| # try sorting deployments by latency | |
| _deployment_latencies = sorted( | |
| _deployment_latencies.items(), key=lambda x: x[1] | |
| ) | |
| _deployment_latency_map = dict(_deployment_latencies) | |
| except Exception: | |
| pass | |
| if _deployment_latency_map is None: | |
| return | |
| for api_base, latency in _deployment_latency_map.items(): | |
| _message_to_send += f"\n{api_base}: {round(latency,2)}s" | |
| _message_to_send = "```" + _message_to_send + "```" | |
| return _message_to_send | |
| async def response_taking_too_long_callback( | |
| self, | |
| kwargs, # kwargs to completion | |
| completion_response, # response from completion | |
| start_time, | |
| end_time, # start/end time | |
| ): | |
| if self.alerting is None or self.alert_types is None: | |
| return | |
| ( | |
| time_difference_float, | |
| model, | |
| api_base, | |
| messages, | |
| ) = self._response_taking_too_long_callback_helper( | |
| kwargs=kwargs, | |
| start_time=start_time, | |
| end_time=end_time, | |
| ) | |
| if litellm.turn_off_message_logging or litellm.redact_messages_in_exceptions: | |
| messages = "Message not logged. litellm.redact_messages_in_exceptions=True" | |
| request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`" | |
| slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`" | |
| alerting_metadata: dict = {} | |
| if time_difference_float > self.alerting_threshold: | |
| # add deployment latencies to alert | |
| if ( | |
| kwargs is not None | |
| and "litellm_params" in kwargs | |
| and "metadata" in kwargs["litellm_params"] | |
| ): | |
| _metadata: dict = kwargs["litellm_params"]["metadata"] | |
| request_info = _add_key_name_and_team_to_alert( | |
| request_info=request_info, metadata=_metadata | |
| ) | |
| _deployment_latency_map = self._get_deployment_latencies_to_alert( | |
| metadata=_metadata | |
| ) | |
| if _deployment_latency_map is not None: | |
| request_info += ( | |
| f"\nAvailable Deployment Latencies\n{_deployment_latency_map}" | |
| ) | |
| if "alerting_metadata" in _metadata: | |
| alerting_metadata = _metadata["alerting_metadata"] | |
| await self.send_alert( | |
| message=slow_message + request_info, | |
| level="Low", | |
| alert_type=AlertType.llm_too_slow, | |
| alerting_metadata=alerting_metadata, | |
| ) | |
| async def async_update_daily_reports( | |
| self, deployment_metrics: DeploymentMetrics | |
| ) -> int: | |
| """ | |
| Store the perf by deployment in cache | |
| - Number of failed requests per deployment | |
| - Latency / output tokens per deployment | |
| 'deployment_id:daily_metrics:failed_requests' | |
| 'deployment_id:daily_metrics:latency_per_output_token' | |
| Returns | |
| int - count of metrics set (1 - if just latency, 2 - if failed + latency) | |
| """ | |
| return_val = 0 | |
| try: | |
| ## FAILED REQUESTS ## | |
| if deployment_metrics.failed_request: | |
| await self.internal_usage_cache.async_increment_cache( | |
| key="{}:{}".format( | |
| deployment_metrics.id, | |
| SlackAlertingCacheKeys.failed_requests_key.value, | |
| ), | |
| value=1, | |
| parent_otel_span=None, # no attached request, this is a background operation | |
| ) | |
| return_val += 1 | |
| ## LATENCY ## | |
| if deployment_metrics.latency_per_output_token is not None: | |
| await self.internal_usage_cache.async_increment_cache( | |
| key="{}:{}".format( | |
| deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value | |
| ), | |
| value=deployment_metrics.latency_per_output_token, | |
| parent_otel_span=None, # no attached request, this is a background operation | |
| ) | |
| return_val += 1 | |
| return return_val | |
| except Exception: | |
| return 0 | |
| async def send_daily_reports(self, router) -> bool: # noqa: PLR0915 | |
| """ | |
| Send a daily report on: | |
| - Top 5 deployments with most failed requests | |
| - Top 5 slowest deployments (normalized by latency/output tokens) | |
| Get the value from redis cache (if available) or in-memory and send it | |
| Cleanup: | |
| - reset values in cache -> prevent memory leak | |
| Returns: | |
| True -> if successfuly sent | |
| False -> if not sent | |
| """ | |
| ids = router.get_model_ids() | |
| # get keys | |
| failed_request_keys = [ | |
| "{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value) | |
| for id in ids | |
| ] | |
| latency_keys = [ | |
| "{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids | |
| ] | |
| combined_metrics_keys = failed_request_keys + latency_keys # reduce cache calls | |
| combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache( | |
| keys=combined_metrics_keys | |
| ) # [1, 2, None, ..] | |
| if combined_metrics_values is None: | |
| return False | |
| all_none = True | |
| for val in combined_metrics_values: | |
| if val is not None and val > 0: | |
| all_none = False | |
| break | |
| if all_none: | |
| return False | |
| failed_request_values = combined_metrics_values[ | |
| : len(failed_request_keys) | |
| ] # # [1, 2, None, ..] | |
| latency_values = combined_metrics_values[len(failed_request_keys) :] | |
| # find top 5 failed | |
| ## Replace None values with a placeholder value (-1 in this case) | |
| placeholder_value = 0 | |
| replaced_failed_values = [ | |
| value if value is not None else placeholder_value | |
| for value in failed_request_values | |
| ] | |
| ## Get the indices of top 5 keys with the highest numerical values (ignoring None and 0 values) | |
| top_5_failed = sorted( | |
| range(len(replaced_failed_values)), | |
| key=lambda i: replaced_failed_values[i], | |
| reverse=True, | |
| )[:5] | |
| top_5_failed = [ | |
| index for index in top_5_failed if replaced_failed_values[index] > 0 | |
| ] | |
| # find top 5 slowest | |
| # Replace None values with a placeholder value (-1 in this case) | |
| placeholder_value = 0 | |
| replaced_slowest_values = [ | |
| value if value is not None else placeholder_value | |
| for value in latency_values | |
| ] | |
| # Get the indices of top 5 values with the highest numerical values (ignoring None and 0 values) | |
| top_5_slowest = sorted( | |
| range(len(replaced_slowest_values)), | |
| key=lambda i: replaced_slowest_values[i], | |
| reverse=True, | |
| )[:5] | |
| top_5_slowest = [ | |
| index for index in top_5_slowest if replaced_slowest_values[index] > 0 | |
| ] | |
| # format alert -> return the litellm model name + api base | |
| message = f"\n\nTime: `{time.time()}`s\nHere are today's key metrics 📈: \n\n" | |
| message += "\n\n*❗️ Top Deployments with Most Failed Requests:*\n\n" | |
| if not top_5_failed: | |
| message += "\tNone\n" | |
| for i in range(len(top_5_failed)): | |
| key = failed_request_keys[top_5_failed[i]].split(":")[0] | |
| _deployment = router.get_model_info(key) | |
| if isinstance(_deployment, dict): | |
| deployment_name = _deployment["litellm_params"].get("model", "") | |
| else: | |
| return False | |
| api_base = litellm.get_api_base( | |
| model=deployment_name, | |
| optional_params=( | |
| _deployment["litellm_params"] if _deployment is not None else {} | |
| ), | |
| ) | |
| if api_base is None: | |
| api_base = "" | |
| value = replaced_failed_values[top_5_failed[i]] | |
| message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n" | |
| message += "\n\n*😅 Top Slowest Deployments:*\n\n" | |
| if not top_5_slowest: | |
| message += "\tNone\n" | |
| for i in range(len(top_5_slowest)): | |
| key = latency_keys[top_5_slowest[i]].split(":")[0] | |
| _deployment = router.get_model_info(key) | |
| if _deployment is not None: | |
| deployment_name = _deployment["litellm_params"].get("model", "") | |
| else: | |
| deployment_name = "" | |
| api_base = litellm.get_api_base( | |
| model=deployment_name, | |
| optional_params=( | |
| _deployment["litellm_params"] if _deployment is not None else {} | |
| ), | |
| ) | |
| value = round(replaced_slowest_values[top_5_slowest[i]], 3) | |
| message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`, API Base: `{api_base}`\n\n" | |
| # cache cleanup -> reset values to 0 | |
| latency_cache_keys = [(key, 0) for key in latency_keys] | |
| failed_request_cache_keys = [(key, 0) for key in failed_request_keys] | |
| combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys | |
| await self.internal_usage_cache.async_set_cache_pipeline( | |
| cache_list=combined_metrics_cache_keys | |
| ) | |
| message += f"\n\nNext Run is at: `{time.time() + self.alerting_args.daily_report_frequency}`s" | |
| # send alert | |
| await self.send_alert( | |
| message=message, | |
| level="Low", | |
| alert_type=AlertType.daily_reports, | |
| alerting_metadata={}, | |
| ) | |
| return True | |
| async def response_taking_too_long( | |
| self, | |
| start_time: Optional[datetime.datetime] = None, | |
| end_time: Optional[datetime.datetime] = None, | |
| type: Literal["hanging_request", "slow_response"] = "hanging_request", | |
| request_data: Optional[dict] = None, | |
| ): | |
| if self.alerting is None or self.alert_types is None: | |
| return | |
| model: str = "" | |
| if request_data is not None: | |
| model = request_data.get("model", "") | |
| messages = request_data.get("messages", None) | |
| if messages is None: | |
| # if messages does not exist fallback to "input" | |
| messages = request_data.get("input", None) | |
| # try casting messages to str and get the first 100 characters, else mark as None | |
| try: | |
| messages = str(messages) | |
| messages = messages[:100] | |
| except Exception: | |
| messages = "" | |
| if ( | |
| litellm.turn_off_message_logging | |
| or litellm.redact_messages_in_exceptions | |
| ): | |
| messages = ( | |
| "Message not logged. litellm.redact_messages_in_exceptions=True" | |
| ) | |
| request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`" | |
| else: | |
| request_info = "" | |
| if type == "hanging_request": | |
| await asyncio.sleep( | |
| self.alerting_threshold | |
| ) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests | |
| alerting_metadata: dict = {} | |
| if await self._request_is_completed(request_data=request_data) is True: | |
| return | |
| if request_data is not None: | |
| if request_data.get("deployment", None) is not None and isinstance( | |
| request_data["deployment"], dict | |
| ): | |
| _api_base = litellm.get_api_base( | |
| model=model, | |
| optional_params=request_data["deployment"].get( | |
| "litellm_params", {} | |
| ), | |
| ) | |
| if _api_base is None: | |
| _api_base = "" | |
| request_info += f"\nAPI Base: {_api_base}" | |
| elif request_data.get("metadata", None) is not None and isinstance( | |
| request_data["metadata"], dict | |
| ): | |
| # In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data`` | |
| # in that case we fallback to the api base set in the request metadata | |
| _metadata: dict = request_data["metadata"] | |
| _api_base = _metadata.get("api_base", "") | |
| request_info = _add_key_name_and_team_to_alert( | |
| request_info=request_info, metadata=_metadata | |
| ) | |
| if _api_base is None: | |
| _api_base = "" | |
| if "alerting_metadata" in _metadata: | |
| alerting_metadata = _metadata["alerting_metadata"] | |
| request_info += f"\nAPI Base: `{_api_base}`" | |
| # only alert hanging responses if they have not been marked as success | |
| alerting_message = ( | |
| f"`Requests are hanging - {self.alerting_threshold}s+ request time`" | |
| ) | |
| if "langfuse" in litellm.success_callback: | |
| langfuse_url = await _add_langfuse_trace_id_to_alert( | |
| request_data=request_data, | |
| ) | |
| if langfuse_url is not None: | |
| request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url) | |
| # add deployment latencies to alert | |
| _deployment_latency_map = self._get_deployment_latencies_to_alert( | |
| metadata=request_data.get("metadata", {}) | |
| ) | |
| if _deployment_latency_map is not None: | |
| request_info += f"\nDeployment Latencies\n{_deployment_latency_map}" | |
| await self.send_alert( | |
| message=alerting_message + request_info, | |
| level="Medium", | |
| alert_type=AlertType.llm_requests_hanging, | |
| alerting_metadata=alerting_metadata, | |
| ) | |
| async def failed_tracking_alert(self, error_message: str, failing_model: str): | |
| """ | |
| Raise alert when tracking failed for specific model | |
| Args: | |
| error_message (str): Error message | |
| failing_model (str): Model that failed tracking | |
| """ | |
| if self.alerting is None or self.alert_types is None: | |
| # do nothing if alerting is not switched on | |
| return | |
| if "failed_tracking_spend" not in self.alert_types: | |
| return | |
| _cache: DualCache = self.internal_usage_cache | |
| message = "Failed Tracking Cost for " + error_message | |
| _cache_key = "budget_alerts:failed_tracking:{}".format(failing_model) | |
| result = await _cache.async_get_cache(key=_cache_key) | |
| if result is None: | |
| await self.send_alert( | |
| message=message, | |
| level="High", | |
| alert_type=AlertType.failed_tracking_spend, | |
| alerting_metadata={}, | |
| ) | |
| await _cache.async_set_cache( | |
| key=_cache_key, | |
| value="SENT", | |
| ttl=self.alerting_args.budget_alert_ttl, | |
| ) | |
| async def budget_alerts( # noqa: PLR0915 | |
| self, | |
| type: Literal[ | |
| "token_budget", | |
| "soft_budget", | |
| "user_budget", | |
| "team_budget", | |
| "proxy_budget", | |
| "projected_limit_exceeded", | |
| ], | |
| user_info: CallInfo, | |
| ): | |
| ## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727 | |
| # - Alert once within 24hr period | |
| # - Cache this information | |
| # - Don't re-alert, if alert already sent | |
| _cache: DualCache = self.internal_usage_cache | |
| if self.alerting is None or self.alert_types is None: | |
| # do nothing if alerting is not switched on | |
| return | |
| if "budget_alerts" not in self.alert_types: | |
| return | |
| _id: Optional[str] = "default_id" # used for caching | |
| user_info_json = user_info.model_dump(exclude_none=True) | |
| user_info_str = self._get_user_info_str(user_info) | |
| event: Optional[ | |
| Literal[ | |
| "budget_crossed", | |
| "threshold_crossed", | |
| "projected_limit_exceeded", | |
| "soft_budget_crossed", | |
| ] | |
| ] = None | |
| event_group: Optional[ | |
| Literal["internal_user", "team", "key", "proxy", "customer"] | |
| ] = None | |
| event_message: str = "" | |
| webhook_event: Optional[WebhookEvent] = None | |
| if type == "proxy_budget": | |
| event_group = "proxy" | |
| event_message += "Proxy Budget: " | |
| elif type == "soft_budget": | |
| event_group = "proxy" | |
| event_message += "Soft Budget Crossed: " | |
| elif type == "user_budget": | |
| event_group = "internal_user" | |
| event_message += "User Budget: " | |
| _id = user_info.user_id or _id | |
| elif type == "team_budget": | |
| event_group = "team" | |
| event_message += "Team Budget: " | |
| _id = user_info.team_id or _id | |
| elif type == "token_budget": | |
| event_group = "key" | |
| event_message += "Key Budget: " | |
| _id = user_info.token | |
| elif type == "projected_limit_exceeded": | |
| event_group = "key" | |
| event_message += "Key Budget: Projected Limit Exceeded" | |
| event = "projected_limit_exceeded" | |
| _id = user_info.token | |
| # percent of max_budget left to spend | |
| if user_info.max_budget is None and user_info.soft_budget is None: | |
| return | |
| percent_left: float = 0 | |
| if user_info.max_budget is not None: | |
| if user_info.max_budget > 0: | |
| percent_left = ( | |
| user_info.max_budget - user_info.spend | |
| ) / user_info.max_budget | |
| # check if crossed budget | |
| if user_info.max_budget is not None: | |
| if user_info.spend >= user_info.max_budget: | |
| event = "budget_crossed" | |
| event_message += ( | |
| f"Budget Crossed\n Total Budget:`{user_info.max_budget}`" | |
| ) | |
| elif percent_left <= SLACK_ALERTING_THRESHOLD_5_PERCENT: | |
| event = "threshold_crossed" | |
| event_message += "5% Threshold Crossed " | |
| elif percent_left <= SLACK_ALERTING_THRESHOLD_15_PERCENT: | |
| event = "threshold_crossed" | |
| event_message += "15% Threshold Crossed" | |
| elif user_info.soft_budget is not None: | |
| if user_info.spend >= user_info.soft_budget: | |
| event = "soft_budget_crossed" | |
| if event is not None and event_group is not None: | |
| _cache_key = "budget_alerts:{}:{}".format(event, _id) | |
| result = await _cache.async_get_cache(key=_cache_key) | |
| if result is None: | |
| webhook_event = WebhookEvent( | |
| event=event, | |
| event_group=event_group, | |
| event_message=event_message, | |
| **user_info_json, | |
| ) | |
| await self.send_alert( | |
| message=event_message + "\n\n" + user_info_str, | |
| level="High", | |
| alert_type=AlertType.budget_alerts, | |
| user_info=webhook_event, | |
| alerting_metadata={}, | |
| ) | |
| await _cache.async_set_cache( | |
| key=_cache_key, | |
| value="SENT", | |
| ttl=self.alerting_args.budget_alert_ttl, | |
| ) | |
| return | |
| return | |
| def _get_user_info_str(self, user_info: CallInfo) -> str: | |
| """ | |
| Create a standard message for a budget alert | |
| """ | |
| _all_fields_as_dict = user_info.model_dump(exclude_none=True) | |
| _all_fields_as_dict.pop("token") | |
| msg = "" | |
| for k, v in _all_fields_as_dict.items(): | |
| msg += f"*{k}:* `{v}`\n" | |
| return msg | |
| async def customer_spend_alert( | |
| self, | |
| token: Optional[str], | |
| key_alias: Optional[str], | |
| end_user_id: Optional[str], | |
| response_cost: Optional[float], | |
| max_budget: Optional[float], | |
| ): | |
| if ( | |
| self.alerting is not None | |
| and "webhook" in self.alerting | |
| and end_user_id is not None | |
| and token is not None | |
| and response_cost is not None | |
| ): | |
| # log customer spend | |
| event = WebhookEvent( | |
| spend=response_cost, | |
| max_budget=max_budget, | |
| token=token, | |
| customer_id=end_user_id, | |
| user_id=None, | |
| team_id=None, | |
| user_email=None, | |
| key_alias=key_alias, | |
| projected_exceeded_date=None, | |
| projected_spend=None, | |
| event="spend_tracked", | |
| event_group="customer", | |
| event_message="Customer spend tracked. Customer={}, spend={}".format( | |
| end_user_id, response_cost | |
| ), | |
| ) | |
| await self.send_webhook_alert(webhook_event=event) | |
| def _count_outage_alerts(self, alerts: List[int]) -> str: | |
| """ | |
| Parameters: | |
| - alerts: List[int] -> list of error codes (either 408 or 500+) | |
| Returns: | |
| - str -> formatted string. This is an alert message, giving a human-friendly description of the errors. | |
| """ | |
| error_breakdown = {"Timeout Errors": 0, "API Errors": 0, "Unknown Errors": 0} | |
| for alert in alerts: | |
| if alert == 408: | |
| error_breakdown["Timeout Errors"] += 1 | |
| elif alert >= 500: | |
| error_breakdown["API Errors"] += 1 | |
| else: | |
| error_breakdown["Unknown Errors"] += 1 | |
| error_msg = "" | |
| for key, value in error_breakdown.items(): | |
| if value > 0: | |
| error_msg += "\n{}: {}\n".format(key, value) | |
| return error_msg | |
| def _outage_alert_msg_factory( | |
| self, | |
| alert_type: Literal["Major", "Minor"], | |
| key: Literal["Model", "Region"], | |
| key_val: str, | |
| provider: str, | |
| api_base: Optional[str], | |
| outage_value: BaseOutageModel, | |
| ) -> str: | |
| """Format an alert message for slack""" | |
| headers = {f"{key} Name": key_val, "Provider": provider} | |
| if api_base is not None: | |
| headers["API Base"] = api_base # type: ignore | |
| headers_str = "\n" | |
| for k, v in headers.items(): | |
| headers_str += f"*{k}:* `{v}`\n" | |
| return f"""\n\n | |
| *⚠️ {alert_type} Service Outage* | |
| {headers_str} | |
| *Errors:* | |
| {self._count_outage_alerts(alerts=outage_value["alerts"])} | |
| *Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n | |
| """ | |
| async def region_outage_alerts( | |
| self, | |
| exception: APIError, | |
| deployment_id: str, | |
| ) -> None: | |
| """ | |
| Send slack alert if specific provider region is having an outage. | |
| Track for 408 (Timeout) and >=500 Error codes | |
| """ | |
| ## CREATE (PROVIDER+REGION) ID ## | |
| if self.llm_router is None: | |
| return | |
| deployment = self.llm_router.get_deployment(model_id=deployment_id) | |
| if deployment is None: | |
| return | |
| model = deployment.litellm_params.model | |
| ### GET PROVIDER ### | |
| provider = deployment.litellm_params.custom_llm_provider | |
| if provider is None: | |
| model, provider, _, _ = litellm.get_llm_provider(model=model) | |
| ### GET REGION ### | |
| region_name = deployment.litellm_params.region_name | |
| if region_name is None: | |
| region_name = litellm.utils._get_model_region( | |
| custom_llm_provider=provider, litellm_params=deployment.litellm_params | |
| ) | |
| if region_name is None: | |
| return | |
| ### UNIQUE CACHE KEY ### | |
| cache_key = provider + region_name | |
| outage_value: Optional[ | |
| ProviderRegionOutageModel | |
| ] = await self.internal_usage_cache.async_get_cache(key=cache_key) | |
| if ( | |
| getattr(exception, "status_code", None) is None | |
| or ( | |
| exception.status_code != 408 # type: ignore | |
| and exception.status_code < 500 # type: ignore | |
| ) | |
| or self.llm_router is None | |
| ): | |
| return | |
| if outage_value is None: | |
| _deployment_set = set() | |
| _deployment_set.add(deployment_id) | |
| outage_value = ProviderRegionOutageModel( | |
| provider_region_id=cache_key, | |
| alerts=[exception.status_code], # type: ignore | |
| minor_alert_sent=False, | |
| major_alert_sent=False, | |
| last_updated_at=time.time(), | |
| deployment_ids=_deployment_set, | |
| ) | |
| ## add to cache ## | |
| await self.internal_usage_cache.async_set_cache( | |
| key=cache_key, | |
| value=outage_value, | |
| ttl=self.alerting_args.region_outage_alert_ttl, | |
| ) | |
| return | |
| if len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size: | |
| outage_value["alerts"].append(exception.status_code) # type: ignore | |
| else: # prevent memory leaks | |
| pass | |
| _deployment_set = outage_value["deployment_ids"] | |
| _deployment_set.add(deployment_id) | |
| outage_value["deployment_ids"] = _deployment_set | |
| outage_value["last_updated_at"] = time.time() | |
| ## MINOR OUTAGE ALERT SENT ## | |
| if ( | |
| outage_value["minor_alert_sent"] is False | |
| and len(outage_value["alerts"]) | |
| >= self.alerting_args.minor_outage_alert_threshold | |
| and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment | |
| ): | |
| msg = self._outage_alert_msg_factory( | |
| alert_type="Minor", | |
| key="Region", | |
| key_val=region_name, | |
| api_base=None, | |
| outage_value=outage_value, | |
| provider=provider, | |
| ) | |
| # send minor alert | |
| await self.send_alert( | |
| message=msg, | |
| level="Medium", | |
| alert_type=AlertType.outage_alerts, | |
| alerting_metadata={}, | |
| ) | |
| # set to true | |
| outage_value["minor_alert_sent"] = True | |
| ## MAJOR OUTAGE ALERT SENT ## | |
| elif ( | |
| outage_value["major_alert_sent"] is False | |
| and len(outage_value["alerts"]) | |
| >= self.alerting_args.major_outage_alert_threshold | |
| and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment | |
| ): | |
| msg = self._outage_alert_msg_factory( | |
| alert_type="Major", | |
| key="Region", | |
| key_val=region_name, | |
| api_base=None, | |
| outage_value=outage_value, | |
| provider=provider, | |
| ) | |
| # send minor alert | |
| await self.send_alert( | |
| message=msg, | |
| level="High", | |
| alert_type=AlertType.outage_alerts, | |
| alerting_metadata={}, | |
| ) | |
| # set to true | |
| outage_value["major_alert_sent"] = True | |
| ## update cache ## | |
| await self.internal_usage_cache.async_set_cache( | |
| key=cache_key, value=outage_value | |
| ) | |
| async def outage_alerts( | |
| self, | |
| exception: APIError, | |
| deployment_id: str, | |
| ) -> None: | |
| """ | |
| Send slack alert if model is badly configured / having an outage (408, 401, 429, >=500). | |
| key = model_id | |
| value = { | |
| - model_id | |
| - threshold | |
| - alerts [] | |
| } | |
| ttl = 1hr | |
| max_alerts_size = 10 | |
| """ | |
| try: | |
| outage_value: Optional[OutageModel] = await self.internal_usage_cache.async_get_cache(key=deployment_id) # type: ignore | |
| if ( | |
| getattr(exception, "status_code", None) is None | |
| or ( | |
| exception.status_code != 408 # type: ignore | |
| and exception.status_code < 500 # type: ignore | |
| ) | |
| or self.llm_router is None | |
| ): | |
| return | |
| ### EXTRACT MODEL DETAILS ### | |
| deployment = self.llm_router.get_deployment(model_id=deployment_id) | |
| if deployment is None: | |
| return | |
| model = deployment.litellm_params.model | |
| provider = deployment.litellm_params.custom_llm_provider | |
| if provider is None: | |
| try: | |
| model, provider, _, _ = litellm.get_llm_provider(model=model) | |
| except Exception: | |
| provider = "" | |
| api_base = litellm.get_api_base( | |
| model=model, optional_params=deployment.litellm_params | |
| ) | |
| if outage_value is None: | |
| outage_value = OutageModel( | |
| model_id=deployment_id, | |
| alerts=[exception.status_code], # type: ignore | |
| minor_alert_sent=False, | |
| major_alert_sent=False, | |
| last_updated_at=time.time(), | |
| ) | |
| ## add to cache ## | |
| await self.internal_usage_cache.async_set_cache( | |
| key=deployment_id, | |
| value=outage_value, | |
| ttl=self.alerting_args.outage_alert_ttl, | |
| ) | |
| return | |
| if ( | |
| len(outage_value["alerts"]) | |
| < self.alerting_args.max_outage_alert_list_size | |
| ): | |
| outage_value["alerts"].append(exception.status_code) # type: ignore | |
| else: # prevent memory leaks | |
| pass | |
| outage_value["last_updated_at"] = time.time() | |
| ## MINOR OUTAGE ALERT SENT ## | |
| if ( | |
| outage_value["minor_alert_sent"] is False | |
| and len(outage_value["alerts"]) | |
| >= self.alerting_args.minor_outage_alert_threshold | |
| ): | |
| msg = self._outage_alert_msg_factory( | |
| alert_type="Minor", | |
| key="Model", | |
| key_val=model, | |
| api_base=api_base, | |
| outage_value=outage_value, | |
| provider=provider, | |
| ) | |
| # send minor alert | |
| await self.send_alert( | |
| message=msg, | |
| level="Medium", | |
| alert_type=AlertType.outage_alerts, | |
| alerting_metadata={}, | |
| ) | |
| # set to true | |
| outage_value["minor_alert_sent"] = True | |
| elif ( | |
| outage_value["major_alert_sent"] is False | |
| and len(outage_value["alerts"]) | |
| >= self.alerting_args.major_outage_alert_threshold | |
| ): | |
| msg = self._outage_alert_msg_factory( | |
| alert_type="Major", | |
| key="Model", | |
| key_val=model, | |
| api_base=api_base, | |
| outage_value=outage_value, | |
| provider=provider, | |
| ) | |
| # send minor alert | |
| await self.send_alert( | |
| message=msg, | |
| level="High", | |
| alert_type=AlertType.outage_alerts, | |
| alerting_metadata={}, | |
| ) | |
| # set to true | |
| outage_value["major_alert_sent"] = True | |
| ## update cache ## | |
| await self.internal_usage_cache.async_set_cache( | |
| key=deployment_id, value=outage_value | |
| ) | |
| except Exception: | |
| pass | |
| async def model_added_alert( | |
| self, model_name: str, litellm_model_name: str, passed_model_info: Any | |
| ): | |
| base_model_from_user = getattr(passed_model_info, "base_model", None) | |
| model_info = {} | |
| base_model = "" | |
| if base_model_from_user is not None: | |
| model_info = litellm.model_cost.get(base_model_from_user, {}) | |
| base_model = f"Base Model: `{base_model_from_user}`\n" | |
| else: | |
| model_info = litellm.model_cost.get(litellm_model_name, {}) | |
| model_info_str = "" | |
| for k, v in model_info.items(): | |
| if k == "input_cost_per_token" or k == "output_cost_per_token": | |
| # when converting to string it should not be 1.63e-06 | |
| v = "{:.8f}".format(v) | |
| model_info_str += f"{k}: {v}\n" | |
| message = f""" | |
| *🚅 New Model Added* | |
| Model Name: `{model_name}` | |
| {base_model} | |
| Usage OpenAI Python SDK: | |
| ``` | |
| import openai | |
| client = openai.OpenAI( | |
| api_key="your_api_key", | |
| base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")} | |
| ) | |
| response = client.chat.completions.create( | |
| model="{model_name}", # model to send to the proxy | |
| messages = [ | |
| {{ | |
| "role": "user", | |
| "content": "this is a test request, write a short poem" | |
| }} | |
| ] | |
| ) | |
| ``` | |
| Model Info: | |
| ``` | |
| {model_info_str} | |
| ``` | |
| """ | |
| alert_val = self.send_alert( | |
| message=message, | |
| level="Low", | |
| alert_type=AlertType.new_model_added, | |
| alerting_metadata={}, | |
| ) | |
| if alert_val is not None and asyncio.iscoroutine(alert_val): | |
| await alert_val | |
| async def model_removed_alert(self, model_name: str): | |
| pass | |
| async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool: | |
| """ | |
| Sends structured alert to webhook, if set. | |
| Currently only implemented for budget alerts | |
| Returns -> True if sent, False if not. | |
| Raises Exception | |
| - if WEBHOOK_URL is not set | |
| """ | |
| webhook_url = os.getenv("WEBHOOK_URL", None) | |
| if webhook_url is None: | |
| raise Exception("Missing webhook_url from environment") | |
| payload = webhook_event.model_dump_json() | |
| headers = {"Content-type": "application/json"} | |
| response = await self.async_http_handler.post( | |
| url=webhook_url, | |
| headers=headers, | |
| data=payload, | |
| ) | |
| if response.status_code == 200: | |
| return True | |
| else: | |
| print("Error sending webhook alert. Error=", response.text) # noqa | |
| return False | |
| async def _check_if_using_premium_email_feature( | |
| self, | |
| premium_user: bool, | |
| email_logo_url: Optional[str] = None, | |
| email_support_contact: Optional[str] = None, | |
| ): | |
| from litellm.proxy.proxy_server import CommonProxyErrors, premium_user | |
| if premium_user is not True: | |
| if email_logo_url is not None or email_support_contact is not None: | |
| raise ValueError( | |
| f"Trying to Customize Email Alerting\n {CommonProxyErrors.not_premium_user.value}" | |
| ) | |
| return | |
| async def send_key_created_or_user_invited_email( | |
| self, webhook_event: WebhookEvent | |
| ) -> bool: | |
| try: | |
| from litellm.proxy.utils import send_email | |
| if self.alerting is None or "email" not in self.alerting: | |
| # do nothing if user does not want email alerts | |
| verbose_proxy_logger.error( | |
| "Error sending email alert - 'email' not in self.alerting %s", | |
| self.alerting, | |
| ) | |
| return False | |
| from litellm.proxy.proxy_server import premium_user, prisma_client | |
| email_logo_url = os.getenv( | |
| "SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None) | |
| ) | |
| email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None) | |
| await self._check_if_using_premium_email_feature( | |
| premium_user, email_logo_url, email_support_contact | |
| ) | |
| if email_logo_url is None: | |
| email_logo_url = LITELLM_LOGO_URL | |
| if email_support_contact is None: | |
| email_support_contact = LITELLM_SUPPORT_CONTACT | |
| event_name = webhook_event.event_message | |
| recipient_email = webhook_event.user_email | |
| recipient_user_id = webhook_event.user_id | |
| if ( | |
| recipient_email is None | |
| and recipient_user_id is not None | |
| and prisma_client is not None | |
| ): | |
| user_row = await prisma_client.db.litellm_usertable.find_unique( | |
| where={"user_id": recipient_user_id} | |
| ) | |
| if user_row is not None: | |
| recipient_email = user_row.user_email | |
| key_token = webhook_event.token | |
| key_budget = webhook_event.max_budget | |
| base_url = os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000") | |
| email_html_content = "Alert from LiteLLM Server" | |
| if recipient_email is None: | |
| verbose_proxy_logger.error( | |
| "Trying to send email alert to no recipient", | |
| extra=webhook_event.dict(), | |
| ) | |
| if webhook_event.event == "key_created": | |
| email_html_content = KEY_CREATED_EMAIL_TEMPLATE.format( | |
| email_logo_url=email_logo_url, | |
| recipient_email=recipient_email, | |
| key_budget=key_budget, | |
| key_token=key_token, | |
| base_url=base_url, | |
| email_support_contact=email_support_contact, | |
| ) | |
| elif webhook_event.event == "internal_user_created": | |
| # GET TEAM NAME | |
| team_id = webhook_event.team_id | |
| team_name = "Default Team" | |
| if team_id is not None and prisma_client is not None: | |
| team_row = await prisma_client.db.litellm_teamtable.find_unique( | |
| where={"team_id": team_id} | |
| ) | |
| if team_row is not None: | |
| team_name = team_row.team_alias or "-" | |
| email_html_content = USER_INVITED_EMAIL_TEMPLATE.format( | |
| email_logo_url=email_logo_url, | |
| recipient_email=recipient_email, | |
| team_name=team_name, | |
| base_url=base_url, | |
| email_support_contact=email_support_contact, | |
| ) | |
| else: | |
| verbose_proxy_logger.error( | |
| "Trying to send email alert on unknown webhook event", | |
| extra=webhook_event.model_dump(), | |
| ) | |
| webhook_event.model_dump_json() | |
| email_event = { | |
| "to": recipient_email, | |
| "subject": f"LiteLLM: {event_name}", | |
| "html": email_html_content, | |
| } | |
| await send_email( | |
| receiver_email=email_event["to"], | |
| subject=email_event["subject"], | |
| html=email_event["html"], | |
| ) | |
| return True | |
| except Exception as e: | |
| verbose_proxy_logger.error("Error sending email alert %s", str(e)) | |
| return False | |
| async def send_email_alert_using_smtp( | |
| self, webhook_event: WebhookEvent, alert_type: str | |
| ) -> bool: | |
| """ | |
| Sends structured Email alert to an SMTP server | |
| Currently only implemented for budget alerts | |
| Returns -> True if sent, False if not. | |
| """ | |
| from litellm.proxy.proxy_server import premium_user | |
| from litellm.proxy.utils import send_email | |
| email_logo_url = os.getenv( | |
| "SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None) | |
| ) | |
| email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None) | |
| await self._check_if_using_premium_email_feature( | |
| premium_user, email_logo_url, email_support_contact | |
| ) | |
| if email_logo_url is None: | |
| email_logo_url = LITELLM_LOGO_URL | |
| if email_support_contact is None: | |
| email_support_contact = LITELLM_SUPPORT_CONTACT | |
| event_name = webhook_event.event_message | |
| recipient_email = webhook_event.user_email | |
| user_name = webhook_event.user_id | |
| max_budget = webhook_event.max_budget | |
| email_html_content = "Alert from LiteLLM Server" | |
| if recipient_email is None: | |
| verbose_proxy_logger.error( | |
| "Trying to send email alert to no recipient", extra=webhook_event.dict() | |
| ) | |
| if webhook_event.event == "budget_crossed": | |
| email_html_content = f""" | |
| <img src="{email_logo_url}" alt="LiteLLM Logo" width="150" height="50" /> | |
| <p> Hi {user_name}, <br/> | |
| Your LLM API usage this month has reached your account's <b> monthly budget of ${max_budget} </b> <br /> <br /> | |
| API requests will be rejected until either (a) you increase your monthly budget or (b) your monthly usage resets at the beginning of the next calendar month. <br /> <br /> | |
| If you have any questions, please send an email to {email_support_contact} <br /> <br /> | |
| Best, <br /> | |
| The LiteLLM team <br /> | |
| """ | |
| webhook_event.model_dump_json() | |
| email_event = { | |
| "to": recipient_email, | |
| "subject": f"LiteLLM: {event_name}", | |
| "html": email_html_content, | |
| } | |
| await send_email( | |
| receiver_email=email_event["to"], | |
| subject=email_event["subject"], | |
| html=email_event["html"], | |
| ) | |
| if webhook_event.event_group == "team": | |
| from litellm.integrations.email_alerting import send_team_budget_alert | |
| await send_team_budget_alert(webhook_event=webhook_event) | |
| return False | |
| async def send_alert( | |
| self, | |
| message: str, | |
| level: Literal["Low", "Medium", "High"], | |
| alert_type: AlertType, | |
| alerting_metadata: dict, | |
| user_info: Optional[WebhookEvent] = None, | |
| **kwargs, | |
| ): | |
| """ | |
| Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298 | |
| - Responses taking too long | |
| - Requests are hanging | |
| - Calls are failing | |
| - DB Read/Writes are failing | |
| - Proxy Close to max budget | |
| - Key Close to max budget | |
| Parameters: | |
| level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'. | |
| message: str - what is the alert about | |
| """ | |
| if self.alerting is None: | |
| return | |
| if ( | |
| "webhook" in self.alerting | |
| and alert_type == "budget_alerts" | |
| and user_info is not None | |
| ): | |
| await self.send_webhook_alert(webhook_event=user_info) | |
| if ( | |
| "email" in self.alerting | |
| and alert_type == "budget_alerts" | |
| and user_info is not None | |
| ): | |
| # only send budget alerts over Email | |
| await self.send_email_alert_using_smtp( | |
| webhook_event=user_info, alert_type=alert_type | |
| ) | |
| if "slack" not in self.alerting: | |
| return | |
| if alert_type not in self.alert_types: | |
| return | |
| from datetime import datetime | |
| # Get the current timestamp | |
| current_time = datetime.now().strftime("%H:%M:%S") | |
| _proxy_base_url = os.getenv("PROXY_BASE_URL", None) | |
| if alert_type == "daily_reports" or alert_type == "new_model_added": | |
| formatted_message = message | |
| else: | |
| formatted_message = ( | |
| f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" | |
| ) | |
| if kwargs: | |
| for key, value in kwargs.items(): | |
| formatted_message += f"\n\n{key}: `{value}`\n\n" | |
| if alerting_metadata: | |
| for key, value in alerting_metadata.items(): | |
| formatted_message += f"\n\n*Alerting Metadata*: \n{key}: `{value}`\n\n" | |
| if _proxy_base_url is not None: | |
| formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" | |
| # check if we find the slack webhook url in self.alert_to_webhook_url | |
| if ( | |
| self.alert_to_webhook_url is not None | |
| and alert_type in self.alert_to_webhook_url | |
| ): | |
| slack_webhook_url: Optional[ | |
| Union[str, List[str]] | |
| ] = self.alert_to_webhook_url[alert_type] | |
| elif self.default_webhook_url is not None: | |
| slack_webhook_url = self.default_webhook_url | |
| else: | |
| slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None) | |
| if slack_webhook_url is None: | |
| raise ValueError("Missing SLACK_WEBHOOK_URL from environment") | |
| payload = {"text": formatted_message} | |
| headers = {"Content-type": "application/json"} | |
| if isinstance(slack_webhook_url, list): | |
| for url in slack_webhook_url: | |
| self.log_queue.append( | |
| { | |
| "url": url, | |
| "headers": headers, | |
| "payload": payload, | |
| "alert_type": alert_type, | |
| } | |
| ) | |
| else: | |
| self.log_queue.append( | |
| { | |
| "url": slack_webhook_url, | |
| "headers": headers, | |
| "payload": payload, | |
| "alert_type": alert_type, | |
| } | |
| ) | |
| if len(self.log_queue) >= self.batch_size: | |
| await self.flush_queue() | |
| async def async_send_batch(self): | |
| if not self.log_queue: | |
| return | |
| squashed_queue = squash_payloads(self.log_queue) | |
| tasks = [ | |
| send_to_webhook( | |
| slackAlertingInstance=self, item=item["item"], count=item["count"] | |
| ) | |
| for item in squashed_queue.values() | |
| ] | |
| await asyncio.gather(*tasks) | |
| self.log_queue.clear() | |
| async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| """Log deployment latency""" | |
| try: | |
| if "daily_reports" in self.alert_types: | |
| litellm_params = kwargs.get("litellm_params", {}) or {} | |
| model_info = litellm_params.get("model_info", {}) or {} | |
| model_id = model_info.get("id", "") or "" | |
| response_s: timedelta = end_time - start_time | |
| final_value = response_s | |
| if isinstance(response_obj, litellm.ModelResponse) and ( | |
| hasattr(response_obj, "usage") | |
| and response_obj.usage is not None # type: ignore | |
| and hasattr(response_obj.usage, "completion_tokens") # type: ignore | |
| ): | |
| completion_tokens = response_obj.usage.completion_tokens # type: ignore | |
| if completion_tokens is not None and completion_tokens > 0: | |
| final_value = float( | |
| response_s.total_seconds() / completion_tokens | |
| ) | |
| if isinstance(final_value, timedelta): | |
| final_value = final_value.total_seconds() | |
| await self.async_update_daily_reports( | |
| DeploymentMetrics( | |
| id=model_id, | |
| failed_request=False, | |
| latency_per_output_token=final_value, | |
| updated_at=litellm.utils.get_utc_datetime(), | |
| ) | |
| ) | |
| except Exception as e: | |
| verbose_proxy_logger.error( | |
| f"[Non-Blocking Error] Slack Alerting: Got error in logging LLM deployment latency: {str(e)}" | |
| ) | |
| pass | |
| async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): | |
| """Log failure + deployment latency""" | |
| _litellm_params = kwargs.get("litellm_params", {}) | |
| _model_info = _litellm_params.get("model_info", {}) or {} | |
| model_id = _model_info.get("id", "") | |
| try: | |
| if "daily_reports" in self.alert_types: | |
| try: | |
| await self.async_update_daily_reports( | |
| DeploymentMetrics( | |
| id=model_id, | |
| failed_request=True, | |
| latency_per_output_token=None, | |
| updated_at=litellm.utils.get_utc_datetime(), | |
| ) | |
| ) | |
| except Exception as e: | |
| verbose_logger.debug(f"Exception raises -{str(e)}") | |
| if isinstance(kwargs.get("exception", ""), APIError): | |
| if "outage_alerts" in self.alert_types: | |
| await self.outage_alerts( | |
| exception=kwargs["exception"], | |
| deployment_id=model_id, | |
| ) | |
| if "region_outage_alerts" in self.alert_types: | |
| await self.region_outage_alerts( | |
| exception=kwargs["exception"], deployment_id=model_id | |
| ) | |
| except Exception: | |
| pass | |
| async def _run_scheduler_helper(self, llm_router) -> bool: | |
| """ | |
| Returns: | |
| - True -> report sent | |
| - False -> report not sent | |
| """ | |
| report_sent_bool = False | |
| report_sent = await self.internal_usage_cache.async_get_cache( | |
| key=SlackAlertingCacheKeys.report_sent_key.value, | |
| parent_otel_span=None, | |
| ) # None | float | |
| current_time = time.time() | |
| if report_sent is None: | |
| await self.internal_usage_cache.async_set_cache( | |
| key=SlackAlertingCacheKeys.report_sent_key.value, | |
| value=current_time, | |
| ) | |
| elif isinstance(report_sent, float): | |
| # Check if current time - interval >= time last sent | |
| interval_seconds = self.alerting_args.daily_report_frequency | |
| if current_time - report_sent >= interval_seconds: | |
| # Sneak in the reporting logic here | |
| await self.send_daily_reports(router=llm_router) | |
| # Also, don't forget to update the report_sent time after sending the report! | |
| await self.internal_usage_cache.async_set_cache( | |
| key=SlackAlertingCacheKeys.report_sent_key.value, | |
| value=current_time, | |
| ) | |
| report_sent_bool = True | |
| return report_sent_bool | |
| async def _run_scheduled_daily_report(self, llm_router: Optional[Any] = None): | |
| """ | |
| If 'daily_reports' enabled | |
| Ping redis cache every 5 minutes to check if we should send the report | |
| If yes -> call send_daily_report() | |
| """ | |
| if llm_router is None or self.alert_types is None: | |
| return | |
| if "daily_reports" in self.alert_types: | |
| while True: | |
| await self._run_scheduler_helper(llm_router=llm_router) | |
| interval = random.randint( | |
| self.alerting_args.report_check_interval - 3, | |
| self.alerting_args.report_check_interval + 3, | |
| ) # shuffle to prevent collisions | |
| await asyncio.sleep(interval) | |
| return | |
| async def send_weekly_spend_report( | |
| self, | |
| time_range: str = "7d", | |
| ): | |
| """ | |
| Send a spend report for a configurable time range. | |
| Args: | |
| time_range: A string specifying the time range for the report, e.g., "1d", "7d", "30d" | |
| """ | |
| if self.alerting is None or "spend_reports" not in self.alert_types: | |
| return | |
| try: | |
| from litellm.proxy.spend_tracking.spend_management_endpoints import ( | |
| _get_spend_report_for_time_range, | |
| ) | |
| # Parse the time range | |
| days = int(time_range[:-1]) | |
| if time_range[-1].lower() != "d": | |
| raise ValueError("Time range must be specified in days, e.g., '7d'") | |
| todays_date = datetime.datetime.now().date() | |
| start_date = todays_date - datetime.timedelta(days=days) | |
| _event_cache_key = f"weekly_spend_report_sent_{start_date.strftime('%Y-%m-%d')}_{todays_date.strftime('%Y-%m-%d')}" | |
| if await self.internal_usage_cache.async_get_cache(key=_event_cache_key): | |
| return | |
| _resp = await _get_spend_report_for_time_range( | |
| start_date=start_date.strftime("%Y-%m-%d"), | |
| end_date=todays_date.strftime("%Y-%m-%d"), | |
| ) | |
| if _resp is None or _resp == ([], []): | |
| return | |
| spend_per_team, spend_per_tag = _resp | |
| _spend_message = f"*💸 Spend Report for `{start_date.strftime('%m-%d-%Y')} - {todays_date.strftime('%m-%d-%Y')}` ({days} days)*\n" | |
| if spend_per_team is not None: | |
| _spend_message += "\n*Team Spend Report:*\n" | |
| for spend in spend_per_team: | |
| _team_spend = round(float(spend["total_spend"]), 4) | |
| _spend_message += ( | |
| f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" | |
| ) | |
| if spend_per_tag is not None: | |
| _spend_message += "\n*Tag Spend Report:*\n" | |
| for spend in spend_per_tag: | |
| _tag_spend = round(float(spend["total_spend"]), 4) | |
| _spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" | |
| await self.send_alert( | |
| message=_spend_message, | |
| level="Low", | |
| alert_type=AlertType.spend_reports, | |
| alerting_metadata={}, | |
| ) | |
| await self.internal_usage_cache.async_set_cache( | |
| key=_event_cache_key, | |
| value="SENT", | |
| ttl=duration_in_seconds(time_range), | |
| ) | |
| except ValueError as ve: | |
| verbose_proxy_logger.error(f"Invalid time range format: {ve}") | |
| except Exception as e: | |
| verbose_proxy_logger.error(f"Error sending spend report: {e}") | |
| async def send_monthly_spend_report(self): | |
| """ """ | |
| try: | |
| from calendar import monthrange | |
| from litellm.proxy.spend_tracking.spend_management_endpoints import ( | |
| _get_spend_report_for_time_range, | |
| ) | |
| todays_date = datetime.datetime.now().date() | |
| first_day_of_month = todays_date.replace(day=1) | |
| _, last_day_of_month = monthrange(todays_date.year, todays_date.month) | |
| last_day_of_month = first_day_of_month + datetime.timedelta( | |
| days=last_day_of_month - 1 | |
| ) | |
| _event_cache_key = f"monthly_spend_report_sent_{first_day_of_month.strftime('%Y-%m-%d')}_{last_day_of_month.strftime('%Y-%m-%d')}" | |
| if await self.internal_usage_cache.async_get_cache(key=_event_cache_key): | |
| return | |
| _resp = await _get_spend_report_for_time_range( | |
| start_date=first_day_of_month.strftime("%Y-%m-%d"), | |
| end_date=last_day_of_month.strftime("%Y-%m-%d"), | |
| ) | |
| if _resp is None or _resp == ([], []): | |
| return | |
| monthly_spend_per_team, monthly_spend_per_tag = _resp | |
| _spend_message = f"*💸 Monthly Spend Report for `{first_day_of_month.strftime('%m-%d-%Y')} - {last_day_of_month.strftime('%m-%d-%Y')}` *\n" | |
| if monthly_spend_per_team is not None: | |
| _spend_message += "\n*Team Spend Report:*\n" | |
| for spend in monthly_spend_per_team: | |
| _team_spend = spend["total_spend"] | |
| _team_spend = float(_team_spend) | |
| # round to 4 decimal places | |
| _team_spend = round(_team_spend, 4) | |
| _spend_message += ( | |
| f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" | |
| ) | |
| if monthly_spend_per_tag is not None: | |
| _spend_message += "\n*Tag Spend Report:*\n" | |
| for spend in monthly_spend_per_tag: | |
| _tag_spend = spend["total_spend"] | |
| _tag_spend = float(_tag_spend) | |
| # round to 4 decimal places | |
| _tag_spend = round(_tag_spend, 4) | |
| _spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" | |
| await self.send_alert( | |
| message=_spend_message, | |
| level="Low", | |
| alert_type=AlertType.spend_reports, | |
| alerting_metadata={}, | |
| ) | |
| await self.internal_usage_cache.async_set_cache( | |
| key=_event_cache_key, | |
| value="SENT", | |
| ttl=(30 * HOURS_IN_A_DAY * 60 * 60), # 1 month | |
| ) | |
| except Exception as e: | |
| verbose_proxy_logger.exception("Error sending weekly spend report %s", e) | |
| async def send_fallback_stats_from_prometheus(self): | |
| """ | |
| Helper to send fallback statistics from prometheus server -> to slack | |
| This runs once per day and sends an overview of all the fallback statistics | |
| """ | |
| try: | |
| from litellm.integrations.prometheus_helpers.prometheus_api import ( | |
| get_fallback_metric_from_prometheus, | |
| ) | |
| # call prometheuslogger. | |
| falllback_success_info_prometheus = ( | |
| await get_fallback_metric_from_prometheus() | |
| ) | |
| fallback_message = ( | |
| f"*Fallback Statistics:*\n{falllback_success_info_prometheus}" | |
| ) | |
| await self.send_alert( | |
| message=fallback_message, | |
| level="Low", | |
| alert_type=AlertType.fallback_reports, | |
| alerting_metadata={}, | |
| ) | |
| except Exception as e: | |
| verbose_proxy_logger.error("Error sending weekly spend report %s", e) | |
| pass | |
| async def send_virtual_key_event_slack( | |
| self, | |
| key_event: VirtualKeyEvent, | |
| alert_type: AlertType, | |
| event_name: str, | |
| ): | |
| """ | |
| Handles sending Virtual Key related alerts | |
| Example: | |
| - New Virtual Key Created | |
| - Internal User Updated | |
| - Team Created, Updated, Deleted | |
| """ | |
| try: | |
| message = f"`{event_name}`\n" | |
| key_event_dict = key_event.model_dump() | |
| # Add Created by information first | |
| message += "*Action Done by:*\n" | |
| for key, value in key_event_dict.items(): | |
| if "created_by" in key: | |
| message += f"{key}: `{value}`\n" | |
| # Add args sent to function in the alert | |
| message += "\n*Arguments passed:*\n" | |
| request_kwargs = key_event.request_kwargs | |
| for key, value in request_kwargs.items(): | |
| if key == "user_api_key_dict": | |
| continue | |
| message += f"{key}: `{value}`\n" | |
| await self.send_alert( | |
| message=message, | |
| level="High", | |
| alert_type=alert_type, | |
| alerting_metadata={}, | |
| ) | |
| except Exception as e: | |
| verbose_proxy_logger.error( | |
| "Error sending send_virtual_key_event_slack %s", e | |
| ) | |
| return | |
| async def _request_is_completed(self, request_data: Optional[dict]) -> bool: | |
| """ | |
| Returns True if the request is completed - either as a success or failure | |
| """ | |
| if request_data is None: | |
| return False | |
| if ( | |
| request_data.get("litellm_status", "") != "success" | |
| and request_data.get("litellm_status", "") != "fail" | |
| ): | |
| ## CHECK IF CACHE IS UPDATED | |
| litellm_call_id = request_data.get("litellm_call_id", "") | |
| status: Optional[str] = await self.internal_usage_cache.async_get_cache( | |
| key="request_status:{}".format(litellm_call_id), local_only=True | |
| ) | |
| if status is not None and (status == "success" or status == "fail"): | |
| return True | |
| return False | |