Spaces:
Paused
Paused
| """ | |
| Proxy handler for Z.AI API requests, updated with simplified signature logic. | |
| """ | |
| import json, logging, re, time, uuid, base64, hashlib, hmac | |
| from typing import AsyncGenerator, Dict, Any, Tuple, List | |
| import httpx | |
| from fastapi import HTTPException | |
| from fastapi.responses import StreamingResponse | |
| from config import settings | |
| from cookie_manager import cookie_manager | |
| from models import ChatCompletionRequest, ChatCompletionResponse | |
| logger = logging.getLogger(__name__) | |
| class ProxyHandler: | |
| def __init__(self): | |
| self.client = httpx.AsyncClient( | |
| timeout=httpx.Timeout(60.0, read=300.0), | |
| limits=httpx.Limits(max_connections=100, max_keepalive_connections=20), | |
| http2=True, | |
| ) | |
| # The primary secret key from the reference code. | |
| self.primary_secret = "junjie".encode('utf-8') | |
| async def aclose(self): | |
| if not self.client.is_closed: | |
| await self.client.aclose() | |
| def _get_timestamp_millis(self) -> int: | |
| return int(time.time() * 1000) | |
| def _parse_jwt_token(self, token: str) -> Dict[str, str]: | |
| """A simple JWT payload decoder to get user ID ('sub' claim).""" | |
| try: | |
| parts = token.split('.') | |
| if len(parts) != 3: return {"user_id": ""} | |
| payload_b64 = parts[1] | |
| payload_b64 += '=' * (-len(payload_b64) % 4) # Add padding if needed | |
| payload_json = base64.urlsafe_b64decode(payload_b64).decode('utf-8') | |
| payload = json.loads(payload_json) | |
| return {"user_id": payload.get("sub", "")} | |
| except Exception: | |
| # It's okay if this fails; we'll proceed with an empty user_id. | |
| return {"user_id": ""} | |
| def _generate_signature(self, e_payload: str, t_payload: str) -> Dict[str, Any]: | |
| """ | |
| Generates the signature based on the logic from the reference JS code (work.js). | |
| This is a two-level HMAC-SHA256 process with Base64 encoding for the content. | |
| Args: | |
| e_payload (str): The simplified payload string (e.g., "requestId,...,timestamp,..."). | |
| t_payload (str): The last message content. | |
| Returns: | |
| A dictionary with 'signature' and 'timestamp'. | |
| """ | |
| timestamp_ms = self._get_timestamp_millis() | |
| # --- MODIFICATION START --- | |
| # As per work.js, the last message content (t_payload) must be Base64 encoded. | |
| content_base64 = base64.b64encode(t_payload.encode('utf-8')).decode('utf-8') | |
| # Concatenate with the Base64 encoded content | |
| message_string = f"{e_payload}|{content_base64}|{timestamp_ms}" | |
| # --- MODIFICATION END --- | |
| # Per the Python snippet and JS reference: n is a 5-minute bucket | |
| n = timestamp_ms // (5 * 60 * 1000) | |
| # Intermediate key derivation | |
| msg1 = str(n).encode("utf-8") | |
| intermediate_key = hmac.new(self.primary_secret, msg1, hashlib.sha256).hexdigest() | |
| # Final signature | |
| msg2 = message_string.encode("utf-8") | |
| final_signature = hmac.new(intermediate_key.encode("utf-8"), msg2, hashlib.sha256).hexdigest() | |
| return {"signature": final_signature, "timestamp": timestamp_ms} | |
| def _clean_thinking_content(self, text: str) -> str: | |
| if not text: return "" | |
| cleaned_text = re.sub(r'<summary>.*?</summary>|<glm_block.*?</glm_block>|<[^>]*duration="[^"]*"[^>]*>', '', text, flags=re.DOTALL) | |
| cleaned_text = cleaned_text.replace("</thinking>", "").replace("<Full>", "").replace("</Full>", "") | |
| cleaned_text = re.sub(r'</?details[^>]*>', '', cleaned_text) | |
| cleaned_text = re.sub(r'^\s*>\s*(?!>)', '', cleaned_text, flags=re.MULTILINE) | |
| cleaned_text = cleaned_text.replace("Thinking…", "") | |
| return cleaned_text.strip() | |
| def _clean_answer_content(self, text: str) -> str: | |
| if not text: return "" | |
| cleaned_text = re.sub(r'<glm_block.*?</glm_block>|<details[^>]*>.*?</details>|<summary>.*?</summary>', '', text, flags=re.DOTALL) | |
| return cleaned_text | |
| def _serialize_msgs(self, msgs) -> list: | |
| out = [] | |
| for m in msgs: | |
| # Adapting to Pydantic v1/v2 and dicts | |
| if hasattr(m, "dict"): out.append(m.dict()) | |
| elif hasattr(m, "model_dump"): out.append(m.model_dump()) | |
| elif isinstance(m, dict): out.append(m) | |
| else: out.append({"role": getattr(m, "role", "user"), "content": getattr(m, "content", str(m))}) | |
| return out | |
| async def _prep_upstream(self, req: ChatCompletionRequest) -> Tuple[Dict[str, Any], Dict[str, str], str, str]: | |
| """Prepares the request body, headers, cookie, and URL for the upstream API.""" | |
| ck = await cookie_manager.get_next_cookie() | |
| if not ck: raise HTTPException(503, "No available cookies") | |
| model = settings.UPSTREAM_MODEL if req.model == settings.MODEL_NAME else req.model | |
| chat_id = str(uuid.uuid4()) | |
| request_id = str(uuid.uuid4()) | |
| # --- NEW Simplified Signature Payload Logic --- | |
| user_info = self._parse_jwt_token(ck) | |
| user_id = user_info.get("user_id", "") | |
| # The reference code uses a separate UUID for user_id in payload, let's follow that. | |
| # This seems strange, but let's replicate the reference code exactly. | |
| payload_user_id = str(uuid.uuid4()) | |
| payload_request_id = str(uuid.uuid4()) | |
| payload_timestamp = str(self._get_timestamp_millis()) | |
| # e: The simplified payload for the signature | |
| e_payload = f"requestId,{payload_request_id},timestamp,{payload_timestamp},user_id,{payload_user_id}" | |
| # t: The last message content | |
| t_payload = "" | |
| if req.messages: | |
| last_message = req.messages[-1] | |
| if isinstance(last_message.content, str): | |
| t_payload = last_message.content | |
| # Generate the signature | |
| signature_data = self._generate_signature(e_payload, t_payload) | |
| signature = signature_data["signature"] | |
| signature_timestamp = signature_data["timestamp"] | |
| # The reference code sends these as URL parameters, not in the body. | |
| url_params = { | |
| "requestId": payload_request_id, | |
| "timestamp": payload_timestamp, | |
| "user_id": payload_user_id, | |
| "signature_timestamp": str(signature_timestamp) | |
| } | |
| # Construct URL with query parameters | |
| # Note: The reference code has a typo `f"{BASE_URL}/api/chat/completions"`, it should be `z.ai` | |
| final_url = httpx.URL(settings.UPSTREAM_URL).copy_with(params=url_params) | |
| body = { | |
| "stream": True, | |
| "model": model, | |
| "messages": self._serialize_msgs(req.messages), | |
| "chat_id": chat_id, | |
| "id": request_id, | |
| "features": { | |
| "image_generation": False, | |
| "web_search": False, | |
| "auto_web_search": False, | |
| "preview_mode": False, | |
| "flags": [], | |
| "enable_thinking": True, | |
| } | |
| } | |
| headers = { | |
| "Accept": "*/*", | |
| "Accept-Language": "zh-CN", | |
| "Authorization": f"Bearer {ck}", | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "Content-Type": "application/json", | |
| "Origin": "https://chat.z.ai", | |
| "Referer": "https://chat.z.ai/", | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36", | |
| "X-FE-Version": "prod-fe-1.0.103", | |
| "X-Signature": signature, | |
| } | |
| return body, headers, ck, str(final_url) | |
| async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]: | |
| ck = None | |
| try: | |
| body, headers, ck, url = await self._prep_upstream(req) | |
| comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}" | |
| think_open = False | |
| yielded_think_buffer = "" | |
| current_raw_thinking = "" | |
| is_first_answer_chunk = True | |
| async def yield_delta(content_type: str, text: str): | |
| nonlocal think_open, yielded_think_buffer | |
| if content_type == "thinking" and settings.SHOW_THINK_TAGS: | |
| if not think_open: | |
| yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]})}\n\n" | |
| think_open = True | |
| cleaned_full_text = self._clean_thinking_content(text) | |
| delta_to_send = cleaned_full_text[len(yielded_think_buffer):] | |
| if delta_to_send: | |
| yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': delta_to_send}, 'finish_reason': None}]})}\n\n" | |
| yielded_think_buffer = cleaned_full_text | |
| elif content_type == "answer": | |
| if think_open: | |
| yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n" | |
| think_open = False | |
| cleaned_text = self._clean_answer_content(text) | |
| if cleaned_text: | |
| yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': cleaned_text}, 'finish_reason': None}]})}\n\n" | |
| async with self.client.stream("POST", url, json=body, headers=headers) as resp: | |
| if resp.status_code != 200: | |
| await cookie_manager.mark_cookie_failed(ck); err_body = await resp.aread() | |
| err_msg = f"Error: {resp.status_code} - {err_body.decode(errors='ignore')}" | |
| logger.error(f"Upstream error: {err_msg}") | |
| err = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": err_msg}, "finish_reason": "stop"}],} | |
| yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return | |
| await cookie_manager.mark_cookie_success(ck) | |
| async for raw in resp.aiter_text(): | |
| for line in raw.strip().split('\n'): | |
| line = line.strip() | |
| if not line.startswith('data: '): continue | |
| payload_str = line[6:] | |
| # The reference code has a special 'done' phase, but the original Z.AI uses [DONE] | |
| if payload_str == '[DONE]': | |
| if think_open: | |
| yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n" | |
| yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"; | |
| yield "data: [DONE]\n\n"; | |
| return | |
| try: | |
| dat = json.loads(payload_str).get("data", {}) | |
| except (json.JSONDecodeError, AttributeError): continue | |
| phase = dat.get("phase") | |
| content_chunk = dat.get("delta_content") or dat.get("edit_content") | |
| if not content_chunk: | |
| # Handle case where chunk is just usage info, etc. | |
| if phase == 'other' and dat.get('usage'): | |
| pass # In streaming, usage might come with the final chunk | |
| else: | |
| continue | |
| if phase == "thinking": | |
| current_raw_thinking = content_chunk if dat.get("edit_content") is not None else current_raw_thinking + content_chunk | |
| async for item in yield_delta("thinking", current_raw_thinking): | |
| yield item | |
| elif phase == "answer": | |
| content_to_process = content_chunk | |
| if is_first_answer_chunk: | |
| if '</details>' in content_to_process: | |
| parts = content_to_process.split('</details>', 1) | |
| content_to_process = parts[1] if len(parts) > 1 else "" | |
| is_first_answer_chunk = False | |
| if content_to_process: | |
| async for item in yield_delta("answer", content_to_process): | |
| yield item | |
| except Exception: | |
| logger.exception("Stream error"); raise | |
| async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse: | |
| # This part of the code can be simplified as well, but let's focus on fixing the streaming first. | |
| # The logic will be almost identical to the streaming one. | |
| ck = None | |
| try: | |
| body, headers, ck, url = await self._prep_upstream(req) | |
| # For non-stream, set stream to False in the body | |
| body["stream"] = False | |
| async with self.client.post(url, json=body, headers=headers) as resp: | |
| if resp.status_code != 200: | |
| await cookie_manager.mark_cookie_failed(ck); error_detail = await resp.text() | |
| logger.error(f"Upstream error: {resp.status_code} - {error_detail}") | |
| raise HTTPException(resp.status_code, f"Upstream error: {error_detail}") | |
| await cookie_manager.mark_cookie_success(ck) | |
| # Z.AI non-stream response is a single JSON object | |
| response_data = resp.json() | |
| # We need to adapt Z.AI's response format to OpenAI's format | |
| final_content = "" | |
| finish_reason = "stop" # Default | |
| if "choices" in response_data and response_data["choices"]: | |
| first_choice = response_data["choices"][0] | |
| if "message" in first_choice and "content" in first_choice["message"]: | |
| final_content = first_choice["message"]["content"] | |
| if "finish_reason" in first_choice: | |
| finish_reason = first_choice["finish_reason"] | |
| return ChatCompletionResponse( | |
| id=response_data.get("id", f"chatcmpl-{uuid.uuid4().hex[:29]}"), | |
| created=int(time.time()), | |
| model=req.model, | |
| choices=[{"index": 0, "message": {"role": "assistant", "content": final_content}, "finish_reason": finish_reason}], | |
| ) | |
| except Exception: | |
| logger.exception("Non-stream processing failed"); raise | |
| async def handle_chat_completion(self, req: ChatCompletionRequest): | |
| """Determines whether to stream or not and handles the request.""" | |
| stream = bool(req.stream) if req.stream is not None else settings.DEFAULT_STREAM | |
| if stream: | |
| return StreamingResponse(self.stream_proxy_response(req), media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}) | |
| return await self.non_stream_proxy_response(req) |