Spaces:
Running
Running
| """ | |
| K-AI API — Main Application | |
| --------------------------- | |
| Free AI proxy API. No signup, no API keys on the AI side. | |
| Users send a message, get an AI response from the best available provider. | |
| Run with: uvicorn main:app --host 0.0.0.0 --port 8000 | |
| """ | |
| import os | |
| import sys | |
| print("🔥 STARTING K-AI API... 🔥", file=sys.stderr) | |
| print(f"Current Working Directory: {os.getcwd()}", file=sys.stderr) | |
| print(f"Directory Contents: {os.listdir('.')}", file=sys.stderr) | |
| # Fix for Vercel Read-Only File System | |
| # Must be set BEFORE importing g4f or engine | |
| if os.environ.get("VERCEL") or os.environ.get("AWS_LAMBDA_FUNCTION_NAME"): | |
| os.environ["HOME"] = "/tmp" | |
| import logging | |
| from datetime import datetime | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.openapi.docs import get_swagger_ui_html | |
| from config import ( | |
| API_TITLE, | |
| API_DESCRIPTION, | |
| API_VERSION, | |
| CORS_ORIGINS, | |
| CORS_METHODS, | |
| CORS_HEADERS, | |
| ) | |
| from models import ( | |
| ChatRequest, | |
| ChatResponse, | |
| ErrorResponse, | |
| ModelsResponse, | |
| HealthResponse, | |
| ProviderHealth, | |
| ) | |
| from models import ( | |
| ChatRequest, | |
| ChatResponse, | |
| ErrorResponse, | |
| ModelsResponse, | |
| HealthResponse, | |
| ProviderHealth, | |
| ) | |
| from services import engine, search_engine | |
| from v1_router import router as v1_router | |
| from admin_router import router as admin_router | |
| # ---------- Logging ---------- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", | |
| ) | |
| logger = logging.getLogger("kai_api") | |
| # ---------- App ---------- | |
| app = FastAPI( | |
| title=API_TITLE, | |
| description=API_DESCRIPTION, | |
| version=API_VERSION, | |
| docs_url=None, # Disable default docs to serve custom one | |
| redoc_url=None, | |
| ) | |
| # Mount static files (for CSS/JS if needed later) | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Error Handling (Global) | |
| from error_handling import openai_error | |
| async def http_exception_handler(request: Request, exc: HTTPException): | |
| """ | |
| Override default 404/401/500 to return OpenAI-style JSON. | |
| """ | |
| code = "invalid_request_error" | |
| if exc.status_code == 401: code = "invalid_api_key" | |
| if exc.status_code == 429: code = "insufficient_quota" | |
| if exc.status_code == 404: code = "model_not_found" | |
| if exc.status_code == 500: code = "internal_server_error" | |
| return openai_error( | |
| message=exc.detail, | |
| code=code, | |
| status_code=exc.status_code | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=CORS_ORIGINS, | |
| allow_credentials=True, | |
| allow_methods=CORS_METHODS, | |
| allow_headers=CORS_HEADERS, | |
| ) | |
| # AI Engine (initialized via services.py) | |
| # engine = AIEngine() -> Moved to services.py | |
| # search_engine = SearchEngine() -> Moved to services.py | |
| # Include OpenAI Router | |
| app.include_router(v1_router) | |
| app.include_router(admin_router) | |
| # ---------- Admin Routes ---------- | |
| async def admin_page(): | |
| """Serve the Secret Admin Dashboard.""" | |
| return FileResponse("static/admin.html") | |
| async def admin_stats(): | |
| """Return raw model stats for dashboard.""" | |
| return JSONResponse(engine.get_stats()) | |
| async def admin_test_all(): | |
| """Trigger parallel testing of all models.""" | |
| results = await engine.test_all_models() | |
| return JSONResponse(results) | |
| async def admin_clear_stats(): | |
| """Clear all stats.""" | |
| engine.clear_stats() | |
| return JSONResponse({"status": "cleared"}) | |
| async def admin_debug_g4f(): | |
| """ | |
| Verbose Debug for G4F Provider on Server. | |
| Captures logs to see which providers are failing. | |
| """ | |
| import io | |
| import sys | |
| import logging | |
| from contextlib import redirect_stdout, redirect_stderr | |
| import g4f | |
| from g4f.client import AsyncClient | |
| from useragent import get_random_user_agent | |
| # Enable logging | |
| g4f.debug.logging = True | |
| # Capture output | |
| f = io.StringIO() | |
| # We must also redirect logging stream if possible, or just rely on stdout/stderr | |
| # g4f logs to stderr mostly | |
| with redirect_stdout(f), redirect_stderr(f): | |
| print("=== G4F DEBUG LOG ===") | |
| print(f"G4F Version: {g4f.version}") | |
| ua = get_random_user_agent() | |
| print(f"Testing with User-Agent: {ua[:30]}...") | |
| # We need to simulate the environment | |
| import os | |
| os.environ["HOME"] = "/tmp" | |
| # Test 1: GPT-4o-mini | |
| print("\n--- Testing gpt-4o-mini ---") | |
| try: | |
| client = AsyncClient(headers={"User-Agent": ua}) | |
| response = await client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": "Hi"}], | |
| ) | |
| print(f"✅ Success! Provider: {response.provider}") | |
| print(f"Response: {response.choices[0].message.content[:20]}...") | |
| except Exception as e: | |
| print(f"❌ Failed: {e}") | |
| # Test 2: GPT-4 | |
| print("\n--- Testing gpt-4 ---") | |
| try: | |
| client = AsyncClient(headers={"User-Agent": ua}) | |
| response = await client.chat.completions.create( | |
| model="gpt-4", | |
| messages=[{"role": "user", "content": "Hi"}], | |
| ) | |
| print(f"✅ Success! Provider: {response.provider}") | |
| print(f"Response: {response.choices[0].message.content[:20]}...") | |
| except Exception as e: | |
| print(f"❌ Failed: {e}") | |
| output = f.getvalue() | |
| return HTMLResponse(f"<pre>{output}</pre>") | |
| # ---------- Custom Swagger UI ---------- | |
| async def custom_swagger_ui_html(): | |
| """Serve custom dark-themed Swagger UI.""" | |
| return get_swagger_ui_html( | |
| openapi_url=app.openapi_url, | |
| title=f"{API_TITLE} - Swagger UI", | |
| swagger_css_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5/swagger-ui.css", | |
| swagger_js_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5/swagger-ui-bundle.js", | |
| swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", | |
| ) | |
| # ---------- Search Routes ---------- | |
| async def search_endpoint(request: Request): | |
| """ | |
| Perform a standard web search. | |
| Body: {"query": "something", "limit": 10} | |
| """ | |
| try: | |
| data = await request.json() | |
| query = data.get("query") | |
| limit = data.get("limit", 10) | |
| if not query: | |
| return JSONResponse({"error": "Query required"}, status_code=400) | |
| results = search_engine.simple_search(query, max_results=limit) | |
| return JSONResponse({"results": results}) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def deep_research_endpoint(request: Request): | |
| """ | |
| Perform Deep Research: Search -> Scrape -> Synthesize. | |
| Body: {"query": "complex topic"} | |
| """ | |
| try: | |
| data = await request.json() | |
| query = data.get("query") | |
| if not query: | |
| return JSONResponse({"error": "Query required"}, status_code=400) | |
| # 1. Gather Content (Offload to thread) | |
| import asyncio | |
| loop = asyncio.get_event_loop() | |
| # Gather context (Search + Scrape) | |
| raw_context = await loop.run_in_executor(None, search_engine.deep_research_gather, query) | |
| if not raw_context: | |
| return JSONResponse({"report": "No relevant information found."}) | |
| # 2. Return Raw Context (No AI Synthesis to avoid Hallucination) | |
| return JSONResponse({ | |
| "report": "### Deep Research Results\n\n" + raw_context, | |
| "model": "scraper-v1", | |
| "sources": "Derived from DuckDuckGo Search (Raw Content)" | |
| }) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| # ---------- Routes ---------- | |
| async def root(): | |
| """Serve the HTML Dashboard.""" | |
| return FileResponse("static/docs.html") | |
| async def list_models(): | |
| """List all available AI models across all providers.""" | |
| if not engine: | |
| return ModelsResponse(models=[], total=0) | |
| models = engine.get_all_models() | |
| return ModelsResponse( | |
| models=models, | |
| total=len(models), | |
| ) | |
| async def health_check(): | |
| """ | |
| Run health checks on all providers. | |
| """ | |
| if not engine: | |
| return HealthResponse( | |
| status="unhealthy", | |
| version="2.0.0", | |
| uptime=0, # TODO: Track uptime | |
| providers={}, | |
| error="AI Engine failed to initialize (check logs)" | |
| ) | |
| # Get provider health | |
| logger.info("Running health checks...") | |
| results = await engine.health_check_all() | |
| providers = [ | |
| ProviderHealth( | |
| provider=r["provider"], | |
| status=r["status"], | |
| response_time_ms=r.get("response_time_ms"), | |
| error=r.get("error"), | |
| ) | |
| for r in results | |
| ] | |
| # Determine overall status | |
| healthy_count = sum(1 for p in providers if p.status == "healthy") | |
| if healthy_count == len(providers): | |
| overall = "healthy" | |
| elif healthy_count > 0: | |
| overall = "degraded" | |
| else: | |
| overall = "unhealthy" | |
| return HealthResponse( | |
| status=overall, | |
| providers=providers, | |
| timestamp=datetime.utcnow().isoformat() + "Z", | |
| ) | |