File size: 9,058 Bytes
2446f5f
500ef17
5bad7a1
063d7d5
a42e3f7
02594ce
a42e3f7
063d7d5
eab2c9c
c90334d
7ee09b9
a42e3f7
 
 
 
 
02594ce
 
 
 
 
 
 
 
 
 
 
 
a42e3f7
 
 
 
 
 
 
 
 
02594ce
 
a42e3f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56d0fcf
a42e3f7
 
02594ce
 
a42e3f7
 
 
 
7ee09b9
02594ce
 
 
 
a42e3f7
 
2ab9654
a42e3f7
 
02594ce
 
 
 
 
 
2ab9654
a42e3f7
063d7d5
 
a42e3f7
 
 
 
063d7d5
 
a42e3f7
 
 
 
 
 
02594ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a42e3f7
 
02594ce
 
 
 
 
 
 
a42e3f7
 
 
02594ce
a42e3f7
02594ce
 
a42e3f7
02594ce
 
 
 
 
 
a42e3f7
 
02594ce
 
 
 
 
 
 
 
 
 
 
a42e3f7
 
 
 
e50ca24
a42e3f7
 
5bad7a1
a42e3f7
 
 
 
 
 
 
 
5bad7a1
2446f5f
a42e3f7
 
2446f5f
7ee09b9
a42e3f7
 
 
063d7d5
 
a42e3f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
063d7d5
eab2c9c
a42e3f7
2ab9654
 
a42e3f7
 
eab2c9c
a42e3f7
2ab9654
a42e3f7
b3b4e9a
a42e3f7
 
 
 
 
b3b4e9a
a42e3f7
 
 
 
b3b4e9a
a42e3f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b3b4e9a
 
a42e3f7
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import httpx
from fastapi import FastAPI, Request, HTTPException
from starlette.responses import StreamingResponse, JSONResponse
from starlette.background import BackgroundTask
from pydantic import Field, field_validator
from typing import Set, Dict
from contextlib import asynccontextmanager
import os
import random
import logging
import time
import asyncio
import uuid
from faker import Faker
from fake_useragent import UserAgent

# --- Pydantic V1/V2 Compatibility ---
# This block makes the code resilient to different Pydantic versions.
try:
    # Recommended for Pydantic V2
    from pydantic_settings import BaseSettings
    print("Using pydantic_settings.BaseSettings")
except ImportError:
    # Fallback for Pydantic V1
    from pydantic import BaseSettings
    print("pydantic_settings not found, falling back to pydantic.BaseSettings")


# --- Structured Configuration using Pydantic ---
class Settings(BaseSettings):
    """Manages application settings and configuration from environment variables."""
    LOG_LEVEL: str = Field(default="INFO")
    TARGET_URL: str = Field(default="https://api.gmi-serving.com")
    MAX_RETRIES: int = Field(default=5, gt=0)
    
    RETRY_CODES_STR: str = Field(default="429,500,502,503,504", alias="RETRY_CODES")
    RETRY_STATUS_CODES: Set[int] = {429, 500, 502, 503, 504}
    BACKOFF_FACTOR: float = Field(default=0.5, gt=0)
    JITTER_FACTOR: float = Field(default=0.2, ge=0)

    @field_validator('RETRY_STATUS_CODES', mode='before')
    @classmethod
    def parse_retry_codes(cls, v, values):
        """Parses the comma-separated string of retry codes into a set of integers."""
        retry_codes_str = values.data.get('RETRY_CODES_STR', "429,500,502,503,504")
        try:
            return {int(code.strip()) for code in retry_codes_str.split(',')}
        except (ValueError, AttributeError) as e:
            logging.error(f"Invalid RETRY_CODES format: '{retry_codes_str}'. Error: {e}. Using default.")
            return {429, 500, 502, 503, 504}

    class Config:
        env_file = '.env'
        env_file_encoding = 'utf-8'

# --- Initialize Settings and Global Services ---
settings = Settings()

# Custom logging filter to inject request_id
class RequestIdFilter(logging.Filter):
    def filter(self, record):
        record.request_id = getattr(record, 'request_id', 'main')
        return True

logging.basicConfig(
    level=settings.LOG_LEVEL.upper(),
    format='%(asctime)s - %(levelname)s - [%(request_id)s] - %(message)s'
)
logger = logging.getLogger(__name__)
logger.addFilter(RequestIdFilter())

# Initialize data generation tools
faker = Faker()
try:
    ua = UserAgent()
except Exception:
    # Fallback if fake-useragent server is down
    ua = None
    logger.warning("Could not initialize UserAgent. A fallback will be used.")

# --- HTTPX Client Lifecycle Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manages the lifecycle of the HTTPX client for the application."""
    logger.info(f"Starting application. Proxying to {settings.TARGET_URL}")
    logger.info(f"Will retry on status codes: {settings.RETRY_STATUS_CODES}")
    async with httpx.AsyncClient(base_url=settings.TARGET_URL, timeout=None) as client:
        app.state.http_client = client
        yield
    logger.info("Application shut down.")

# Initialize the FastAPI app
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)

# --- Helper Functions ---
def get_random_user_agent() -> str:
    """Returns a random User-Agent, with a fallback."""
    if ua:
        try:
            return ua.random
        except Exception:
            logger.warning("Failed to get random User-Agent, using fallback.")
    # Fallback User-Agent
    return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36"

def prepare_forward_headers(incoming_headers: Dict, client_host: str) -> (Dict, str):
    """
    Prepares headers for the downstream request, adding a comprehensive set of
    spoofed IP and a random User-Agent.
    """
    # Start with a clean slate of lower-cased headers
    forward_headers = {k.lower(): v for k, v in incoming_headers.items()}
    
    # Remove headers that are specific to the incoming request's connection or could leak info
    headers_to_remove = [
        "host", "content-length", "x-forwarded-for", "x-real-ip", "forwarded",
        "via", "x-client-ip", "x-forwarded-proto", "x-forwarded-host"
    ]
    for h in headers_to_remove:
        forward_headers.pop(h, None)
    
    spoofed_ip = faker.ipv4_public()
    
    # Add a comprehensive set of headers to mask the origin
    override_headers = {
        # Standard headers
        "x-forwarded-for": f"{spoofed_ip}, {faker.ipv4_public()}", # Append a fake proxy chain
        "x-real-ip": spoofed_ip,
        
        # RFC 7239 standard, more structured
        "forwarded": f"for={spoofed_ip};proto=https",
        
        # Common non-standard headers
        "x-client-ip": spoofed_ip,
        "x-originating-ip": spoofed_ip,
        "x-remote-ip": spoofed_ip,
        "x-remote-addr": spoofed_ip,
        
        # Cloudflare-specific headers
        "cf-connecting-ip": spoofed_ip,
        "true-client-ip": spoofed_ip,
        
        # Other proxy headers
        "via": "1.1 google", # Fake a passthrough via a common service
        
        # Dynamic User-Agent
        "user-agent": get_random_user_agent(),
    }
    forward_headers.update(override_headers)
    
    return forward_headers, spoofed_ip

# --- API Endpoints ---
@app.get("/", include_in_schema=False)
async def health_check():
    """Provides a basic health check endpoint."""
    return JSONResponse({
        "status": "ok",
        "target_url": settings.TARGET_URL,
        "max_retries": settings.MAX_RETRIES
    })

@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], include_in_schema=False)
async def reverse_proxy_handler(request: Request):
    """
    A catch-all reverse proxy that forwards requests with advanced retry logic,
    backoff, and dynamic header generation.
    """
    start_time = time.monotonic()
    request_id = str(uuid.uuid4())
    log_extra = {'request_id': request_id}
    
    client: httpx.AsyncClient = request.app.state.http_client
    url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
    
    forward_headers, spoofed_ip = prepare_forward_headers(dict(request.headers), request.client.host)
    
    logger.info(f"Incoming: {request.method} {url.path} from {request.client.host}. Spoofed IP: {spoofed_ip}", extra=log_extra)
    
    request_body = await request.body()
    
    last_exception = None
    for attempt in range(settings.MAX_RETRIES):
        if attempt > 0:
            backoff_delay = settings.BACKOFF_FACTOR * (2 ** (attempt - 1))
            jitter = random.uniform(-settings.JITTER_FACTOR, settings.JITTER_FACTOR) * backoff_delay
            sleep_duration = max(0, backoff_delay + jitter)
            logger.warning(f"Attempt {attempt}/{settings.MAX_RETRIES} failed. Retrying in {sleep_duration:.2f}s...", extra=log_extra)
            await asyncio.sleep(sleep_duration)

        try:
            req = client.build_request(
                method=request.method,
                url=url,
                headers=forward_headers,
                content=request_body,
            )
            resp = await client.send(req, stream=True)

            if resp.status_code not in settings.RETRY_STATUS_CODES or attempt == settings.MAX_RETRIES - 1:
                duration_ms = (time.monotonic() - start_time) * 1000
                log_func = logger.info if resp.is_success else logger.warning
                log_func(
                    f"Finished: {request.method} {url.path} status={resp.status_code} attempt={attempt+1} latency={duration_ms:.2f}ms",
                    extra=log_extra
                )
                return StreamingResponse(
                    resp.aiter_raw(),
                    status_code=resp.status_code,
                    headers=resp.headers,
                    background=BackgroundTask(resp.aclose),
                )
            
            await resp.aclose()
            last_exception = f"Last failed attempt returned status code {resp.status_code}"

        except httpx.RequestError as e:
            last_exception = e
            logger.warning(f"HTTPX RequestError on attempt {attempt + 1}: {e}", extra=log_extra)
            if attempt == settings.MAX_RETRIES - 1:
                break
    
    duration_ms = (time.monotonic() - start_time) * 1000
    logger.critical(
        f"Failed permanently: {request.method} {url.path} after {settings.MAX_RETRIES} attempts. latency={duration_ms:.2f}ms. Last error: {last_exception}",
        extra=log_extra
    )
    raise HTTPException(
        status_code=502,
        detail=f"Bad Gateway: The server was unable to process the request after {settings.MAX_RETRIES} attempts. Last error: {last_exception}"
    )