zai2api / proxy_handler.py
bluewinliang's picture
Update proxy_handler.py
ef7c129 verified
"""
Proxy handler for Z.AI API requests, updated with simplified signature logic.
"""
import json, logging, re, time, uuid, base64, hashlib, hmac
from typing import AsyncGenerator, Dict, Any, Tuple, List
import httpx
from fastapi import HTTPException
from fastapi.responses import StreamingResponse
from config import settings
from cookie_manager import cookie_manager
from models import ChatCompletionRequest, ChatCompletionResponse
logger = logging.getLogger(__name__)
class ProxyHandler:
def __init__(self):
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(60.0, read=300.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
http2=True,
)
# The primary secret key from the reference code.
self.primary_secret = "junjie".encode('utf-8')
async def aclose(self):
if not self.client.is_closed:
await self.client.aclose()
def _get_timestamp_millis(self) -> int:
return int(time.time() * 1000)
def _parse_jwt_token(self, token: str) -> Dict[str, str]:
"""A simple JWT payload decoder to get user ID ('sub' claim)."""
try:
parts = token.split('.')
if len(parts) != 3: return {"user_id": ""}
payload_b64 = parts[1]
payload_b64 += '=' * (-len(payload_b64) % 4) # Add padding if needed
payload_json = base64.urlsafe_b64decode(payload_b64).decode('utf-8')
payload = json.loads(payload_json)
return {"user_id": payload.get("sub", "")}
except Exception:
# It's okay if this fails; we'll proceed with an empty user_id.
return {"user_id": ""}
def _generate_signature(self, e_payload: str, t_payload: str) -> Dict[str, Any]:
"""
Generates the signature based on the logic from the reference JS code (work.js).
This is a two-level HMAC-SHA256 process with Base64 encoding for the content.
Args:
e_payload (str): The simplified payload string (e.g., "requestId,...,timestamp,...").
t_payload (str): The last message content.
Returns:
A dictionary with 'signature' and 'timestamp'.
"""
timestamp_ms = self._get_timestamp_millis()
# --- MODIFICATION START ---
# As per work.js, the last message content (t_payload) must be Base64 encoded.
content_base64 = base64.b64encode(t_payload.encode('utf-8')).decode('utf-8')
# Concatenate with the Base64 encoded content
message_string = f"{e_payload}|{content_base64}|{timestamp_ms}"
# --- MODIFICATION END ---
# Per the Python snippet and JS reference: n is a 5-minute bucket
n = timestamp_ms // (5 * 60 * 1000)
# Intermediate key derivation
msg1 = str(n).encode("utf-8")
intermediate_key = hmac.new(self.primary_secret, msg1, hashlib.sha256).hexdigest()
# Final signature
msg2 = message_string.encode("utf-8")
final_signature = hmac.new(intermediate_key.encode("utf-8"), msg2, hashlib.sha256).hexdigest()
return {"signature": final_signature, "timestamp": timestamp_ms}
def _clean_thinking_content(self, text: str) -> str:
if not text: return ""
cleaned_text = re.sub(r'<summary>.*?</summary>|<glm_block.*?</glm_block>|<[^>]*duration="[^"]*"[^>]*>', '', text, flags=re.DOTALL)
cleaned_text = cleaned_text.replace("</thinking>", "").replace("<Full>", "").replace("</Full>", "")
cleaned_text = re.sub(r'</?details[^>]*>', '', cleaned_text)
cleaned_text = re.sub(r'^\s*>\s*(?!>)', '', cleaned_text, flags=re.MULTILINE)
cleaned_text = cleaned_text.replace("Thinking…", "")
return cleaned_text.strip()
def _clean_answer_content(self, text: str) -> str:
if not text: return ""
cleaned_text = re.sub(r'<glm_block.*?</glm_block>|<details[^>]*>.*?</details>|<summary>.*?</summary>', '', text, flags=re.DOTALL)
return cleaned_text
def _serialize_msgs(self, msgs) -> list:
out = []
for m in msgs:
# Adapting to Pydantic v1/v2 and dicts
if hasattr(m, "dict"): out.append(m.dict())
elif hasattr(m, "model_dump"): out.append(m.model_dump())
elif isinstance(m, dict): out.append(m)
else: out.append({"role": getattr(m, "role", "user"), "content": getattr(m, "content", str(m))})
return out
async def _prep_upstream(self, req: ChatCompletionRequest) -> Tuple[Dict[str, Any], Dict[str, str], str, str]:
"""Prepares the request body, headers, cookie, and URL for the upstream API."""
ck = await cookie_manager.get_next_cookie()
if not ck: raise HTTPException(503, "No available cookies")
model = settings.UPSTREAM_MODEL if req.model == settings.MODEL_NAME else req.model
chat_id = str(uuid.uuid4())
request_id = str(uuid.uuid4())
# --- NEW Simplified Signature Payload Logic ---
user_info = self._parse_jwt_token(ck)
user_id = user_info.get("user_id", "")
# The reference code uses a separate UUID for user_id in payload, let's follow that.
# This seems strange, but let's replicate the reference code exactly.
payload_user_id = str(uuid.uuid4())
payload_request_id = str(uuid.uuid4())
payload_timestamp = str(self._get_timestamp_millis())
# e: The simplified payload for the signature
e_payload = f"requestId,{payload_request_id},timestamp,{payload_timestamp},user_id,{payload_user_id}"
# t: The last message content
t_payload = ""
if req.messages:
last_message = req.messages[-1]
if isinstance(last_message.content, str):
t_payload = last_message.content
# Generate the signature
signature_data = self._generate_signature(e_payload, t_payload)
signature = signature_data["signature"]
signature_timestamp = signature_data["timestamp"]
# The reference code sends these as URL parameters, not in the body.
url_params = {
"requestId": payload_request_id,
"timestamp": payload_timestamp,
"user_id": payload_user_id,
"signature_timestamp": str(signature_timestamp)
}
# Construct URL with query parameters
# Note: The reference code has a typo `f"{BASE_URL}/api/chat/completions"`, it should be `z.ai`
final_url = httpx.URL(settings.UPSTREAM_URL).copy_with(params=url_params)
body = {
"stream": True,
"model": model,
"messages": self._serialize_msgs(req.messages),
"chat_id": chat_id,
"id": request_id,
"features": {
"image_generation": False,
"web_search": False,
"auto_web_search": False,
"preview_mode": False,
"flags": [],
"enable_thinking": True,
}
}
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN",
"Authorization": f"Bearer {ck}",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/json",
"Origin": "https://chat.z.ai",
"Referer": "https://chat.z.ai/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
"X-FE-Version": "prod-fe-1.0.103",
"X-Signature": signature,
}
return body, headers, ck, str(final_url)
async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]:
ck = None
try:
body, headers, ck, url = await self._prep_upstream(req)
comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
think_open = False
yielded_think_buffer = ""
current_raw_thinking = ""
is_first_answer_chunk = True
async def yield_delta(content_type: str, text: str):
nonlocal think_open, yielded_think_buffer
if content_type == "thinking" and settings.SHOW_THINK_TAGS:
if not think_open:
yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]})}\n\n"
think_open = True
cleaned_full_text = self._clean_thinking_content(text)
delta_to_send = cleaned_full_text[len(yielded_think_buffer):]
if delta_to_send:
yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': delta_to_send}, 'finish_reason': None}]})}\n\n"
yielded_think_buffer = cleaned_full_text
elif content_type == "answer":
if think_open:
yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
think_open = False
cleaned_text = self._clean_answer_content(text)
if cleaned_text:
yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': cleaned_text}, 'finish_reason': None}]})}\n\n"
async with self.client.stream("POST", url, json=body, headers=headers) as resp:
if resp.status_code != 200:
await cookie_manager.mark_cookie_failed(ck); err_body = await resp.aread()
err_msg = f"Error: {resp.status_code} - {err_body.decode(errors='ignore')}"
logger.error(f"Upstream error: {err_msg}")
err = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": err_msg}, "finish_reason": "stop"}],}
yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return
await cookie_manager.mark_cookie_success(ck)
async for raw in resp.aiter_text():
for line in raw.strip().split('\n'):
line = line.strip()
if not line.startswith('data: '): continue
payload_str = line[6:]
# The reference code has a special 'done' phase, but the original Z.AI uses [DONE]
if payload_str == '[DONE]':
if think_open:
yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n";
yield "data: [DONE]\n\n";
return
try:
dat = json.loads(payload_str).get("data", {})
except (json.JSONDecodeError, AttributeError): continue
phase = dat.get("phase")
content_chunk = dat.get("delta_content") or dat.get("edit_content")
if not content_chunk:
# Handle case where chunk is just usage info, etc.
if phase == 'other' and dat.get('usage'):
pass # In streaming, usage might come with the final chunk
else:
continue
if phase == "thinking":
current_raw_thinking = content_chunk if dat.get("edit_content") is not None else current_raw_thinking + content_chunk
async for item in yield_delta("thinking", current_raw_thinking):
yield item
elif phase == "answer":
content_to_process = content_chunk
if is_first_answer_chunk:
if '</details>' in content_to_process:
parts = content_to_process.split('</details>', 1)
content_to_process = parts[1] if len(parts) > 1 else ""
is_first_answer_chunk = False
if content_to_process:
async for item in yield_delta("answer", content_to_process):
yield item
except Exception:
logger.exception("Stream error"); raise
async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse:
# This part of the code can be simplified as well, but let's focus on fixing the streaming first.
# The logic will be almost identical to the streaming one.
ck = None
try:
body, headers, ck, url = await self._prep_upstream(req)
# For non-stream, set stream to False in the body
body["stream"] = False
async with self.client.post(url, json=body, headers=headers) as resp:
if resp.status_code != 200:
await cookie_manager.mark_cookie_failed(ck); error_detail = await resp.text()
logger.error(f"Upstream error: {resp.status_code} - {error_detail}")
raise HTTPException(resp.status_code, f"Upstream error: {error_detail}")
await cookie_manager.mark_cookie_success(ck)
# Z.AI non-stream response is a single JSON object
response_data = resp.json()
# We need to adapt Z.AI's response format to OpenAI's format
final_content = ""
finish_reason = "stop" # Default
if "choices" in response_data and response_data["choices"]:
first_choice = response_data["choices"][0]
if "message" in first_choice and "content" in first_choice["message"]:
final_content = first_choice["message"]["content"]
if "finish_reason" in first_choice:
finish_reason = first_choice["finish_reason"]
return ChatCompletionResponse(
id=response_data.get("id", f"chatcmpl-{uuid.uuid4().hex[:29]}"),
created=int(time.time()),
model=req.model,
choices=[{"index": 0, "message": {"role": "assistant", "content": final_content}, "finish_reason": finish_reason}],
)
except Exception:
logger.exception("Non-stream processing failed"); raise
async def handle_chat_completion(self, req: ChatCompletionRequest):
"""Determines whether to stream or not and handles the request."""
stream = bool(req.stream) if req.stream is not None else settings.DEFAULT_STREAM
if stream:
return StreamingResponse(self.stream_proxy_response(req), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"})
return await self.non_stream_proxy_response(req)