import socket import threading import time from queue import Queue import uvicorn from fastapi import FastAPI, Form, Request from fastapi.responses import HTMLResponse # --- UI Template (HTML, CSS, JS) --- # NOTE: All curly braces for CSS and JS are doubled (e.g., {{ ... }}) # to escape them from Python's .format() method. The only single-brace # placeholder is {content}. HTML_TEMPLATE = """ FastAPI Port Scanner
{content}
""" FORM_CONTENT = """

FastAPI Port Scanner

Warning: Use only on networks you own or have explicit permission to scan. Unauthorized scanning is illegal.

Scanning in progress... This may take a moment.

""" # --- Scanner Logic --- def run_scan(target_ip: str, start_port: int, end_port: int, num_threads: int): """ Sets up the queue and worker threads for scanning and returns the results. This is now a self-contained function that returns results instead of using globals. """ q = Queue() open_ports = [] thread_lock = threading.Lock() def worker(): while not q.empty(): port = q.get() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(0.5) # Aggressive timeout try: if s.connect_ex((target_ip, port)) == 0: with thread_lock: open_ports.append(port) except (socket.timeout, ConnectionRefusedError): pass q.task_done() # Populate queue for port in range(start_port, end_port + 1): q.put(port) # Create and start threads threads = [] for _ in range(num_threads): thread = threading.Thread(target=worker, daemon=True) thread.start() threads.append(thread) q.join() # Block until all ports are scanned open_ports.sort() return open_ports # --- FastAPI Application --- app = FastAPI() @app.get("/", response_class=HTMLResponse) async def get_form(): """Serves the main page with the input form.""" return HTML_TEMPLATE.format(content=FORM_CONTENT) @app.post("/scan", response_class=HTMLResponse) async def scan( target: str = Form(...), start_port: int = Form(...), end_port: int = Form(...), threads: int = Form(...) ): """Handles the scan request, runs the scanner, and returns the results page.""" # Input validation if not (1 <= start_port <= end_port <= 65535): return HTMLResponse(content="

Error: Invalid port range.

", status_code=400) if not (1 <= threads <= 2500): return HTMLResponse(content="

Error: Thread count must be between 1 and 2500.

", status_code=400) # Resolve hostname try: target_ip = socket.gethostbyname(target) except socket.gaierror: return HTMLResponse(content=f"

Error: Could not resolve hostname '{target}'

", status_code=400) start_time = time.time() open_ports = run_scan(target_ip, start_port, end_port, threads) duration = time.time() - start_time # Build results content results_html = f"""

Scan Results

Target: {target} ({target_ip})

Time Taken: {duration:.2f} seconds

Open Ports Found: {len(open_ports)}

""" if open_ports: rows = "".join(f'{port}' for port in open_ports) results_html += f""" {rows}
Open Port
""" else: results_html += "

No open ports were found in the specified range.

" results_html += 'Scan Another Target' return HTML_TEMPLATE.format(content=results_html) # To run this app, you need uvicorn: `pip install uvicorn` # Then run from your terminal: `uvicorn main:app --reload` if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=False)