File size: 5,678 Bytes
2446f5f 41b75fe 5bad7a1 063d7d5 eab2c9c c90334d 7ee09b9 7ce9a6b 6a181af 7ee09b9 83583ba 6a181af 02594ce 6a181af 7ce9a6b 02594ce 7ce9a6b 364231f a9a44ef 364231f 7ce9a6b d4952d3 41b75fe 7ce9a6b 41b75fe 7ce9a6b 41b75fe 7ce9a6b 41b75fe 7ce9a6b 41b75fe 7ce9a6b 41b75fe 063d7d5 364231f 3bc0a07 7ce9a6b 41b75fe 7ce9a6b a42e3f7 41b75fe a42e3f7 7ce9a6b a42e3f7 83583ba 7ce9a6b 5bad7a1 41b75fe 7ce9a6b 41b75fe 364231f 61655b8 364231f 2446f5f 7ee09b9 063d7d5 41b75fe 364231f 7ce9a6b 41b75fe 7ce9a6b 41b75fe 7ce9a6b f5fc07f 7ce9a6b 6a181af 41b75fe 6a181af eab2c9c 7ce9a6b 41b75fe 7ce9a6b 364231f 7ce9a6b b3b4e9a 7ce9a6b 41b75fe 7ce9a6b 41b75fe 7ce9a6b 41b75fe 7ce9a6b 3bc0a07 41b75fe 7ce9a6b 41b75fe 7ce9a6b 3bc0a07 7ce9a6b 41b75fe 7ce9a6b 6a181af 41b75fe 7ce9a6b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import httpx
from fastapi import FastAPI, Request, HTTPException
from starlette.responses import StreamingResponse, JSONResponse
from starlette.background import BackgroundTask
import os
import random
import logging
import time
import uvicorn
from contextlib import asynccontextmanager
# --- Production-Ready Configuration ---
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=LOG_LEVEL,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# --- Simplified Target Configuration ---
# The proxy will now forward the request path exactly as it receives it.
# We only need to define the target host.
TARGET_HOST = os.getenv("TARGET_HOST", "https://api.gmi-serving.com")
logger.info(f"Proxying all request paths to host: {TARGET_HOST}")
# --- Retry Logic Configuration ---
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "15"))
DEFAULT_RETRY_CODES = "429,500,502,503,504"
RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES)
try:
RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',')}
logger.info(f"Will retry on the following status codes: {RETRY_STATUS_CODES}")
except ValueError:
logger.error(f"Invalid RETRY_CODES format: '{RETRY_CODES_STR}'. Falling back to default: {DEFAULT_RETRY_CODES}")
RETRY_STATUS_CODES = {int(code.strip()) for code in DEFAULT_RETRY_CODES.split(',')}
# --- Helper Function ---
def generate_random_ip():
"""Generates a random, valid-looking IPv4 address for X-Forwarded-For header."""
return ".".join(str(random.randint(1, 254)) for _ in range(4))
# --- HTTPX Client Lifecycle Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manages the lifecycle of the HTTPX client."""
async with httpx.AsyncClient(base_url=TARGET_HOST, timeout=None) as client:
logger.info(f"HTTPX client created for target: {TARGET_HOST}")
app.state.http_client = client
yield
logger.info("HTTPX client closed gracefully.")
# Initialize the FastAPI app with the lifespan manager and disabled docs
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
# --- API Endpoints ---
@app.get("/", include_in_schema=False)
async def health_check():
"""Provides a basic health check endpoint."""
return JSONResponse({
"status": "ok",
"target_host": TARGET_HOST,
})
@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
async def reverse_proxy_handler(request: Request):
"""
A catch-all reverse proxy that forwards requests to the target host.
It forwards the path and query parameters exactly as received.
"""
start_time = time.monotonic()
client: httpx.AsyncClient = request.app.state.http_client
# --- THE CORE FIX: Forward the path as-is, without adding any prefix ---
url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
request_headers = dict(request.headers)
request_headers.pop("host", None)
random_ip = generate_random_ip()
request_headers.update({
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"x-forwarded-for": random_ip,
"x-real-ip": random_ip,
})
request_body = await request.body()
last_exception = None
for attempt in range(MAX_RETRIES):
try:
req = client.build_request(
method=request.method,
url=url,
headers=request_headers,
content=request_body,
)
# Use debug level for this log as it can be very noisy
logger.debug(f"Attempt {attempt + 1}/{MAX_RETRIES} -> {req.method} {req.url}")
resp = await client.send(req, stream=True)
if resp.status_code not in RETRY_STATUS_CODES or attempt == MAX_RETRIES - 1:
duration_ms = (time.monotonic() - start_time) * 1000
log_func = logger.info if resp.is_success else logger.warning
log_func(
f"Request finished: {request.method} {request.url.path} -> {resp.status_code} "
f"[{resp.reason_phrase}] latency={duration_ms:.2f}ms"
)
return StreamingResponse(
resp.aiter_raw(),
status_code=resp.status_code,
headers=resp.headers,
background=BackgroundTask(resp.aclose),
)
logger.warning(
f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with status {resp.status_code}. Retrying..."
)
await resp.aclose()
time.sleep(1)
except (httpx.ConnectError, httpx.ReadTimeout, httpx.ConnectTimeout) as e:
last_exception = e
logger.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with connection error: {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(1)
continue
duration_ms = (time.monotonic() - start_time) * 1000
logger.critical(
f"Request failed after {MAX_RETRIES} attempts. Cannot connect to target. "
f"path={request.url.path} latency={duration_ms:.2f}ms"
)
raise HTTPException(
status_code=502,
detail=f"Bad Gateway: Cannot connect to target service at {TARGET_HOST} after {MAX_RETRIES} attempts. Last error: {last_exception}"
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000) |