|
|
"""Concurrency manager for token-based rate limiting""" |
|
|
import asyncio |
|
|
from typing import Dict, Optional |
|
|
from ..core.logger import debug_logger |
|
|
|
|
|
|
|
|
class ConcurrencyManager: |
|
|
"""Manages concurrent request limits for each token""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize concurrency manager""" |
|
|
self._image_concurrency: Dict[int, int] = {} |
|
|
self._video_concurrency: Dict[int, int] = {} |
|
|
self._lock = asyncio.Lock() |
|
|
|
|
|
async def initialize(self, tokens: list): |
|
|
""" |
|
|
Initialize concurrency counters from token list |
|
|
|
|
|
Args: |
|
|
tokens: List of Token objects with image_concurrency and video_concurrency fields |
|
|
""" |
|
|
async with self._lock: |
|
|
for token in tokens: |
|
|
if token.image_concurrency and token.image_concurrency > 0: |
|
|
self._image_concurrency[token.id] = token.image_concurrency |
|
|
if token.video_concurrency and token.video_concurrency > 0: |
|
|
self._video_concurrency[token.id] = token.video_concurrency |
|
|
|
|
|
debug_logger.log_info(f"Concurrency manager initialized with {len(tokens)} tokens") |
|
|
|
|
|
async def can_use_image(self, token_id: int) -> bool: |
|
|
""" |
|
|
Check if token can be used for image generation |
|
|
|
|
|
Args: |
|
|
token_id: Token ID |
|
|
|
|
|
Returns: |
|
|
True if token has available image concurrency, False if concurrency is 0 |
|
|
""" |
|
|
async with self._lock: |
|
|
|
|
|
if token_id not in self._image_concurrency: |
|
|
return True |
|
|
|
|
|
remaining = self._image_concurrency[token_id] |
|
|
if remaining <= 0: |
|
|
debug_logger.log_info(f"Token {token_id} image concurrency exhausted (remaining: {remaining})") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
async def can_use_video(self, token_id: int) -> bool: |
|
|
""" |
|
|
Check if token can be used for video generation |
|
|
|
|
|
Args: |
|
|
token_id: Token ID |
|
|
|
|
|
Returns: |
|
|
True if token has available video concurrency, False if concurrency is 0 |
|
|
""" |
|
|
async with self._lock: |
|
|
|
|
|
if token_id not in self._video_concurrency: |
|
|
return True |
|
|
|
|
|
remaining = self._video_concurrency[token_id] |
|
|
if remaining <= 0: |
|
|
debug_logger.log_info(f"Token {token_id} video concurrency exhausted (remaining: {remaining})") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
async def acquire_image(self, token_id: int) -> bool: |
|
|
""" |
|
|
Acquire image concurrency slot |
|
|
|
|
|
Args: |
|
|
token_id: Token ID |
|
|
|
|
|
Returns: |
|
|
True if acquired, False if not available |
|
|
""" |
|
|
async with self._lock: |
|
|
if token_id not in self._image_concurrency: |
|
|
|
|
|
return True |
|
|
|
|
|
if self._image_concurrency[token_id] <= 0: |
|
|
return False |
|
|
|
|
|
self._image_concurrency[token_id] -= 1 |
|
|
debug_logger.log_info(f"Token {token_id} acquired image slot (remaining: {self._image_concurrency[token_id]})") |
|
|
return True |
|
|
|
|
|
async def acquire_video(self, token_id: int) -> bool: |
|
|
""" |
|
|
Acquire video concurrency slot |
|
|
|
|
|
Args: |
|
|
token_id: Token ID |
|
|
|
|
|
Returns: |
|
|
True if acquired, False if not available |
|
|
""" |
|
|
async with self._lock: |
|
|
if token_id not in self._video_concurrency: |
|
|
|
|
|
return True |
|
|
|
|
|
if self._video_concurrency[token_id] <= 0: |
|
|
return False |
|
|
|
|
|
self._video_concurrency[token_id] -= 1 |
|
|
debug_logger.log_info(f"Token {token_id} acquired video slot (remaining: {self._video_concurrency[token_id]})") |
|
|
return True |
|
|
|
|
|
async def release_image(self, token_id: int): |
|
|
""" |
|
|
Release image concurrency slot |
|
|
|
|
|
Args: |
|
|
token_id: Token ID |
|
|
""" |
|
|
async with self._lock: |
|
|
if token_id in self._image_concurrency: |
|
|
self._image_concurrency[token_id] += 1 |
|
|
debug_logger.log_info(f"Token {token_id} released image slot (remaining: {self._image_concurrency[token_id]})") |
|
|
|
|
|
async def release_video(self, token_id: int): |
|
|
""" |
|
|
Release video concurrency slot |
|
|
|
|
|
Args: |
|
|
token_id: Token ID |
|
|
""" |
|
|
async with self._lock: |
|
|
if token_id in self._video_concurrency: |
|
|
self._video_concurrency[token_id] += 1 |
|
|
debug_logger.log_info(f"Token {token_id} released video slot (remaining: {self._video_concurrency[token_id]})") |
|
|
|
|
|
async def get_image_remaining(self, token_id: int) -> Optional[int]: |
|
|
""" |
|
|
Get remaining image concurrency for token |
|
|
|
|
|
Args: |
|
|
token_id: Token ID |
|
|
|
|
|
Returns: |
|
|
Remaining count or None if no limit |
|
|
""" |
|
|
async with self._lock: |
|
|
return self._image_concurrency.get(token_id) |
|
|
|
|
|
async def get_video_remaining(self, token_id: int) -> Optional[int]: |
|
|
""" |
|
|
Get remaining video concurrency for token |
|
|
|
|
|
Args: |
|
|
token_id: Token ID |
|
|
|
|
|
Returns: |
|
|
Remaining count or None if no limit |
|
|
""" |
|
|
async with self._lock: |
|
|
return self._video_concurrency.get(token_id) |
|
|
|
|
|
async def reset_token(self, token_id: int, image_concurrency: int = -1, video_concurrency: int = -1): |
|
|
""" |
|
|
Reset concurrency counters for a token |
|
|
|
|
|
Args: |
|
|
token_id: Token ID |
|
|
image_concurrency: New image concurrency limit (-1 for no limit) |
|
|
video_concurrency: New video concurrency limit (-1 for no limit) |
|
|
""" |
|
|
async with self._lock: |
|
|
if image_concurrency > 0: |
|
|
self._image_concurrency[token_id] = image_concurrency |
|
|
elif token_id in self._image_concurrency: |
|
|
del self._image_concurrency[token_id] |
|
|
|
|
|
if video_concurrency > 0: |
|
|
self._video_concurrency[token_id] = video_concurrency |
|
|
elif token_id in self._video_concurrency: |
|
|
del self._video_concurrency[token_id] |
|
|
|
|
|
debug_logger.log_info(f"Token {token_id} concurrency reset (image: {image_concurrency}, video: {video_concurrency})") |
|
|
|