Spaces:
Running
Running
| from flask import Flask, request, Response, jsonify | |
| import requests | |
| import json | |
| import uuid | |
| import time | |
| import os | |
| import re | |
| import base64 | |
| import mimetypes | |
| import random | |
| from datetime import datetime, timedelta | |
| app = Flask(__name__) | |
| # ================= CONFIGURATION ================= | |
| COGNIX_BASE_URL = "https://www.cognixai.co" | |
| DEFAULT_SESSION_ID = "f351d7e7-a0ba-4888-86a4-76aab9a7a661" | |
| # Development Users Plan - 1 Month Validity | |
| PLAN_NAME = "Development Users Plan" | |
| # Set expiration to 1 month from now (current date: 2026-02-13) | |
| # Expiration: 2026-03-13 | |
| EXPIRATION_DATE = datetime(2026, 3, 13) | |
| # API Keys with "sk-rudraksh" like prefixes | |
| # Format: { "api_key": { "name": "...", "expiry": datetime } } | |
| AUTHORIZED_KEYS = { | |
| "sk-rudraksh": {"name": "Rudraksh"}, | |
| "sk-fardin-aslam": {"name": "Fardin Aslam"}, | |
| "sk-at41rv": {"name": "At41rv"}, | |
| "sk-wivodetok": {"name": "Wivodetok"}, | |
| "sk-fx-uk": {"name": "Fx uk"}, | |
| "sk-g-man": {"name": "G-Man"}, | |
| "sk-aakash-singh": {"name": "Aakash Singh"}, | |
| "sk-redollaz": {"name": "Redollaz"} | |
| } | |
| # Rate Limit Settings | |
| RPM_LIMIT = 60 | |
| RPD_LIMITS = { | |
| "claude opus-4.6": 1000, | |
| "3-pro": 1000, | |
| "gpt-5.3 codex": 1000 | |
| } | |
| # Tracking Data (In-memory) | |
| rpm_tracking = {} | |
| rpd_tracking = {} | |
| # Use USER'S provided cookies from cognix_proxy.py/demo.py | |
| COGNIX_COOKIES = [ | |
| "ext_name=ojplmecpdpgccookcobabopnaifgidhf; cf_clearance=j_nYaeNI0RwDRG1Qyd.bRf0R5YCGgIgAEzEgaQEjCCU-1770908625-1.2.1.1-RMchxpAE5hSG0Xl4XY3BShfT4aXGHCqNiBxN6iyTGkrv8azqzeTMuCOKZZ1lHjBZ5kdtj4.F_hmpP2legrsaaSe16gMqtqa5.FrM7yNuGQczvf1ep45loNu5MhI151HAk0k9T5UKDHdHXHcidlUt_ajlE64FUTSj26Rf6WwTg55n.xeliVOzxYygojzifx7hywAXmXMAqCpKADeDnSuEWqahc2_zDnpJxwy4444gh_o; __Secure-better-auth.state=FOj7ymeub1GeD3s4fiEbm9Hrd-hE0slR.oM0kHle4Je9FhUDPisXmPSHQvH4nkqldTe3kRBrTHJk%3D; __Secure-better-auth.session_token=5npdnyCa90buJBq2qW2wopL6nC3HjO4R.5v3gNhODuU7F0hbVXAJ%2BPFgMPsCPM0j8J%2BHk%2FrqsNdc%3D; __Secure-better-auth.session_data=eyJzZXNzaW9uIjp7InNlc3Npb24iOnsiZXhwaXJlc0F0IjoiMjAyNi0wMi0xOVQxNTowMzo0OC44MjNaIiwidG9rZW4iOiI1bnBkbnlDYTkwYnVKQnEycVcyd29wTDZuQzNIak80UiIsImNyZWF0ZWRBdCI6IjIwMjYtMDItMTJUMTU6MDM6NDguODIzWiIsInVwZGF0ZWRBdCI6IjIwMjYtMDItMTJUMTU6MDM6NDguODIzWiIsImlwQWRkcmVzcyI6IjE2Mi4xNTguNjMuMjQwIiwidXNlckFnZW50IjoiTW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzE0NC4wLjAuMCBTYWZhcmkvNTM3LjM2IiwidXNlcklkIjoiODM0YWZkYWEtOWFiYy00OGNkLTkwMzQtNzU4YTMzY2M3NTUxIiwiaW1wZXJzb25hdGVkQnkiOm51bGwsImlkIjoiNzk5ODJjMWMtZjQwOC00ODYyLWI0ZGEtMzI2ZTZkZmQ1NWU0In0sInVzZXIiOnsibmFtZSI6IkhpcmVuIEFoYWxhd2F0IiwiZW1haWwiOiJnaGc2NDI3MkBnbWFpbC5jb20iLCJlbWFpbFZlcmlmaWVkIjp0cnVlLCJpbWFnZSI6Imh0dHBzOi8vbGgzLmdvb2dsZXVzZXJjb250ZW50LmNvbS9hL0FDZzhvY0ozTVo3MjdKYzlJU244bERCcUplS2MyU0MxYXV5djFlbkV1bWxuTDhmR01CaEp0OGNUPXM5Ni1jIiwiY3JlYXRlZEF0IjoiMjAyNi0wMS0yNlQwNTo0NzoyNC43NzNaIiwidXBkYXRlZEF0IjoiMjAyNi0wMS0yNlQwNTo0NzoyNC43NzNaIiwicm9sZSI6ImVkaXRvciIsImJhbm5lZCI6ZmFsc2UsImJhblJlYXNvbiI6bnVsbCwiYmFuRXhwaXJlcyI6bnVsbCwiaWQiOiI4MzRhZmRhYS05YWJjLTQ4Y2QtOTAzNC03NThhMzNjYzc1NTEifX0sImV4cGlyZXNBdCI6MTc3MDkxMjIyODgzNCwic2lnbmF0dXJlIjoidXpNQWloYU9Sbk1QSnZ1V2VCMDdtOGcxSHliYVVrT2hLU05PS3JKSE96byJ9" | |
| ] | |
| def get_headers(): | |
| return { | |
| "accept": "*/*", | |
| "accept-language": "en-IN,en;q=0.9", | |
| "cookie": random.choice(COGNIX_COOKIES), | |
| "origin": "https://www.cognixai.co", | |
| "referer": f"https://www.cognixai.co/chat/{DEFAULT_SESSION_ID}", | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36" | |
| } | |
| # ================= AUTH & RATE LIMITING ================= | |
| def validate_access(): | |
| if request.path == "/" or not request.path.startswith("/v1/"): | |
| return | |
| # 1. Date Check (1 Month Validity) | |
| if datetime.now() > EXPIRATION_DATE: | |
| return jsonify({ | |
| "error": "Plan Expired", | |
| "message": "plan expired contact Admin", | |
| "code": 402 | |
| }), 402 | |
| auth_header = request.headers.get("Authorization", "") | |
| if not auth_header.startswith("Bearer "): | |
| return jsonify({"error": "Unauthorized", "message": "Missing API key"}), 401 | |
| api_key = auth_header.replace("Bearer ", "").strip().lower() | |
| if api_key not in AUTHORIZED_KEYS: | |
| return jsonify({"error": "Unauthorized", "message": "Invalid API key for Development Plan"}), 401 | |
| now = time.time() | |
| today = datetime.now().strftime("%Y-%m-%d") | |
| # 2. RPM Check (60 RPM) | |
| if api_key not in rpm_tracking: | |
| rpm_tracking[api_key] = [] | |
| rpm_tracking[api_key] = [t for t in rpm_tracking[api_key] if now - t < 60] | |
| if len(rpm_tracking[api_key]) >= RPM_LIMIT: | |
| return jsonify({"error": "Rate limit exceeded", "message": f"Limit: {RPM_LIMIT} RPM"}), 429 | |
| rpm_tracking[api_key].append(now) | |
| # 3. RPD Check for restricted models | |
| if request.method == "POST" and (request.path.endswith("/chat/completions") or request.path.endswith("/messages")): | |
| try: | |
| body = request.get_json(silent=True) or {} | |
| model_id = body.get("model", "").lower() | |
| matched_model = None | |
| for key in RPD_LIMITS: | |
| if key in model_id: | |
| matched_model = key | |
| break | |
| # Restricted models check | |
| if matched_model: | |
| if api_key not in rpd_tracking: | |
| rpd_tracking[api_key] = {} | |
| if matched_model not in rpd_tracking[api_key]: | |
| rpd_tracking[api_key][matched_model] = {} | |
| if today not in rpd_tracking[api_key][matched_model]: | |
| rpd_tracking[api_key][matched_model][today] = 0 | |
| if rpd_tracking[api_key][matched_model][today] >= RPD_LIMITS[matched_model]: | |
| return jsonify({ | |
| "error": "Daily quota reached", | |
| "message": f"Daily limit for '{matched_model}' reached (20 RPD)." | |
| }), 403 | |
| rpd_tracking[api_key][matched_model][today] += 1 | |
| # UNLIMITED for all other models | |
| except Exception as e: | |
| print(f"Error checking limits: {e}") | |
| # ================= CORE LOGIC ================= | |
| def parse_cognix_stream_chunk(line): | |
| if not line.strip(): return None, "content" | |
| if line.startswith("data: "): line = line[6:] | |
| if line.strip() == "[DONE]": return None, "stop" | |
| try: | |
| data = json.loads(line) | |
| content = data.get('text') or data.get('content') | |
| if not content: | |
| delta = data.get('delta') | |
| if isinstance(delta, str): content = delta | |
| elif isinstance(delta, dict): content = delta.get('text') or delta.get('content', '') | |
| return content or "", "content" | |
| except: | |
| if line.strip().startswith('{') and line.strip().endswith('}'): return "", "content" | |
| return line, "content" | |
| def chat_completions(): | |
| d = request.json | |
| model = d.get('model', 'anthropic/Claude Opus 4.6') | |
| messages = d.get('messages', []) | |
| system_prompt = next((m.get('content', '') for m in messages if m.get('role') == 'system'), "") | |
| filtered = [m for m in messages if m.get('role') != 'system'] | |
| prov, ver = model.split('/', 1) if '/' in model else ("anthropic", model) | |
| payload = { | |
| "id": str(uuid.uuid4()), | |
| "chatModel": {"provider": prov, "model": ver}, | |
| "message": {"role": "user", "parts": [{"type": "text", "text": f"[System]\n{system_prompt}\n\n[Message]\n{filtered[-1].get('content', '') if filtered else ''}"}], "id": str(uuid.uuid4())}, | |
| "allowedAppDefaultToolkit": ["webSearch"], | |
| "attachments": [] | |
| } | |
| if d.get('stream'): | |
| def gen(): | |
| cid = f"chatcmpl-{uuid.uuid4().hex[:24]}" | |
| yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'index': 0, 'delta': {'role': 'assistant'}}]})}\n\n" | |
| with requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers(), stream=True) as r: | |
| for line in r.iter_lines(decode_unicode=True): | |
| if not line: continue | |
| cont, pty = parse_cognix_stream_chunk(line) | |
| if pty == "stop": break | |
| if cont: | |
| yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'index': 0, 'delta': {'content': cont}}]})}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return Response(gen(), content_type='text/event-stream') | |
| r = requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers()) | |
| full_text = "".join([parse_cognix_stream_chunk(l)[0] or "" for l in r.text.strip().split('\n')]) | |
| return jsonify({"id": str(uuid.uuid4()), "object": "chat.completion", "choices": [{"message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}]}) | |
| def list_models(): | |
| return jsonify({ | |
| "object": "list", | |
| "data": [ | |
| {"id": "claude opus-4.6", "object": "model"}, | |
| {"id": "3-pro", "object": "model"}, | |
| {"id": "gpt-5.3 codex", "object": "model"}, | |
| {"id": "gpt-4o", "object": "model"}, | |
| {"id": "claude-3-5-sonnet", "object": "model"} | |
| ] | |
| }) | |
| def index(): | |
| return jsonify({ | |
| "status": "online", | |
| "plan": PLAN_NAME, | |
| "expires_on": EXPIRATION_DATE.strftime("%Y-%m-%d"), | |
| "rpm": RPM_LIMIT, | |
| "rpd_restricted_models": list(RPD_LIMITS.keys()), | |
| "other_models": "Unlimited" | |
| }) | |
| if __name__ == '__main__': | |
| print(f"🚀 {PLAN_NAME} Proxy starting on http://localhost:7860") | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |