Spaces:
Running
Running
File size: 3,030 Bytes
799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 799ac7c acd7cf4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
import asyncio
import time
from datetime import datetime, timedelta
from graphgen.utils import logger
class RPM:
def __init__(self, rpm: int = 1000):
self.rpm = rpm
self.record = {"rpm_slot": self.get_minute_slot(), "counter": 0}
@staticmethod
def get_minute_slot():
current_time = time.time()
dt_object = datetime.fromtimestamp(current_time)
total_minutes_since_midnight = dt_object.hour * 60 + dt_object.minute
return total_minutes_since_midnight
async def wait(self, silent=False):
current = time.time()
dt_object = datetime.fromtimestamp(current)
minute_slot = self.get_minute_slot()
if self.record["rpm_slot"] == minute_slot:
# check RPM exceed
if self.record["counter"] >= self.rpm:
# wait until next minute
next_minute = dt_object.replace(second=0, microsecond=0) + timedelta(
minutes=1
)
_next = next_minute.timestamp()
sleep_time = abs(_next - current)
if not silent:
logger.info("RPM sleep %s", sleep_time)
await asyncio.sleep(sleep_time)
self.record = {"rpm_slot": self.get_minute_slot(), "counter": 0}
else:
self.record = {"rpm_slot": self.get_minute_slot(), "counter": 0}
self.record["counter"] += 1
if not silent:
logger.debug(self.record)
class TPM:
def __init__(self, tpm: int = 20000):
self.tpm = tpm
self.record = {"tpm_slot": self.get_minute_slot(), "counter": 0}
@staticmethod
def get_minute_slot():
current_time = time.time()
dt_object = datetime.fromtimestamp(current_time)
total_minutes_since_midnight = dt_object.hour * 60 + dt_object.minute
return total_minutes_since_midnight
async def wait(self, token_count, silent=False):
current = time.time()
dt_object = datetime.fromtimestamp(current)
minute_slot = self.get_minute_slot()
# get next slot, skip
if self.record["tpm_slot"] != minute_slot:
self.record = {"tpm_slot": minute_slot, "counter": token_count}
return
# check RPM exceed
old_counter = self.record["counter"]
self.record["counter"] += token_count
if self.record["counter"] > self.tpm:
logger.info("Current TPM: %s, limit: %s", old_counter, self.tpm)
# wait until next minute
next_minute = dt_object.replace(second=0, microsecond=0) + timedelta(
minutes=1
)
_next = next_minute.timestamp()
sleep_time = abs(_next - current)
logger.warning("TPM limit exceeded, wait %s seconds", sleep_time)
await asyncio.sleep(sleep_time)
self.record = {"tpm_slot": self.get_minute_slot(), "counter": token_count}
if not silent:
logger.debug(self.record)
|