FLOW2API / src /api /routes.py
xiaoh2018's picture
Upload 1108 files
33cfa2a verified
"""API routes - OpenAI compatible endpoints"""
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from typing import List, Optional
import base64
import re
import json
import time
from urllib.parse import urlparse
from curl_cffi.requests import AsyncSession
from ..core.auth import verify_api_key_header
from ..core.models import ChatCompletionRequest
from ..services.generation_handler import GenerationHandler, MODEL_CONFIG
from ..core.logger import debug_logger
router = APIRouter()
# Dependency injection will be set up in main.py
generation_handler: GenerationHandler = None
def set_generation_handler(handler: GenerationHandler):
"""Set generation handler instance"""
global generation_handler
generation_handler = handler
async def retrieve_image_data(url: str) -> Optional[bytes]:
"""
智能获取图片数据:
1. 优先检查是否为本地 /tmp/ 缓存文件,如果是则直接读取磁盘
2. 如果本地不存在或是外部链接,则进行网络下载
"""
# 优先尝试本地读取
try:
if "/tmp/" in url and generation_handler and generation_handler.file_cache:
path = urlparse(url).path
filename = path.split("/tmp/")[-1]
local_file_path = generation_handler.file_cache.cache_dir / filename
if local_file_path.exists() and local_file_path.is_file():
data = local_file_path.read_bytes()
if data:
return data
except Exception as e:
debug_logger.log_warning(f"[CONTEXT] 本地缓存读取失败: {str(e)}")
# 回退逻辑:网络下载
try:
async with AsyncSession() as session:
response = await session.get(url, timeout=30, impersonate="chrome110", verify=False)
if response.status_code == 200:
return response.content
else:
debug_logger.log_warning(f"[CONTEXT] 图片下载失败,状态码: {response.status_code}")
except Exception as e:
debug_logger.log_error(f"[CONTEXT] 图片下载异常: {str(e)}")
return None
@router.get("/v1/models")
async def list_models(api_key: str = Depends(verify_api_key_header)):
"""List available models"""
models = []
for model_id, config in MODEL_CONFIG.items():
description = f"{config['type'].capitalize()} generation"
if config['type'] == 'image':
description += f" - {config['model_name']}"
else:
description += f" - {config['model_key']}"
models.append({
"id": model_id,
"object": "model",
"owned_by": "flow2api",
"description": description
})
return {
"object": "list",
"data": models
}
@router.post("/v1/chat/completions")
async def create_chat_completion(
request: ChatCompletionRequest,
api_key: str = Depends(verify_api_key_header)
):
"""Create chat completion (unified endpoint for image and video generation)"""
try:
# Extract prompt from messages
if not request.messages:
raise HTTPException(status_code=400, detail="Messages cannot be empty")
last_message = request.messages[-1]
content = last_message.content
# Handle both string and array format (OpenAI multimodal)
prompt = ""
images: List[bytes] = []
if isinstance(content, str):
# Simple text format
prompt = content
elif isinstance(content, list):
# Multimodal format
for item in content:
if item.get("type") == "text":
prompt = item.get("text", "")
elif item.get("type") == "image_url":
# Extract base64 image
image_url = item.get("image_url", {}).get("url", "")
if image_url.startswith("data:image"):
# Parse base64
match = re.search(r"base64,(.+)", image_url)
if match:
image_base64 = match.group(1)
image_bytes = base64.b64decode(image_base64)
images.append(image_bytes)
# Fallback to deprecated image parameter
if request.image and not images:
if request.image.startswith("data:image"):
match = re.search(r"base64,(.+)", request.image)
if match:
image_base64 = match.group(1)
image_bytes = base64.b64decode(image_base64)
images.append(image_bytes)
# 自动参考图:仅对图片模型生效
model_config = MODEL_CONFIG.get(request.model)
if model_config and model_config["type"] == "image" and len(request.messages) > 1:
debug_logger.log_info(f"[CONTEXT] 开始查找历史参考图,消息数量: {len(request.messages)}")
# 查找上一次 assistant 回复的图片
for msg in reversed(request.messages[:-1]):
if msg.role == "assistant" and isinstance(msg.content, str):
# 匹配 Markdown 图片格式: ![...](http...)
matches = re.findall(r"!\[.*?\]\((.*?)\)", msg.content)
if matches:
last_image_url = matches[-1]
if last_image_url.startswith("http"):
try:
downloaded_bytes = await retrieve_image_data(last_image_url)
if downloaded_bytes and len(downloaded_bytes) > 0:
# 将历史图片插入到最前面
images.insert(0, downloaded_bytes)
debug_logger.log_info(f"[CONTEXT] ✅ 添加历史参考图: {last_image_url}")
break
else:
debug_logger.log_warning(f"[CONTEXT] 图片下载失败或为空,尝试下一个: {last_image_url}")
except Exception as e:
debug_logger.log_error(f"[CONTEXT] 处理参考图时出错: {str(e)}")
# 继续尝试下一个图片
if not prompt:
raise HTTPException(status_code=400, detail="Prompt cannot be empty")
# Call generation handler
if request.stream:
# Streaming response
async def generate():
async for chunk in generation_handler.handle_generation(
model=request.model,
prompt=prompt,
images=images if images else None,
stream=True
):
yield chunk
# Send [DONE] signal
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
else:
# Non-streaming response
result = None
async for chunk in generation_handler.handle_generation(
model=request.model,
prompt=prompt,
images=images if images else None,
stream=False
):
result = chunk
if result:
# Parse the result JSON string
try:
result_json = json.loads(result)
return JSONResponse(content=result_json)
except json.JSONDecodeError:
# If not JSON, return as-is
return JSONResponse(content={"result": result})
else:
raise HTTPException(status_code=500, detail="Generation failed: No response from handler")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))