import socket
import threading
import time
from queue import Queue
import uvicorn
from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse
# --- UI Template (HTML, CSS, JS) ---
# NOTE: All curly braces for CSS and JS are doubled (e.g., {{ ... }})
# to escape them from Python's .format() method. The only single-brace
# placeholder is {content}.
HTML_TEMPLATE = """
FastAPI Port Scanner
{content}
"""
FORM_CONTENT = """
Scanning in progress... This may take a moment.
"""
# --- Scanner Logic ---
def run_scan(target_ip: str, start_port: int, end_port: int, num_threads: int):
"""
Sets up the queue and worker threads for scanning and returns the results.
This is now a self-contained function that returns results instead of using globals.
"""
q = Queue()
open_ports = []
thread_lock = threading.Lock()
def worker():
while not q.empty():
port = q.get()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.5) # Aggressive timeout
try:
if s.connect_ex((target_ip, port)) == 0:
with thread_lock:
open_ports.append(port)
except (socket.timeout, ConnectionRefusedError):
pass
q.task_done()
# Populate queue
for port in range(start_port, end_port + 1):
q.put(port)
# Create and start threads
threads = []
for _ in range(num_threads):
thread = threading.Thread(target=worker, daemon=True)
thread.start()
threads.append(thread)
q.join() # Block until all ports are scanned
open_ports.sort()
return open_ports
# --- FastAPI Application ---
app = FastAPI()
@app.get("/", response_class=HTMLResponse)
async def get_form():
"""Serves the main page with the input form."""
return HTML_TEMPLATE.format(content=FORM_CONTENT)
@app.post("/scan", response_class=HTMLResponse)
async def scan(
target: str = Form(...),
start_port: int = Form(...),
end_port: int = Form(...),
threads: int = Form(...)
):
"""Handles the scan request, runs the scanner, and returns the results page."""
# Input validation
if not (1 <= start_port <= end_port <= 65535):
return HTMLResponse(content="Error: Invalid port range.
", status_code=400)
if not (1 <= threads <= 2500):
return HTMLResponse(content="Error: Thread count must be between 1 and 2500.
", status_code=400)
# Resolve hostname
try:
target_ip = socket.gethostbyname(target)
except socket.gaierror:
return HTMLResponse(content=f"Error: Could not resolve hostname '{target}'
", status_code=400)
start_time = time.time()
open_ports = run_scan(target_ip, start_port, end_port, threads)
duration = time.time() - start_time
# Build results content
results_html = f"""
Scan Results
Target: {target} ({target_ip})
Time Taken: {duration:.2f} seconds
Open Ports Found: {len(open_ports)}
"""
if open_ports:
rows = "".join(f'| {port} |
' for port in open_ports)
results_html += f"""
"""
else:
results_html += "No open ports were found in the specified range.
"
results_html += 'Scan Another Target'
return HTML_TEMPLATE.format(content=results_html)
# To run this app, you need uvicorn: `pip install uvicorn`
# Then run from your terminal: `uvicorn main:app --reload`
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=False)