""" Proxy handler for Z.AI API requests, updated with simplified signature logic. """ import json, logging, re, time, uuid, base64, hashlib, hmac from typing import AsyncGenerator, Dict, Any, Tuple, List import httpx from fastapi import HTTPException from fastapi.responses import StreamingResponse from config import settings from cookie_manager import cookie_manager from models import ChatCompletionRequest, ChatCompletionResponse logger = logging.getLogger(__name__) class ProxyHandler: def __init__(self): self.client = httpx.AsyncClient( timeout=httpx.Timeout(60.0, read=300.0), limits=httpx.Limits(max_connections=100, max_keepalive_connections=20), http2=True, ) # The primary secret key from the reference code. self.primary_secret = "junjie".encode('utf-8') async def aclose(self): if not self.client.is_closed: await self.client.aclose() def _get_timestamp_millis(self) -> int: return int(time.time() * 1000) def _parse_jwt_token(self, token: str) -> Dict[str, str]: """A simple JWT payload decoder to get user ID ('sub' claim).""" try: parts = token.split('.') if len(parts) != 3: return {"user_id": ""} payload_b64 = parts[1] payload_b64 += '=' * (-len(payload_b64) % 4) # Add padding if needed payload_json = base64.urlsafe_b64decode(payload_b64).decode('utf-8') payload = json.loads(payload_json) return {"user_id": payload.get("sub", "")} except Exception: # It's okay if this fails; we'll proceed with an empty user_id. return {"user_id": ""} def _generate_signature(self, e_payload: str, t_payload: str) -> Dict[str, Any]: """ Generates the signature based on the logic from the reference JS code (work.js). This is a two-level HMAC-SHA256 process with Base64 encoding for the content. Args: e_payload (str): The simplified payload string (e.g., "requestId,...,timestamp,..."). t_payload (str): The last message content. Returns: A dictionary with 'signature' and 'timestamp'. """ timestamp_ms = self._get_timestamp_millis() # --- MODIFICATION START --- # As per work.js, the last message content (t_payload) must be Base64 encoded. content_base64 = base64.b64encode(t_payload.encode('utf-8')).decode('utf-8') # Concatenate with the Base64 encoded content message_string = f"{e_payload}|{content_base64}|{timestamp_ms}" # --- MODIFICATION END --- # Per the Python snippet and JS reference: n is a 5-minute bucket n = timestamp_ms // (5 * 60 * 1000) # Intermediate key derivation msg1 = str(n).encode("utf-8") intermediate_key = hmac.new(self.primary_secret, msg1, hashlib.sha256).hexdigest() # Final signature msg2 = message_string.encode("utf-8") final_signature = hmac.new(intermediate_key.encode("utf-8"), msg2, hashlib.sha256).hexdigest() return {"signature": final_signature, "timestamp": timestamp_ms} def _clean_thinking_content(self, text: str) -> str: if not text: return "" cleaned_text = re.sub(r'.*?||<[^>]*duration="[^"]*"[^>]*>', '', text, flags=re.DOTALL) cleaned_text = cleaned_text.replace("", "").replace("", "").replace("", "") cleaned_text = re.sub(r']*>', '', cleaned_text) cleaned_text = re.sub(r'^\s*>\s*(?!>)', '', cleaned_text, flags=re.MULTILINE) cleaned_text = cleaned_text.replace("Thinking…", "") return cleaned_text.strip() def _clean_answer_content(self, text: str) -> str: if not text: return "" cleaned_text = re.sub(r'|]*>.*?|.*?', '', text, flags=re.DOTALL) return cleaned_text def _serialize_msgs(self, msgs) -> list: out = [] for m in msgs: # Adapting to Pydantic v1/v2 and dicts if hasattr(m, "dict"): out.append(m.dict()) elif hasattr(m, "model_dump"): out.append(m.model_dump()) elif isinstance(m, dict): out.append(m) else: out.append({"role": getattr(m, "role", "user"), "content": getattr(m, "content", str(m))}) return out async def _prep_upstream(self, req: ChatCompletionRequest) -> Tuple[Dict[str, Any], Dict[str, str], str, str]: """Prepares the request body, headers, cookie, and URL for the upstream API.""" ck = await cookie_manager.get_next_cookie() if not ck: raise HTTPException(503, "No available cookies") model = settings.UPSTREAM_MODEL if req.model == settings.MODEL_NAME else req.model chat_id = str(uuid.uuid4()) request_id = str(uuid.uuid4()) # --- NEW Simplified Signature Payload Logic --- user_info = self._parse_jwt_token(ck) user_id = user_info.get("user_id", "") # The reference code uses a separate UUID for user_id in payload, let's follow that. # This seems strange, but let's replicate the reference code exactly. payload_user_id = str(uuid.uuid4()) payload_request_id = str(uuid.uuid4()) payload_timestamp = str(self._get_timestamp_millis()) # e: The simplified payload for the signature e_payload = f"requestId,{payload_request_id},timestamp,{payload_timestamp},user_id,{payload_user_id}" # t: The last message content t_payload = "" if req.messages: last_message = req.messages[-1] if isinstance(last_message.content, str): t_payload = last_message.content # Generate the signature signature_data = self._generate_signature(e_payload, t_payload) signature = signature_data["signature"] signature_timestamp = signature_data["timestamp"] # The reference code sends these as URL parameters, not in the body. url_params = { "requestId": payload_request_id, "timestamp": payload_timestamp, "user_id": payload_user_id, "signature_timestamp": str(signature_timestamp) } # Construct URL with query parameters # Note: The reference code has a typo `f"{BASE_URL}/api/chat/completions"`, it should be `z.ai` final_url = httpx.URL(settings.UPSTREAM_URL).copy_with(params=url_params) body = { "stream": True, "model": model, "messages": self._serialize_msgs(req.messages), "chat_id": chat_id, "id": request_id, "features": { "image_generation": False, "web_search": False, "auto_web_search": False, "preview_mode": False, "flags": [], "enable_thinking": True, } } headers = { "Accept": "*/*", "Accept-Language": "zh-CN", "Authorization": f"Bearer {ck}", "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/json", "Origin": "https://chat.z.ai", "Referer": "https://chat.z.ai/", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36", "X-FE-Version": "prod-fe-1.0.103", "X-Signature": signature, } return body, headers, ck, str(final_url) async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]: ck = None try: body, headers, ck, url = await self._prep_upstream(req) comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}" think_open = False yielded_think_buffer = "" current_raw_thinking = "" is_first_answer_chunk = True async def yield_delta(content_type: str, text: str): nonlocal think_open, yielded_think_buffer if content_type == "thinking" and settings.SHOW_THINK_TAGS: if not think_open: yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': ''}, 'finish_reason': None}]})}\n\n" think_open = True cleaned_full_text = self._clean_thinking_content(text) delta_to_send = cleaned_full_text[len(yielded_think_buffer):] if delta_to_send: yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': delta_to_send}, 'finish_reason': None}]})}\n\n" yielded_think_buffer = cleaned_full_text elif content_type == "answer": if think_open: yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': ''}, 'finish_reason': None}]})}\n\n" think_open = False cleaned_text = self._clean_answer_content(text) if cleaned_text: yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': cleaned_text}, 'finish_reason': None}]})}\n\n" async with self.client.stream("POST", url, json=body, headers=headers) as resp: if resp.status_code != 200: await cookie_manager.mark_cookie_failed(ck); err_body = await resp.aread() err_msg = f"Error: {resp.status_code} - {err_body.decode(errors='ignore')}" logger.error(f"Upstream error: {err_msg}") err = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": err_msg}, "finish_reason": "stop"}],} yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return await cookie_manager.mark_cookie_success(ck) async for raw in resp.aiter_text(): for line in raw.strip().split('\n'): line = line.strip() if not line.startswith('data: '): continue payload_str = line[6:] # The reference code has a special 'done' phase, but the original Z.AI uses [DONE] if payload_str == '[DONE]': if think_open: yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': ''}, 'finish_reason': None}]})}\n\n" yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"; yield "data: [DONE]\n\n"; return try: dat = json.loads(payload_str).get("data", {}) except (json.JSONDecodeError, AttributeError): continue phase = dat.get("phase") content_chunk = dat.get("delta_content") or dat.get("edit_content") if not content_chunk: # Handle case where chunk is just usage info, etc. if phase == 'other' and dat.get('usage'): pass # In streaming, usage might come with the final chunk else: continue if phase == "thinking": current_raw_thinking = content_chunk if dat.get("edit_content") is not None else current_raw_thinking + content_chunk async for item in yield_delta("thinking", current_raw_thinking): yield item elif phase == "answer": content_to_process = content_chunk if is_first_answer_chunk: if '' in content_to_process: parts = content_to_process.split('', 1) content_to_process = parts[1] if len(parts) > 1 else "" is_first_answer_chunk = False if content_to_process: async for item in yield_delta("answer", content_to_process): yield item except Exception: logger.exception("Stream error"); raise async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse: # This part of the code can be simplified as well, but let's focus on fixing the streaming first. # The logic will be almost identical to the streaming one. ck = None try: body, headers, ck, url = await self._prep_upstream(req) # For non-stream, set stream to False in the body body["stream"] = False async with self.client.post(url, json=body, headers=headers) as resp: if resp.status_code != 200: await cookie_manager.mark_cookie_failed(ck); error_detail = await resp.text() logger.error(f"Upstream error: {resp.status_code} - {error_detail}") raise HTTPException(resp.status_code, f"Upstream error: {error_detail}") await cookie_manager.mark_cookie_success(ck) # Z.AI non-stream response is a single JSON object response_data = resp.json() # We need to adapt Z.AI's response format to OpenAI's format final_content = "" finish_reason = "stop" # Default if "choices" in response_data and response_data["choices"]: first_choice = response_data["choices"][0] if "message" in first_choice and "content" in first_choice["message"]: final_content = first_choice["message"]["content"] if "finish_reason" in first_choice: finish_reason = first_choice["finish_reason"] return ChatCompletionResponse( id=response_data.get("id", f"chatcmpl-{uuid.uuid4().hex[:29]}"), created=int(time.time()), model=req.model, choices=[{"index": 0, "message": {"role": "assistant", "content": final_content}, "finish_reason": finish_reason}], ) except Exception: logger.exception("Non-stream processing failed"); raise async def handle_chat_completion(self, req: ChatCompletionRequest): """Determines whether to stream or not and handles the request.""" stream = bool(req.stream) if req.stream is not None else settings.DEFAULT_STREAM if stream: return StreamingResponse(self.stream_proxy_response(req), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}) return await self.non_stream_proxy_response(req)