Spaces:
Paused
Paused
| """ | |
| KEY MANAGEMENT | |
| All /key management endpoints | |
| /key/generate | |
| /key/info | |
| /key/update | |
| /key/delete | |
| """ | |
| import asyncio | |
| import copy | |
| import json | |
| import secrets | |
| import traceback | |
| import uuid | |
| from datetime import datetime, timedelta, timezone | |
| from typing import List, Literal, Optional, Tuple, cast | |
| import fastapi | |
| from fastapi import APIRouter, Depends, Header, HTTPException, Query, Request, status | |
| import litellm | |
| from litellm._logging import verbose_proxy_logger | |
| from litellm.caching import DualCache | |
| from litellm.constants import LENGTH_OF_LITELLM_GENERATED_KEY, UI_SESSION_TOKEN_TEAM_ID | |
| from litellm.litellm_core_utils.duration_parser import duration_in_seconds | |
| from litellm.proxy._types import * | |
| from litellm.proxy.auth.auth_checks import ( | |
| _cache_key_object, | |
| _delete_cache_key_object, | |
| get_key_object, | |
| get_team_object, | |
| ) | |
| from litellm.proxy.auth.user_api_key_auth import user_api_key_auth | |
| from litellm.proxy.hooks.key_management_event_hooks import KeyManagementEventHooks | |
| from litellm.proxy.management_endpoints.common_utils import ( | |
| _is_user_team_admin, | |
| _set_object_metadata_field, | |
| ) | |
| from litellm.proxy.management_endpoints.model_management_endpoints import ( | |
| _add_model_to_db, | |
| ) | |
| from litellm.proxy.management_helpers.team_member_permission_checks import ( | |
| TeamMemberPermissionChecks, | |
| ) | |
| from litellm.proxy.management_helpers.utils import management_endpoint_wrapper | |
| from litellm.proxy.spend_tracking.spend_tracking_utils import _is_master_key | |
| from litellm.proxy.utils import ( | |
| PrismaClient, | |
| _hash_token_if_needed, | |
| handle_exception_on_proxy, | |
| jsonify_object, | |
| ) | |
| from litellm.router import Router | |
| from litellm.secret_managers.main import get_secret | |
| from litellm.types.router import Deployment | |
| from litellm.types.utils import ( | |
| BudgetConfig, | |
| PersonalUIKeyGenerationConfig, | |
| TeamUIKeyGenerationConfig, | |
| ) | |
| def _is_team_key(data: Union[GenerateKeyRequest, LiteLLM_VerificationToken]): | |
| return data.team_id is not None | |
| def _get_user_in_team( | |
| team_table: LiteLLM_TeamTableCachedObj, user_id: Optional[str] | |
| ) -> Optional[Member]: | |
| if user_id is None: | |
| return None | |
| for member in team_table.members_with_roles: | |
| if member.user_id is not None and member.user_id == user_id: | |
| return member | |
| return None | |
| def _is_allowed_to_make_key_request( | |
| user_api_key_dict: UserAPIKeyAuth, user_id: Optional[str], team_id: Optional[str] | |
| ) -> bool: | |
| """ | |
| Assert user only creates keys for themselves | |
| Relevant issue: https://github.com/BerriAI/litellm/issues/7336 | |
| """ | |
| ## BASE CASE - PROXY ADMIN | |
| if ( | |
| user_api_key_dict.user_role is not None | |
| and user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value | |
| ): | |
| return True | |
| if user_id is not None: | |
| assert ( | |
| user_id == user_api_key_dict.user_id | |
| ), "User can only create keys for themselves. Got user_id={}, Your ID={}".format( | |
| user_id, user_api_key_dict.user_id | |
| ) | |
| if team_id is not None: | |
| if ( | |
| user_api_key_dict.team_id is not None | |
| and user_api_key_dict.team_id == UI_TEAM_ID | |
| ): | |
| return True # handle https://github.com/BerriAI/litellm/issues/7482 | |
| return True | |
| def _team_key_operation_team_member_check( | |
| assigned_user_id: Optional[str], | |
| team_table: LiteLLM_TeamTableCachedObj, | |
| user_api_key_dict: UserAPIKeyAuth, | |
| team_key_generation: TeamUIKeyGenerationConfig, | |
| route: KeyManagementRoutes, | |
| ): | |
| if assigned_user_id is not None: | |
| key_assigned_user_in_team = _get_user_in_team( | |
| team_table=team_table, user_id=assigned_user_id | |
| ) | |
| if key_assigned_user_in_team is None: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"User={assigned_user_id} not assigned to team={team_table.team_id}", | |
| ) | |
| team_member_object = _get_user_in_team( | |
| team_table=team_table, user_id=user_api_key_dict.user_id | |
| ) | |
| is_admin = ( | |
| user_api_key_dict.user_role is not None | |
| and user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value | |
| ) | |
| if is_admin: | |
| return True | |
| elif team_member_object is None: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"User={user_api_key_dict.user_id} not assigned to team={team_table.team_id}", | |
| ) | |
| elif ( | |
| "allowed_team_member_roles" in team_key_generation | |
| and team_member_object.role | |
| not in team_key_generation["allowed_team_member_roles"] | |
| ): | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Team member role {team_member_object.role} not in allowed_team_member_roles={team_key_generation['allowed_team_member_roles']}", | |
| ) | |
| TeamMemberPermissionChecks.does_team_member_have_permissions_for_endpoint( | |
| team_member_object=team_member_object, | |
| team_table=team_table, | |
| route=route, | |
| ) | |
| return True | |
| def _key_generation_required_param_check( | |
| data: GenerateKeyRequest, required_params: Optional[List[str]] | |
| ): | |
| if required_params is None: | |
| return True | |
| data_dict = data.model_dump(exclude_unset=True) | |
| for param in required_params: | |
| if param not in data_dict: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Required param {param} not in data", | |
| ) | |
| return True | |
| def _team_key_generation_check( | |
| team_table: LiteLLM_TeamTableCachedObj, | |
| user_api_key_dict: UserAPIKeyAuth, | |
| data: GenerateKeyRequest, | |
| route: KeyManagementRoutes, | |
| ): | |
| if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value: | |
| return True | |
| if ( | |
| litellm.key_generation_settings is not None | |
| and "team_key_generation" in litellm.key_generation_settings | |
| ): | |
| _team_key_generation = litellm.key_generation_settings["team_key_generation"] | |
| else: | |
| _team_key_generation = TeamUIKeyGenerationConfig( | |
| allowed_team_member_roles=["admin", "user"], | |
| ) | |
| _team_key_operation_team_member_check( | |
| assigned_user_id=data.user_id, | |
| team_table=team_table, | |
| user_api_key_dict=user_api_key_dict, | |
| team_key_generation=_team_key_generation, | |
| route=route, | |
| ) | |
| _key_generation_required_param_check( | |
| data, | |
| _team_key_generation.get("required_params"), | |
| ) | |
| return True | |
| def _personal_key_membership_check( | |
| user_api_key_dict: UserAPIKeyAuth, | |
| personal_key_generation: Optional[PersonalUIKeyGenerationConfig], | |
| ): | |
| if ( | |
| personal_key_generation is None | |
| or "allowed_user_roles" not in personal_key_generation | |
| ): | |
| return True | |
| if user_api_key_dict.user_role not in personal_key_generation["allowed_user_roles"]: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Personal key creation has been restricted by admin. Allowed roles={litellm.key_generation_settings['personal_key_generation']['allowed_user_roles']}. Your role={user_api_key_dict.user_role}", # type: ignore | |
| ) | |
| return True | |
| def _personal_key_generation_check( | |
| user_api_key_dict: UserAPIKeyAuth, data: GenerateKeyRequest | |
| ): | |
| if ( | |
| litellm.key_generation_settings is None | |
| or litellm.key_generation_settings.get("personal_key_generation") is None | |
| ): | |
| return True | |
| _personal_key_generation = litellm.key_generation_settings["personal_key_generation"] # type: ignore | |
| _personal_key_membership_check( | |
| user_api_key_dict, | |
| personal_key_generation=_personal_key_generation, | |
| ) | |
| _key_generation_required_param_check( | |
| data, | |
| _personal_key_generation.get("required_params"), | |
| ) | |
| return True | |
| def key_generation_check( | |
| team_table: Optional[LiteLLM_TeamTableCachedObj], | |
| user_api_key_dict: UserAPIKeyAuth, | |
| data: GenerateKeyRequest, | |
| route: KeyManagementRoutes, | |
| ) -> bool: | |
| """ | |
| Check if admin has restricted key creation to certain roles for teams or individuals | |
| """ | |
| ## check if key is for team or individual | |
| is_team_key = _is_team_key(data=data) | |
| if is_team_key: | |
| if team_table is None and litellm.key_generation_settings is not None: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unable to find team object in database. Team ID: {data.team_id}", | |
| ) | |
| elif team_table is None: | |
| return True # assume user is assigning team_id without using the team table | |
| return _team_key_generation_check( | |
| team_table=team_table, | |
| user_api_key_dict=user_api_key_dict, | |
| data=data, | |
| route=route, | |
| ) | |
| else: | |
| return _personal_key_generation_check( | |
| user_api_key_dict=user_api_key_dict, data=data | |
| ) | |
| def common_key_access_checks( | |
| user_api_key_dict: UserAPIKeyAuth, | |
| data: Union[GenerateKeyRequest, UpdateKeyRequest], | |
| llm_router: Optional[Router], | |
| premium_user: bool, | |
| ) -> Literal[True]: | |
| """ | |
| Check if user is allowed to make a key request, for this key | |
| """ | |
| try: | |
| _is_allowed_to_make_key_request( | |
| user_api_key_dict=user_api_key_dict, | |
| user_id=data.user_id, | |
| team_id=data.team_id, | |
| ) | |
| except AssertionError as e: | |
| raise HTTPException( | |
| status_code=403, | |
| detail=str(e), | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=str(e), | |
| ) | |
| _check_model_access_group( | |
| models=data.models, | |
| llm_router=llm_router, | |
| premium_user=premium_user, | |
| ) | |
| return True | |
| router = APIRouter() | |
| async def generate_key_fn( # noqa: PLR0915 | |
| data: GenerateKeyRequest, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| litellm_changed_by: Optional[str] = Header( | |
| None, | |
| description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability", | |
| ), | |
| ): | |
| """ | |
| Generate an API key based on the provided data. | |
| Docs: https://docs.litellm.ai/docs/proxy/virtual_keys | |
| Parameters: | |
| - duration: Optional[str] - Specify the length of time the token is valid for. You can set duration as seconds ("30s"), minutes ("30m"), hours ("30h"), days ("30d"). | |
| - key_alias: Optional[str] - User defined key alias | |
| - key: Optional[str] - User defined key value. If not set, a 16-digit unique sk-key is created for you. | |
| - team_id: Optional[str] - The team id of the key | |
| - user_id: Optional[str] - The user id of the key | |
| - budget_id: Optional[str] - The budget id associated with the key. Created by calling `/budget/new`. | |
| - models: Optional[list] - Model_name's a user is allowed to call. (if empty, key is allowed to call all models) | |
| - aliases: Optional[dict] - Any alias mappings, on top of anything in the config.yaml model list. - https://docs.litellm.ai/docs/proxy/virtual_keys#managing-auth---upgradedowngrade-models | |
| - config: Optional[dict] - any key-specific configs, overrides config in config.yaml | |
| - spend: Optional[int] - Amount spent by key. Default is 0. Will be updated by proxy whenever key is used. https://docs.litellm.ai/docs/proxy/virtual_keys#managing-auth---tracking-spend | |
| - send_invite_email: Optional[bool] - Whether to send an invite email to the user_id, with the generate key | |
| - max_budget: Optional[float] - Specify max budget for a given key. | |
| - budget_duration: Optional[str] - Budget is reset at the end of specified duration. If not set, budget is never reset. You can set duration as seconds ("30s"), minutes ("30m"), hours ("30h"), days ("30d"). | |
| - max_parallel_requests: Optional[int] - Rate limit a user based on the number of parallel requests. Raises 429 error, if user's parallel requests > x. | |
| - metadata: Optional[dict] - Metadata for key, store information for key. Example metadata = {"team": "core-infra", "app": "app2", "email": "ishaan@berri.ai" } | |
| - guardrails: Optional[List[str]] - List of active guardrails for the key | |
| - permissions: Optional[dict] - key-specific permissions. Currently just used for turning off pii masking (if connected). Example - {"pii": false} | |
| - model_max_budget: Optional[Dict[str, BudgetConfig]] - Model-specific budgets {"gpt-4": {"budget_limit": 0.0005, "time_period": "30d"}}}. IF null or {} then no model specific budget. | |
| - model_rpm_limit: Optional[dict] - key-specific model rpm limit. Example - {"text-davinci-002": 1000, "gpt-3.5-turbo": 1000}. IF null or {} then no model specific rpm limit. | |
| - model_tpm_limit: Optional[dict] - key-specific model tpm limit. Example - {"text-davinci-002": 1000, "gpt-3.5-turbo": 1000}. IF null or {} then no model specific tpm limit. | |
| - allowed_cache_controls: Optional[list] - List of allowed cache control values. Example - ["no-cache", "no-store"]. See all values - https://docs.litellm.ai/docs/proxy/caching#turn-on--off-caching-per-request | |
| - blocked: Optional[bool] - Whether the key is blocked. | |
| - rpm_limit: Optional[int] - Specify rpm limit for a given key (Requests per minute) | |
| - tpm_limit: Optional[int] - Specify tpm limit for a given key (Tokens per minute) | |
| - soft_budget: Optional[float] - Specify soft budget for a given key. Will trigger a slack alert when this soft budget is reached. | |
| - tags: Optional[List[str]] - Tags for [tracking spend](https://litellm.vercel.app/docs/proxy/enterprise#tracking-spend-for-custom-tags) and/or doing [tag-based routing](https://litellm.vercel.app/docs/proxy/tag_routing). | |
| - enforced_params: Optional[List[str]] - List of enforced params for the key (Enterprise only). [Docs](https://docs.litellm.ai/docs/proxy/enterprise#enforce-required-params-for-llm-requests) | |
| - allowed_routes: Optional[list] - List of allowed routes for the key. Store the actual route or store a wildcard pattern for a set of routes. Example - ["/chat/completions", "/embeddings", "/keys/*"] | |
| Examples: | |
| 1. Allow users to turn on/off pii masking | |
| ```bash | |
| curl --location 'http://0.0.0.0:4000/key/generate' \ | |
| --header 'Authorization: Bearer sk-1234' \ | |
| --header 'Content-Type: application/json' \ | |
| --data '{ | |
| "permissions": {"allow_pii_controls": true} | |
| }' | |
| ``` | |
| Returns: | |
| - key: (str) The generated api key | |
| - expires: (datetime) Datetime object for when key expires. | |
| - user_id: (str) Unique user id - used for tracking spend across multiple keys for same user id. | |
| """ | |
| try: | |
| from litellm.proxy.proxy_server import ( | |
| litellm_proxy_admin_name, | |
| llm_router, | |
| premium_user, | |
| prisma_client, | |
| user_api_key_cache, | |
| user_custom_key_generate, | |
| ) | |
| verbose_proxy_logger.debug("entered /key/generate") | |
| if user_custom_key_generate is not None: | |
| if asyncio.iscoroutinefunction(user_custom_key_generate): | |
| result = await user_custom_key_generate(data) # type: ignore | |
| else: | |
| raise ValueError("user_custom_key_generate must be a coroutine") | |
| decision = result.get("decision", True) | |
| message = result.get("message", "Authentication Failed - Custom Auth Rule") | |
| if not decision: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail=message | |
| ) | |
| team_table: Optional[LiteLLM_TeamTableCachedObj] = None | |
| if data.team_id is not None: | |
| try: | |
| team_table = await get_team_object( | |
| team_id=data.team_id, | |
| prisma_client=prisma_client, | |
| user_api_key_cache=user_api_key_cache, | |
| parent_otel_span=user_api_key_dict.parent_otel_span, | |
| check_db_only=True, | |
| ) | |
| except Exception as e: | |
| verbose_proxy_logger.debug( | |
| f"Error getting team object in `/key/generate`: {e}" | |
| ) | |
| team_table = None | |
| key_generation_check( | |
| team_table=team_table, | |
| user_api_key_dict=user_api_key_dict, | |
| data=data, | |
| route=KeyManagementRoutes.KEY_GENERATE, | |
| ) | |
| common_key_access_checks( | |
| user_api_key_dict=user_api_key_dict, | |
| data=data, | |
| llm_router=llm_router, | |
| premium_user=premium_user, | |
| ) | |
| # check if user set default key/generate params on config.yaml | |
| if litellm.default_key_generate_params is not None: | |
| for elem in data: | |
| key, value = elem | |
| if value is None and key in [ | |
| "max_budget", | |
| "user_id", | |
| "team_id", | |
| "max_parallel_requests", | |
| "tpm_limit", | |
| "rpm_limit", | |
| "budget_duration", | |
| ]: | |
| setattr( | |
| data, key, litellm.default_key_generate_params.get(key, None) | |
| ) | |
| elif key == "models" and value == []: | |
| setattr(data, key, litellm.default_key_generate_params.get(key, [])) | |
| elif key == "metadata" and value == {}: | |
| setattr(data, key, litellm.default_key_generate_params.get(key, {})) | |
| # check if user set default key/generate params on config.yaml | |
| if litellm.upperbound_key_generate_params is not None: | |
| for elem in data: | |
| key, value = elem | |
| upperbound_value = getattr( | |
| litellm.upperbound_key_generate_params, key, None | |
| ) | |
| if upperbound_value is not None: | |
| if value is None: | |
| # Use the upperbound value if user didn't provide a value | |
| setattr(data, key, upperbound_value) | |
| else: | |
| # Compare with upperbound for numeric fields | |
| if key in [ | |
| "max_budget", | |
| "max_parallel_requests", | |
| "tpm_limit", | |
| "rpm_limit", | |
| ]: | |
| if value > upperbound_value: | |
| raise HTTPException( | |
| status_code=400, | |
| detail={ | |
| "error": f"{key} is over max limit set in config - user_value={value}; max_value={upperbound_value}" | |
| }, | |
| ) | |
| # Compare durations | |
| elif key in ["budget_duration", "duration"]: | |
| upperbound_duration = duration_in_seconds( | |
| duration=upperbound_value | |
| ) | |
| user_duration = duration_in_seconds(duration=value) | |
| if user_duration > upperbound_duration: | |
| raise HTTPException( | |
| status_code=400, | |
| detail={ | |
| "error": f"{key} is over max limit set in config - user_value={value}; max_value={upperbound_value}" | |
| }, | |
| ) | |
| # TODO: @ishaan-jaff: Migrate all budget tracking to use LiteLLM_BudgetTable | |
| _budget_id = data.budget_id | |
| if prisma_client is not None and data.soft_budget is not None: | |
| # create the Budget Row for the LiteLLM Verification Token | |
| budget_row = LiteLLM_BudgetTable( | |
| soft_budget=data.soft_budget, | |
| model_max_budget=data.model_max_budget or {}, | |
| ) | |
| new_budget = prisma_client.jsonify_object( | |
| budget_row.json(exclude_none=True) | |
| ) | |
| _budget = await prisma_client.db.litellm_budgettable.create( | |
| data={ | |
| **new_budget, # type: ignore | |
| "created_by": user_api_key_dict.user_id or litellm_proxy_admin_name, | |
| "updated_by": user_api_key_dict.user_id or litellm_proxy_admin_name, | |
| } | |
| ) | |
| _budget_id = getattr(_budget, "budget_id", None) | |
| # ADD METADATA FIELDS | |
| # Set Management Endpoint Metadata Fields | |
| for field in LiteLLM_ManagementEndpoint_MetadataFields_Premium: | |
| if getattr(data, field) is not None: | |
| _set_object_metadata_field( | |
| object_data=data, | |
| field_name=field, | |
| value=getattr(data, field), | |
| ) | |
| data_json = data.model_dump(exclude_unset=True, exclude_none=True) # type: ignore | |
| # if we get max_budget passed to /key/generate, then use it as key_max_budget. Since generate_key_helper_fn is used to make new users | |
| if "max_budget" in data_json: | |
| data_json["key_max_budget"] = data_json.pop("max_budget", None) | |
| if _budget_id is not None: | |
| data_json["budget_id"] = _budget_id | |
| if "budget_duration" in data_json: | |
| data_json["key_budget_duration"] = data_json.pop("budget_duration", None) | |
| if user_api_key_dict.user_id is not None: | |
| data_json["created_by"] = user_api_key_dict.user_id | |
| data_json["updated_by"] = user_api_key_dict.user_id | |
| # Set tags on the new key | |
| if "tags" in data_json: | |
| from litellm.proxy.proxy_server import premium_user | |
| if premium_user is not True and data_json["tags"] is not None: | |
| raise ValueError( | |
| f"Only premium users can add tags to keys. {CommonProxyErrors.not_premium_user.value}" | |
| ) | |
| _metadata = data_json.get("metadata") | |
| if not _metadata: | |
| data_json["metadata"] = {"tags": data_json["tags"]} | |
| else: | |
| data_json["metadata"]["tags"] = data_json["tags"] | |
| data_json.pop("tags") | |
| await _enforce_unique_key_alias( | |
| key_alias=data_json.get("key_alias", None), | |
| prisma_client=prisma_client, | |
| ) | |
| response = await generate_key_helper_fn( | |
| request_type="key", **data_json, table_name="key" | |
| ) | |
| response[ | |
| "soft_budget" | |
| ] = data.soft_budget # include the user-input soft budget in the response | |
| response = GenerateKeyResponse(**response) | |
| response.token = ( | |
| response.token_id | |
| ) # remap token to use the hash, and leave the key in the `key` field [TODO]: clean up generate_key_helper_fn to do this | |
| asyncio.create_task( | |
| KeyManagementEventHooks.async_key_generated_hook( | |
| data=data, | |
| response=response, | |
| user_api_key_dict=user_api_key_dict, | |
| litellm_changed_by=litellm_changed_by, | |
| ) | |
| ) | |
| return response | |
| except Exception as e: | |
| verbose_proxy_logger.exception( | |
| "litellm.proxy.proxy_server.generate_key_fn(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| raise handle_exception_on_proxy(e) | |
| def prepare_metadata_fields( | |
| data: BaseModel, non_default_values: dict, existing_metadata: dict | |
| ) -> dict: | |
| """ | |
| Check LiteLLM_ManagementEndpoint_MetadataFields (proxy/_types.py) for fields that are allowed to be updated | |
| """ | |
| if "metadata" not in non_default_values: # allow user to set metadata to none | |
| non_default_values["metadata"] = existing_metadata.copy() | |
| casted_metadata = cast(dict, non_default_values["metadata"]) | |
| data_json = data.model_dump(exclude_unset=True, exclude_none=True) | |
| try: | |
| for k, v in data_json.items(): | |
| if k in LiteLLM_ManagementEndpoint_MetadataFields: | |
| if isinstance(v, datetime): | |
| casted_metadata[k] = v.isoformat() | |
| else: | |
| casted_metadata[k] = v | |
| except Exception as e: | |
| verbose_proxy_logger.exception( | |
| "litellm.proxy.proxy_server.prepare_metadata_fields(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| non_default_values["metadata"] = casted_metadata | |
| return non_default_values | |
| def prepare_key_update_data( | |
| data: Union[UpdateKeyRequest, RegenerateKeyRequest], existing_key_row | |
| ): | |
| data_json: dict = data.model_dump(exclude_unset=True) | |
| data_json.pop("key", None) | |
| non_default_values = {} | |
| for k, v in data_json.items(): | |
| if k in LiteLLM_ManagementEndpoint_MetadataFields: | |
| continue | |
| non_default_values[k] = v | |
| if "duration" in non_default_values: | |
| duration = non_default_values.pop("duration") | |
| if duration and (isinstance(duration, str)) and len(duration) > 0: | |
| duration_s = duration_in_seconds(duration=duration) | |
| expires = datetime.now(timezone.utc) + timedelta(seconds=duration_s) | |
| non_default_values["expires"] = expires | |
| if "budget_duration" in non_default_values: | |
| budget_duration = non_default_values.pop("budget_duration") | |
| if ( | |
| budget_duration | |
| and (isinstance(budget_duration, str)) | |
| and len(budget_duration) > 0 | |
| ): | |
| from litellm.proxy.common_utils.timezone_utils import get_budget_reset_time | |
| key_reset_at = get_budget_reset_time(budget_duration=budget_duration) | |
| non_default_values["budget_reset_at"] = key_reset_at | |
| non_default_values["budget_duration"] = budget_duration | |
| _metadata = existing_key_row.metadata or {} | |
| # validate model_max_budget | |
| if "model_max_budget" in non_default_values: | |
| validate_model_max_budget(non_default_values["model_max_budget"]) | |
| non_default_values = prepare_metadata_fields( | |
| data=data, non_default_values=non_default_values, existing_metadata=_metadata | |
| ) | |
| return non_default_values | |
| async def update_key_fn( | |
| request: Request, | |
| data: UpdateKeyRequest, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| litellm_changed_by: Optional[str] = Header( | |
| None, | |
| description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability", | |
| ), | |
| ): | |
| """ | |
| Update an existing API key's parameters. | |
| Parameters: | |
| - key: str - The key to update | |
| - key_alias: Optional[str] - User-friendly key alias | |
| - user_id: Optional[str] - User ID associated with key | |
| - team_id: Optional[str] - Team ID associated with key | |
| - budget_id: Optional[str] - The budget id associated with the key. Created by calling `/budget/new`. | |
| - models: Optional[list] - Model_name's a user is allowed to call | |
| - tags: Optional[List[str]] - Tags for organizing keys (Enterprise only) | |
| - enforced_params: Optional[List[str]] - List of enforced params for the key (Enterprise only). [Docs](https://docs.litellm.ai/docs/proxy/enterprise#enforce-required-params-for-llm-requests) | |
| - spend: Optional[float] - Amount spent by key | |
| - max_budget: Optional[float] - Max budget for key | |
| - model_max_budget: Optional[Dict[str, BudgetConfig]] - Model-specific budgets {"gpt-4": {"budget_limit": 0.0005, "time_period": "30d"}} | |
| - budget_duration: Optional[str] - Budget reset period ("30d", "1h", etc.) | |
| - soft_budget: Optional[float] - [TODO] Soft budget limit (warning vs. hard stop). Will trigger a slack alert when this soft budget is reached. | |
| - max_parallel_requests: Optional[int] - Rate limit for parallel requests | |
| - metadata: Optional[dict] - Metadata for key. Example {"team": "core-infra", "app": "app2"} | |
| - tpm_limit: Optional[int] - Tokens per minute limit | |
| - rpm_limit: Optional[int] - Requests per minute limit | |
| - model_rpm_limit: Optional[dict] - Model-specific RPM limits {"gpt-4": 100, "claude-v1": 200} | |
| - model_tpm_limit: Optional[dict] - Model-specific TPM limits {"gpt-4": 100000, "claude-v1": 200000} | |
| - allowed_cache_controls: Optional[list] - List of allowed cache control values | |
| - duration: Optional[str] - Key validity duration ("30d", "1h", etc.) | |
| - permissions: Optional[dict] - Key-specific permissions | |
| - send_invite_email: Optional[bool] - Send invite email to user_id | |
| - guardrails: Optional[List[str]] - List of active guardrails for the key | |
| - blocked: Optional[bool] - Whether the key is blocked | |
| - aliases: Optional[dict] - Model aliases for the key - [Docs](https://litellm.vercel.app/docs/proxy/virtual_keys#model-aliases) | |
| - config: Optional[dict] - [DEPRECATED PARAM] Key-specific config. | |
| - temp_budget_increase: Optional[float] - Temporary budget increase for the key (Enterprise only). | |
| - temp_budget_expiry: Optional[str] - Expiry time for the temporary budget increase (Enterprise only). | |
| - allowed_routes: Optional[list] - List of allowed routes for the key. Store the actual route or store a wildcard pattern for a set of routes. Example - ["/chat/completions", "/embeddings", "/keys/*"] | |
| Example: | |
| ```bash | |
| curl --location 'http://0.0.0.0:4000/key/update' \ | |
| --header 'Authorization: Bearer sk-1234' \ | |
| --header 'Content-Type: application/json' \ | |
| --data '{ | |
| "key": "sk-1234", | |
| "key_alias": "my-key", | |
| "user_id": "user-1234", | |
| "team_id": "team-1234", | |
| "max_budget": 100, | |
| "metadata": {"any_key": "any-val"}, | |
| }' | |
| ``` | |
| """ | |
| from litellm.proxy.proxy_server import ( | |
| llm_router, | |
| premium_user, | |
| prisma_client, | |
| proxy_logging_obj, | |
| user_api_key_cache, | |
| ) | |
| try: | |
| data_json: dict = data.model_dump(exclude_unset=True, exclude_none=True) | |
| key = data_json.pop("key") | |
| # get the row from db | |
| if prisma_client is None: | |
| raise Exception("Not connected to DB!") | |
| common_key_access_checks( | |
| user_api_key_dict=user_api_key_dict, | |
| data=data, | |
| llm_router=llm_router, | |
| premium_user=premium_user, | |
| ) | |
| existing_key_row = await prisma_client.get_data( | |
| token=data.key, table_name="key", query_type="find_unique" | |
| ) | |
| if existing_key_row is None: | |
| raise HTTPException( | |
| status_code=404, | |
| detail={"error": f"Team not found, passed team_id={data.team_id}"}, | |
| ) | |
| # check if user has permission to update key | |
| await TeamMemberPermissionChecks.can_team_member_execute_key_management_endpoint( | |
| user_api_key_dict=user_api_key_dict, | |
| route=KeyManagementRoutes.KEY_UPDATE, | |
| prisma_client=prisma_client, | |
| existing_key_row=existing_key_row, | |
| user_api_key_cache=user_api_key_cache, | |
| ) | |
| non_default_values = prepare_key_update_data( | |
| data=data, existing_key_row=existing_key_row | |
| ) | |
| await _enforce_unique_key_alias( | |
| key_alias=non_default_values.get("key_alias", None), | |
| prisma_client=prisma_client, | |
| existing_key_token=existing_key_row.token, | |
| ) | |
| _data = {**non_default_values, "token": key} | |
| response = await prisma_client.update_data(token=key, data=_data) | |
| # Delete - key from cache, since it's been updated! | |
| # key updated - a new model could have been added to this key. it should not block requests after this is done | |
| await _delete_cache_key_object( | |
| hashed_token=hash_token(key), | |
| user_api_key_cache=user_api_key_cache, | |
| proxy_logging_obj=proxy_logging_obj, | |
| ) | |
| asyncio.create_task( | |
| KeyManagementEventHooks.async_key_updated_hook( | |
| data=data, | |
| existing_key_row=existing_key_row, | |
| response=response, | |
| user_api_key_dict=user_api_key_dict, | |
| litellm_changed_by=litellm_changed_by, | |
| ) | |
| ) | |
| if response is None: | |
| raise ValueError("Failed to update key got response = None") | |
| return {"key": key, **response["data"]} | |
| # update based on remaining passed in values | |
| except Exception as e: | |
| verbose_proxy_logger.exception( | |
| "litellm.proxy.proxy_server.update_key_fn(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| if isinstance(e, HTTPException): | |
| raise ProxyException( | |
| message=getattr(e, "detail", f"Authentication Error({str(e)})"), | |
| type=ProxyErrorTypes.auth_error, | |
| param=getattr(e, "param", "None"), | |
| code=getattr(e, "status_code", status.HTTP_400_BAD_REQUEST), | |
| ) | |
| elif isinstance(e, ProxyException): | |
| raise e | |
| raise ProxyException( | |
| message="Authentication Error, " + str(e), | |
| type=ProxyErrorTypes.auth_error, | |
| param=getattr(e, "param", "None"), | |
| code=status.HTTP_400_BAD_REQUEST, | |
| ) | |
| async def delete_key_fn( | |
| data: KeyRequest, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| litellm_changed_by: Optional[str] = Header( | |
| None, | |
| description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability", | |
| ), | |
| ): | |
| """ | |
| Delete a key from the key management system. | |
| Parameters:: | |
| - keys (List[str]): A list of keys or hashed keys to delete. Example {"keys": ["sk-QWrxEynunsNpV1zT48HIrw", "837e17519f44683334df5291321d97b8bf1098cd490e49e215f6fea935aa28be"]} | |
| - key_aliases (List[str]): A list of key aliases to delete. Can be passed instead of `keys`.Example {"key_aliases": ["alias1", "alias2"]} | |
| Returns: | |
| - deleted_keys (List[str]): A list of deleted keys. Example {"deleted_keys": ["sk-QWrxEynunsNpV1zT48HIrw", "837e17519f44683334df5291321d97b8bf1098cd490e49e215f6fea935aa28be"]} | |
| Example: | |
| ```bash | |
| curl --location 'http://0.0.0.0:4000/key/delete' \ | |
| --header 'Authorization: Bearer sk-1234' \ | |
| --header 'Content-Type: application/json' \ | |
| --data '{ | |
| "keys": ["sk-QWrxEynunsNpV1zT48HIrw"] | |
| }' | |
| ``` | |
| Raises: | |
| HTTPException: If an error occurs during key deletion. | |
| """ | |
| try: | |
| from litellm.proxy.proxy_server import prisma_client, user_api_key_cache | |
| if prisma_client is None: | |
| raise Exception("Not connected to DB!") | |
| ## only allow user to delete keys they own | |
| verbose_proxy_logger.debug( | |
| f"user_api_key_dict.user_role: {user_api_key_dict.user_role}" | |
| ) | |
| num_keys_to_be_deleted = 0 | |
| deleted_keys = [] | |
| if data.keys: | |
| number_deleted_keys, _keys_being_deleted = await delete_verification_tokens( | |
| tokens=data.keys, | |
| user_api_key_cache=user_api_key_cache, | |
| user_api_key_dict=user_api_key_dict, | |
| ) | |
| num_keys_to_be_deleted = len(data.keys) | |
| deleted_keys = data.keys | |
| elif data.key_aliases: | |
| number_deleted_keys, _keys_being_deleted = await delete_key_aliases( | |
| key_aliases=data.key_aliases, | |
| prisma_client=prisma_client, | |
| user_api_key_cache=user_api_key_cache, | |
| user_api_key_dict=user_api_key_dict, | |
| ) | |
| num_keys_to_be_deleted = len(data.key_aliases) | |
| deleted_keys = data.key_aliases | |
| else: | |
| raise ValueError("Invalid request type") | |
| if number_deleted_keys is None: | |
| raise ProxyException( | |
| message="Failed to delete keys got None response from delete_verification_token", | |
| type=ProxyErrorTypes.internal_server_error, | |
| param="keys", | |
| code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| ) | |
| verbose_proxy_logger.debug(f"/key/delete - deleted_keys={number_deleted_keys}") | |
| try: | |
| assert num_keys_to_be_deleted == len(deleted_keys) | |
| except Exception: | |
| raise HTTPException( | |
| status_code=400, | |
| detail={ | |
| "error": f"Not all keys passed in were deleted. This probably means you don't have access to delete all the keys passed in. Keys passed in={num_keys_to_be_deleted}, Deleted keys ={number_deleted_keys}" | |
| }, | |
| ) | |
| verbose_proxy_logger.debug( | |
| f"/keys/delete - cache after delete: {user_api_key_cache.in_memory_cache.cache_dict}" | |
| ) | |
| asyncio.create_task( | |
| KeyManagementEventHooks.async_key_deleted_hook( | |
| data=data, | |
| keys_being_deleted=_keys_being_deleted, | |
| user_api_key_dict=user_api_key_dict, | |
| litellm_changed_by=litellm_changed_by, | |
| response=number_deleted_keys, | |
| ) | |
| ) | |
| return {"deleted_keys": deleted_keys} | |
| except Exception as e: | |
| verbose_proxy_logger.exception( | |
| "litellm.proxy.proxy_server.delete_key_fn(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| raise handle_exception_on_proxy(e) | |
| async def info_key_fn_v2( | |
| data: Optional[KeyRequest] = None, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| ): | |
| """ | |
| Retrieve information about a list of keys. | |
| **New endpoint**. Currently admin only. | |
| Parameters: | |
| keys: Optional[list] = body parameter representing the key(s) in the request | |
| user_api_key_dict: UserAPIKeyAuth = Dependency representing the user's API key | |
| Returns: | |
| Dict containing the key and its associated information | |
| Example Curl: | |
| ``` | |
| curl -X GET "http://0.0.0.0:4000/key/info" \ | |
| -H "Authorization: Bearer sk-1234" \ | |
| -d {"keys": ["sk-1", "sk-2", "sk-3"]} | |
| ``` | |
| """ | |
| from litellm.proxy.proxy_server import prisma_client | |
| try: | |
| if prisma_client is None: | |
| raise Exception( | |
| "Database not connected. Connect a database to your proxy - https://docs.litellm.ai/docs/simple_proxy#managing-auth---virtual-keys" | |
| ) | |
| if data is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| detail={"message": "Malformed request. No keys passed in."}, | |
| ) | |
| key_info = await prisma_client.get_data( | |
| token=data.keys, table_name="key", query_type="find_all" | |
| ) | |
| if key_info is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail={"message": "No keys found"}, | |
| ) | |
| filtered_key_info = [] | |
| for k in key_info: | |
| try: | |
| k = k.model_dump() # noqa | |
| except Exception: | |
| # if using pydantic v1 | |
| k = k.dict() | |
| filtered_key_info.append(k) | |
| return {"key": data.keys, "info": filtered_key_info} | |
| except Exception as e: | |
| raise handle_exception_on_proxy(e) | |
| async def info_key_fn( | |
| key: Optional[str] = fastapi.Query( | |
| default=None, description="Key in the request parameters" | |
| ), | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| ): | |
| """ | |
| Retrieve information about a key. | |
| Parameters: | |
| key: Optional[str] = Query parameter representing the key in the request | |
| user_api_key_dict: UserAPIKeyAuth = Dependency representing the user's API key | |
| Returns: | |
| Dict containing the key and its associated information | |
| Example Curl: | |
| ``` | |
| curl -X GET "http://0.0.0.0:4000/key/info?key=sk-02Wr4IAlN3NvPXvL5JVvDA" \ | |
| -H "Authorization: Bearer sk-1234" | |
| ``` | |
| Example Curl - if no key is passed, it will use the Key Passed in Authorization Header | |
| ``` | |
| curl -X GET "http://0.0.0.0:4000/key/info" \ | |
| -H "Authorization: Bearer sk-02Wr4IAlN3NvPXvL5JVvDA" | |
| ``` | |
| """ | |
| from litellm.proxy.proxy_server import prisma_client | |
| try: | |
| if prisma_client is None: | |
| raise Exception( | |
| "Database not connected. Connect a database to your proxy - https://docs.litellm.ai/docs/simple_proxy#managing-auth---virtual-keys" | |
| ) | |
| # default to using Auth token if no key is passed in | |
| key = key or user_api_key_dict.api_key | |
| hashed_key: Optional[str] = key | |
| if key is not None: | |
| hashed_key = _hash_token_if_needed(token=key) | |
| key_info = await prisma_client.db.litellm_verificationtoken.find_unique( | |
| where={"token": hashed_key}, # type: ignore | |
| include={"litellm_budget_table": True}, | |
| ) | |
| if key_info is None: | |
| raise ProxyException( | |
| message="Key not found in database", | |
| type=ProxyErrorTypes.not_found_error, | |
| param="key", | |
| code=status.HTTP_404_NOT_FOUND, | |
| ) | |
| if ( | |
| await _can_user_query_key_info( | |
| user_api_key_dict=user_api_key_dict, | |
| key=key, | |
| key_info=key_info, | |
| ) | |
| is not True | |
| ): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You are not allowed to access this key's info. Your role={}".format( | |
| user_api_key_dict.user_role | |
| ), | |
| ) | |
| ## REMOVE HASHED TOKEN INFO BEFORE RETURNING ## | |
| try: | |
| key_info = key_info.model_dump() # noqa | |
| except Exception: | |
| # if using pydantic v1 | |
| key_info = key_info.dict() | |
| key_info.pop("token") | |
| return {"key": key, "info": key_info} | |
| except Exception as e: | |
| raise handle_exception_on_proxy(e) | |
| def _check_model_access_group( | |
| models: Optional[List[str]], llm_router: Optional[Router], premium_user: bool | |
| ) -> Literal[True]: | |
| """ | |
| if is_model_access_group is True + is_wildcard_route is True, check if user is a premium user | |
| Return True if user is a premium user, False otherwise | |
| """ | |
| if models is None or llm_router is None: | |
| return True | |
| for model in models: | |
| if llm_router._is_model_access_group_for_wildcard_route( | |
| model_access_group=model | |
| ): | |
| if not premium_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail={ | |
| "error": "Setting a model access group on a wildcard model is only available for LiteLLM Enterprise users.{}".format( | |
| CommonProxyErrors.not_premium_user.value | |
| ) | |
| }, | |
| ) | |
| return True | |
| async def generate_key_helper_fn( # noqa: PLR0915 | |
| request_type: Literal[ | |
| "user", "key" | |
| ], # identifies if this request is from /user/new or /key/generate | |
| duration: Optional[str] = None, | |
| models: list = [], | |
| aliases: dict = {}, | |
| config: dict = {}, | |
| spend: float = 0.0, | |
| key_max_budget: Optional[float] = None, # key_max_budget is used to Budget Per key | |
| key_budget_duration: Optional[str] = None, | |
| budget_id: Optional[float] = None, # budget id <-> LiteLLM_BudgetTable | |
| soft_budget: Optional[ | |
| float | |
| ] = None, # soft_budget is used to set soft Budgets Per user | |
| max_budget: Optional[float] = None, # max_budget is used to Budget Per user | |
| blocked: Optional[bool] = None, | |
| budget_duration: Optional[str] = None, # max_budget is used to Budget Per user | |
| token: Optional[str] = None, | |
| key: Optional[ | |
| str | |
| ] = None, # dev-friendly alt param for 'token'. Exposed on `/key/generate` for setting key value yourself. | |
| user_id: Optional[str] = None, | |
| user_alias: Optional[str] = None, | |
| team_id: Optional[str] = None, | |
| user_email: Optional[str] = None, | |
| user_role: Optional[str] = None, | |
| max_parallel_requests: Optional[int] = None, | |
| metadata: Optional[dict] = {}, | |
| tpm_limit: Optional[int] = None, | |
| rpm_limit: Optional[int] = None, | |
| query_type: Literal["insert_data", "update_data"] = "insert_data", | |
| update_key_values: Optional[dict] = None, | |
| key_alias: Optional[str] = None, | |
| allowed_cache_controls: Optional[list] = [], | |
| permissions: Optional[dict] = {}, | |
| model_max_budget: Optional[dict] = {}, | |
| model_rpm_limit: Optional[dict] = None, | |
| model_tpm_limit: Optional[dict] = None, | |
| guardrails: Optional[list] = None, | |
| teams: Optional[list] = None, | |
| organization_id: Optional[str] = None, | |
| table_name: Optional[Literal["key", "user"]] = None, | |
| send_invite_email: Optional[bool] = None, | |
| created_by: Optional[str] = None, | |
| updated_by: Optional[str] = None, | |
| allowed_routes: Optional[list] = None, | |
| sso_user_id: Optional[str] = None, | |
| ): | |
| from litellm.proxy.proxy_server import ( | |
| litellm_proxy_budget_name, | |
| premium_user, | |
| prisma_client, | |
| ) | |
| if prisma_client is None: | |
| raise Exception( | |
| "Connect Proxy to database to generate keys - https://docs.litellm.ai/docs/proxy/virtual_keys " | |
| ) | |
| if token is None: | |
| if key is not None: | |
| token = key | |
| else: | |
| token = f"sk-{secrets.token_urlsafe(LENGTH_OF_LITELLM_GENERATED_KEY)}" | |
| if duration is None: # allow tokens that never expire | |
| expires = None | |
| else: | |
| duration_s = duration_in_seconds(duration=duration) | |
| expires = datetime.now(timezone.utc) + timedelta(seconds=duration_s) | |
| if key_budget_duration is None: # one-time budget | |
| key_reset_at = None | |
| else: | |
| duration_s = duration_in_seconds(duration=key_budget_duration) | |
| key_reset_at = datetime.now(timezone.utc) + timedelta(seconds=duration_s) | |
| if budget_duration is None: # one-time budget | |
| reset_at = None | |
| else: | |
| duration_s = duration_in_seconds(duration=budget_duration) | |
| reset_at = datetime.now(timezone.utc) + timedelta(seconds=duration_s) | |
| aliases_json = json.dumps(aliases) | |
| config_json = json.dumps(config) | |
| permissions_json = json.dumps(permissions) | |
| # Add model_rpm_limit and model_tpm_limit to metadata | |
| if model_rpm_limit is not None: | |
| metadata = metadata or {} | |
| metadata["model_rpm_limit"] = model_rpm_limit | |
| if model_tpm_limit is not None: | |
| metadata = metadata or {} | |
| metadata["model_tpm_limit"] = model_tpm_limit | |
| if guardrails is not None: | |
| metadata = metadata or {} | |
| metadata["guardrails"] = guardrails | |
| metadata_json = json.dumps(metadata) | |
| validate_model_max_budget(model_max_budget) | |
| model_max_budget_json = json.dumps(model_max_budget) | |
| user_role = user_role | |
| tpm_limit = tpm_limit | |
| rpm_limit = rpm_limit | |
| allowed_cache_controls = allowed_cache_controls | |
| try: | |
| # Create a new verification token (you may want to enhance this logic based on your needs) | |
| user_data = { | |
| "max_budget": max_budget, | |
| "user_email": user_email, | |
| "user_id": user_id, | |
| "user_alias": user_alias, | |
| "team_id": team_id, | |
| "organization_id": organization_id, | |
| "user_role": user_role, | |
| "spend": spend, | |
| "models": models, | |
| "metadata": metadata_json, | |
| "max_parallel_requests": max_parallel_requests, | |
| "tpm_limit": tpm_limit, | |
| "rpm_limit": rpm_limit, | |
| "budget_duration": budget_duration, | |
| "budget_reset_at": reset_at, | |
| "allowed_cache_controls": allowed_cache_controls, | |
| "sso_user_id": sso_user_id, | |
| } | |
| if teams is not None: | |
| user_data["teams"] = teams | |
| key_data = { | |
| "token": token, | |
| "key_alias": key_alias, | |
| "expires": expires, | |
| "models": models, | |
| "aliases": aliases_json, | |
| "config": config_json, | |
| "spend": spend, | |
| "max_budget": key_max_budget, | |
| "user_id": user_id, | |
| "team_id": team_id, | |
| "max_parallel_requests": max_parallel_requests, | |
| "metadata": metadata_json, | |
| "tpm_limit": tpm_limit, | |
| "rpm_limit": rpm_limit, | |
| "budget_duration": key_budget_duration, | |
| "budget_reset_at": key_reset_at, | |
| "allowed_cache_controls": allowed_cache_controls, | |
| "permissions": permissions_json, | |
| "model_max_budget": model_max_budget_json, | |
| "budget_id": budget_id, | |
| "blocked": blocked, | |
| "created_by": created_by, | |
| "updated_by": updated_by, | |
| "allowed_routes": allowed_routes or [], | |
| } | |
| if ( | |
| get_secret("DISABLE_KEY_NAME", False) is True | |
| ): # allow user to disable storing abbreviated key name (shown in UI, to help figure out which key spent how much) | |
| pass | |
| else: | |
| key_data["key_name"] = f"sk-...{token[-4:]}" | |
| saved_token = copy.deepcopy(key_data) | |
| if isinstance(saved_token["aliases"], str): | |
| saved_token["aliases"] = json.loads(saved_token["aliases"]) | |
| if isinstance(saved_token["config"], str): | |
| saved_token["config"] = json.loads(saved_token["config"]) | |
| if isinstance(saved_token["metadata"], str): | |
| saved_token["metadata"] = json.loads(saved_token["metadata"]) | |
| if isinstance(saved_token["permissions"], str): | |
| if ( | |
| "get_spend_routes" in saved_token["permissions"] | |
| and premium_user is not True | |
| ): | |
| raise ValueError( | |
| "get_spend_routes permission is only available for LiteLLM Enterprise users" | |
| ) | |
| saved_token["permissions"] = json.loads(saved_token["permissions"]) | |
| if isinstance(saved_token["model_max_budget"], str): | |
| saved_token["model_max_budget"] = json.loads( | |
| saved_token["model_max_budget"] | |
| ) | |
| if saved_token.get("expires", None) is not None and isinstance( | |
| saved_token["expires"], datetime | |
| ): | |
| saved_token["expires"] = saved_token["expires"].isoformat() | |
| if prisma_client is not None: | |
| if ( | |
| table_name is None or table_name == "user" | |
| ): # do not auto-create users for `/key/generate` | |
| ## CREATE USER (If necessary) | |
| if query_type == "insert_data": | |
| user_row = await prisma_client.insert_data( | |
| data=user_data, table_name="user" | |
| ) | |
| if user_row is None: | |
| raise Exception("Failed to create user") | |
| ## use default user model list if no key-specific model list provided | |
| if len(user_row.models) > 0 and len(key_data["models"]) == 0: # type: ignore | |
| key_data["models"] = user_row.models # type: ignore | |
| elif query_type == "update_data": | |
| user_row = await prisma_client.update_data( | |
| data=user_data, | |
| table_name="user", | |
| update_key_values=update_key_values, | |
| ) | |
| if user_id == litellm_proxy_budget_name or ( | |
| table_name is not None and table_name == "user" | |
| ): | |
| # do not create a key for litellm_proxy_budget_name or if table name is set to just 'user' | |
| # we only need to ensure this exists in the user table | |
| # the LiteLLM_VerificationToken table will increase in size if we don't do this check | |
| return user_data | |
| ## CREATE KEY | |
| verbose_proxy_logger.debug("prisma_client: Creating Key= %s", key_data) | |
| create_key_response = await prisma_client.insert_data( | |
| data=key_data, table_name="key" | |
| ) | |
| key_data["token_id"] = getattr(create_key_response, "token", None) | |
| key_data["litellm_budget_table"] = getattr( | |
| create_key_response, "litellm_budget_table", None | |
| ) | |
| key_data["created_at"] = getattr(create_key_response, "created_at", None) | |
| key_data["updated_at"] = getattr(create_key_response, "updated_at", None) | |
| except Exception as e: | |
| verbose_proxy_logger.error( | |
| "litellm.proxy.proxy_server.generate_key_helper_fn(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| verbose_proxy_logger.debug(traceback.format_exc()) | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail={"error": "Internal Server Error."}, | |
| ) | |
| # Add budget related info in key_data - this ensures it's returned | |
| key_data["budget_id"] = budget_id | |
| if request_type == "user": | |
| # if this is a /user/new request update the key_date with user_data fields | |
| key_data.update(user_data) | |
| return key_data | |
| async def _team_key_deletion_check( | |
| user_api_key_dict: UserAPIKeyAuth, | |
| key_info: LiteLLM_VerificationToken, | |
| prisma_client: PrismaClient, | |
| user_api_key_cache: DualCache, | |
| ): | |
| is_team_key = _is_team_key(data=key_info) | |
| if is_team_key and key_info.team_id is not None: | |
| team_table = await get_team_object( | |
| team_id=key_info.team_id, | |
| prisma_client=prisma_client, | |
| user_api_key_cache=user_api_key_cache, | |
| check_db_only=True, | |
| ) | |
| if ( | |
| litellm.key_generation_settings is not None | |
| and "team_key_generation" in litellm.key_generation_settings | |
| ): | |
| _team_key_generation = litellm.key_generation_settings[ | |
| "team_key_generation" | |
| ] | |
| else: | |
| _team_key_generation = TeamUIKeyGenerationConfig( | |
| allowed_team_member_roles=["admin", "user"], | |
| ) | |
| # check if user is team admin | |
| if team_table is not None: | |
| return _team_key_operation_team_member_check( | |
| assigned_user_id=user_api_key_dict.user_id, | |
| team_table=team_table, | |
| user_api_key_dict=user_api_key_dict, | |
| team_key_generation=_team_key_generation, | |
| route=KeyManagementRoutes.KEY_DELETE, | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail={ | |
| "error": f"Team not found in db, and user not proxy admin. Team id = {key_info.team_id}" | |
| }, | |
| ) | |
| return False | |
| async def can_delete_verification_token( | |
| key_info: LiteLLM_VerificationToken, | |
| user_api_key_cache: DualCache, | |
| user_api_key_dict: UserAPIKeyAuth, | |
| prisma_client: PrismaClient, | |
| ) -> bool: | |
| """ | |
| - check if user is proxy admin | |
| - check if user is team admin and key is a team key | |
| - check if key is personal key | |
| """ | |
| is_team_key = _is_team_key(data=key_info) | |
| if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value: | |
| return True | |
| elif is_team_key and key_info.team_id is not None: | |
| return await _team_key_deletion_check( | |
| user_api_key_dict=user_api_key_dict, | |
| key_info=key_info, | |
| prisma_client=prisma_client, | |
| user_api_key_cache=user_api_key_cache, | |
| ) | |
| elif key_info.user_id is not None and key_info.user_id == user_api_key_dict.user_id: | |
| return True | |
| else: | |
| return False | |
| async def delete_verification_tokens( | |
| tokens: List, | |
| user_api_key_cache: DualCache, | |
| user_api_key_dict: UserAPIKeyAuth, | |
| ) -> Tuple[Optional[Dict], List[LiteLLM_VerificationToken]]: | |
| """ | |
| Helper that deletes the list of tokens from the database | |
| - check if user is proxy admin | |
| - check if user is team admin and key is a team key | |
| Args: | |
| tokens: List of tokens to delete | |
| user_id: Optional user_id to filter by | |
| Returns: | |
| Tuple[Optional[Dict], List[LiteLLM_VerificationToken]]: | |
| Optional[Dict]: | |
| - Number of deleted tokens | |
| List[LiteLLM_VerificationToken]: | |
| - List of keys being deleted, this contains information about the key_alias, token, and user_id being deleted, | |
| this is passed down to the KeyManagementEventHooks to delete the keys from the secret manager and handle audit logs | |
| """ | |
| from litellm.proxy.proxy_server import prisma_client | |
| try: | |
| if prisma_client: | |
| tokens = [_hash_token_if_needed(token=key) for key in tokens] | |
| _keys_being_deleted: List[ | |
| LiteLLM_VerificationToken | |
| ] = await prisma_client.db.litellm_verificationtoken.find_many( | |
| where={"token": {"in": tokens}} | |
| ) | |
| # Assuming 'db' is your Prisma Client instance | |
| # check if admin making request - don't filter by user-id | |
| if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value: | |
| deleted_tokens = await prisma_client.delete_data(tokens=tokens) | |
| # else | |
| else: | |
| tasks = [] | |
| deleted_tokens = [] | |
| for key in _keys_being_deleted: | |
| async def _delete_key(key: LiteLLM_VerificationToken): | |
| if await can_delete_verification_token( | |
| key_info=key, | |
| user_api_key_cache=user_api_key_cache, | |
| user_api_key_dict=user_api_key_dict, | |
| prisma_client=prisma_client, | |
| ): | |
| await prisma_client.delete_data(tokens=[key.token]) | |
| deleted_tokens.append(key.token) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail={ | |
| "error": "You are not authorized to delete this key" | |
| }, | |
| ) | |
| tasks.append(_delete_key(key)) | |
| await asyncio.gather(*tasks) | |
| _num_deleted_tokens = len(deleted_tokens) | |
| if _num_deleted_tokens != len(tokens): | |
| failed_tokens = [ | |
| token for token in tokens if token not in deleted_tokens | |
| ] | |
| raise Exception( | |
| "Failed to delete all tokens. Failed to delete tokens: " | |
| + str(failed_tokens) | |
| ) | |
| else: | |
| raise Exception("DB not connected. prisma_client is None") | |
| except Exception as e: | |
| verbose_proxy_logger.exception( | |
| "litellm.proxy.proxy_server.delete_verification_tokens(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| verbose_proxy_logger.debug(traceback.format_exc()) | |
| raise e | |
| for key in tokens: | |
| user_api_key_cache.delete_cache(key) | |
| # remove hash token from cache | |
| hashed_token = hash_token(cast(str, key)) | |
| user_api_key_cache.delete_cache(hashed_token) | |
| return {"deleted_keys": deleted_tokens}, _keys_being_deleted | |
| async def delete_key_aliases( | |
| key_aliases: List[str], | |
| user_api_key_cache: DualCache, | |
| prisma_client: PrismaClient, | |
| user_api_key_dict: UserAPIKeyAuth, | |
| ) -> Tuple[Optional[Dict], List[LiteLLM_VerificationToken]]: | |
| _keys_being_deleted = await prisma_client.db.litellm_verificationtoken.find_many( | |
| where={"key_alias": {"in": key_aliases}} | |
| ) | |
| tokens = [key.token for key in _keys_being_deleted] | |
| return await delete_verification_tokens( | |
| tokens=tokens, | |
| user_api_key_cache=user_api_key_cache, | |
| user_api_key_dict=user_api_key_dict, | |
| ) | |
| async def _rotate_master_key( | |
| prisma_client: PrismaClient, | |
| user_api_key_dict: UserAPIKeyAuth, | |
| current_master_key: str, | |
| new_master_key: str, | |
| ) -> None: | |
| """ | |
| Rotate the master key | |
| 1. Get the values from the DB | |
| - Get models from DB | |
| - Get config from DB | |
| 2. Decrypt the values | |
| - ModelTable | |
| - [{"model_name": "str", "litellm_params": {}}] | |
| - ConfigTable | |
| 3. Encrypt the values with the new master key | |
| 4. Update the values in the DB | |
| """ | |
| from litellm.proxy.proxy_server import proxy_config | |
| try: | |
| models: Optional[ | |
| List | |
| ] = await prisma_client.db.litellm_proxymodeltable.find_many() | |
| except Exception: | |
| models = None | |
| # 2. process model table | |
| if models: | |
| decrypted_models = proxy_config.decrypt_model_list_from_db(new_models=models) | |
| verbose_proxy_logger.info( | |
| "ABLE TO DECRYPT MODELS - len(decrypted_models): %s", len(decrypted_models) | |
| ) | |
| new_models = [] | |
| for model in decrypted_models: | |
| new_model = await _add_model_to_db( | |
| model_params=Deployment(**model), | |
| user_api_key_dict=user_api_key_dict, | |
| prisma_client=prisma_client, | |
| new_encryption_key=new_master_key, | |
| should_create_model_in_db=False, | |
| ) | |
| if new_model: | |
| new_models.append(jsonify_object(new_model.model_dump())) | |
| verbose_proxy_logger.info("Resetting proxy model table") | |
| await prisma_client.db.litellm_proxymodeltable.delete_many() | |
| verbose_proxy_logger.info("Creating %s models", len(new_models)) | |
| await prisma_client.db.litellm_proxymodeltable.create_many( | |
| data=new_models, | |
| ) | |
| # 3. process config table | |
| try: | |
| config = await prisma_client.db.litellm_config.find_many() | |
| except Exception: | |
| config = None | |
| if config: | |
| """If environment_variables is found, decrypt it and encrypt it with the new master key""" | |
| environment_variables_dict = {} | |
| for c in config: | |
| if c.param_name == "environment_variables": | |
| environment_variables_dict = c.param_value | |
| if environment_variables_dict: | |
| decrypted_env_vars = proxy_config._decrypt_and_set_db_env_variables( | |
| environment_variables=environment_variables_dict | |
| ) | |
| encrypted_env_vars = proxy_config._encrypt_env_variables( | |
| environment_variables=decrypted_env_vars, | |
| new_encryption_key=new_master_key, | |
| ) | |
| if encrypted_env_vars: | |
| await prisma_client.db.litellm_config.update( | |
| where={"param_name": "environment_variables"}, | |
| data={"param_value": jsonify_object(encrypted_env_vars)}, | |
| ) | |
| async def regenerate_key_fn( | |
| key: Optional[str] = None, | |
| data: Optional[RegenerateKeyRequest] = None, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| litellm_changed_by: Optional[str] = Header( | |
| None, | |
| description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability", | |
| ), | |
| ) -> Optional[GenerateKeyResponse]: | |
| """ | |
| Regenerate an existing API key while optionally updating its parameters. | |
| Parameters: | |
| - key: str (path parameter) - The key to regenerate | |
| - data: Optional[RegenerateKeyRequest] - Request body containing optional parameters to update | |
| - key_alias: Optional[str] - User-friendly key alias | |
| - user_id: Optional[str] - User ID associated with key | |
| - team_id: Optional[str] - Team ID associated with key | |
| - models: Optional[list] - Model_name's a user is allowed to call | |
| - tags: Optional[List[str]] - Tags for organizing keys (Enterprise only) | |
| - spend: Optional[float] - Amount spent by key | |
| - max_budget: Optional[float] - Max budget for key | |
| - model_max_budget: Optional[Dict[str, BudgetConfig]] - Model-specific budgets {"gpt-4": {"budget_limit": 0.0005, "time_period": "30d"}} | |
| - budget_duration: Optional[str] - Budget reset period ("30d", "1h", etc.) | |
| - soft_budget: Optional[float] - Soft budget limit (warning vs. hard stop). Will trigger a slack alert when this soft budget is reached. | |
| - max_parallel_requests: Optional[int] - Rate limit for parallel requests | |
| - metadata: Optional[dict] - Metadata for key. Example {"team": "core-infra", "app": "app2"} | |
| - tpm_limit: Optional[int] - Tokens per minute limit | |
| - rpm_limit: Optional[int] - Requests per minute limit | |
| - model_rpm_limit: Optional[dict] - Model-specific RPM limits {"gpt-4": 100, "claude-v1": 200} | |
| - model_tpm_limit: Optional[dict] - Model-specific TPM limits {"gpt-4": 100000, "claude-v1": 200000} | |
| - allowed_cache_controls: Optional[list] - List of allowed cache control values | |
| - duration: Optional[str] - Key validity duration ("30d", "1h", etc.) | |
| - permissions: Optional[dict] - Key-specific permissions | |
| - guardrails: Optional[List[str]] - List of active guardrails for the key | |
| - blocked: Optional[bool] - Whether the key is blocked | |
| Returns: | |
| - GenerateKeyResponse containing the new key and its updated parameters | |
| Example: | |
| ```bash | |
| curl --location --request POST 'http://localhost:4000/key/sk-1234/regenerate' \ | |
| --header 'Authorization: Bearer sk-1234' \ | |
| --header 'Content-Type: application/json' \ | |
| --data-raw '{ | |
| "max_budget": 100, | |
| "metadata": {"team": "core-infra"}, | |
| "models": ["gpt-4", "gpt-3.5-turbo"] | |
| }' | |
| ``` | |
| Note: This is an Enterprise feature. It requires a premium license to use. | |
| """ | |
| try: | |
| from litellm.proxy.proxy_server import ( | |
| hash_token, | |
| master_key, | |
| premium_user, | |
| prisma_client, | |
| proxy_logging_obj, | |
| user_api_key_cache, | |
| ) | |
| if premium_user is not True: | |
| raise ValueError( | |
| f"Regenerating Virtual Keys is an Enterprise feature, {CommonProxyErrors.not_premium_user.value}" | |
| ) | |
| # Check if key exists, raise exception if key is not in the DB | |
| key = data.key if data and data.key else key | |
| if not key: | |
| raise HTTPException(status_code=400, detail={"error": "No key passed in."}) | |
| ### 1. Create New copy that is duplicate of existing key | |
| ###################################################################### | |
| # create duplicate of existing key | |
| # set token = new token generated | |
| # insert new token in DB | |
| # create hash of token | |
| if prisma_client is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail={"error": "DB not connected. prisma_client is None"}, | |
| ) | |
| _is_master_key_valid = _is_master_key(api_key=key, _master_key=master_key) | |
| if master_key is not None and data and _is_master_key_valid: | |
| if data.new_master_key is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail={"error": "New master key is required."}, | |
| ) | |
| await _rotate_master_key( | |
| prisma_client=prisma_client, | |
| user_api_key_dict=user_api_key_dict, | |
| current_master_key=master_key, | |
| new_master_key=data.new_master_key, | |
| ) | |
| return GenerateKeyResponse( | |
| key=data.new_master_key, | |
| token=data.new_master_key, | |
| key_name=data.new_master_key, | |
| expires=None, | |
| ) | |
| if "sk" not in key: | |
| hashed_api_key = key | |
| else: | |
| hashed_api_key = hash_token(key) | |
| _key_in_db = await prisma_client.db.litellm_verificationtoken.find_unique( | |
| where={"token": hashed_api_key}, | |
| ) | |
| if _key_in_db is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail={"error": f"Key {key} not found."}, | |
| ) | |
| # check if user has permission to regenerate key | |
| await TeamMemberPermissionChecks.can_team_member_execute_key_management_endpoint( | |
| user_api_key_dict=user_api_key_dict, | |
| route=KeyManagementRoutes.KEY_REGENERATE, | |
| prisma_client=prisma_client, | |
| existing_key_row=_key_in_db, | |
| user_api_key_cache=user_api_key_cache, | |
| ) | |
| verbose_proxy_logger.debug("key_in_db: %s", _key_in_db) | |
| new_token = f"sk-{secrets.token_urlsafe(LENGTH_OF_LITELLM_GENERATED_KEY)}" | |
| new_token_hash = hash_token(new_token) | |
| new_token_key_name = f"sk-...{new_token[-4:]}" | |
| # Prepare the update data | |
| update_data = { | |
| "token": new_token_hash, | |
| "key_name": new_token_key_name, | |
| } | |
| non_default_values = {} | |
| if data is not None: | |
| # Update with any provided parameters from GenerateKeyRequest | |
| non_default_values = prepare_key_update_data( | |
| data=data, existing_key_row=_key_in_db | |
| ) | |
| verbose_proxy_logger.debug("non_default_values: %s", non_default_values) | |
| update_data.update(non_default_values) | |
| update_data = prisma_client.jsonify_object(data=update_data) | |
| # Update the token in the database | |
| updated_token = await prisma_client.db.litellm_verificationtoken.update( | |
| where={"token": hashed_api_key}, | |
| data=update_data, # type: ignore | |
| ) | |
| updated_token_dict = {} | |
| if updated_token is not None: | |
| updated_token_dict = dict(updated_token) | |
| updated_token_dict["key"] = new_token | |
| updated_token_dict["token_id"] = updated_token_dict.pop("token") | |
| ### 3. remove existing key entry from cache | |
| ###################################################################### | |
| if key: | |
| await _delete_cache_key_object( | |
| hashed_token=hash_token(key), | |
| user_api_key_cache=user_api_key_cache, | |
| proxy_logging_obj=proxy_logging_obj, | |
| ) | |
| if hashed_api_key: | |
| await _delete_cache_key_object( | |
| hashed_token=hash_token(key), | |
| user_api_key_cache=user_api_key_cache, | |
| proxy_logging_obj=proxy_logging_obj, | |
| ) | |
| response = GenerateKeyResponse( | |
| **updated_token_dict, | |
| ) | |
| asyncio.create_task( | |
| KeyManagementEventHooks.async_key_rotated_hook( | |
| data=data, | |
| existing_key_row=_key_in_db, | |
| response=response, | |
| user_api_key_dict=user_api_key_dict, | |
| litellm_changed_by=litellm_changed_by, | |
| ) | |
| ) | |
| return response | |
| except Exception as e: | |
| verbose_proxy_logger.exception("Error regenerating key: %s", e) | |
| raise handle_exception_on_proxy(e) | |
| async def validate_key_list_check( | |
| user_api_key_dict: UserAPIKeyAuth, | |
| user_id: Optional[str], | |
| team_id: Optional[str], | |
| organization_id: Optional[str], | |
| key_alias: Optional[str], | |
| key_hash: Optional[str], | |
| prisma_client: PrismaClient, | |
| ) -> Optional[LiteLLM_UserTable]: | |
| if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value: | |
| return None | |
| if user_api_key_dict.user_id is None: | |
| raise ProxyException( | |
| message="You are not authorized to access this endpoint. No 'user_id' is associated with your API key.", | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="user_id", | |
| code=status.HTTP_403_FORBIDDEN, | |
| ) | |
| complete_user_info_db_obj: Optional[ | |
| BaseModel | |
| ] = await prisma_client.db.litellm_usertable.find_unique( | |
| where={"user_id": user_api_key_dict.user_id}, | |
| include={"organization_memberships": True}, | |
| ) | |
| if complete_user_info_db_obj is None: | |
| raise ProxyException( | |
| message="You are not authorized to access this endpoint. No 'user_id' is associated with your API key.", | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="user_id", | |
| code=status.HTTP_403_FORBIDDEN, | |
| ) | |
| complete_user_info = LiteLLM_UserTable(**complete_user_info_db_obj.model_dump()) | |
| # internal user can only see their own keys | |
| if user_id: | |
| if complete_user_info.user_id != user_id: | |
| raise ProxyException( | |
| message="You are not authorized to check another user's keys", | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="user_id", | |
| code=status.HTTP_403_FORBIDDEN, | |
| ) | |
| if team_id: | |
| if team_id not in complete_user_info.teams: | |
| raise ProxyException( | |
| message="You are not authorized to check this team's keys", | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="team_id", | |
| code=status.HTTP_403_FORBIDDEN, | |
| ) | |
| if organization_id: | |
| if ( | |
| complete_user_info.organization_memberships is None | |
| or organization_id | |
| not in [ | |
| membership.organization_id | |
| for membership in complete_user_info.organization_memberships | |
| ] | |
| ): | |
| raise ProxyException( | |
| message="You are not authorized to check this organization's keys", | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="organization_id", | |
| code=status.HTTP_403_FORBIDDEN, | |
| ) | |
| if key_hash: | |
| try: | |
| key_info = await prisma_client.db.litellm_verificationtoken.find_unique( | |
| where={"token": key_hash}, | |
| ) | |
| except Exception: | |
| raise ProxyException( | |
| message="Key Hash not found.", | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="key_hash", | |
| code=status.HTTP_403_FORBIDDEN, | |
| ) | |
| can_user_query_key_info = await _can_user_query_key_info( | |
| user_api_key_dict=user_api_key_dict, | |
| key=key_hash, | |
| key_info=key_info, | |
| ) | |
| if not can_user_query_key_info: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You are not allowed to access this key's info. Your role={}".format( | |
| user_api_key_dict.user_role | |
| ), | |
| ) | |
| return complete_user_info | |
| async def get_admin_team_ids( | |
| complete_user_info: Optional[LiteLLM_UserTable], | |
| user_api_key_dict: UserAPIKeyAuth, | |
| prisma_client: PrismaClient, | |
| ) -> List[str]: | |
| """ | |
| Get all team IDs where the user is an admin. | |
| """ | |
| if complete_user_info is None: | |
| return [] | |
| # Get all teams that user is an admin of | |
| teams: Optional[ | |
| List[BaseModel] | |
| ] = await prisma_client.db.litellm_teamtable.find_many( | |
| where={"team_id": {"in": complete_user_info.teams}} | |
| ) | |
| if teams is None: | |
| return [] | |
| teams_pydantic_obj = [LiteLLM_TeamTable(**team.model_dump()) for team in teams] | |
| admin_team_ids = [ | |
| team.team_id | |
| for team in teams_pydantic_obj | |
| if _is_user_team_admin(user_api_key_dict=user_api_key_dict, team_obj=team) | |
| ] | |
| return admin_team_ids | |
| async def list_keys( | |
| request: Request, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| page: int = Query(1, description="Page number", ge=1), | |
| size: int = Query(10, description="Page size", ge=1, le=100), | |
| user_id: Optional[str] = Query(None, description="Filter keys by user ID"), | |
| team_id: Optional[str] = Query(None, description="Filter keys by team ID"), | |
| organization_id: Optional[str] = Query( | |
| None, description="Filter keys by organization ID" | |
| ), | |
| key_hash: Optional[str] = Query(None, description="Filter keys by key hash"), | |
| key_alias: Optional[str] = Query(None, description="Filter keys by key alias"), | |
| return_full_object: bool = Query(False, description="Return full key object"), | |
| include_team_keys: bool = Query( | |
| False, description="Include all keys for teams that user is an admin of." | |
| ), | |
| sort_by: Optional[str] = Query( | |
| default=None, | |
| description="Column to sort by (e.g. 'user_id', 'created_at', 'spend')", | |
| ), | |
| sort_order: str = Query(default="desc", description="Sort order ('asc' or 'desc')"), | |
| ) -> KeyListResponseObject: | |
| """ | |
| List all keys for a given user / team / organization. | |
| Returns: | |
| { | |
| "keys": List[str] or List[UserAPIKeyAuth], | |
| "total_count": int, | |
| "current_page": int, | |
| "total_pages": int, | |
| } | |
| """ | |
| try: | |
| from litellm.proxy.proxy_server import prisma_client | |
| verbose_proxy_logger.debug("Entering list_keys function") | |
| if prisma_client is None: | |
| verbose_proxy_logger.error("Database not connected") | |
| raise Exception("Database not connected") | |
| complete_user_info = await validate_key_list_check( | |
| user_api_key_dict=user_api_key_dict, | |
| user_id=user_id, | |
| team_id=team_id, | |
| organization_id=organization_id, | |
| key_alias=key_alias, | |
| key_hash=key_hash, | |
| prisma_client=prisma_client, | |
| ) | |
| if include_team_keys: | |
| admin_team_ids = await get_admin_team_ids( | |
| complete_user_info=complete_user_info, | |
| user_api_key_dict=user_api_key_dict, | |
| prisma_client=prisma_client, | |
| ) | |
| else: | |
| admin_team_ids = None | |
| if user_id is None and user_api_key_dict.user_role not in [ | |
| LitellmUserRoles.PROXY_ADMIN.value, | |
| LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value, | |
| ]: | |
| user_id = user_api_key_dict.user_id | |
| response = await _list_key_helper( | |
| prisma_client=prisma_client, | |
| page=page, | |
| size=size, | |
| user_id=user_id, | |
| team_id=team_id, | |
| key_alias=key_alias, | |
| key_hash=key_hash, | |
| return_full_object=return_full_object, | |
| organization_id=organization_id, | |
| admin_team_ids=admin_team_ids, | |
| sort_by=sort_by, | |
| sort_order=sort_order, | |
| ) | |
| verbose_proxy_logger.debug("Successfully prepared response") | |
| return response | |
| except Exception as e: | |
| verbose_proxy_logger.exception(f"Error in list_keys: {e}") | |
| if isinstance(e, HTTPException): | |
| raise ProxyException( | |
| message=getattr(e, "detail", f"error({str(e)})"), | |
| type=ProxyErrorTypes.internal_server_error, | |
| param=getattr(e, "param", "None"), | |
| code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR), | |
| ) | |
| elif isinstance(e, ProxyException): | |
| raise e | |
| raise ProxyException( | |
| message="Authentication Error, " + str(e), | |
| type=ProxyErrorTypes.internal_server_error, | |
| param=getattr(e, "param", "None"), | |
| code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| ) | |
| def _validate_sort_params( | |
| sort_by: Optional[str], sort_order: str | |
| ) -> Optional[Dict[str, str]]: | |
| order_by: Dict[str, str] = {} | |
| if sort_by is None: | |
| return None | |
| # Validate sort_by is a valid column | |
| valid_columns = [ | |
| "spend", | |
| "max_budget", | |
| "created_at", | |
| "updated_at", | |
| "token", | |
| "key_alias", | |
| ] | |
| if sort_by not in valid_columns: | |
| raise HTTPException( | |
| status_code=400, | |
| detail={ | |
| "error": f"Invalid sort column. Must be one of: {', '.join(valid_columns)}" | |
| }, | |
| ) | |
| # Validate sort_order | |
| if sort_order.lower() not in ["asc", "desc"]: | |
| raise HTTPException( | |
| status_code=400, | |
| detail={"error": "Invalid sort order. Must be 'asc' or 'desc'"}, | |
| ) | |
| order_by[sort_by] = sort_order.lower() | |
| return order_by | |
| async def _list_key_helper( | |
| prisma_client: PrismaClient, | |
| page: int, | |
| size: int, | |
| user_id: Optional[str], | |
| team_id: Optional[str], | |
| organization_id: Optional[str], | |
| key_alias: Optional[str], | |
| key_hash: Optional[str], | |
| exclude_team_id: Optional[str] = None, | |
| return_full_object: bool = False, | |
| admin_team_ids: Optional[ | |
| List[str] | |
| ] = None, # New parameter for teams where user is admin | |
| sort_by: Optional[str] = None, | |
| sort_order: str = "desc", | |
| ) -> KeyListResponseObject: | |
| """ | |
| Helper function to list keys | |
| Args: | |
| page: int | |
| size: int | |
| user_id: Optional[str] | |
| team_id: Optional[str] | |
| key_alias: Optional[str] | |
| exclude_team_id: Optional[str] # exclude a specific team_id | |
| return_full_object: bool # when true, will return UserAPIKeyAuth objects instead of just the token | |
| admin_team_ids: Optional[List[str]] # list of team IDs where the user is an admin | |
| Returns: | |
| KeyListResponseObject | |
| { | |
| "keys": List[str] or List[UserAPIKeyAuth], # Updated to reflect possible return types | |
| "total_count": int, | |
| "current_page": int, | |
| "total_pages": int, | |
| } | |
| """ | |
| # Prepare filter conditions | |
| where: Dict[str, Union[str, Dict[str, Any], List[Dict[str, Any]]]] = {} | |
| where.update(_get_condition_to_filter_out_ui_session_tokens()) | |
| # Build the OR conditions for user's keys and admin team keys | |
| or_conditions: List[Dict[str, Any]] = [] | |
| # Base conditions for user's own keys | |
| user_condition: Dict[str, Any] = {} | |
| if user_id and isinstance(user_id, str): | |
| user_condition["user_id"] = user_id | |
| if team_id and isinstance(team_id, str): | |
| user_condition["team_id"] = team_id | |
| if key_alias and isinstance(key_alias, str): | |
| user_condition["key_alias"] = key_alias | |
| if exclude_team_id and isinstance(exclude_team_id, str): | |
| user_condition["team_id"] = {"not": exclude_team_id} | |
| if organization_id and isinstance(organization_id, str): | |
| user_condition["organization_id"] = organization_id | |
| if key_hash and isinstance(key_hash, str): | |
| user_condition["token"] = key_hash | |
| if user_condition: | |
| or_conditions.append(user_condition) | |
| # Add condition for admin team keys if provided | |
| if admin_team_ids: | |
| or_conditions.append({"team_id": {"in": admin_team_ids}}) | |
| # Combine conditions with OR if we have multiple conditions | |
| if len(or_conditions) > 1: | |
| where = {"AND": [where, {"OR": or_conditions}]} | |
| elif len(or_conditions) == 1: | |
| where.update(or_conditions[0]) | |
| verbose_proxy_logger.debug(f"Filter conditions: {where}") | |
| # Calculate skip for pagination | |
| skip = (page - 1) * size | |
| verbose_proxy_logger.debug(f"Pagination: skip={skip}, take={size}") | |
| order_by: Optional[Dict[str, str]] = ( | |
| _validate_sort_params(sort_by, sort_order) | |
| if sort_by is not None and isinstance(sort_by, str) | |
| else None | |
| ) | |
| # Fetch keys with pagination | |
| keys = await prisma_client.db.litellm_verificationtoken.find_many( | |
| where=where, # type: ignore | |
| skip=skip, # type: ignore | |
| take=size, # type: ignore | |
| order=order_by | |
| if order_by | |
| else [ | |
| {"created_at": "desc"}, | |
| {"token": "desc"}, # fallback sort | |
| ], | |
| ) | |
| verbose_proxy_logger.debug(f"Fetched {len(keys)} keys") | |
| # Get total count of keys | |
| total_count = await prisma_client.db.litellm_verificationtoken.count( | |
| where=where # type: ignore | |
| ) | |
| verbose_proxy_logger.debug(f"Total count of keys: {total_count}") | |
| # Calculate total pages | |
| total_pages = -(-total_count // size) # Ceiling division | |
| # Prepare response | |
| key_list: List[Union[str, UserAPIKeyAuth]] = [] | |
| for key in keys: | |
| if return_full_object is True: | |
| key_list.append(UserAPIKeyAuth(**key.dict())) # Return full key object | |
| else: | |
| _token = key.dict().get("token") | |
| key_list.append(_token) # Return only the token | |
| return KeyListResponseObject( | |
| keys=key_list, | |
| total_count=total_count, | |
| current_page=page, | |
| total_pages=total_pages, | |
| ) | |
| def _get_condition_to_filter_out_ui_session_tokens() -> Dict[str, Any]: | |
| """ | |
| Condition to filter out UI session tokens | |
| """ | |
| return { | |
| "OR": [ | |
| {"team_id": None}, # Include records where team_id is null | |
| { | |
| "team_id": {"not": UI_SESSION_TOKEN_TEAM_ID} | |
| }, # Include records where team_id != UI_SESSION_TOKEN_TEAM_ID | |
| ] | |
| } | |
| async def block_key( | |
| data: BlockKeyRequest, | |
| http_request: Request, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| litellm_changed_by: Optional[str] = Header( | |
| None, | |
| description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability", | |
| ), | |
| ) -> Optional[LiteLLM_VerificationToken]: | |
| """ | |
| Block an Virtual key from making any requests. | |
| Parameters: | |
| - key: str - The key to block. Can be either the unhashed key (sk-...) or the hashed key value | |
| Example: | |
| ```bash | |
| curl --location 'http://0.0.0.0:4000/key/block' \ | |
| --header 'Authorization: Bearer sk-1234' \ | |
| --header 'Content-Type: application/json' \ | |
| --data '{ | |
| "key": "sk-Fn8Ej39NxjAXrvpUGKghGw" | |
| }' | |
| ``` | |
| Note: This is an admin-only endpoint. Only proxy admins can block keys. | |
| """ | |
| from litellm.proxy.proxy_server import ( | |
| create_audit_log_for_update, | |
| hash_token, | |
| litellm_proxy_admin_name, | |
| prisma_client, | |
| proxy_logging_obj, | |
| user_api_key_cache, | |
| ) | |
| if prisma_client is None: | |
| raise Exception("{}".format(CommonProxyErrors.db_not_connected_error.value)) | |
| if data.key.startswith("sk-"): | |
| hashed_token = hash_token(token=data.key) | |
| else: | |
| hashed_token = data.key | |
| if litellm.store_audit_logs is True: | |
| # make an audit log for key update | |
| record = await prisma_client.db.litellm_verificationtoken.find_unique( | |
| where={"token": hashed_token} | |
| ) | |
| if record is None: | |
| raise ProxyException( | |
| message=f"Key {data.key} not found", | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="key", | |
| code=status.HTTP_404_NOT_FOUND, | |
| ) | |
| asyncio.create_task( | |
| create_audit_log_for_update( | |
| request_data=LiteLLM_AuditLogs( | |
| id=str(uuid.uuid4()), | |
| updated_at=datetime.now(timezone.utc), | |
| changed_by=litellm_changed_by | |
| or user_api_key_dict.user_id | |
| or litellm_proxy_admin_name, | |
| changed_by_api_key=user_api_key_dict.api_key, | |
| table_name=LitellmTableNames.KEY_TABLE_NAME, | |
| object_id=hashed_token, | |
| action="blocked", | |
| updated_values="{}", | |
| before_value=record.model_dump_json(), | |
| ) | |
| ) | |
| ) | |
| record = await prisma_client.db.litellm_verificationtoken.update( | |
| where={"token": hashed_token}, data={"blocked": True} # type: ignore | |
| ) | |
| ## UPDATE KEY CACHE | |
| ### get cached object ### | |
| key_object = await get_key_object( | |
| hashed_token=hashed_token, | |
| prisma_client=prisma_client, | |
| user_api_key_cache=user_api_key_cache, | |
| parent_otel_span=None, | |
| proxy_logging_obj=proxy_logging_obj, | |
| ) | |
| ### update cached object ### | |
| key_object.blocked = True | |
| ### store cached object ### | |
| await _cache_key_object( | |
| hashed_token=hashed_token, | |
| user_api_key_obj=key_object, | |
| user_api_key_cache=user_api_key_cache, | |
| proxy_logging_obj=proxy_logging_obj, | |
| ) | |
| return record | |
| async def unblock_key( | |
| data: BlockKeyRequest, | |
| http_request: Request, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| litellm_changed_by: Optional[str] = Header( | |
| None, | |
| description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability", | |
| ), | |
| ): | |
| """ | |
| Unblock a Virtual key to allow it to make requests again. | |
| Parameters: | |
| - key: str - The key to unblock. Can be either the unhashed key (sk-...) or the hashed key value | |
| Example: | |
| ```bash | |
| curl --location 'http://0.0.0.0:4000/key/unblock' \ | |
| --header 'Authorization: Bearer sk-1234' \ | |
| --header 'Content-Type: application/json' \ | |
| --data '{ | |
| "key": "sk-Fn8Ej39NxjAXrvpUGKghGw" | |
| }' | |
| ``` | |
| Note: This is an admin-only endpoint. Only proxy admins can unblock keys. | |
| """ | |
| from litellm.proxy.proxy_server import ( | |
| create_audit_log_for_update, | |
| hash_token, | |
| litellm_proxy_admin_name, | |
| prisma_client, | |
| proxy_logging_obj, | |
| user_api_key_cache, | |
| ) | |
| if prisma_client is None: | |
| raise Exception("{}".format(CommonProxyErrors.db_not_connected_error.value)) | |
| if data.key.startswith("sk-"): | |
| hashed_token = hash_token(token=data.key) | |
| else: | |
| hashed_token = data.key | |
| if litellm.store_audit_logs is True: | |
| # make an audit log for key update | |
| record = await prisma_client.db.litellm_verificationtoken.find_unique( | |
| where={"token": hashed_token} | |
| ) | |
| if record is None: | |
| raise ProxyException( | |
| message=f"Key {data.key} not found", | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="key", | |
| code=status.HTTP_404_NOT_FOUND, | |
| ) | |
| asyncio.create_task( | |
| create_audit_log_for_update( | |
| request_data=LiteLLM_AuditLogs( | |
| id=str(uuid.uuid4()), | |
| updated_at=datetime.now(timezone.utc), | |
| changed_by=litellm_changed_by | |
| or user_api_key_dict.user_id | |
| or litellm_proxy_admin_name, | |
| changed_by_api_key=user_api_key_dict.api_key, | |
| table_name=LitellmTableNames.KEY_TABLE_NAME, | |
| object_id=hashed_token, | |
| action="blocked", | |
| updated_values="{}", | |
| before_value=record.model_dump_json(), | |
| ) | |
| ) | |
| ) | |
| record = await prisma_client.db.litellm_verificationtoken.update( | |
| where={"token": hashed_token}, data={"blocked": False} # type: ignore | |
| ) | |
| ## UPDATE KEY CACHE | |
| ### get cached object ### | |
| key_object = await get_key_object( | |
| hashed_token=hashed_token, | |
| prisma_client=prisma_client, | |
| user_api_key_cache=user_api_key_cache, | |
| parent_otel_span=None, | |
| proxy_logging_obj=proxy_logging_obj, | |
| ) | |
| ### update cached object ### | |
| key_object.blocked = False | |
| ### store cached object ### | |
| await _cache_key_object( | |
| hashed_token=hashed_token, | |
| user_api_key_obj=key_object, | |
| user_api_key_cache=user_api_key_cache, | |
| proxy_logging_obj=proxy_logging_obj, | |
| ) | |
| return record | |
| async def key_health( | |
| request: Request, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| ): | |
| """ | |
| Check the health of the key | |
| Checks: | |
| - If key based logging is configured correctly - sends a test log | |
| Usage | |
| Pass the key in the request header | |
| ```bash | |
| curl -X POST "http://localhost:4000/key/health" \ | |
| -H "Authorization: Bearer sk-1234" \ | |
| -H "Content-Type: application/json" | |
| ``` | |
| Response when logging callbacks are setup correctly: | |
| ```json | |
| { | |
| "key": "healthy", | |
| "logging_callbacks": { | |
| "callbacks": [ | |
| "gcs_bucket" | |
| ], | |
| "status": "healthy", | |
| "details": "No logger exceptions triggered, system is healthy. Manually check if logs were sent to ['gcs_bucket']" | |
| } | |
| } | |
| ``` | |
| Response when logging callbacks are not setup correctly: | |
| ```json | |
| { | |
| "key": "unhealthy", | |
| "logging_callbacks": { | |
| "callbacks": [ | |
| "gcs_bucket" | |
| ], | |
| "status": "unhealthy", | |
| "details": "Logger exceptions triggered, system is unhealthy: Failed to load vertex credentials. Check to see if credentials containing partial/invalid information." | |
| } | |
| } | |
| ``` | |
| """ | |
| try: | |
| # Get the key's metadata | |
| key_metadata = user_api_key_dict.metadata | |
| health_status: KeyHealthResponse = KeyHealthResponse( | |
| key="healthy", | |
| logging_callbacks=None, | |
| ) | |
| # Check if logging is configured in metadata | |
| if key_metadata and "logging" in key_metadata: | |
| logging_statuses = await test_key_logging( | |
| user_api_key_dict=user_api_key_dict, | |
| request=request, | |
| key_logging=key_metadata["logging"], | |
| ) | |
| health_status["logging_callbacks"] = logging_statuses | |
| # Check if any logging callback is unhealthy | |
| if logging_statuses.get("status") == "unhealthy": | |
| health_status["key"] = "unhealthy" | |
| return KeyHealthResponse(**health_status) | |
| except Exception as e: | |
| raise ProxyException( | |
| message=f"Key health check failed: {str(e)}", | |
| type=ProxyErrorTypes.internal_server_error, | |
| param=getattr(e, "param", "None"), | |
| code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| ) | |
| async def _can_user_query_key_info( | |
| user_api_key_dict: UserAPIKeyAuth, | |
| key: Optional[str], | |
| key_info: LiteLLM_VerificationToken, | |
| ) -> bool: | |
| """ | |
| Helper to check if the user has access to the key's info | |
| """ | |
| if ( | |
| user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value | |
| or user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value | |
| ): | |
| return True | |
| elif user_api_key_dict.api_key == key: | |
| return True | |
| # user can query their own key info | |
| elif key_info.user_id == user_api_key_dict.user_id: | |
| return True | |
| elif await TeamMemberPermissionChecks.user_belongs_to_keys_team( | |
| user_api_key_dict=user_api_key_dict, | |
| existing_key_row=key_info, | |
| ): | |
| return True | |
| return False | |
| async def test_key_logging( | |
| user_api_key_dict: UserAPIKeyAuth, | |
| request: Request, | |
| key_logging: List[Dict[str, Any]], | |
| ) -> LoggingCallbackStatus: | |
| """ | |
| Test the key-based logging | |
| - Test that key logging is correctly formatted and all args are passed correctly | |
| - Make a mock completion call -> user can check if it's correctly logged | |
| - Check if any logger.exceptions were triggered -> if they were then returns it to the user client side | |
| """ | |
| import logging | |
| from io import StringIO | |
| from litellm.proxy.litellm_pre_call_utils import add_litellm_data_to_request | |
| from litellm.proxy.proxy_server import general_settings, proxy_config | |
| logging_callbacks: List[str] = [] | |
| for callback in key_logging: | |
| if callback.get("callback_name") is not None: | |
| logging_callbacks.append(callback["callback_name"]) | |
| else: | |
| raise ValueError("callback_name is required in key_logging") | |
| log_capture_string = StringIO() | |
| ch = logging.StreamHandler(log_capture_string) | |
| ch.setLevel(logging.ERROR) | |
| logger = logging.getLogger() | |
| logger.addHandler(ch) | |
| try: | |
| data = { | |
| "model": "openai/litellm-key-health-test", | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": "Hello, this is a test from litellm /key/health. No LLM API call was made for this", | |
| } | |
| ], | |
| "mock_response": "test response", | |
| } | |
| data = await add_litellm_data_to_request( | |
| data=data, | |
| user_api_key_dict=user_api_key_dict, | |
| proxy_config=proxy_config, | |
| general_settings=general_settings, | |
| request=request, | |
| ) | |
| await litellm.acompletion( | |
| **data | |
| ) # make mock completion call to trigger key based callbacks | |
| except Exception as e: | |
| return LoggingCallbackStatus( | |
| callbacks=logging_callbacks, | |
| status="unhealthy", | |
| details=f"Logging test failed: {str(e)}", | |
| ) | |
| await asyncio.sleep( | |
| 2 | |
| ) # wait for callbacks to run, callbacks use batching so wait for the flush event | |
| # Check if any logger exceptions were triggered | |
| log_contents = log_capture_string.getvalue() | |
| logger.removeHandler(ch) | |
| if log_contents: | |
| return LoggingCallbackStatus( | |
| callbacks=logging_callbacks, | |
| status="unhealthy", | |
| details=f"Logger exceptions triggered, system is unhealthy: {log_contents}", | |
| ) | |
| else: | |
| return LoggingCallbackStatus( | |
| callbacks=logging_callbacks, | |
| status="healthy", | |
| details=f"No logger exceptions triggered, system is healthy. Manually check if logs were sent to {logging_callbacks} ", | |
| ) | |
| async def _enforce_unique_key_alias( | |
| key_alias: Optional[str], | |
| prisma_client: Any, | |
| existing_key_token: Optional[str] = None, | |
| ) -> None: | |
| """ | |
| Helper to enforce unique key aliases across all keys. | |
| Args: | |
| key_alias (Optional[str]): The key alias to check | |
| prisma_client (Any): Prisma client instance | |
| existing_key_token (Optional[str]): ID of existing key being updated, to exclude from uniqueness check | |
| (The Admin UI passes key_alias, in all Edit key requests. So we need to be sure that if we find a key with the same alias, it's not the same key we're updating) | |
| Raises: | |
| ProxyException: If key alias already exists on a different key | |
| """ | |
| if key_alias is not None and prisma_client is not None: | |
| where_clause: dict[str, Any] = {"key_alias": key_alias} | |
| if existing_key_token: | |
| # Exclude the current key from the uniqueness check | |
| where_clause["NOT"] = {"token": existing_key_token} | |
| existing_key = await prisma_client.db.litellm_verificationtoken.find_first( | |
| where=where_clause | |
| ) | |
| if existing_key is not None: | |
| raise ProxyException( | |
| message=f"Key with alias '{key_alias}' already exists. Unique key aliases across all keys are required.", | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="key_alias", | |
| code=status.HTTP_400_BAD_REQUEST, | |
| ) | |
| def validate_model_max_budget(model_max_budget: Optional[Dict]) -> None: | |
| """ | |
| Validate the model_max_budget is GenericBudgetConfigType + enforce user has an enterprise license | |
| Raises: | |
| Exception: If model_max_budget is not a valid GenericBudgetConfigType | |
| """ | |
| try: | |
| if model_max_budget is None: | |
| return | |
| if len(model_max_budget) == 0: | |
| return | |
| if model_max_budget is not None: | |
| from litellm.proxy.proxy_server import CommonProxyErrors, premium_user | |
| if premium_user is not True: | |
| raise ValueError( | |
| f"You must have an enterprise license to set model_max_budget. {CommonProxyErrors.not_premium_user.value}" | |
| ) | |
| for _model, _budget_info in model_max_budget.items(): | |
| assert isinstance(_model, str) | |
| # /CRUD endpoints can pass budget_limit as a string, so we need to convert it to a float | |
| if "budget_limit" in _budget_info: | |
| _budget_info["budget_limit"] = float(_budget_info["budget_limit"]) | |
| BudgetConfig(**_budget_info) | |
| except Exception as e: | |
| raise ValueError( | |
| f"Invalid model_max_budget: {str(e)}. Example of valid model_max_budget: https://docs.litellm.ai/docs/proxy/users" | |
| ) | |