CodexHF_API_Gateway / api_gateway.py
LordXido's picture
Update api_gateway.py
f32fc57 verified
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from datetime import datetime
import uuid
from codex_logic import (
list_capabilities,
invoke_capability,
wake_module
)
app = FastAPI(
title="CodexHF API Gateway",
version="1.0.0",
description="Sovereign orchestration gateway for Codex Spaces",
)
# -------------------------
# Human-facing landing page
# -------------------------
@app.get("/", response_class=HTMLResponse)
def root():
return """
<!DOCTYPE html>
<html>
<head>
<title>CodexHF Sovereign API Gateway</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="
background-color:#000;
color:#00ff00;
font-family:monospace;
display:flex;
align-items:center;
justify-content:center;
height:100vh;
margin:0;
text-align:center;
">
<div>
<h1>🧠 CodexHF Sovereign API Gateway</h1>
<p>✅ Status: Running</p>
<p>Use <code>/invoke</code> via POST</p>
<p>
<a href="/docs" style="color:#00ff00;">View API Documentation</a>
</p>
</div>
</body>
</html>
"""
# -------------------------
# Machine-readable status
# -------------------------
@app.get("/status")
def status():
return {
"service": "CodexHF API Gateway",
"status": "running",
"timestamp": datetime.utcnow().isoformat(),
"endpoints": {
"health": "/health",
"status": "/status",
"capabilities": "/capabilities",
"invoke": "/invoke",
"wake": "/wake",
"docs": "/docs"
}
}
# -------------------------
# Health check
# -------------------------
@app.get("/health")
def health():
return {
"status": "ok",
"timestamp": datetime.utcnow().isoformat()
}
# -------------------------
# Capability registry
# -------------------------
@app.get("/capabilities")
def capabilities():
return {
"capabilities": list_capabilities()
}
# -------------------------
# Invocation endpoint
# -------------------------
@app.post("/invoke")
async def invoke(request: dict):
trace_id = request.get("trace_id", str(uuid.uuid4()))
capability = request.get("capability")
payload = request.get("payload", {})
mode = request.get("execution_mode", "sync")
if not capability:
raise HTTPException(status_code=400, detail="Missing capability")
if mode == "async":
return {
"trace_id": trace_id,
"status": "accepted"
}
result, duration = await invoke_capability(capability, payload)
return {
"trace_id": trace_id,
"result": result,
"duration_ms": duration
}
# -------------------------
# Wake a downstream Space
# -------------------------
@app.post("/wake")
async def wake(request: dict):
module = request.get("module")
if not module:
raise HTTPException(status_code=400, detail="Missing module")
await wake_module(module)
return {
"status": "awakened",
"module": module
}