rkihacker commited on
Commit
6a181af
·
verified ·
1 Parent(s): 02594ce

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +76 -183
main.py CHANGED
@@ -2,230 +2,123 @@ import httpx
2
  from fastapi import FastAPI, Request, HTTPException
3
  from starlette.responses import StreamingResponse, JSONResponse
4
  from starlette.background import BackgroundTask
5
- from pydantic import Field, field_validator
6
- from typing import Set, Dict
7
- from contextlib import asynccontextmanager
8
  import os
9
  import random
10
  import logging
11
  import time
12
- import asyncio
13
- import uuid
14
- from faker import Faker
15
- from fake_useragent import UserAgent
16
-
17
- # --- Pydantic V1/V2 Compatibility ---
18
- # This block makes the code resilient to different Pydantic versions.
19
- try:
20
- # Recommended for Pydantic V2
21
- from pydantic_settings import BaseSettings
22
- print("Using pydantic_settings.BaseSettings")
23
- except ImportError:
24
- # Fallback for Pydantic V1
25
- from pydantic import BaseSettings
26
- print("pydantic_settings not found, falling back to pydantic.BaseSettings")
27
-
28
-
29
- # --- Structured Configuration using Pydantic ---
30
- class Settings(BaseSettings):
31
- """Manages application settings and configuration from environment variables."""
32
- LOG_LEVEL: str = Field(default="INFO")
33
- TARGET_URL: str = Field(default="https://api.gmi-serving.com")
34
- MAX_RETRIES: int = Field(default=5, gt=0)
35
-
36
- RETRY_CODES_STR: str = Field(default="429,500,502,503,504", alias="RETRY_CODES")
37
- RETRY_STATUS_CODES: Set[int] = {429, 500, 502, 503, 504}
38
- BACKOFF_FACTOR: float = Field(default=0.5, gt=0)
39
- JITTER_FACTOR: float = Field(default=0.2, ge=0)
40
-
41
- @field_validator('RETRY_STATUS_CODES', mode='before')
42
- @classmethod
43
- def parse_retry_codes(cls, v, values):
44
- """Parses the comma-separated string of retry codes into a set of integers."""
45
- retry_codes_str = values.data.get('RETRY_CODES_STR', "429,500,502,503,504")
46
- try:
47
- return {int(code.strip()) for code in retry_codes_str.split(',')}
48
- except (ValueError, AttributeError) as e:
49
- logging.error(f"Invalid RETRY_CODES format: '{retry_codes_str}'. Error: {e}. Using default.")
50
- return {429, 500, 502, 503, 504}
51
-
52
- class Config:
53
- env_file = '.env'
54
- env_file_encoding = 'utf-8'
55
-
56
- # --- Initialize Settings and Global Services ---
57
- settings = Settings()
58
-
59
- # Custom logging filter to inject request_id
60
- class RequestIdFilter(logging.Filter):
61
- def filter(self, record):
62
- record.request_id = getattr(record, 'request_id', 'main')
63
- return True
64
 
 
 
65
  logging.basicConfig(
66
- level=settings.LOG_LEVEL.upper(),
67
- format='%(asctime)s - %(levelname)s - [%(request_id)s] - %(message)s'
68
  )
69
- logger = logging.getLogger(__name__)
70
- logger.addFilter(RequestIdFilter())
71
-
72
- # Initialize data generation tools
73
- faker = Faker()
74
  try:
75
- ua = UserAgent()
76
- except Exception:
77
- # Fallback if fake-useragent server is down
78
- ua = None
79
- logger.warning("Could not initialize UserAgent. A fallback will be used.")
 
 
 
 
 
80
 
81
  # --- HTTPX Client Lifecycle Management ---
82
  @asynccontextmanager
83
  async def lifespan(app: FastAPI):
84
- """Manages the lifecycle of the HTTPX client for the application."""
85
- logger.info(f"Starting application. Proxying to {settings.TARGET_URL}")
86
- logger.info(f"Will retry on status codes: {settings.RETRY_STATUS_CODES}")
87
- async with httpx.AsyncClient(base_url=settings.TARGET_URL, timeout=None) as client:
88
  app.state.http_client = client
89
  yield
90
- logger.info("Application shut down.")
91
 
92
- # Initialize the FastAPI app
93
  app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
94
 
95
- # --- Helper Functions ---
96
- def get_random_user_agent() -> str:
97
- """Returns a random User-Agent, with a fallback."""
98
- if ua:
99
- try:
100
- return ua.random
101
- except Exception:
102
- logger.warning("Failed to get random User-Agent, using fallback.")
103
- # Fallback User-Agent
104
- return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36"
105
-
106
- def prepare_forward_headers(incoming_headers: Dict, client_host: str) -> (Dict, str):
107
- """
108
- Prepares headers for the downstream request, adding a comprehensive set of
109
- spoofed IP and a random User-Agent.
110
- """
111
- # Start with a clean slate of lower-cased headers
112
- forward_headers = {k.lower(): v for k, v in incoming_headers.items()}
113
-
114
- # Remove headers that are specific to the incoming request's connection or could leak info
115
- headers_to_remove = [
116
- "host", "content-length", "x-forwarded-for", "x-real-ip", "forwarded",
117
- "via", "x-client-ip", "x-forwarded-proto", "x-forwarded-host"
118
- ]
119
- for h in headers_to_remove:
120
- forward_headers.pop(h, None)
121
-
122
- spoofed_ip = faker.ipv4_public()
123
-
124
- # Add a comprehensive set of headers to mask the origin
125
- override_headers = {
126
- # Standard headers
127
- "x-forwarded-for": f"{spoofed_ip}, {faker.ipv4_public()}", # Append a fake proxy chain
128
- "x-real-ip": spoofed_ip,
129
-
130
- # RFC 7239 standard, more structured
131
- "forwarded": f"for={spoofed_ip};proto=https",
132
-
133
- # Common non-standard headers
134
- "x-client-ip": spoofed_ip,
135
- "x-originating-ip": spoofed_ip,
136
- "x-remote-ip": spoofed_ip,
137
- "x-remote-addr": spoofed_ip,
138
-
139
- # Cloudflare-specific headers
140
- "cf-connecting-ip": spoofed_ip,
141
- "true-client-ip": spoofed_ip,
142
-
143
- # Other proxy headers
144
- "via": "1.1 google", # Fake a passthrough via a common service
145
-
146
- # Dynamic User-Agent
147
- "user-agent": get_random_user_agent(),
148
- }
149
- forward_headers.update(override_headers)
150
-
151
- return forward_headers, spoofed_ip
152
-
153
  # --- API Endpoints ---
154
- @app.get("/", include_in_schema=False)
155
  async def health_check():
156
- """Provides a basic health check endpoint."""
157
- return JSONResponse({
158
- "status": "ok",
159
- "target_url": settings.TARGET_URL,
160
- "max_retries": settings.MAX_RETRIES
161
- })
162
 
163
- @app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], include_in_schema=False)
164
  async def reverse_proxy_handler(request: Request):
165
  """
166
- A catch-all reverse proxy that forwards requests with advanced retry logic,
167
- backoff, and dynamic header generation.
168
  """
169
  start_time = time.monotonic()
170
- request_id = str(uuid.uuid4())
171
- log_extra = {'request_id': request_id}
172
-
173
  client: httpx.AsyncClient = request.app.state.http_client
174
  url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
175
 
176
- forward_headers, spoofed_ip = prepare_forward_headers(dict(request.headers), request.client.host)
 
 
 
 
177
 
178
- logger.info(f"Incoming: {request.method} {url.path} from {request.client.host}. Spoofed IP: {spoofed_ip}", extra=log_extra)
 
 
 
 
 
 
 
 
 
 
179
 
180
- request_body = await request.body()
 
 
181
 
 
182
  last_exception = None
183
- for attempt in range(settings.MAX_RETRIES):
184
- if attempt > 0:
185
- backoff_delay = settings.BACKOFF_FACTOR * (2 ** (attempt - 1))
186
- jitter = random.uniform(-settings.JITTER_FACTOR, settings.JITTER_FACTOR) * backoff_delay
187
- sleep_duration = max(0, backoff_delay + jitter)
188
- logger.warning(f"Attempt {attempt}/{settings.MAX_RETRIES} failed. Retrying in {sleep_duration:.2f}s...", extra=log_extra)
189
- await asyncio.sleep(sleep_duration)
190
-
191
  try:
192
- req = client.build_request(
193
- method=request.method,
194
- url=url,
195
- headers=forward_headers,
196
- content=request_body,
197
  )
198
- resp = await client.send(req, stream=True)
199
-
200
- if resp.status_code not in settings.RETRY_STATUS_CODES or attempt == settings.MAX_RETRIES - 1:
201
  duration_ms = (time.monotonic() - start_time) * 1000
202
- log_func = logger.info if resp.is_success else logger.warning
203
- log_func(
204
- f"Finished: {request.method} {url.path} status={resp.status_code} attempt={attempt+1} latency={duration_ms:.2f}ms",
205
- extra=log_extra
206
- )
207
  return StreamingResponse(
208
- resp.aiter_raw(),
209
- status_code=resp.status_code,
210
- headers=resp.headers,
211
- background=BackgroundTask(resp.aclose),
212
  )
213
 
214
- await resp.aclose()
215
- last_exception = f"Last failed attempt returned status code {resp.status_code}"
216
-
217
- except httpx.RequestError as e:
218
  last_exception = e
219
- logger.warning(f"HTTPX RequestError on attempt {attempt + 1}: {e}", extra=log_extra)
220
- if attempt == settings.MAX_RETRIES - 1:
221
  break
 
 
 
 
 
 
222
 
223
  duration_ms = (time.monotonic() - start_time) * 1000
224
- logger.critical(
225
- f"Failed permanently: {request.method} {url.path} after {settings.MAX_RETRIES} attempts. latency={duration_ms:.2f}ms. Last error: {last_exception}",
226
- extra=log_extra
227
- )
228
  raise HTTPException(
229
  status_code=502,
230
- detail=f"Bad Gateway: The server was unable to process the request after {settings.MAX_RETRIES} attempts. Last error: {last_exception}"
231
  )
 
2
  from fastapi import FastAPI, Request, HTTPException
3
  from starlette.responses import StreamingResponse, JSONResponse
4
  from starlette.background import BackgroundTask
 
 
 
5
  import os
6
  import random
7
  import logging
8
  import time
9
+ from contextlib import asynccontextmanager
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
 
11
+ # --- Configuration ---
12
+ LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
13
  logging.basicConfig(
14
+ level=LOG_LEVEL,
15
+ format='%(asctime)s - %(levelname)s - %(message)s'
16
  )
17
+ TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com")
18
+ MAX_RETRIES = int(os.getenv("MAX_RETRIES", "15"))
19
+ DEFAULT_RETRY_CODES = "429,500,502,503,504"
20
+ RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES)
 
21
  try:
22
+ RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',')}
23
+ logging.info(f"Retry on status codes: {RETRY_STATUS_CODES}")
24
+ except ValueError:
25
+ logging.error(f"Invalid RETRY_CODES: '{RETRY_CODES_STR}'. Using default: {DEFAULT_RETRY_CODES}")
26
+ RETRY_STATUS_CODES = {int(code.strip()) for code in DEFAULT_RETRY_CODES.split(',')}
27
+
28
+ # --- Helper Function ---
29
+ def generate_random_ip():
30
+ """Generates a random, valid-looking IPv4 address with improved randomization."""
31
+ return ".".join(str(random.randint(1, 255)) for _ in range(4))
32
 
33
  # --- HTTPX Client Lifecycle Management ---
34
  @asynccontextmanager
35
  async def lifespan(app: FastAPI):
36
+ """Manages the lifecycle of the HTTPX client."""
37
+ async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client:
 
 
38
  app.state.http_client = client
39
  yield
 
40
 
41
+ # Initialize FastAPI app
42
  app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
  # --- API Endpoints ---
45
+ @app.get("/")
46
  async def health_check():
47
+ """Basic health check endpoint."""
48
+ return JSONResponse({"status": "ok", "target": TARGET_URL})
 
 
 
 
49
 
50
+ @app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
51
  async def reverse_proxy_handler(request: Request):
52
  """
53
+ Catch-all reverse proxy that forwards requests to the target URL with retry logic and latency logging.
 
54
  """
55
  start_time = time.monotonic()
 
 
 
56
  client: httpx.AsyncClient = request.app.state.http_client
57
  url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
58
 
59
+ # Prepare headers
60
+ request_headers = dict(request.headers)
61
+ request_headers.pop("host", None)
62
+ random_ip = generate_random_ip()
63
+ logging.info(f"Client '{request.client.host}' proxied with spoofed IP: {random_ip} for path: {url.path}")
64
 
65
+ specific_headers = {
66
+ "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
67
+ "x-forwarded-for": random_ip,
68
+ "x-real-ip": random_ip,
69
+ "x-originating-ip": random_ip,
70
+ "x-remote-ip": random_ip,
71
+ "x-remote-addr": random_ip,
72
+ "x-host": random_ip,
73
+ "x-forwarded-host": random_ip,
74
+ }
75
+ request_headers.update(specific_headers)
76
 
77
+ # Preserve authorization header if present
78
+ if "authorization" in request.headers:
79
+ request_headers["authorization"] = request.headers["authorization"]
80
 
81
+ body = await request.body()
82
  last_exception = None
83
+
84
+ for attempt in range(MAX_RETRIES):
 
 
 
 
 
 
85
  try:
86
+ rp_req = client.build_request(
87
+ method=request.method, url=url, headers=request_headers, content=body
 
 
 
88
  )
89
+ rp_resp = await client.send(rp_req, stream=True)
90
+
91
+ if rp_resp.status_code not in RETRY_STATUS_CODES or attempt == MAX_RETRIES - 1:
92
  duration_ms = (time.monotonic() - start_time) * 1000
93
+ log_func = logging.info if rp_resp.is_success else logging.warning
94
+ log_func(f"Request finished: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
95
+
 
 
96
  return StreamingResponse(
97
+ rp_resp.aiter_raw(),
98
+ status_code=rp_resp.status_code,
99
+ headers=rp_resp.headers,
100
+ background=BackgroundTask(rp_resp.aclose),
101
  )
102
 
103
+ logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with status {rp_resp.status_code}. Retrying...")
104
+ await rp_resp.aclose()
105
+
106
+ except httpx.ConnectError as e:
107
  last_exception = e
108
+ logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} for {url.path} failed with connection error: {e}")
109
+ if attempt == MAX_RETRIES - 1:
110
  break
111
+ await asyncio.sleep(2 ** attempt) # Exponential backoff
112
+
113
+ except Exception as e:
114
+ last_exception = e
115
+ logging.error(f"Unexpected error on attempt {attempt + 1}/{MAX_RETRIES} for {url.path}: {e}")
116
+ break
117
 
118
  duration_ms = (time.monotonic() - start_time) * 1000
119
+ logging.critical(f"Request failed: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
120
+
 
 
121
  raise HTTPException(
122
  status_code=502,
123
+ detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
124
  )