terminaltes / entrypoint.py
devstok's picture
Create entrypoint.py
a58f797 verified
import asyncio
import os
import threading
import time
import collections
CMD = os.getenv("CMD", "ttyd -p 3000 bash")
LIMIT, LOG, DEST = 30, collections.defaultdict(list), ('127.0.0.1', 3000)
def run():
print(f"▶ Menjalankan: {CMD}")
os.system(CMD)
threading.Thread(target=run, daemon=True).start()
def blocked(ip):
now = time.time()
LOG[ip] = [t for t in LOG[ip] if now - t < 60] + [now]
return len(LOG[ip]) > LIMIT
async def proxy(r1, w1):
ip = w1.get_extra_info("peername")[0]
if blocked(ip):
w1.write(b"HTTP/1.1 429 Too Many Requests\r\nConnection: close\r\n\r\n")
await w1.drain(); w1.close(); return
try:
r2, w2 = await asyncio.open_connection(*DEST)
async def pipe(a, b):
try:
while d := await a.read(65536): b.write(d); await b.drain()
except: pass
finally: b.close()
await asyncio.gather(pipe(r1, w2), pipe(r2, w1))
except: pass
async def main():
server = await asyncio.start_server(proxy, '0.0.0.0', 7860)
async with server:
await server.serve_forever()
asyncio.run(main())