File size: 9,058 Bytes
2446f5f 500ef17 5bad7a1 063d7d5 a42e3f7 02594ce a42e3f7 063d7d5 eab2c9c c90334d 7ee09b9 a42e3f7 02594ce a42e3f7 02594ce a42e3f7 56d0fcf a42e3f7 02594ce a42e3f7 7ee09b9 02594ce a42e3f7 2ab9654 a42e3f7 02594ce 2ab9654 a42e3f7 063d7d5 a42e3f7 063d7d5 a42e3f7 02594ce a42e3f7 02594ce a42e3f7 02594ce a42e3f7 02594ce a42e3f7 02594ce a42e3f7 02594ce a42e3f7 e50ca24 a42e3f7 5bad7a1 a42e3f7 5bad7a1 2446f5f a42e3f7 2446f5f 7ee09b9 a42e3f7 063d7d5 a42e3f7 063d7d5 eab2c9c a42e3f7 2ab9654 a42e3f7 eab2c9c a42e3f7 2ab9654 a42e3f7 b3b4e9a a42e3f7 b3b4e9a a42e3f7 b3b4e9a a42e3f7 b3b4e9a a42e3f7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
import httpx
from fastapi import FastAPI, Request, HTTPException
from starlette.responses import StreamingResponse, JSONResponse
from starlette.background import BackgroundTask
from pydantic import Field, field_validator
from typing import Set, Dict
from contextlib import asynccontextmanager
import os
import random
import logging
import time
import asyncio
import uuid
from faker import Faker
from fake_useragent import UserAgent
# --- Pydantic V1/V2 Compatibility ---
# This block makes the code resilient to different Pydantic versions.
try:
# Recommended for Pydantic V2
from pydantic_settings import BaseSettings
print("Using pydantic_settings.BaseSettings")
except ImportError:
# Fallback for Pydantic V1
from pydantic import BaseSettings
print("pydantic_settings not found, falling back to pydantic.BaseSettings")
# --- Structured Configuration using Pydantic ---
class Settings(BaseSettings):
"""Manages application settings and configuration from environment variables."""
LOG_LEVEL: str = Field(default="INFO")
TARGET_URL: str = Field(default="https://api.gmi-serving.com")
MAX_RETRIES: int = Field(default=5, gt=0)
RETRY_CODES_STR: str = Field(default="429,500,502,503,504", alias="RETRY_CODES")
RETRY_STATUS_CODES: Set[int] = {429, 500, 502, 503, 504}
BACKOFF_FACTOR: float = Field(default=0.5, gt=0)
JITTER_FACTOR: float = Field(default=0.2, ge=0)
@field_validator('RETRY_STATUS_CODES', mode='before')
@classmethod
def parse_retry_codes(cls, v, values):
"""Parses the comma-separated string of retry codes into a set of integers."""
retry_codes_str = values.data.get('RETRY_CODES_STR', "429,500,502,503,504")
try:
return {int(code.strip()) for code in retry_codes_str.split(',')}
except (ValueError, AttributeError) as e:
logging.error(f"Invalid RETRY_CODES format: '{retry_codes_str}'. Error: {e}. Using default.")
return {429, 500, 502, 503, 504}
class Config:
env_file = '.env'
env_file_encoding = 'utf-8'
# --- Initialize Settings and Global Services ---
settings = Settings()
# Custom logging filter to inject request_id
class RequestIdFilter(logging.Filter):
def filter(self, record):
record.request_id = getattr(record, 'request_id', 'main')
return True
logging.basicConfig(
level=settings.LOG_LEVEL.upper(),
format='%(asctime)s - %(levelname)s - [%(request_id)s] - %(message)s'
)
logger = logging.getLogger(__name__)
logger.addFilter(RequestIdFilter())
# Initialize data generation tools
faker = Faker()
try:
ua = UserAgent()
except Exception:
# Fallback if fake-useragent server is down
ua = None
logger.warning("Could not initialize UserAgent. A fallback will be used.")
# --- HTTPX Client Lifecycle Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manages the lifecycle of the HTTPX client for the application."""
logger.info(f"Starting application. Proxying to {settings.TARGET_URL}")
logger.info(f"Will retry on status codes: {settings.RETRY_STATUS_CODES}")
async with httpx.AsyncClient(base_url=settings.TARGET_URL, timeout=None) as client:
app.state.http_client = client
yield
logger.info("Application shut down.")
# Initialize the FastAPI app
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
# --- Helper Functions ---
def get_random_user_agent() -> str:
"""Returns a random User-Agent, with a fallback."""
if ua:
try:
return ua.random
except Exception:
logger.warning("Failed to get random User-Agent, using fallback.")
# Fallback User-Agent
return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36"
def prepare_forward_headers(incoming_headers: Dict, client_host: str) -> (Dict, str):
"""
Prepares headers for the downstream request, adding a comprehensive set of
spoofed IP and a random User-Agent.
"""
# Start with a clean slate of lower-cased headers
forward_headers = {k.lower(): v for k, v in incoming_headers.items()}
# Remove headers that are specific to the incoming request's connection or could leak info
headers_to_remove = [
"host", "content-length", "x-forwarded-for", "x-real-ip", "forwarded",
"via", "x-client-ip", "x-forwarded-proto", "x-forwarded-host"
]
for h in headers_to_remove:
forward_headers.pop(h, None)
spoofed_ip = faker.ipv4_public()
# Add a comprehensive set of headers to mask the origin
override_headers = {
# Standard headers
"x-forwarded-for": f"{spoofed_ip}, {faker.ipv4_public()}", # Append a fake proxy chain
"x-real-ip": spoofed_ip,
# RFC 7239 standard, more structured
"forwarded": f"for={spoofed_ip};proto=https",
# Common non-standard headers
"x-client-ip": spoofed_ip,
"x-originating-ip": spoofed_ip,
"x-remote-ip": spoofed_ip,
"x-remote-addr": spoofed_ip,
# Cloudflare-specific headers
"cf-connecting-ip": spoofed_ip,
"true-client-ip": spoofed_ip,
# Other proxy headers
"via": "1.1 google", # Fake a passthrough via a common service
# Dynamic User-Agent
"user-agent": get_random_user_agent(),
}
forward_headers.update(override_headers)
return forward_headers, spoofed_ip
# --- API Endpoints ---
@app.get("/", include_in_schema=False)
async def health_check():
"""Provides a basic health check endpoint."""
return JSONResponse({
"status": "ok",
"target_url": settings.TARGET_URL,
"max_retries": settings.MAX_RETRIES
})
@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], include_in_schema=False)
async def reverse_proxy_handler(request: Request):
"""
A catch-all reverse proxy that forwards requests with advanced retry logic,
backoff, and dynamic header generation.
"""
start_time = time.monotonic()
request_id = str(uuid.uuid4())
log_extra = {'request_id': request_id}
client: httpx.AsyncClient = request.app.state.http_client
url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
forward_headers, spoofed_ip = prepare_forward_headers(dict(request.headers), request.client.host)
logger.info(f"Incoming: {request.method} {url.path} from {request.client.host}. Spoofed IP: {spoofed_ip}", extra=log_extra)
request_body = await request.body()
last_exception = None
for attempt in range(settings.MAX_RETRIES):
if attempt > 0:
backoff_delay = settings.BACKOFF_FACTOR * (2 ** (attempt - 1))
jitter = random.uniform(-settings.JITTER_FACTOR, settings.JITTER_FACTOR) * backoff_delay
sleep_duration = max(0, backoff_delay + jitter)
logger.warning(f"Attempt {attempt}/{settings.MAX_RETRIES} failed. Retrying in {sleep_duration:.2f}s...", extra=log_extra)
await asyncio.sleep(sleep_duration)
try:
req = client.build_request(
method=request.method,
url=url,
headers=forward_headers,
content=request_body,
)
resp = await client.send(req, stream=True)
if resp.status_code not in settings.RETRY_STATUS_CODES or attempt == settings.MAX_RETRIES - 1:
duration_ms = (time.monotonic() - start_time) * 1000
log_func = logger.info if resp.is_success else logger.warning
log_func(
f"Finished: {request.method} {url.path} status={resp.status_code} attempt={attempt+1} latency={duration_ms:.2f}ms",
extra=log_extra
)
return StreamingResponse(
resp.aiter_raw(),
status_code=resp.status_code,
headers=resp.headers,
background=BackgroundTask(resp.aclose),
)
await resp.aclose()
last_exception = f"Last failed attempt returned status code {resp.status_code}"
except httpx.RequestError as e:
last_exception = e
logger.warning(f"HTTPX RequestError on attempt {attempt + 1}: {e}", extra=log_extra)
if attempt == settings.MAX_RETRIES - 1:
break
duration_ms = (time.monotonic() - start_time) * 1000
logger.critical(
f"Failed permanently: {request.method} {url.path} after {settings.MAX_RETRIES} attempts. latency={duration_ms:.2f}ms. Last error: {last_exception}",
extra=log_extra
)
raise HTTPException(
status_code=502,
detail=f"Bad Gateway: The server was unable to process the request after {settings.MAX_RETRIES} attempts. Last error: {last_exception}"
) |