flash / main.py
rkihacker's picture
Update main.py
02594ce verified
raw
history blame
9.06 kB
import httpx
from fastapi import FastAPI, Request, HTTPException
from starlette.responses import StreamingResponse, JSONResponse
from starlette.background import BackgroundTask
from pydantic import Field, field_validator
from typing import Set, Dict
from contextlib import asynccontextmanager
import os
import random
import logging
import time
import asyncio
import uuid
from faker import Faker
from fake_useragent import UserAgent
# --- Pydantic V1/V2 Compatibility ---
# This block makes the code resilient to different Pydantic versions.
try:
# Recommended for Pydantic V2
from pydantic_settings import BaseSettings
print("Using pydantic_settings.BaseSettings")
except ImportError:
# Fallback for Pydantic V1
from pydantic import BaseSettings
print("pydantic_settings not found, falling back to pydantic.BaseSettings")
# --- Structured Configuration using Pydantic ---
class Settings(BaseSettings):
"""Manages application settings and configuration from environment variables."""
LOG_LEVEL: str = Field(default="INFO")
TARGET_URL: str = Field(default="https://api.gmi-serving.com")
MAX_RETRIES: int = Field(default=5, gt=0)
RETRY_CODES_STR: str = Field(default="429,500,502,503,504", alias="RETRY_CODES")
RETRY_STATUS_CODES: Set[int] = {429, 500, 502, 503, 504}
BACKOFF_FACTOR: float = Field(default=0.5, gt=0)
JITTER_FACTOR: float = Field(default=0.2, ge=0)
@field_validator('RETRY_STATUS_CODES', mode='before')
@classmethod
def parse_retry_codes(cls, v, values):
"""Parses the comma-separated string of retry codes into a set of integers."""
retry_codes_str = values.data.get('RETRY_CODES_STR', "429,500,502,503,504")
try:
return {int(code.strip()) for code in retry_codes_str.split(',')}
except (ValueError, AttributeError) as e:
logging.error(f"Invalid RETRY_CODES format: '{retry_codes_str}'. Error: {e}. Using default.")
return {429, 500, 502, 503, 504}
class Config:
env_file = '.env'
env_file_encoding = 'utf-8'
# --- Initialize Settings and Global Services ---
settings = Settings()
# Custom logging filter to inject request_id
class RequestIdFilter(logging.Filter):
def filter(self, record):
record.request_id = getattr(record, 'request_id', 'main')
return True
logging.basicConfig(
level=settings.LOG_LEVEL.upper(),
format='%(asctime)s - %(levelname)s - [%(request_id)s] - %(message)s'
)
logger = logging.getLogger(__name__)
logger.addFilter(RequestIdFilter())
# Initialize data generation tools
faker = Faker()
try:
ua = UserAgent()
except Exception:
# Fallback if fake-useragent server is down
ua = None
logger.warning("Could not initialize UserAgent. A fallback will be used.")
# --- HTTPX Client Lifecycle Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manages the lifecycle of the HTTPX client for the application."""
logger.info(f"Starting application. Proxying to {settings.TARGET_URL}")
logger.info(f"Will retry on status codes: {settings.RETRY_STATUS_CODES}")
async with httpx.AsyncClient(base_url=settings.TARGET_URL, timeout=None) as client:
app.state.http_client = client
yield
logger.info("Application shut down.")
# Initialize the FastAPI app
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
# --- Helper Functions ---
def get_random_user_agent() -> str:
"""Returns a random User-Agent, with a fallback."""
if ua:
try:
return ua.random
except Exception:
logger.warning("Failed to get random User-Agent, using fallback.")
# Fallback User-Agent
return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36"
def prepare_forward_headers(incoming_headers: Dict, client_host: str) -> (Dict, str):
"""
Prepares headers for the downstream request, adding a comprehensive set of
spoofed IP and a random User-Agent.
"""
# Start with a clean slate of lower-cased headers
forward_headers = {k.lower(): v for k, v in incoming_headers.items()}
# Remove headers that are specific to the incoming request's connection or could leak info
headers_to_remove = [
"host", "content-length", "x-forwarded-for", "x-real-ip", "forwarded",
"via", "x-client-ip", "x-forwarded-proto", "x-forwarded-host"
]
for h in headers_to_remove:
forward_headers.pop(h, None)
spoofed_ip = faker.ipv4_public()
# Add a comprehensive set of headers to mask the origin
override_headers = {
# Standard headers
"x-forwarded-for": f"{spoofed_ip}, {faker.ipv4_public()}", # Append a fake proxy chain
"x-real-ip": spoofed_ip,
# RFC 7239 standard, more structured
"forwarded": f"for={spoofed_ip};proto=https",
# Common non-standard headers
"x-client-ip": spoofed_ip,
"x-originating-ip": spoofed_ip,
"x-remote-ip": spoofed_ip,
"x-remote-addr": spoofed_ip,
# Cloudflare-specific headers
"cf-connecting-ip": spoofed_ip,
"true-client-ip": spoofed_ip,
# Other proxy headers
"via": "1.1 google", # Fake a passthrough via a common service
# Dynamic User-Agent
"user-agent": get_random_user_agent(),
}
forward_headers.update(override_headers)
return forward_headers, spoofed_ip
# --- API Endpoints ---
@app.get("/", include_in_schema=False)
async def health_check():
"""Provides a basic health check endpoint."""
return JSONResponse({
"status": "ok",
"target_url": settings.TARGET_URL,
"max_retries": settings.MAX_RETRIES
})
@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], include_in_schema=False)
async def reverse_proxy_handler(request: Request):
"""
A catch-all reverse proxy that forwards requests with advanced retry logic,
backoff, and dynamic header generation.
"""
start_time = time.monotonic()
request_id = str(uuid.uuid4())
log_extra = {'request_id': request_id}
client: httpx.AsyncClient = request.app.state.http_client
url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
forward_headers, spoofed_ip = prepare_forward_headers(dict(request.headers), request.client.host)
logger.info(f"Incoming: {request.method} {url.path} from {request.client.host}. Spoofed IP: {spoofed_ip}", extra=log_extra)
request_body = await request.body()
last_exception = None
for attempt in range(settings.MAX_RETRIES):
if attempt > 0:
backoff_delay = settings.BACKOFF_FACTOR * (2 ** (attempt - 1))
jitter = random.uniform(-settings.JITTER_FACTOR, settings.JITTER_FACTOR) * backoff_delay
sleep_duration = max(0, backoff_delay + jitter)
logger.warning(f"Attempt {attempt}/{settings.MAX_RETRIES} failed. Retrying in {sleep_duration:.2f}s...", extra=log_extra)
await asyncio.sleep(sleep_duration)
try:
req = client.build_request(
method=request.method,
url=url,
headers=forward_headers,
content=request_body,
)
resp = await client.send(req, stream=True)
if resp.status_code not in settings.RETRY_STATUS_CODES or attempt == settings.MAX_RETRIES - 1:
duration_ms = (time.monotonic() - start_time) * 1000
log_func = logger.info if resp.is_success else logger.warning
log_func(
f"Finished: {request.method} {url.path} status={resp.status_code} attempt={attempt+1} latency={duration_ms:.2f}ms",
extra=log_extra
)
return StreamingResponse(
resp.aiter_raw(),
status_code=resp.status_code,
headers=resp.headers,
background=BackgroundTask(resp.aclose),
)
await resp.aclose()
last_exception = f"Last failed attempt returned status code {resp.status_code}"
except httpx.RequestError as e:
last_exception = e
logger.warning(f"HTTPX RequestError on attempt {attempt + 1}: {e}", extra=log_extra)
if attempt == settings.MAX_RETRIES - 1:
break
duration_ms = (time.monotonic() - start_time) * 1000
logger.critical(
f"Failed permanently: {request.method} {url.path} after {settings.MAX_RETRIES} attempts. latency={duration_ms:.2f}ms. Last error: {last_exception}",
extra=log_extra
)
raise HTTPException(
status_code=502,
detail=f"Bad Gateway: The server was unable to process the request after {settings.MAX_RETRIES} attempts. Last error: {last_exception}"
)