Spaces:
Paused
Paused
| """ | |
| Unified /v1/messages endpoint - (Anthropic Spec) | |
| """ | |
| import asyncio | |
| import json | |
| import time | |
| import traceback | |
| from fastapi import APIRouter, Depends, HTTPException, Request, Response, status | |
| import litellm | |
| from litellm._logging import verbose_proxy_logger | |
| from litellm.constants import STREAM_SSE_DATA_PREFIX | |
| from litellm.litellm_core_utils.safe_json_dumps import safe_dumps | |
| from litellm.proxy._types import * | |
| from litellm.proxy.auth.user_api_key_auth import user_api_key_auth | |
| from litellm.proxy.common_request_processing import ( | |
| ProxyBaseLLMRequestProcessing, | |
| create_streaming_response, | |
| ) | |
| from litellm.proxy.common_utils.http_parsing_utils import _read_request_body | |
| from litellm.proxy.litellm_pre_call_utils import add_litellm_data_to_request | |
| from litellm.proxy.utils import ProxyLogging | |
| router = APIRouter() | |
| def return_anthropic_chunk(chunk: Any) -> str: | |
| """ | |
| Helper function to format streaming chunks for Anthropic API format | |
| Args: | |
| chunk: A string or dictionary to be returned in SSE format | |
| Returns: | |
| str: A properly formatted SSE chunk string | |
| """ | |
| if isinstance(chunk, dict): | |
| # Use safe_dumps for proper JSON serialization with circular reference detection | |
| chunk_str = safe_dumps(chunk) | |
| return f"{STREAM_SSE_DATA_PREFIX}{chunk_str}\n\n" | |
| else: | |
| return chunk | |
| async def async_data_generator_anthropic( | |
| response, | |
| user_api_key_dict: UserAPIKeyAuth, | |
| request_data: dict, | |
| proxy_logging_obj: ProxyLogging, | |
| ): | |
| verbose_proxy_logger.debug("inside generator") | |
| try: | |
| time.time() | |
| async for chunk in response: | |
| verbose_proxy_logger.debug( | |
| "async_data_generator: received streaming chunk - {}".format(chunk) | |
| ) | |
| ### CALL HOOKS ### - modify outgoing data | |
| chunk = await proxy_logging_obj.async_post_call_streaming_hook( | |
| user_api_key_dict=user_api_key_dict, response=chunk | |
| ) | |
| # Format chunk using helper function | |
| yield return_anthropic_chunk(chunk) | |
| except Exception as e: | |
| verbose_proxy_logger.exception( | |
| "litellm.proxy.proxy_server.async_data_generator(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| await proxy_logging_obj.post_call_failure_hook( | |
| user_api_key_dict=user_api_key_dict, | |
| original_exception=e, | |
| request_data=request_data, | |
| ) | |
| verbose_proxy_logger.debug( | |
| f"\033[1;31mAn error occurred: {e}\n\n Debug this by setting `--debug`, e.g. `litellm --model gpt-3.5-turbo --debug`" | |
| ) | |
| if isinstance(e, HTTPException): | |
| raise e | |
| else: | |
| error_traceback = traceback.format_exc() | |
| error_msg = f"{str(e)}\n\n{error_traceback}" | |
| proxy_exception = ProxyException( | |
| message=getattr(e, "message", error_msg), | |
| type=getattr(e, "type", "None"), | |
| param=getattr(e, "param", "None"), | |
| code=getattr(e, "status_code", 500), | |
| ) | |
| error_returned = json.dumps({"error": proxy_exception.to_dict()}) | |
| yield f"{STREAM_SSE_DATA_PREFIX}{error_returned}\n\n" | |
| async def anthropic_response( # noqa: PLR0915 | |
| fastapi_response: Response, | |
| request: Request, | |
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
| ): | |
| """ | |
| Use `{PROXY_BASE_URL}/anthropic/v1/messages` instead - [Docs](https://docs.litellm.ai/docs/anthropic_completion). | |
| This was a BETA endpoint that calls 100+ LLMs in the anthropic format. | |
| """ | |
| from litellm.proxy.proxy_server import ( | |
| general_settings, | |
| llm_router, | |
| proxy_config, | |
| proxy_logging_obj, | |
| user_api_base, | |
| user_max_tokens, | |
| user_model, | |
| user_request_timeout, | |
| user_temperature, | |
| version, | |
| ) | |
| request_data = await _read_request_body(request=request) | |
| data: dict = {**request_data} | |
| try: | |
| data["model"] = ( | |
| general_settings.get("completion_model", None) # server default | |
| or user_model # model name passed via cli args | |
| or data.get("model", None) # default passed in http request | |
| ) | |
| if user_model: | |
| data["model"] = user_model | |
| data = await add_litellm_data_to_request( | |
| data=data, # type: ignore | |
| request=request, | |
| general_settings=general_settings, | |
| user_api_key_dict=user_api_key_dict, | |
| version=version, | |
| proxy_config=proxy_config, | |
| ) | |
| # override with user settings, these are params passed via cli | |
| if user_temperature: | |
| data["temperature"] = user_temperature | |
| if user_request_timeout: | |
| data["request_timeout"] = user_request_timeout | |
| if user_max_tokens: | |
| data["max_tokens"] = user_max_tokens | |
| if user_api_base: | |
| data["api_base"] = user_api_base | |
| ### MODEL ALIAS MAPPING ### | |
| # check if model name in model alias map | |
| # get the actual model name | |
| if data["model"] in litellm.model_alias_map: | |
| data["model"] = litellm.model_alias_map[data["model"]] | |
| ### CALL HOOKS ### - modify incoming data before calling the model | |
| data = await proxy_logging_obj.pre_call_hook( # type: ignore | |
| user_api_key_dict=user_api_key_dict, data=data, call_type="text_completion" | |
| ) | |
| ### ROUTE THE REQUESTs ### | |
| router_model_names = llm_router.model_names if llm_router is not None else [] | |
| # skip router if user passed their key | |
| if ( | |
| llm_router is not None and data["model"] in router_model_names | |
| ): # model in router model list | |
| llm_response = asyncio.create_task(llm_router.aanthropic_messages(**data)) | |
| elif ( | |
| llm_router is not None | |
| and llm_router.model_group_alias is not None | |
| and data["model"] in llm_router.model_group_alias | |
| ): # model set in model_group_alias | |
| llm_response = asyncio.create_task(llm_router.aanthropic_messages(**data)) | |
| elif ( | |
| llm_router is not None and data["model"] in llm_router.deployment_names | |
| ): # model in router deployments, calling a specific deployment on the router | |
| llm_response = asyncio.create_task( | |
| llm_router.aanthropic_messages(**data, specific_deployment=True) | |
| ) | |
| elif ( | |
| llm_router is not None and data["model"] in llm_router.get_model_ids() | |
| ): # model in router model list | |
| llm_response = asyncio.create_task(llm_router.aanthropic_messages(**data)) | |
| elif ( | |
| llm_router is not None | |
| and data["model"] not in router_model_names | |
| and ( | |
| llm_router.default_deployment is not None | |
| or len(llm_router.pattern_router.patterns) > 0 | |
| ) | |
| ): # model in router deployments, calling a specific deployment on the router | |
| llm_response = asyncio.create_task(llm_router.aanthropic_messages(**data)) | |
| elif user_model is not None: # `litellm --model <your-model-name>` | |
| llm_response = asyncio.create_task(litellm.anthropic_messages(**data)) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail={ | |
| "error": "completion: Invalid model name passed in model=" | |
| + data.get("model", "") | |
| }, | |
| ) | |
| # Await the llm_response task | |
| response = await llm_response | |
| hidden_params = getattr(response, "_hidden_params", {}) or {} | |
| model_id = hidden_params.get("model_id", None) or "" | |
| cache_key = hidden_params.get("cache_key", None) or "" | |
| api_base = hidden_params.get("api_base", None) or "" | |
| response_cost = hidden_params.get("response_cost", None) or "" | |
| ### ALERTING ### | |
| asyncio.create_task( | |
| proxy_logging_obj.update_request_status( | |
| litellm_call_id=data.get("litellm_call_id", ""), status="success" | |
| ) | |
| ) | |
| verbose_proxy_logger.debug("final response: %s", response) | |
| fastapi_response.headers.update( | |
| ProxyBaseLLMRequestProcessing.get_custom_headers( | |
| user_api_key_dict=user_api_key_dict, | |
| model_id=model_id, | |
| cache_key=cache_key, | |
| api_base=api_base, | |
| version=version, | |
| response_cost=response_cost, | |
| request_data=data, | |
| hidden_params=hidden_params, | |
| ) | |
| ) | |
| if ( | |
| "stream" in data and data["stream"] is True | |
| ): # use generate_responses to stream responses | |
| selected_data_generator = async_data_generator_anthropic( | |
| response=response, | |
| user_api_key_dict=user_api_key_dict, | |
| request_data=data, | |
| proxy_logging_obj=proxy_logging_obj, | |
| ) | |
| return await create_streaming_response( | |
| generator=selected_data_generator, | |
| media_type="text/event-stream", | |
| headers=dict(fastapi_response.headers), | |
| ) | |
| verbose_proxy_logger.info("\nResponse from Litellm:\n{}".format(response)) | |
| return response | |
| except Exception as e: | |
| await proxy_logging_obj.post_call_failure_hook( | |
| user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data | |
| ) | |
| verbose_proxy_logger.exception( | |
| "litellm.proxy.proxy_server.anthropic_response(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| error_msg = f"{str(e)}" | |
| raise ProxyException( | |
| message=getattr(e, "message", error_msg), | |
| type=getattr(e, "type", "None"), | |
| param=getattr(e, "param", "None"), | |
| code=getattr(e, "status_code", 500), | |
| ) | |