|
|
""" |
|
|
FastAPI backend for AnyCoder - provides REST API endpoints |
|
|
""" |
|
|
from fastapi import FastAPI, HTTPException, Header, WebSocket, WebSocketDisconnect |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import StreamingResponse |
|
|
from pydantic import BaseModel |
|
|
from typing import Optional, List, Dict, AsyncGenerator |
|
|
import json |
|
|
import asyncio |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
import sys |
|
|
import os |
|
|
from huggingface_hub import InferenceClient |
|
|
|
|
|
|
|
|
AVAILABLE_MODELS = [ |
|
|
{"name": "Sherlock Dash Alpha", "id": "openrouter/sherlock-dash-alpha", "description": "Sherlock Dash Alpha model via OpenRouter"}, |
|
|
{"name": "MiniMax M2", "id": "MiniMaxAI/MiniMax-M2", "description": "MiniMax M2 model via HuggingFace InferenceClient with Novita provider"}, |
|
|
{"name": "DeepSeek V3.2-Exp", "id": "deepseek-ai/DeepSeek-V3.2-Exp", "description": "DeepSeek V3.2 Experimental via HuggingFace"}, |
|
|
{"name": "DeepSeek R1", "id": "deepseek-ai/DeepSeek-R1-0528", "description": "DeepSeek R1 model for code generation"}, |
|
|
{"name": "GPT-5", "id": "gpt-5", "description": "OpenAI GPT-5 via OpenRouter"}, |
|
|
{"name": "Gemini Flash Latest", "id": "gemini-flash-latest", "description": "Google Gemini Flash via OpenRouter"}, |
|
|
{"name": "Qwen3 Max Preview", "id": "qwen3-max-preview", "description": "Qwen3 Max Preview via DashScope API"}, |
|
|
] |
|
|
|
|
|
LANGUAGE_CHOICES = ["html", "gradio", "transformers.js", "streamlit", "comfyui", "react"] |
|
|
|
|
|
app = FastAPI(title="AnyCoder API", version="1.0.0") |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["http://localhost:3000", "http://localhost:3001"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class CodeGenerationRequest(BaseModel): |
|
|
query: str |
|
|
language: str = "html" |
|
|
model_id: str = "openrouter/sherlock-dash-alpha" |
|
|
provider: str = "auto" |
|
|
history: List[List[str]] = [] |
|
|
agent_mode: bool = False |
|
|
|
|
|
|
|
|
class DeploymentRequest(BaseModel): |
|
|
code: str |
|
|
space_name: str |
|
|
language: str |
|
|
requirements: Optional[str] = None |
|
|
|
|
|
|
|
|
class AuthStatus(BaseModel): |
|
|
authenticated: bool |
|
|
username: Optional[str] = None |
|
|
message: str |
|
|
|
|
|
|
|
|
class ModelInfo(BaseModel): |
|
|
name: str |
|
|
id: str |
|
|
description: str |
|
|
|
|
|
|
|
|
class CodeGenerationResponse(BaseModel): |
|
|
code: str |
|
|
history: List[List[str]] |
|
|
status: str |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MockAuth: |
|
|
def __init__(self, token: Optional[str] = None): |
|
|
self.token = token |
|
|
|
|
|
if token and token.startswith("dev_token_"): |
|
|
|
|
|
parts = token.split("_") |
|
|
self.username = parts[2] if len(parts) > 2 else "user" |
|
|
else: |
|
|
self.username = "user" if token else None |
|
|
|
|
|
def is_authenticated(self): |
|
|
|
|
|
return bool(self.token) |
|
|
|
|
|
|
|
|
def get_auth_from_header(authorization: Optional[str] = None): |
|
|
"""Extract authentication from header""" |
|
|
if not authorization: |
|
|
return MockAuth(None) |
|
|
|
|
|
|
|
|
if authorization.startswith("Bearer "): |
|
|
token = authorization.replace("Bearer ", "") |
|
|
else: |
|
|
token = authorization |
|
|
|
|
|
return MockAuth(token) |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
"""Health check endpoint""" |
|
|
return {"status": "ok", "message": "AnyCoder API is running"} |
|
|
|
|
|
|
|
|
@app.get("/api/models", response_model=List[ModelInfo]) |
|
|
async def get_models(): |
|
|
"""Get available AI models""" |
|
|
return [ |
|
|
ModelInfo( |
|
|
name=model["name"], |
|
|
id=model["id"], |
|
|
description=model["description"] |
|
|
) |
|
|
for model in AVAILABLE_MODELS |
|
|
] |
|
|
|
|
|
|
|
|
@app.get("/api/languages") |
|
|
async def get_languages(): |
|
|
"""Get available programming languages/frameworks""" |
|
|
return {"languages": LANGUAGE_CHOICES} |
|
|
|
|
|
|
|
|
@app.get("/api/auth/status") |
|
|
async def auth_status(authorization: Optional[str] = Header(None)): |
|
|
"""Check authentication status""" |
|
|
auth = get_auth_from_header(authorization) |
|
|
if auth.is_authenticated(): |
|
|
return AuthStatus( |
|
|
authenticated=True, |
|
|
username=auth.username, |
|
|
message=f"Authenticated as {auth.username}" |
|
|
) |
|
|
return AuthStatus( |
|
|
authenticated=False, |
|
|
username=None, |
|
|
message="Not authenticated" |
|
|
) |
|
|
|
|
|
|
|
|
@app.get("/api/generate") |
|
|
async def generate_code( |
|
|
query: str, |
|
|
language: str = "html", |
|
|
model_id: str = "openrouter/sherlock-dash-alpha", |
|
|
provider: str = "auto", |
|
|
authorization: Optional[str] = Header(None) |
|
|
): |
|
|
"""Generate code based on user query - returns streaming response""" |
|
|
|
|
|
|
|
|
|
|
|
async def event_stream() -> AsyncGenerator[str, None]: |
|
|
"""Stream generated code chunks""" |
|
|
try: |
|
|
|
|
|
selected_model = None |
|
|
for model in AVAILABLE_MODELS: |
|
|
if model["id"] == model_id: |
|
|
selected_model = model |
|
|
break |
|
|
|
|
|
if not selected_model: |
|
|
selected_model = AVAILABLE_MODELS[0] |
|
|
|
|
|
|
|
|
generated_code = "" |
|
|
|
|
|
|
|
|
system_prompt = "You are a helpful AI assistant that generates code based on user requirements. Generate clean, well-commented code." |
|
|
|
|
|
|
|
|
actual_model_id = selected_model["id"] |
|
|
|
|
|
|
|
|
if actual_model_id.startswith("openrouter/"): |
|
|
|
|
|
api_key = os.getenv("OPENROUTER_API_KEY") or os.getenv("HF_TOKEN") |
|
|
client = InferenceClient(api_key=api_key, provider="openai", base_url="https://openrouter.ai/api/v1") |
|
|
|
|
|
elif actual_model_id == "MiniMaxAI/MiniMax-M2": |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
if not hf_token: |
|
|
error_data = json.dumps({ |
|
|
"type": "error", |
|
|
"message": "HF_TOKEN environment variable not set. Please set it in your terminal.", |
|
|
"timestamp": datetime.now().isoformat() |
|
|
}) |
|
|
yield f"data: {error_data}\n\n" |
|
|
return |
|
|
|
|
|
|
|
|
from openai import OpenAI |
|
|
client = OpenAI( |
|
|
base_url="https://router.huggingface.co/v1", |
|
|
api_key=hf_token, |
|
|
default_headers={ |
|
|
"X-HF-Bill-To": "huggingface" |
|
|
} |
|
|
) |
|
|
|
|
|
actual_model_id = "MiniMaxAI/MiniMax-M2:novita" |
|
|
elif actual_model_id.startswith("deepseek-ai/"): |
|
|
|
|
|
client = InferenceClient(token=os.getenv("HF_TOKEN")) |
|
|
elif actual_model_id == "qwen3-max-preview": |
|
|
|
|
|
|
|
|
client = InferenceClient(token=os.getenv("HF_TOKEN")) |
|
|
else: |
|
|
|
|
|
client = InferenceClient(token=os.getenv("HF_TOKEN")) |
|
|
|
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": f"Generate a {language} application: {query}"} |
|
|
] |
|
|
|
|
|
|
|
|
try: |
|
|
stream = client.chat.completions.create( |
|
|
model=actual_model_id, |
|
|
messages=messages, |
|
|
temperature=0.7, |
|
|
max_tokens=10000, |
|
|
stream=True |
|
|
) |
|
|
|
|
|
for chunk in stream: |
|
|
|
|
|
if (hasattr(chunk, 'choices') and |
|
|
chunk.choices and |
|
|
len(chunk.choices) > 0 and |
|
|
hasattr(chunk.choices[0], 'delta') and |
|
|
hasattr(chunk.choices[0].delta, 'content') and |
|
|
chunk.choices[0].delta.content): |
|
|
content = chunk.choices[0].delta.content |
|
|
generated_code += content |
|
|
|
|
|
|
|
|
event_data = json.dumps({ |
|
|
"type": "chunk", |
|
|
"content": content, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
}) |
|
|
yield f"data: {event_data}\n\n" |
|
|
await asyncio.sleep(0) |
|
|
|
|
|
|
|
|
completion_data = json.dumps({ |
|
|
"type": "complete", |
|
|
"code": generated_code, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
}) |
|
|
yield f"data: {completion_data}\n\n" |
|
|
|
|
|
except Exception as e: |
|
|
error_data = json.dumps({ |
|
|
"type": "error", |
|
|
"message": str(e), |
|
|
"timestamp": datetime.now().isoformat() |
|
|
}) |
|
|
yield f"data: {error_data}\n\n" |
|
|
|
|
|
except Exception as e: |
|
|
error_data = json.dumps({ |
|
|
"type": "error", |
|
|
"message": f"Generation error: {str(e)}", |
|
|
"timestamp": datetime.now().isoformat() |
|
|
}) |
|
|
yield f"data: {error_data}\n\n" |
|
|
|
|
|
return StreamingResponse( |
|
|
event_stream(), |
|
|
media_type="text/event-stream", |
|
|
headers={ |
|
|
"Cache-Control": "no-cache", |
|
|
"Connection": "keep-alive", |
|
|
"X-Accel-Buffering": "no" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
@app.post("/api/deploy") |
|
|
async def deploy( |
|
|
request: DeploymentRequest, |
|
|
authorization: Optional[str] = Header(None) |
|
|
): |
|
|
"""Deploy generated code to HuggingFace Spaces""" |
|
|
auth = get_auth_from_header(authorization) |
|
|
|
|
|
if not auth.is_authenticated(): |
|
|
raise HTTPException(status_code=401, detail="Authentication required") |
|
|
|
|
|
|
|
|
if auth.token and auth.token.startswith("dev_token_"): |
|
|
|
|
|
import urllib.parse |
|
|
base_url = "https://huggingface.co/new-space" |
|
|
|
|
|
|
|
|
language_to_sdk = { |
|
|
"gradio": "gradio", |
|
|
"streamlit": "docker", |
|
|
"react": "docker", |
|
|
"html": "static", |
|
|
"transformers.js": "static", |
|
|
"comfyui": "static" |
|
|
} |
|
|
sdk = language_to_sdk.get(request.language, "gradio") |
|
|
|
|
|
params = urllib.parse.urlencode({ |
|
|
"name": request.space_name or "my-anycoder-app", |
|
|
"sdk": sdk |
|
|
}) |
|
|
|
|
|
|
|
|
if request.language in ["html", "transformers.js", "comfyui"]: |
|
|
file_path = "index.html" |
|
|
else: |
|
|
file_path = "app.py" |
|
|
|
|
|
files_params = urllib.parse.urlencode({ |
|
|
"files[0][path]": file_path, |
|
|
"files[0][content]": request.code |
|
|
}) |
|
|
|
|
|
space_url = f"{base_url}?{params}&{files_params}" |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"space_url": space_url, |
|
|
"message": "Dev mode: Please create the space manually", |
|
|
"dev_mode": True |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
from huggingface_hub import HfApi |
|
|
import tempfile |
|
|
import uuid |
|
|
|
|
|
|
|
|
user_token = auth.token if auth.token else os.getenv("HF_TOKEN") |
|
|
|
|
|
if not user_token: |
|
|
raise HTTPException(status_code=401, detail="No HuggingFace token available") |
|
|
|
|
|
|
|
|
api = HfApi(token=user_token) |
|
|
|
|
|
|
|
|
space_name = request.space_name or f"anycoder-{uuid.uuid4().hex[:8]}" |
|
|
repo_id = f"{auth.username}/{space_name}" |
|
|
|
|
|
|
|
|
language_to_sdk = { |
|
|
"gradio": "gradio", |
|
|
"streamlit": "docker", |
|
|
"react": "docker", |
|
|
"html": "static", |
|
|
"transformers.js": "static", |
|
|
"comfyui": "static" |
|
|
} |
|
|
sdk = language_to_sdk.get(request.language, "gradio") |
|
|
|
|
|
|
|
|
try: |
|
|
api.create_repo( |
|
|
repo_id=repo_id, |
|
|
repo_type="space", |
|
|
space_sdk=sdk, |
|
|
exist_ok=False |
|
|
) |
|
|
except Exception as e: |
|
|
if "already exists" in str(e).lower(): |
|
|
|
|
|
pass |
|
|
else: |
|
|
raise |
|
|
|
|
|
|
|
|
if request.language in ["html", "transformers.js", "comfyui"]: |
|
|
file_name = "index.html" |
|
|
else: |
|
|
file_name = "app.py" |
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=f".{file_name.split('.')[-1]}", delete=False) as f: |
|
|
f.write(request.code) |
|
|
temp_path = f.name |
|
|
|
|
|
try: |
|
|
api.upload_file( |
|
|
path_or_fileobj=temp_path, |
|
|
path_in_repo=file_name, |
|
|
repo_id=repo_id, |
|
|
repo_type="space" |
|
|
) |
|
|
|
|
|
|
|
|
if request.language == "gradio": |
|
|
|
|
|
requirements = "gradio>=4.0.0\n" |
|
|
with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as req_f: |
|
|
req_f.write(requirements) |
|
|
req_temp_path = req_f.name |
|
|
|
|
|
try: |
|
|
api.upload_file( |
|
|
path_or_fileobj=req_temp_path, |
|
|
path_in_repo="requirements.txt", |
|
|
repo_id=repo_id, |
|
|
repo_type="space" |
|
|
) |
|
|
finally: |
|
|
os.unlink(req_temp_path) |
|
|
|
|
|
space_url = f"https://huggingface.co/spaces/{repo_id}" |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"space_url": space_url, |
|
|
"message": f"✅ Deployed successfully to {repo_id}!" |
|
|
} |
|
|
|
|
|
finally: |
|
|
os.unlink(temp_path) |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Deployment failed: {str(e)}") |
|
|
|
|
|
|
|
|
@app.websocket("/ws/generate") |
|
|
async def websocket_generate(websocket: WebSocket): |
|
|
"""WebSocket endpoint for real-time code generation""" |
|
|
await websocket.accept() |
|
|
|
|
|
try: |
|
|
while True: |
|
|
|
|
|
data = await websocket.receive_json() |
|
|
|
|
|
query = data.get("query") |
|
|
language = data.get("language", "html") |
|
|
model_id = data.get("model_id", "openrouter/sherlock-dash-alpha") |
|
|
|
|
|
|
|
|
await websocket.send_json({ |
|
|
"type": "status", |
|
|
"message": "Generating code..." |
|
|
}) |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
sample_code = f"<!-- Generated {language} code -->\n<h1>Hello from AnyCoder!</h1>" |
|
|
|
|
|
for i, char in enumerate(sample_code): |
|
|
await websocket.send_json({ |
|
|
"type": "chunk", |
|
|
"content": char, |
|
|
"progress": (i + 1) / len(sample_code) * 100 |
|
|
}) |
|
|
await asyncio.sleep(0.01) |
|
|
|
|
|
|
|
|
await websocket.send_json({ |
|
|
"type": "complete", |
|
|
"code": sample_code |
|
|
}) |
|
|
|
|
|
except WebSocketDisconnect: |
|
|
print("Client disconnected") |
|
|
except Exception as e: |
|
|
await websocket.send_json({ |
|
|
"type": "error", |
|
|
"message": str(e) |
|
|
}) |
|
|
await websocket.close() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run("backend_api:app", host="0.0.0.0", port=8000, reload=True) |
|
|
|
|
|
|