Spaces:
Paused
Paused
| """ | |
| Endpoints to control callbacks per team | |
| Use this when each team should control its own callbacks | |
| """ | |
| import json | |
| import traceback | |
| from typing import Optional | |
| from fastapi import APIRouter, Depends, Header, HTTPException, Request, status | |
| from litellm._logging import verbose_proxy_logger | |
| from litellm.proxy._types import ( | |
| AddTeamCallback, | |
| ProxyErrorTypes, | |
| ProxyException, | |
| TeamCallbackMetadata, | |
| UserAPIKeyAuth, | |
| ) | |
| from litellm.proxy.auth.user_api_key_auth import user_api_key_auth | |
| from litellm.proxy.management_helpers.utils import management_endpoint_wrapper | |
| router = APIRouter() | |
| async def add_team_callbacks( | |
| data: AddTeamCallback, | |
| http_request: Request, | |
| team_id: str, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| litellm_changed_by: Optional[str] = Header( | |
| None, | |
| description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability", | |
| ), | |
| ): | |
| """ | |
| Add a success/failure callback to a team | |
| Use this if if you want different teams to have different success/failure callbacks | |
| Parameters: | |
| - callback_name (Literal["langfuse", "langsmith", "gcs"], required): The name of the callback to add | |
| - callback_type (Literal["success", "failure", "success_and_failure"], required): The type of callback to add. One of: | |
| - "success": Callback for successful LLM calls | |
| - "failure": Callback for failed LLM calls | |
| - "success_and_failure": Callback for both successful and failed LLM calls | |
| - callback_vars (StandardCallbackDynamicParams, required): A dictionary of variables to pass to the callback | |
| - langfuse_public_key: The public key for the Langfuse callback | |
| - langfuse_secret_key: The secret key for the Langfuse callback | |
| - langfuse_secret: The secret for the Langfuse callback | |
| - langfuse_host: The host for the Langfuse callback | |
| - gcs_bucket_name: The name of the GCS bucket | |
| - gcs_path_service_account: The path to the GCS service account | |
| - langsmith_api_key: The API key for the Langsmith callback | |
| - langsmith_project: The project for the Langsmith callback | |
| - langsmith_base_url: The base URL for the Langsmith callback | |
| Example curl: | |
| ``` | |
| curl -X POST 'http:/localhost:4000/team/dbe2f686-a686-4896-864a-4c3924458709/callback' \ | |
| -H 'Content-Type: application/json' \ | |
| -H 'Authorization: Bearer sk-1234' \ | |
| -d '{ | |
| "callback_name": "langfuse", | |
| "callback_type": "success", | |
| "callback_vars": {"langfuse_public_key": "pk-lf-xxxx1", "langfuse_secret_key": "sk-xxxxx"} | |
| }' | |
| ``` | |
| This means for the team where team_id = dbe2f686-a686-4896-864a-4c3924458709, all LLM calls will be logged to langfuse using the public key pk-lf-xxxx1 and the secret key sk-xxxxx | |
| """ | |
| try: | |
| from litellm.proxy.proxy_server import prisma_client | |
| if prisma_client is None: | |
| raise HTTPException(status_code=500, detail={"error": "No db connected"}) | |
| # Check if team_id exists already | |
| _existing_team = await prisma_client.get_data( | |
| team_id=team_id, table_name="team", query_type="find_unique" | |
| ) | |
| if _existing_team is None: | |
| raise HTTPException( | |
| status_code=400, | |
| detail={ | |
| "error": f"Team id = {team_id} does not exist. Please use a different team id." | |
| }, | |
| ) | |
| # store team callback settings in metadata | |
| team_metadata = _existing_team.metadata | |
| team_callback_settings = team_metadata.get("callback_settings", {}) | |
| # expect callback settings to be | |
| team_callback_settings_obj = TeamCallbackMetadata(**team_callback_settings) | |
| if data.callback_type == "success": | |
| if team_callback_settings_obj.success_callback is None: | |
| team_callback_settings_obj.success_callback = [] | |
| if data.callback_name in team_callback_settings_obj.success_callback: | |
| raise ProxyException( | |
| message=f"callback_name = {data.callback_name} already exists in failure_callback, for team_id = {team_id}. \n Existing failure_callback = {team_callback_settings_obj.success_callback}", | |
| code=status.HTTP_400_BAD_REQUEST, | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="callback_name", | |
| ) | |
| team_callback_settings_obj.success_callback.append(data.callback_name) | |
| elif data.callback_type == "failure": | |
| if team_callback_settings_obj.failure_callback is None: | |
| team_callback_settings_obj.failure_callback = [] | |
| if data.callback_name in team_callback_settings_obj.failure_callback: | |
| raise ProxyException( | |
| message=f"callback_name = {data.callback_name} already exists in failure_callback, for team_id = {team_id}. \n Existing failure_callback = {team_callback_settings_obj.failure_callback}", | |
| code=status.HTTP_400_BAD_REQUEST, | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="callback_name", | |
| ) | |
| team_callback_settings_obj.failure_callback.append(data.callback_name) | |
| elif data.callback_type == "success_and_failure": | |
| if team_callback_settings_obj.success_callback is None: | |
| team_callback_settings_obj.success_callback = [] | |
| if team_callback_settings_obj.failure_callback is None: | |
| team_callback_settings_obj.failure_callback = [] | |
| if data.callback_name in team_callback_settings_obj.success_callback: | |
| raise ProxyException( | |
| message=f"callback_name = {data.callback_name} already exists in success_callback, for team_id = {team_id}. \n Existing success_callback = {team_callback_settings_obj.success_callback}", | |
| code=status.HTTP_400_BAD_REQUEST, | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="callback_name", | |
| ) | |
| if data.callback_name in team_callback_settings_obj.failure_callback: | |
| raise ProxyException( | |
| message=f"callback_name = {data.callback_name} already exists in failure_callback, for team_id = {team_id}. \n Existing failure_callback = {team_callback_settings_obj.failure_callback}", | |
| code=status.HTTP_400_BAD_REQUEST, | |
| type=ProxyErrorTypes.bad_request_error, | |
| param="callback_name", | |
| ) | |
| team_callback_settings_obj.success_callback.append(data.callback_name) | |
| team_callback_settings_obj.failure_callback.append(data.callback_name) | |
| for var, value in data.callback_vars.items(): | |
| if team_callback_settings_obj.callback_vars is None: | |
| team_callback_settings_obj.callback_vars = {} | |
| team_callback_settings_obj.callback_vars[var] = value | |
| team_callback_settings_obj_dict = team_callback_settings_obj.model_dump() | |
| team_metadata["callback_settings"] = team_callback_settings_obj_dict | |
| team_metadata_json = json.dumps(team_metadata) # update team_metadata | |
| new_team_row = await prisma_client.db.litellm_teamtable.update( | |
| where={"team_id": team_id}, data={"metadata": team_metadata_json} # type: ignore | |
| ) | |
| return { | |
| "status": "success", | |
| "data": new_team_row, | |
| } | |
| except Exception as e: | |
| verbose_proxy_logger.error( | |
| "litellm.proxy.proxy_server.add_team_callbacks(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| verbose_proxy_logger.debug(traceback.format_exc()) | |
| if isinstance(e, HTTPException): | |
| raise ProxyException( | |
| message=getattr(e, "detail", f"Internal Server Error({str(e)})"), | |
| type=ProxyErrorTypes.internal_server_error.value, | |
| param=getattr(e, "param", "None"), | |
| code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR), | |
| ) | |
| elif isinstance(e, ProxyException): | |
| raise e | |
| raise ProxyException( | |
| message="Internal Server Error, " + str(e), | |
| type=ProxyErrorTypes.internal_server_error.value, | |
| param=getattr(e, "param", "None"), | |
| code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| ) | |
| async def disable_team_logging( | |
| http_request: Request, | |
| team_id: str, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| ): | |
| """ | |
| Disable all logging callbacks for a team | |
| Parameters: | |
| - team_id (str, required): The unique identifier for the team | |
| Example curl: | |
| ``` | |
| curl -X POST 'http://localhost:4000/team/dbe2f686-a686-4896-864a-4c3924458709/disable_logging' \ | |
| -H 'Authorization: Bearer sk-1234' | |
| ``` | |
| """ | |
| try: | |
| from litellm.proxy.proxy_server import prisma_client | |
| if prisma_client is None: | |
| raise HTTPException(status_code=500, detail={"error": "No db connected"}) | |
| # Check if team exists | |
| _existing_team = await prisma_client.get_data( | |
| team_id=team_id, table_name="team", query_type="find_unique" | |
| ) | |
| if _existing_team is None: | |
| raise HTTPException( | |
| status_code=404, | |
| detail={"error": f"Team id = {team_id} does not exist."}, | |
| ) | |
| # Update team metadata to disable logging | |
| team_metadata = _existing_team.metadata | |
| team_callback_settings = team_metadata.get("callback_settings", {}) | |
| team_callback_settings_obj = TeamCallbackMetadata(**team_callback_settings) | |
| # Reset callbacks | |
| team_callback_settings_obj.success_callback = [] | |
| team_callback_settings_obj.failure_callback = [] | |
| # Update metadata | |
| team_metadata["callback_settings"] = team_callback_settings_obj.model_dump() | |
| team_metadata_json = json.dumps(team_metadata) | |
| # Update team in database | |
| updated_team = await prisma_client.db.litellm_teamtable.update( | |
| where={"team_id": team_id}, data={"metadata": team_metadata_json} # type: ignore | |
| ) | |
| if updated_team is None: | |
| raise HTTPException( | |
| status_code=404, | |
| detail={ | |
| "error": f"Team id = {team_id} does not exist. Error updating team logging" | |
| }, | |
| ) | |
| return { | |
| "status": "success", | |
| "message": f"Logging disabled for team {team_id}", | |
| "data": { | |
| "team_id": updated_team.team_id, | |
| "success_callbacks": [], | |
| "failure_callbacks": [], | |
| }, | |
| } | |
| except Exception as e: | |
| verbose_proxy_logger.error( | |
| f"litellm.proxy.proxy_server.disable_team_logging(): Exception occurred - {str(e)}" | |
| ) | |
| verbose_proxy_logger.debug(traceback.format_exc()) | |
| if isinstance(e, HTTPException): | |
| raise ProxyException( | |
| message=getattr(e, "detail", f"Internal Server Error({str(e)})"), | |
| type=ProxyErrorTypes.internal_server_error.value, | |
| param=getattr(e, "param", "None"), | |
| code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR), | |
| ) | |
| elif isinstance(e, ProxyException): | |
| raise e | |
| raise ProxyException( | |
| message="Internal Server Error, " + str(e), | |
| type=ProxyErrorTypes.internal_server_error.value, | |
| param=getattr(e, "param", "None"), | |
| code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| ) | |
| async def get_team_callbacks( | |
| http_request: Request, | |
| team_id: str, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| ): | |
| """ | |
| Get the success/failure callbacks and variables for a team | |
| Parameters: | |
| - team_id (str, required): The unique identifier for the team | |
| Example curl: | |
| ``` | |
| curl -X GET 'http://localhost:4000/team/dbe2f686-a686-4896-864a-4c3924458709/callback' \ | |
| -H 'Authorization: Bearer sk-1234' | |
| ``` | |
| This will return the callback settings for the team with id dbe2f686-a686-4896-864a-4c3924458709 | |
| Returns { | |
| "status": "success", | |
| "data": { | |
| "team_id": team_id, | |
| "success_callbacks": team_callback_settings_obj.success_callback, | |
| "failure_callbacks": team_callback_settings_obj.failure_callback, | |
| "callback_vars": team_callback_settings_obj.callback_vars, | |
| }, | |
| } | |
| """ | |
| try: | |
| from litellm.proxy.proxy_server import prisma_client | |
| if prisma_client is None: | |
| raise HTTPException(status_code=500, detail={"error": "No db connected"}) | |
| # Check if team_id exists | |
| _existing_team = await prisma_client.get_data( | |
| team_id=team_id, table_name="team", query_type="find_unique" | |
| ) | |
| if _existing_team is None: | |
| raise HTTPException( | |
| status_code=404, | |
| detail={"error": f"Team id = {team_id} does not exist."}, | |
| ) | |
| # Retrieve team callback settings from metadata | |
| team_metadata = _existing_team.metadata | |
| team_callback_settings = team_metadata.get("callback_settings", {}) | |
| # Convert to TeamCallbackMetadata object for consistent structure | |
| team_callback_settings_obj = TeamCallbackMetadata(**team_callback_settings) | |
| return { | |
| "status": "success", | |
| "data": { | |
| "team_id": team_id, | |
| "success_callbacks": team_callback_settings_obj.success_callback, | |
| "failure_callbacks": team_callback_settings_obj.failure_callback, | |
| "callback_vars": team_callback_settings_obj.callback_vars, | |
| }, | |
| } | |
| except Exception as e: | |
| verbose_proxy_logger.error( | |
| "litellm.proxy.proxy_server.get_team_callbacks(): Exception occurred - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| verbose_proxy_logger.debug(traceback.format_exc()) | |
| if isinstance(e, HTTPException): | |
| raise ProxyException( | |
| message=getattr(e, "detail", f"Internal Server Error({str(e)})"), | |
| type=ProxyErrorTypes.internal_server_error.value, | |
| param=getattr(e, "param", "None"), | |
| code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR), | |
| ) | |
| elif isinstance(e, ProxyException): | |
| raise e | |
| raise ProxyException( | |
| message="Internal Server Error, " + str(e), | |
| type=ProxyErrorTypes.internal_server_error.value, | |
| param=getattr(e, "param", "None"), | |
| code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| ) | |