Spaces:
Running
Running
| import asyncio | |
| import time | |
| from datetime import datetime, timedelta | |
| from graphgen.utils import logger | |
| class RPM: | |
| def __init__(self, rpm: int = 1000): | |
| self.rpm = rpm | |
| self.record = {"rpm_slot": self.get_minute_slot(), "counter": 0} | |
| def get_minute_slot(): | |
| current_time = time.time() | |
| dt_object = datetime.fromtimestamp(current_time) | |
| total_minutes_since_midnight = dt_object.hour * 60 + dt_object.minute | |
| return total_minutes_since_midnight | |
| async def wait(self, silent=False): | |
| current = time.time() | |
| dt_object = datetime.fromtimestamp(current) | |
| minute_slot = self.get_minute_slot() | |
| if self.record["rpm_slot"] == minute_slot: | |
| # check RPM exceed | |
| if self.record["counter"] >= self.rpm: | |
| # wait until next minute | |
| next_minute = dt_object.replace(second=0, microsecond=0) + timedelta( | |
| minutes=1 | |
| ) | |
| _next = next_minute.timestamp() | |
| sleep_time = abs(_next - current) | |
| if not silent: | |
| logger.info("RPM sleep %s", sleep_time) | |
| await asyncio.sleep(sleep_time) | |
| self.record = {"rpm_slot": self.get_minute_slot(), "counter": 0} | |
| else: | |
| self.record = {"rpm_slot": self.get_minute_slot(), "counter": 0} | |
| self.record["counter"] += 1 | |
| if not silent: | |
| logger.debug(self.record) | |
| class TPM: | |
| def __init__(self, tpm: int = 20000): | |
| self.tpm = tpm | |
| self.record = {"tpm_slot": self.get_minute_slot(), "counter": 0} | |
| def get_minute_slot(): | |
| current_time = time.time() | |
| dt_object = datetime.fromtimestamp(current_time) | |
| total_minutes_since_midnight = dt_object.hour * 60 + dt_object.minute | |
| return total_minutes_since_midnight | |
| async def wait(self, token_count, silent=False): | |
| current = time.time() | |
| dt_object = datetime.fromtimestamp(current) | |
| minute_slot = self.get_minute_slot() | |
| # get next slot, skip | |
| if self.record["tpm_slot"] != minute_slot: | |
| self.record = {"tpm_slot": minute_slot, "counter": token_count} | |
| return | |
| # check RPM exceed | |
| old_counter = self.record["counter"] | |
| self.record["counter"] += token_count | |
| if self.record["counter"] > self.tpm: | |
| logger.info("Current TPM: %s, limit: %s", old_counter, self.tpm) | |
| # wait until next minute | |
| next_minute = dt_object.replace(second=0, microsecond=0) + timedelta( | |
| minutes=1 | |
| ) | |
| _next = next_minute.timestamp() | |
| sleep_time = abs(_next - current) | |
| logger.warning("TPM limit exceeded, wait %s seconds", sleep_time) | |
| await asyncio.sleep(sleep_time) | |
| self.record = {"tpm_slot": self.get_minute_slot(), "counter": token_count} | |
| if not silent: | |
| logger.debug(self.record) | |