| | import os |
| | import json |
| | import re |
| | import base64 |
| | import aiohttp |
| | import asyncio |
| |
|
| |
|
| | |
| | BASE_URL = os.getenv('GEMINI_FLOW2API_URL', 'http://127.0.0.1:8000') |
| | BACKEND_URL = BASE_URL + "/v1/chat/completions" |
| | API_KEY = os.getenv('GEMINI_FLOW2API_APIKEY', 'Bearer han1234') |
| | if API_KEY is None: |
| | raise ValueError('[gemini flow2api] api key not set') |
| | MODEL_LANDSCAPE = "gemini-3.0-pro-image-landscape" |
| | MODEL_PORTRAIT = "gemini-3.0-pro-image-portrait" |
| |
|
| | |
| | async def request_backend_generation( |
| | prompt: str, |
| | images: list[bytes] = None, |
| | model: str = None) -> bytes | None: |
| | """ |
| | 请求后端生成图片。 |
| | :param prompt: 提示词 |
| | :param images: 图片二进制列表 |
| | :param model: 指定模型名称 (可选) |
| | :return: 成功返回图片bytes,失败返回None |
| | """ |
| | |
| | images = images or [] |
| | |
| | |
| | use_model = model if model else MODEL_LANDSCAPE |
| |
|
| | |
| | if images: |
| | content_payload = [{"type": "text", "text": prompt}] |
| | print(f"[Backend] 正在处理 {len(images)} 张图片输入...") |
| | for img_bytes in images: |
| | b64_str = base64.b64encode(img_bytes).decode('utf-8') |
| | content_payload.append({ |
| | "type": "image_url", |
| | "image_url": {"url": f"data:image/jpeg;base64,{b64_str}"} |
| | }) |
| | else: |
| | content_payload = prompt |
| |
|
| | payload = { |
| | "model": use_model, |
| | "messages": [{"role": "user", "content": content_payload}], |
| | "stream": True |
| | } |
| | |
| | headers = { |
| | "Authorization": API_KEY, |
| | "Content-Type": "application/json" |
| | } |
| |
|
| | image_url = None |
| | print(f"[Backend] Model: {use_model} | 发起请求: {prompt[:20]}...") |
| | |
| | try: |
| | async with aiohttp.ClientSession() as session: |
| | async with session.post(BACKEND_URL, json=payload, headers=headers, timeout=120) as response: |
| | if response.status != 200: |
| | err_text = await response.text() |
| | content = response.content |
| | print(f"[Backend Error] Status {response.status}: {err_text} {content}") |
| | raise Exception(f"API Error: {response.status}: {err_text}") |
| |
|
| | async for line in response.content: |
| | line_str = line.decode('utf-8').strip() |
| | if line_str.startswith('{"error'): |
| | chunk = json.loads(data_str) |
| | delta = chunk.get("choices", [{}])[0].get("delta", {}) |
| | msg = delta['reasoning_content'] |
| | if '401' in msg: |
| | msg += '\nAccess Token 已失效,需重新配置。' |
| | elif '400' in msg: |
| | msg += '\n返回内容被拦截。' |
| | raise Exception(msg) |
| |
|
| | if not line_str or not line_str.startswith('data: '): |
| | continue |
| | |
| | data_str = line_str[6:] |
| | if data_str == '[DONE]': |
| | break |
| | |
| | try: |
| | chunk = json.loads(data_str) |
| | delta = chunk.get("choices", [{}])[0].get("delta", {}) |
| | |
| | |
| | if "reasoning_content" in delta: |
| | print(delta['reasoning_content'], end="", flush=True) |
| |
|
| | |
| | if "content" in delta: |
| | content_text = delta["content"] |
| | img_match = re.search(r'!\[.*?\]\((.*?)\)', content_text) |
| | if img_match: |
| | image_url = img_match.group(1) |
| | print(f"\n[Backend] 捕获图片链接: {image_url}") |
| | except json.JSONDecodeError: |
| | continue |
| | |
| | |
| | if image_url: |
| | async with session.get(image_url) as img_resp: |
| | if img_resp.status == 200: |
| | image_bytes = await img_resp.read() |
| | return image_bytes |
| | else: |
| | print(f"[Backend Error] 图片下载失败: {img_resp.status}") |
| | except Exception as e: |
| | print(f"[Backend Exception] {e}") |
| | raise e |
| | |
| | return None |
| |
|
| | if __name__ == '__main__': |
| | async def main(): |
| | print("=== AI 绘图接口测试 ===") |
| | user_prompt = input("请输入提示词 (例如 '一只猫'): ").strip() |
| | if not user_prompt: |
| | user_prompt = "A cute cat in the garden" |
| | |
| | print(f"正在请求: {user_prompt}") |
| | |
| | |
| | |
| | |
| | |
| | |
| | result = await request_backend_generation(user_prompt) |
| | |
| | if result: |
| | filename = "output_test.jpg" |
| | with open(filename, "wb") as f: |
| | f.write(result) |
| | print(f"\n[Success] 图片已保存为 {filename},大小: {len(result)} bytes") |
| | else: |
| | print("\n[Failed] 生成失败") |
| |
|
| | |
| | if os.name == 'nt': |
| | asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) |
| | asyncio.run(main()) |