FLOW2API / src /services /concurrency_manager.py
xiaoh2018's picture
Upload 1108 files
33cfa2a verified
"""Concurrency manager for token-based rate limiting"""
import asyncio
from typing import Dict, Optional
from ..core.logger import debug_logger
class ConcurrencyManager:
"""Manages concurrent request limits for each token"""
def __init__(self):
"""Initialize concurrency manager"""
self._image_concurrency: Dict[int, int] = {} # token_id -> remaining image concurrency
self._video_concurrency: Dict[int, int] = {} # token_id -> remaining video concurrency
self._lock = asyncio.Lock() # Protect concurrent access
async def initialize(self, tokens: list):
"""
Initialize concurrency counters from token list
Args:
tokens: List of Token objects with image_concurrency and video_concurrency fields
"""
async with self._lock:
for token in tokens:
if token.image_concurrency and token.image_concurrency > 0:
self._image_concurrency[token.id] = token.image_concurrency
if token.video_concurrency and token.video_concurrency > 0:
self._video_concurrency[token.id] = token.video_concurrency
debug_logger.log_info(f"Concurrency manager initialized with {len(tokens)} tokens")
async def can_use_image(self, token_id: int) -> bool:
"""
Check if token can be used for image generation
Args:
token_id: Token ID
Returns:
True if token has available image concurrency, False if concurrency is 0
"""
async with self._lock:
# If not in dict, it means no limit (-1)
if token_id not in self._image_concurrency:
return True
remaining = self._image_concurrency[token_id]
if remaining <= 0:
debug_logger.log_info(f"Token {token_id} image concurrency exhausted (remaining: {remaining})")
return False
return True
async def can_use_video(self, token_id: int) -> bool:
"""
Check if token can be used for video generation
Args:
token_id: Token ID
Returns:
True if token has available video concurrency, False if concurrency is 0
"""
async with self._lock:
# If not in dict, it means no limit (-1)
if token_id not in self._video_concurrency:
return True
remaining = self._video_concurrency[token_id]
if remaining <= 0:
debug_logger.log_info(f"Token {token_id} video concurrency exhausted (remaining: {remaining})")
return False
return True
async def acquire_image(self, token_id: int) -> bool:
"""
Acquire image concurrency slot
Args:
token_id: Token ID
Returns:
True if acquired, False if not available
"""
async with self._lock:
if token_id not in self._image_concurrency:
# No limit
return True
if self._image_concurrency[token_id] <= 0:
return False
self._image_concurrency[token_id] -= 1
debug_logger.log_info(f"Token {token_id} acquired image slot (remaining: {self._image_concurrency[token_id]})")
return True
async def acquire_video(self, token_id: int) -> bool:
"""
Acquire video concurrency slot
Args:
token_id: Token ID
Returns:
True if acquired, False if not available
"""
async with self._lock:
if token_id not in self._video_concurrency:
# No limit
return True
if self._video_concurrency[token_id] <= 0:
return False
self._video_concurrency[token_id] -= 1
debug_logger.log_info(f"Token {token_id} acquired video slot (remaining: {self._video_concurrency[token_id]})")
return True
async def release_image(self, token_id: int):
"""
Release image concurrency slot
Args:
token_id: Token ID
"""
async with self._lock:
if token_id in self._image_concurrency:
self._image_concurrency[token_id] += 1
debug_logger.log_info(f"Token {token_id} released image slot (remaining: {self._image_concurrency[token_id]})")
async def release_video(self, token_id: int):
"""
Release video concurrency slot
Args:
token_id: Token ID
"""
async with self._lock:
if token_id in self._video_concurrency:
self._video_concurrency[token_id] += 1
debug_logger.log_info(f"Token {token_id} released video slot (remaining: {self._video_concurrency[token_id]})")
async def get_image_remaining(self, token_id: int) -> Optional[int]:
"""
Get remaining image concurrency for token
Args:
token_id: Token ID
Returns:
Remaining count or None if no limit
"""
async with self._lock:
return self._image_concurrency.get(token_id)
async def get_video_remaining(self, token_id: int) -> Optional[int]:
"""
Get remaining video concurrency for token
Args:
token_id: Token ID
Returns:
Remaining count or None if no limit
"""
async with self._lock:
return self._video_concurrency.get(token_id)
async def reset_token(self, token_id: int, image_concurrency: int = -1, video_concurrency: int = -1):
"""
Reset concurrency counters for a token
Args:
token_id: Token ID
image_concurrency: New image concurrency limit (-1 for no limit)
video_concurrency: New video concurrency limit (-1 for no limit)
"""
async with self._lock:
if image_concurrency > 0:
self._image_concurrency[token_id] = image_concurrency
elif token_id in self._image_concurrency:
del self._image_concurrency[token_id]
if video_concurrency > 0:
self._video_concurrency[token_id] = video_concurrency
elif token_id in self._video_concurrency:
del self._video_concurrency[token_id]
debug_logger.log_info(f"Token {token_id} concurrency reset (image: {image_concurrency}, video: {video_concurrency})")