|
|
import httpx |
|
|
from fastapi import FastAPI, Request, HTTPException |
|
|
from starlette.responses import StreamingResponse, JSONResponse |
|
|
from starlette.background import BackgroundTask |
|
|
from pydantic import Field, field_validator |
|
|
from typing import Set, Dict |
|
|
from contextlib import asynccontextmanager |
|
|
import os |
|
|
import random |
|
|
import logging |
|
|
import time |
|
|
import asyncio |
|
|
import uuid |
|
|
from faker import Faker |
|
|
from fake_useragent import UserAgent |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
from pydantic_settings import BaseSettings |
|
|
print("Using pydantic_settings.BaseSettings") |
|
|
except ImportError: |
|
|
|
|
|
from pydantic import BaseSettings |
|
|
print("pydantic_settings not found, falling back to pydantic.BaseSettings") |
|
|
|
|
|
|
|
|
|
|
|
class Settings(BaseSettings): |
|
|
"""Manages application settings and configuration from environment variables.""" |
|
|
LOG_LEVEL: str = Field(default="INFO") |
|
|
TARGET_URL: str = Field(default="https://api.gmi-serving.com") |
|
|
MAX_RETRIES: int = Field(default=5, gt=0) |
|
|
|
|
|
RETRY_CODES_STR: str = Field(default="429,500,502,503,504", alias="RETRY_CODES") |
|
|
RETRY_STATUS_CODES: Set[int] = {429, 500, 502, 503, 504} |
|
|
BACKOFF_FACTOR: float = Field(default=0.5, gt=0) |
|
|
JITTER_FACTOR: float = Field(default=0.2, ge=0) |
|
|
|
|
|
@field_validator('RETRY_STATUS_CODES', mode='before') |
|
|
@classmethod |
|
|
def parse_retry_codes(cls, v, values): |
|
|
"""Parses the comma-separated string of retry codes into a set of integers.""" |
|
|
retry_codes_str = values.data.get('RETRY_CODES_STR', "429,500,502,503,504") |
|
|
try: |
|
|
return {int(code.strip()) for code in retry_codes_str.split(',')} |
|
|
except (ValueError, AttributeError) as e: |
|
|
logging.error(f"Invalid RETRY_CODES format: '{retry_codes_str}'. Error: {e}. Using default.") |
|
|
return {429, 500, 502, 503, 504} |
|
|
|
|
|
class Config: |
|
|
env_file = '.env' |
|
|
env_file_encoding = 'utf-8' |
|
|
|
|
|
|
|
|
settings = Settings() |
|
|
|
|
|
|
|
|
class RequestIdFilter(logging.Filter): |
|
|
def filter(self, record): |
|
|
record.request_id = getattr(record, 'request_id', 'main') |
|
|
return True |
|
|
|
|
|
logging.basicConfig( |
|
|
level=settings.LOG_LEVEL.upper(), |
|
|
format='%(asctime)s - %(levelname)s - [%(request_id)s] - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
logger.addFilter(RequestIdFilter()) |
|
|
|
|
|
|
|
|
faker = Faker() |
|
|
try: |
|
|
ua = UserAgent() |
|
|
except Exception: |
|
|
|
|
|
ua = None |
|
|
logger.warning("Could not initialize UserAgent. A fallback will be used.") |
|
|
|
|
|
|
|
|
@asynccontextmanager |
|
|
async def lifespan(app: FastAPI): |
|
|
"""Manages the lifecycle of the HTTPX client for the application.""" |
|
|
logger.info(f"Starting application. Proxying to {settings.TARGET_URL}") |
|
|
logger.info(f"Will retry on status codes: {settings.RETRY_STATUS_CODES}") |
|
|
async with httpx.AsyncClient(base_url=settings.TARGET_URL, timeout=None) as client: |
|
|
app.state.http_client = client |
|
|
yield |
|
|
logger.info("Application shut down.") |
|
|
|
|
|
|
|
|
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan) |
|
|
|
|
|
|
|
|
def get_random_user_agent() -> str: |
|
|
"""Returns a random User-Agent, with a fallback.""" |
|
|
if ua: |
|
|
try: |
|
|
return ua.random |
|
|
except Exception: |
|
|
logger.warning("Failed to get random User-Agent, using fallback.") |
|
|
|
|
|
return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36" |
|
|
|
|
|
def prepare_forward_headers(incoming_headers: Dict, client_host: str) -> (Dict, str): |
|
|
""" |
|
|
Prepares headers for the downstream request, adding a comprehensive set of |
|
|
spoofed IP and a random User-Agent. |
|
|
""" |
|
|
|
|
|
forward_headers = {k.lower(): v for k, v in incoming_headers.items()} |
|
|
|
|
|
|
|
|
headers_to_remove = [ |
|
|
"host", "content-length", "x-forwarded-for", "x-real-ip", "forwarded", |
|
|
"via", "x-client-ip", "x-forwarded-proto", "x-forwarded-host" |
|
|
] |
|
|
for h in headers_to_remove: |
|
|
forward_headers.pop(h, None) |
|
|
|
|
|
spoofed_ip = faker.ipv4_public() |
|
|
|
|
|
|
|
|
override_headers = { |
|
|
|
|
|
"x-forwarded-for": f"{spoofed_ip}, {faker.ipv4_public()}", |
|
|
"x-real-ip": spoofed_ip, |
|
|
|
|
|
|
|
|
"forwarded": f"for={spoofed_ip};proto=https", |
|
|
|
|
|
|
|
|
"x-client-ip": spoofed_ip, |
|
|
"x-originating-ip": spoofed_ip, |
|
|
"x-remote-ip": spoofed_ip, |
|
|
"x-remote-addr": spoofed_ip, |
|
|
|
|
|
|
|
|
"cf-connecting-ip": spoofed_ip, |
|
|
"true-client-ip": spoofed_ip, |
|
|
|
|
|
|
|
|
"via": "1.1 google", |
|
|
|
|
|
|
|
|
"user-agent": get_random_user_agent(), |
|
|
} |
|
|
forward_headers.update(override_headers) |
|
|
|
|
|
return forward_headers, spoofed_ip |
|
|
|
|
|
|
|
|
@app.get("/", include_in_schema=False) |
|
|
async def health_check(): |
|
|
"""Provides a basic health check endpoint.""" |
|
|
return JSONResponse({ |
|
|
"status": "ok", |
|
|
"target_url": settings.TARGET_URL, |
|
|
"max_retries": settings.MAX_RETRIES |
|
|
}) |
|
|
|
|
|
@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], include_in_schema=False) |
|
|
async def reverse_proxy_handler(request: Request): |
|
|
""" |
|
|
A catch-all reverse proxy that forwards requests with advanced retry logic, |
|
|
backoff, and dynamic header generation. |
|
|
""" |
|
|
start_time = time.monotonic() |
|
|
request_id = str(uuid.uuid4()) |
|
|
log_extra = {'request_id': request_id} |
|
|
|
|
|
client: httpx.AsyncClient = request.app.state.http_client |
|
|
url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8")) |
|
|
|
|
|
forward_headers, spoofed_ip = prepare_forward_headers(dict(request.headers), request.client.host) |
|
|
|
|
|
logger.info(f"Incoming: {request.method} {url.path} from {request.client.host}. Spoofed IP: {spoofed_ip}", extra=log_extra) |
|
|
|
|
|
request_body = await request.body() |
|
|
|
|
|
last_exception = None |
|
|
for attempt in range(settings.MAX_RETRIES): |
|
|
if attempt > 0: |
|
|
backoff_delay = settings.BACKOFF_FACTOR * (2 ** (attempt - 1)) |
|
|
jitter = random.uniform(-settings.JITTER_FACTOR, settings.JITTER_FACTOR) * backoff_delay |
|
|
sleep_duration = max(0, backoff_delay + jitter) |
|
|
logger.warning(f"Attempt {attempt}/{settings.MAX_RETRIES} failed. Retrying in {sleep_duration:.2f}s...", extra=log_extra) |
|
|
await asyncio.sleep(sleep_duration) |
|
|
|
|
|
try: |
|
|
req = client.build_request( |
|
|
method=request.method, |
|
|
url=url, |
|
|
headers=forward_headers, |
|
|
content=request_body, |
|
|
) |
|
|
resp = await client.send(req, stream=True) |
|
|
|
|
|
if resp.status_code not in settings.RETRY_STATUS_CODES or attempt == settings.MAX_RETRIES - 1: |
|
|
duration_ms = (time.monotonic() - start_time) * 1000 |
|
|
log_func = logger.info if resp.is_success else logger.warning |
|
|
log_func( |
|
|
f"Finished: {request.method} {url.path} status={resp.status_code} attempt={attempt+1} latency={duration_ms:.2f}ms", |
|
|
extra=log_extra |
|
|
) |
|
|
return StreamingResponse( |
|
|
resp.aiter_raw(), |
|
|
status_code=resp.status_code, |
|
|
headers=resp.headers, |
|
|
background=BackgroundTask(resp.aclose), |
|
|
) |
|
|
|
|
|
await resp.aclose() |
|
|
last_exception = f"Last failed attempt returned status code {resp.status_code}" |
|
|
|
|
|
except httpx.RequestError as e: |
|
|
last_exception = e |
|
|
logger.warning(f"HTTPX RequestError on attempt {attempt + 1}: {e}", extra=log_extra) |
|
|
if attempt == settings.MAX_RETRIES - 1: |
|
|
break |
|
|
|
|
|
duration_ms = (time.monotonic() - start_time) * 1000 |
|
|
logger.critical( |
|
|
f"Failed permanently: {request.method} {url.path} after {settings.MAX_RETRIES} attempts. latency={duration_ms:.2f}ms. Last error: {last_exception}", |
|
|
extra=log_extra |
|
|
) |
|
|
raise HTTPException( |
|
|
status_code=502, |
|
|
detail=f"Bad Gateway: The server was unable to process the request after {settings.MAX_RETRIES} attempts. Last error: {last_exception}" |
|
|
) |