FLOW2API / src /services /flow_client.py
xiaoh2018's picture
Upload flow_client.py
0ab5a0d verified
"""Flow API Client for VideoFX (Veo)"""
import time
import uuid
import random
import base64
from typing import Dict, Any, Optional, List
from curl_cffi.requests import AsyncSession
from ..core.logger import debug_logger
from ..core.config import config
class FlowClient:
"""VideoFX API客户端"""
def __init__(self, proxy_manager):
self.proxy_manager = proxy_manager
self.labs_base_url = config.flow_labs_base_url # https://labs.google/fx/api
self.api_base_url = config.flow_api_base_url # https://aisandbox-pa.googleapis.com/v1
self.timeout = config.flow_timeout
async def _make_request(
self,
method: str,
url: str,
headers: Optional[Dict] = None,
json_data: Optional[Dict] = None,
use_st: bool = False,
st_token: Optional[str] = None,
use_at: bool = False,
at_token: Optional[str] = None
) -> Dict[str, Any]:
"""统一HTTP请求处理
Args:
method: HTTP方法 (GET/POST)
url: 完整URL
headers: 请求头
json_data: JSON请求体
use_st: 是否使用ST认证 (Cookie方式)
st_token: Session Token
use_at: 是否使用AT认证 (Bearer方式)
at_token: Access Token
"""
proxy_url = await self.proxy_manager.get_proxy_url()
if headers is None:
headers = {}
# ST认证 - 使用Cookie
if use_st and st_token:
headers["Cookie"] = f"__Secure-next-auth.session-token={st_token}"
# AT认证 - 使用Bearer
if use_at and at_token:
headers["Authorization"] = f"Bearer {at_token}"
# 通用请求头
headers.update({
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
})
# Log request
if config.debug_enabled:
debug_logger.log_request(
method=method,
url=url,
headers=headers,
body=json_data,
proxy=proxy_url
)
start_time = time.time()
try:
async with AsyncSession() as session:
if method.upper() == "GET":
response = await session.get(
url,
headers=headers,
proxy=proxy_url,
timeout=self.timeout,
impersonate="chrome110"
)
else: # POST
response = await session.post(
url,
headers=headers,
json=json_data,
proxy=proxy_url,
timeout=self.timeout,
impersonate="chrome110"
)
duration_ms = (time.time() - start_time) * 1000
# Log response
if config.debug_enabled:
debug_logger.log_response(
status_code=response.status_code,
headers=dict(response.headers),
body=response.text,
duration_ms=duration_ms
)
response.raise_for_status()
return response.json()
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
error_msg = str(e)
if config.debug_enabled:
debug_logger.log_error(
error_message=error_msg,
status_code=getattr(e, 'status_code', None),
response_text=getattr(e, 'response_text', None)
)
raise Exception(f"Flow API request failed: {error_msg}")
# ========== 认证相关 (使用ST) ==========
async def st_to_at(self, st: str) -> dict:
"""ST转AT
Args:
st: Session Token
Returns:
{
"access_token": "AT",
"expires": "2025-11-15T04:46:04.000Z",
"user": {...}
}
"""
url = f"{self.labs_base_url}/auth/session"
result = await self._make_request(
method="GET",
url=url,
use_st=True,
st_token=st
)
return result
# ========== 项目管理 (使用ST) ==========
async def create_project(self, st: str, title: str) -> str:
"""创建项目,返回project_id
Args:
st: Session Token
title: 项目标题
Returns:
project_id (UUID)
"""
url = f"{self.labs_base_url}/trpc/project.createProject"
json_data = {
"json": {
"projectTitle": title,
"toolName": "PINHOLE"
}
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_st=True,
st_token=st
)
# 解析返回的project_id
project_id = result["result"]["data"]["json"]["result"]["projectId"]
return project_id
async def delete_project(self, st: str, project_id: str):
"""删除项目
Args:
st: Session Token
project_id: 项目ID
"""
url = f"{self.labs_base_url}/trpc/project.deleteProject"
json_data = {
"json": {
"projectToDeleteId": project_id
}
}
await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_st=True,
st_token=st
)
# ========== 余额查询 (使用AT) ==========
async def get_credits(self, at: str) -> dict:
"""查询余额
Args:
at: Access Token
Returns:
{
"credits": 920,
"userPaygateTier": "PAYGATE_TIER_ONE"
}
"""
url = f"{self.api_base_url}/credits"
result = await self._make_request(
method="GET",
url=url,
use_at=True,
at_token=at
)
return result
# ========== 图片上传 (使用AT) ==========
async def upload_image(
self,
at: str,
image_bytes: bytes,
aspect_ratio: str = "IMAGE_ASPECT_RATIO_LANDSCAPE"
) -> str:
"""上传图片,返回mediaGenerationId
Args:
at: Access Token
image_bytes: 图片字节数据
aspect_ratio: 图片或视频宽高比(会自动转换为图片格式)
Returns:
mediaGenerationId (CAM...)
"""
# 转换视频aspect_ratio为图片aspect_ratio
# VIDEO_ASPECT_RATIO_LANDSCAPE -> IMAGE_ASPECT_RATIO_LANDSCAPE
# VIDEO_ASPECT_RATIO_PORTRAIT -> IMAGE_ASPECT_RATIO_PORTRAIT
if aspect_ratio.startswith("VIDEO_"):
aspect_ratio = aspect_ratio.replace("VIDEO_", "IMAGE_")
# 编码为base64 (去掉前缀)
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
url = f"{self.api_base_url}:uploadUserImage"
json_data = {
"imageInput": {
"rawImageBytes": image_base64,
"mimeType": "image/jpeg",
"isUserUploaded": True,
"aspectRatio": aspect_ratio
},
"clientContext": {
"sessionId": self._generate_session_id(),
"tool": "ASSET_MANAGER"
}
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
# 返回mediaGenerationId
media_id = result["mediaGenerationId"]["mediaGenerationId"]
return media_id
# ========== 图片生成 (使用AT) - 同步返回 ==========
async def generate_image(
self,
at: str,
project_id: str,
prompt: str,
model_name: str,
aspect_ratio: str,
image_inputs: Optional[List[Dict]] = None
) -> dict:
"""生成图片(同步返回)
Args:
at: Access Token
project_id: 项目ID
prompt: 提示词
model_name: GEM_PIX, GEM_PIX_2 或 IMAGEN_3_5
aspect_ratio: 图片宽高比
image_inputs: 参考图片列表(图生图时使用)
Returns:
{
"media": [{
"image": {
"generatedImage": {
"fifeUrl": "图片URL",
...
}
}
}]
}
"""
url = f"{self.api_base_url}/projects/{project_id}/flowMedia:batchGenerateImages"
# 获取 reCAPTCHA token
recaptcha_token = await self._get_recaptcha_token(project_id) or ""
session_id = self._generate_session_id()
# 构建请求
request_data = {
"clientContext": {
"recaptchaToken": recaptcha_token,
"projectId": project_id,
"sessionId": session_id,
"tool": "PINHOLE"
},
"seed": random.randint(1, 99999),
"imageModelName": model_name,
"imageAspectRatio": aspect_ratio,
"prompt": prompt,
"imageInputs": image_inputs or []
}
json_data = {
"clientContext": {
"recaptchaToken": recaptcha_token,
"sessionId": session_id
},
"requests": [request_data]
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
# ========== 视频生成 (使用AT) - 异步返回 ==========
async def generate_video_text(
self,
at: str,
project_id: str,
prompt: str,
model_key: str,
aspect_ratio: str,
user_paygate_tier: str = "PAYGATE_TIER_ONE"
) -> dict:
"""文生视频,返回task_id
Args:
at: Access Token
project_id: 项目ID
prompt: 提示词
model_key: veo_3_1_t2v_fast 等
aspect_ratio: 视频宽高比
user_paygate_tier: 用户等级
Returns:
{
"operations": [{
"operation": {"name": "task_id"},
"sceneId": "uuid",
"status": "MEDIA_GENERATION_STATUS_PENDING"
}],
"remainingCredits": 900
}
"""
url = f"{self.api_base_url}/video:batchAsyncGenerateVideoText"
# 获取 reCAPTCHA token
recaptcha_token = await self._get_recaptcha_token(project_id) or ""
session_id = self._generate_session_id()
scene_id = str(uuid.uuid4())
json_data = {
"clientContext": {
"recaptchaToken": recaptcha_token,
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE",
"userPaygateTier": user_paygate_tier
},
"requests": [{
"aspectRatio": aspect_ratio,
"seed": random.randint(1, 99999),
"textInput": {
"prompt": prompt
},
"videoModelKey": model_key,
"metadata": {
"sceneId": scene_id
}
}]
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
async def generate_video_reference_images(
self,
at: str,
project_id: str,
prompt: str,
model_key: str,
aspect_ratio: str,
reference_images: List[Dict],
user_paygate_tier: str = "PAYGATE_TIER_ONE"
) -> dict:
"""图生视频,返回task_id
Args:
at: Access Token
project_id: 项目ID
prompt: 提示词
model_key: veo_3_0_r2v_fast
aspect_ratio: 视频宽高比
reference_images: 参考图片列表 [{"imageUsageType": "IMAGE_USAGE_TYPE_ASSET", "mediaId": "..."}]
user_paygate_tier: 用户等级
Returns:
同 generate_video_text
"""
url = f"{self.api_base_url}/video:batchAsyncGenerateVideoReferenceImages"
# 获取 reCAPTCHA token
recaptcha_token = await self._get_recaptcha_token(project_id) or ""
session_id = self._generate_session_id()
scene_id = str(uuid.uuid4())
json_data = {
"clientContext": {
"recaptchaToken": recaptcha_token,
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE",
"userPaygateTier": user_paygate_tier
},
"requests": [{
"aspectRatio": aspect_ratio,
"seed": random.randint(1, 99999),
"textInput": {
"prompt": prompt
},
"videoModelKey": model_key,
"referenceImages": reference_images,
"metadata": {
"sceneId": scene_id
}
}]
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
async def generate_video_start_end(
self,
at: str,
project_id: str,
prompt: str,
model_key: str,
aspect_ratio: str,
start_media_id: str,
end_media_id: str,
user_paygate_tier: str = "PAYGATE_TIER_ONE"
) -> dict:
"""收尾帧生成视频,返回task_id
Args:
at: Access Token
project_id: 项目ID
prompt: 提示词
model_key: veo_3_1_i2v_s_fast_fl
aspect_ratio: 视频宽高比
start_media_id: 起始帧mediaId
end_media_id: 结束帧mediaId
user_paygate_tier: 用户等级
Returns:
同 generate_video_text
"""
url = f"{self.api_base_url}/video:batchAsyncGenerateVideoStartAndEndImage"
# 获取 reCAPTCHA token
recaptcha_token = await self._get_recaptcha_token(project_id) or ""
session_id = self._generate_session_id()
scene_id = str(uuid.uuid4())
json_data = {
"clientContext": {
"recaptchaToken": recaptcha_token,
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE",
"userPaygateTier": user_paygate_tier
},
"requests": [{
"aspectRatio": aspect_ratio,
"seed": random.randint(1, 99999),
"textInput": {
"prompt": prompt
},
"videoModelKey": model_key,
"startImage": {
"mediaId": start_media_id
},
"endImage": {
"mediaId": end_media_id
},
"metadata": {
"sceneId": scene_id
}
}]
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
async def generate_video_start_image(
self,
at: str,
project_id: str,
prompt: str,
model_key: str,
aspect_ratio: str,
start_media_id: str,
user_paygate_tier: str = "PAYGATE_TIER_ONE"
) -> dict:
"""仅首帧生成视频,返回task_id
Args:
at: Access Token
project_id: 项目ID
prompt: 提示词
model_key: veo_3_1_i2v_s_fast_fl等
aspect_ratio: 视频宽高比
start_media_id: 起始帧mediaId
user_paygate_tier: 用户等级
Returns:
同 generate_video_text
"""
url = f"{self.api_base_url}/video:batchAsyncGenerateVideoStartAndEndImage"
# 获取 reCAPTCHA token
recaptcha_token = await self._get_recaptcha_token(project_id) or ""
session_id = self._generate_session_id()
scene_id = str(uuid.uuid4())
json_data = {
"clientContext": {
"recaptchaToken": recaptcha_token,
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE",
"userPaygateTier": user_paygate_tier
},
"requests": [{
"aspectRatio": aspect_ratio,
"seed": random.randint(1, 99999),
"textInput": {
"prompt": prompt
},
"videoModelKey": model_key,
"startImage": {
"mediaId": start_media_id
},
# 注意: 没有endImage字段,只用首帧
"metadata": {
"sceneId": scene_id
}
}]
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
# ========== 任务轮询 (使用AT) ==========
async def check_video_status(self, at: str, operations: List[Dict]) -> dict:
"""查询视频生成状态
Args:
at: Access Token
operations: 操作列表 [{"operation": {"name": "task_id"}, "sceneId": "...", "status": "..."}]
Returns:
{
"operations": [{
"operation": {
"name": "task_id",
"metadata": {...} # 完成时包含视频信息
},
"status": "MEDIA_GENERATION_STATUS_SUCCESSFUL"
}]
}
"""
url = f"{self.api_base_url}/video:batchCheckAsyncVideoGenerationStatus"
json_data = {
"operations": operations
}
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
return result
# ========== 媒体删除 (使用ST) ==========
async def delete_media(self, st: str, media_names: List[str]):
"""删除媒体
Args:
st: Session Token
media_names: 媒体ID列表
"""
url = f"{self.labs_base_url}/trpc/media.deleteMedia"
json_data = {
"json": {
"names": media_names
}
}
await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_st=True,
st_token=st
)
# ========== 辅助方法 ==========
def _generate_session_id(self) -> str:
"""生成sessionId: ;timestamp"""
return f";{int(time.time() * 1000)}"
def _generate_scene_id(self) -> str:
"""生成sceneId: UUID"""
return str(uuid.uuid4())
async def _get_recaptcha_token(self, project_id: str) -> Optional[str]:
"""获取reCAPTCHA token - 支持两种方式"""
captcha_method = config.captcha_method
# 恒定浏览器打码
if captcha_method == "personal":
try:
from .browser_captcha_personal import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(self.proxy_manager)
return await service.get_token(project_id)
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA Browser] error: {str(e)}")
return None
# 无头浏览器打码
elif captcha_method == "browser":
try:
from .browser_captcha import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(self.proxy_manager)
return await service.get_token(project_id)
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA Browser] error: {str(e)}")
return None
else:
# YesCaptcha打码
client_key = config.yescaptcha_api_key
if not client_key:
debug_logger.log_info("[reCAPTCHA] API key not configured, skipping")
return None
website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV"
website_url = f"https://labs.google/fx/tools/flow/project/{project_id}"
base_url = config.yescaptcha_base_url
page_action = "FLOW_GENERATION"
try:
async with AsyncSession() as session:
create_url = f"{base_url}/createTask"
create_data = {
"clientKey": client_key,
"task": {
"websiteURL": website_url,
"websiteKey": website_key,
"type": "RecaptchaV3TaskProxylessM1",
"pageAction": page_action
}
}
result = await session.post(create_url, json=create_data, impersonate="chrome110")
result_json = result.json()
task_id = result_json.get('taskId')
debug_logger.log_info(f"[reCAPTCHA] created task_id: {task_id}")
if not task_id:
return None
get_url = f"{base_url}/getTaskResult"
for i in range(40):
get_data = {
"clientKey": client_key,
"taskId": task_id
}
result = await session.post(get_url, json=get_data, impersonate="chrome110")
result_json = result.json()
debug_logger.log_info(f"[reCAPTCHA] polling #{i+1}: {result_json}")
solution = result_json.get('solution', {})
response = solution.get('gRecaptchaResponse')
if response:
return response
time.sleep(3)
return None
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA] error: {str(e)}")
return None