|
|
"""Load balancing module for Flow2API""" |
|
|
import random |
|
|
from typing import Optional |
|
|
from ..core.models import Token |
|
|
from .concurrency_manager import ConcurrencyManager |
|
|
from ..core.logger import debug_logger |
|
|
|
|
|
|
|
|
class LoadBalancer: |
|
|
"""Token load balancer with random selection""" |
|
|
|
|
|
def __init__(self, token_manager, concurrency_manager: Optional[ConcurrencyManager] = None): |
|
|
self.token_manager = token_manager |
|
|
self.concurrency_manager = concurrency_manager |
|
|
|
|
|
async def select_token( |
|
|
self, |
|
|
for_image_generation: bool = False, |
|
|
for_video_generation: bool = False, |
|
|
model: Optional[str] = None |
|
|
) -> Optional[Token]: |
|
|
""" |
|
|
Select a token using random load balancing |
|
|
|
|
|
Args: |
|
|
for_image_generation: If True, only select tokens with image_enabled=True |
|
|
for_video_generation: If True, only select tokens with video_enabled=True |
|
|
model: Model name (used to filter tokens for specific models) |
|
|
|
|
|
Returns: |
|
|
Selected token or None if no available tokens |
|
|
""" |
|
|
debug_logger.log_info(f"[LOAD_BALANCER] 开始选择Token (图片生成={for_image_generation}, 视频生成={for_video_generation}, 模型={model})") |
|
|
|
|
|
active_tokens = await self.token_manager.get_active_tokens() |
|
|
debug_logger.log_info(f"[LOAD_BALANCER] 获取到 {len(active_tokens)} 个活跃Token") |
|
|
|
|
|
if not active_tokens: |
|
|
debug_logger.log_info(f"[LOAD_BALANCER] ❌ 没有活跃的Token") |
|
|
return None |
|
|
|
|
|
|
|
|
available_tokens = [] |
|
|
filtered_reasons = {} |
|
|
|
|
|
for token in active_tokens: |
|
|
|
|
|
if not await self.token_manager.is_at_valid(token.id): |
|
|
filtered_reasons[token.id] = "AT无效或已过期" |
|
|
continue |
|
|
|
|
|
|
|
|
if for_image_generation: |
|
|
if not token.image_enabled: |
|
|
filtered_reasons[token.id] = "图片生成已禁用" |
|
|
continue |
|
|
|
|
|
|
|
|
if self.concurrency_manager and not await self.concurrency_manager.can_use_image(token.id): |
|
|
filtered_reasons[token.id] = "图片并发已满" |
|
|
continue |
|
|
|
|
|
|
|
|
if for_video_generation: |
|
|
if not token.video_enabled: |
|
|
filtered_reasons[token.id] = "视频生成已禁用" |
|
|
continue |
|
|
|
|
|
|
|
|
if self.concurrency_manager and not await self.concurrency_manager.can_use_video(token.id): |
|
|
filtered_reasons[token.id] = "视频并发已满" |
|
|
continue |
|
|
|
|
|
available_tokens.append(token) |
|
|
|
|
|
|
|
|
if filtered_reasons: |
|
|
debug_logger.log_info(f"[LOAD_BALANCER] 已过滤Token:") |
|
|
for token_id, reason in filtered_reasons.items(): |
|
|
debug_logger.log_info(f"[LOAD_BALANCER] - Token {token_id}: {reason}") |
|
|
|
|
|
if not available_tokens: |
|
|
debug_logger.log_info(f"[LOAD_BALANCER] ❌ 没有可用的Token (图片生成={for_image_generation}, 视频生成={for_video_generation})") |
|
|
return None |
|
|
|
|
|
|
|
|
selected = random.choice(available_tokens) |
|
|
debug_logger.log_info(f"[LOAD_BALANCER] ✅ 已选择Token {selected.id} ({selected.email}) - 余额: {selected.credits}") |
|
|
return selected |
|
|
|