inference / api.py
nurulajt's picture
Update api.py
c240d75 verified
"""
Embedding Inference API
Supports JobBERT v2/v3, Jina AI, and Voyage AI embeddings
Compatible with Elasticsearch inference endpoint format
"""
from fastapi import FastAPI, HTTPException, Query, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Union
from sentence_transformers import SentenceTransformer
import os
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="Embedding Inference API",
description="Generate embeddings using JobBERT v2/v3, Jina AI, or Voyage AI",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
MODELS = {}
VOYAGE_API_KEY = os.environ.get('VOYAGE_API_KEY', '')
FIREWORKS_API_KEY = os.environ.get('FIREWORKS_API_KEY', '')
OPENROUTER_API_KEY = os.environ.get('OPENROUTER_API_KEY', '')
API_KEY = os.environ.get('API_KEY', '')
REQUIRE_API_KEY = os.environ.get('REQUIRE_API_KEY', 'false').lower() == 'true'
# Set cache directories to writable location (important for Docker/HF Spaces)
os.environ['TRANSFORMERS_CACHE'] = os.environ.get('TRANSFORMERS_CACHE', '/tmp/transformers_cache')
os.environ['HF_HOME'] = os.environ.get('HF_HOME', '/tmp/huggingface')
os.environ['SENTENCE_TRANSFORMERS_HOME'] = os.environ.get('SENTENCE_TRANSFORMERS_HOME', '/tmp/sentence_transformers')
# Create cache directories if they don't exist
for cache_dir in [os.environ['TRANSFORMERS_CACHE'], os.environ['HF_HOME'], os.environ['SENTENCE_TRANSFORMERS_HOME']]:
os.makedirs(cache_dir, exist_ok=True)
security = HTTPBearer(auto_error=False)
voyage_client = None
fireworks_available = False
openrouter_available = False
logger.info(f"API Key authentication: {'ENABLED' if REQUIRE_API_KEY else 'DISABLED'}")
if API_KEY:
logger.info(f"✓ API Key configured (length: {len(API_KEY)})")
else:
logger.info("ℹ️ No API Key set")
if VOYAGE_API_KEY:
try:
import voyageai
voyage_client = voyageai.Client(api_key=VOYAGE_API_KEY)
logger.info("✓ Voyage AI client initialized")
except ImportError:
logger.warning("⚠️ voyageai package not installed")
except Exception as e:
logger.warning(f"⚠️ Voyage AI initialization failed: {e}")
if FIREWORKS_API_KEY:
try:
import requests
# Test Fireworks AI connection
test_response = requests.get(
"https://api.fireworks.ai/inference/v1/models",
headers={"Authorization": f"Bearer {FIREWORKS_API_KEY}"},
timeout=5
)
if test_response.status_code in [200, 401, 403]: # 401/403 means auth works, just list might be restricted
fireworks_available = True
logger.info("✓ Fireworks AI API key configured (Qwen3 available)")
else:
logger.warning(f"⚠️ Fireworks AI API key validation unclear (status: {test_response.status_code})")
# Still mark as available - the embeddings endpoint might work
fireworks_available = True
except ImportError:
logger.warning("⚠️ requests package not installed (needed for Fireworks AI)")
except Exception as e:
logger.warning(f"⚠️ Fireworks AI validation failed: {e}")
# Still mark as available if key is set
fireworks_available = True if FIREWORKS_API_KEY else False
if OPENROUTER_API_KEY:
try:
import requests
openrouter_available = True
logger.info("✓ OpenRouter API key configured (Qwen3, text-embedding-3-small, and more available)")
except ImportError:
logger.warning("⚠️ requests package not installed (needed for OpenRouter)")
except Exception as e:
logger.warning(f"⚠️ OpenRouter validation failed: {e}")
openrouter_available = True if OPENROUTER_API_KEY else False
def load_models():
"""Load embedding models on startup (gracefully handles failures)"""
# JobBERT-v2
try:
logger.info("Loading JobBERT-v2...")
# MODELS['jobbertv2'] = SentenceTransformer('TechWolf/JobBERT-v2')
logger.info("✓ JobBERT-v2 loaded")
except Exception as e:
logger.warning(f"⚠️ JobBERT-v2 not loaded: {e}")
# JobBERT-v3
try:
logger.info("Loading JobBERT-v3...")
MODELS['jobbertv3'] = SentenceTransformer('TechWolf/JobBERT-v3')
logger.info("✓ JobBERT-v3 loaded")
except Exception as e:
logger.warning(f"⚠️ JobBERT-v3 not loaded: {e}")
# Jina AI
try:
logger.info("Loading Jina AI embeddings-v3...")
MODELS['jina'] = SentenceTransformer('jinaai/jina-embeddings-v3', trust_remote_code=True)
logger.info("✓ Jina AI v3 loaded")
except Exception as e:
logger.warning(f"⚠️ Jina AI v3 not loaded: {e}")
# Qwen3-Embedding-8B via Fireworks AI or OpenRouter (API-based, no download needed!)
if fireworks_available:
MODELS['qwen3'] = 'fireworks' # Mark as available via Fireworks AI
logger.info("✓ Qwen3-Embedding-8B available via Fireworks AI API (MTEB #1, no local model needed)")
elif openrouter_available:
MODELS['qwen3'] = 'openrouter' # Mark as available via OpenRouter
logger.info("✓ Qwen3-Embedding-8B available via OpenRouter API (MTEB #1, no local model needed)")
else:
logger.warning("⚠️ Qwen3-Embedding-8B not available")
logger.warning(" To enable: Set FIREWORKS_API_KEY or OPENROUTER_API_KEY environment variable")
logger.warning(" Fireworks: https://fireworks.ai | OpenRouter: https://openrouter.ai")
logger.warning(" This avoids 15GB local download!")
# Check if at least one model loaded
if not MODELS:
error_msg = "No embedding models could be loaded! Check logs above for details."
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"Loaded models: {list(MODELS.keys())}")
logger.info("API ready!")
async def verify_api_key(credentials: Optional[HTTPAuthorizationCredentials] = Security(security)):
"""Verify API key from Authorization header"""
if not REQUIRE_API_KEY:
return True
if not API_KEY:
raise HTTPException(
status_code=500,
detail="API key authentication is enabled but no API key is configured on the server"
)
if credentials is None:
raise HTTPException(
status_code=401,
detail="Missing authentication credentials. Use: Authorization: Bearer YOUR_API_KEY"
)
if credentials.credentials != API_KEY:
raise HTTPException(
status_code=403,
detail="Invalid API key"
)
return True
def estimate_token_count(texts: List[str]) -> int:
"""Estimate token count for input texts (rough approximation)"""
# Simple estimation: ~1 token per 4 characters
total_chars = sum(len(text) for text in texts)
return max(1, total_chars // 4)
def get_fireworks_embeddings(texts: List[str], task: Optional[str] = None) -> List[List[float]]:
"""
Get embeddings from Fireworks AI Qwen3-Embedding-8B
Args:
texts: List of texts to embed
task: Optional task type ('query' for instruction-aware)
Returns:
List of embedding vectors (4096-dim each)
"""
import requests
import json
if not FIREWORKS_API_KEY:
raise Exception("FIREWORKS_API_KEY not configured")
# Fireworks AI embeddings endpoint
response = requests.post(
"https://api.fireworks.ai/inference/v1/embeddings",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {FIREWORKS_API_KEY}"
},
data=json.dumps({
"model": "accounts/fireworks/models/qwen3-embedding-8b",
"input": texts
}),
timeout=30
)
if response.status_code != 200:
raise Exception(f"Fireworks AI API error: {response.status_code} - {response.text}")
result = response.json()
embeddings = [item["embedding"] for item in result["data"]]
return embeddings
def get_openrouter_embeddings(texts: List[str], model: str = "qwen/qwen3-embedding-8b") -> List[List[float]]:
"""
Get embeddings from OpenRouter API
Args:
texts: List of texts to embed
model: Model to use (default: qwen/qwen3-embedding-8b)
Also supports: openai/text-embedding-3-small, openai/text-embedding-3-large
Returns:
List of embedding vectors
"""
import requests
if not OPENROUTER_API_KEY:
raise Exception("OPENROUTER_API_KEY not configured")
response = requests.post(
"https://openrouter.ai/api/v1/embeddings",
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json"
},
json={
"model": model,
"input": texts
},
timeout=30
)
if response.status_code != 200:
raise Exception(f"OpenRouter API error: {response.status_code} - {response.text}")
result = response.json()
embeddings = [item["embedding"] for item in result["data"]]
return embeddings
@app.on_event("startup")
async def startup_event():
load_models()
class ElasticsearchInferenceRequest(BaseModel):
input: Union[str, List[str]] = Field(..., description="Text or list of texts to embed")
class Config:
schema_extra = {
"example": {
"input": "Software Engineer"
}
}
class EmbeddingObject(BaseModel):
object: str = Field("embedding", description="Object type")
index: int = Field(..., description="Index of the embedding")
embedding: List[float] = Field(..., description="Embedding vector")
class UsageInfo(BaseModel):
total_tokens: int = Field(..., description="Total tokens processed")
prompt_tokens: int = Field(..., description="Prompt tokens")
class OpenAIEmbeddingResponse(BaseModel):
model: str = Field(..., description="Model used for embeddings")
object: str = Field("list", description="Object type")
usage: UsageInfo = Field(..., description="Token usage information")
data: List[EmbeddingObject] = Field(..., description="List of embeddings")
# Legacy response models (kept for backward compatibility if needed)
class ElasticsearchInferenceResponse(BaseModel):
embedding: List[float] = Field(..., description="Embedding vector for single input")
class ElasticsearchInferenceBatchResponse(BaseModel):
embeddings: List[List[float]] = Field(..., description="List of embedding vectors for batch input")
class BatchEmbeddingRequest(BaseModel):
texts: List[str] = Field(..., description="List of texts to embed", min_items=1)
model: str = Field(..., description="Model to use: 'jobbertv2', 'jobbertv3', 'jina', or 'voyage'")
task: Optional[str] = Field(None, description="Task type for Jina AI: 'retrieval.query', 'retrieval.passage', 'text-matching', etc.")
input_type: Optional[str] = Field(None, description="Input type for Voyage AI: 'document' or 'query'")
class Config:
schema_extra = {
"example": {
"texts": ["Software Engineer", "Data Scientist"],
"model": "jobbertv3",
"task": "text-matching"
}
}
class BatchEmbeddingResponse(BaseModel):
embeddings: List[List[float]] = Field(..., description="List of embedding vectors")
model: str = Field(..., description="Model used")
dimension: int = Field(..., description="Embedding dimension")
num_texts: int = Field(..., description="Number of texts processed")
class HealthResponse(BaseModel):
status: str
models_loaded: List[str]
voyage_available: bool
fireworks_available: bool
openrouter_available: bool
api_key_required: bool
@app.get("/", response_model=dict)
async def root():
"""Root endpoint with API information"""
return {
"message": "Embedding Inference API",
"version": "1.0.0",
"endpoints": {
"/health": "Health check and available models",
"/embed": "Generate embeddings - Elasticsearch compatible (POST)",
"/embed/batch": "Generate batch embeddings (POST)",
"/models": "List available models",
"/docs": "API documentation"
}
}
@app.get("/health", response_model=HealthResponse)
async def health():
"""Health check endpoint (no authentication required)"""
models_loaded = list(MODELS.keys())
return {
"status": "healthy",
"models_loaded": models_loaded,
"voyage_available": voyage_client is not None,
"fireworks_available": fireworks_available,
"openrouter_available": openrouter_available,
"api_key_required": REQUIRE_API_KEY
}
@app.post("/embed", response_model=OpenAIEmbeddingResponse)
async def create_embeddings_elasticsearch(
request: ElasticsearchInferenceRequest,
model: str = Query("jobbertv3", description="Model: jobbertv2, jobbertv3, jina, or voyage"),
task: Optional[str] = Query(None, description="Task for Jina AI: retrieval.query, retrieval.passage, text-matching, etc."),
input_type: Optional[str] = Query(None, description="Input type for Voyage AI: document or query"),
authenticated: bool = Depends(verify_api_key)
):
"""
Generate embeddings - Elasticsearch inference endpoint compatible format
**Usage:**
- Single text: `POST /embed?model=jobbertv3` with body `{"input": "Software Engineer"}`
- Multiple texts: `POST /embed?model=jina` with body `{"input": ["text1", "text2"]}`
**Models (via query parameter):**
- `jobbertv2`: JobBERT-v2 (768-dim, job-specific)
- `jobbertv3`: JobBERT-v3 (768-dim, job-specific, improved performance) - default
- `jina`: Jina AI embeddings-v3 (1024-dim, general purpose)
- `qwen3`: Qwen3-Embedding-8B (4096-dim, MTEB #1, multilingual, 32k context, via Fireworks or OpenRouter)
- `openrouter`: OpenRouter embeddings (supports multiple models, requires API key)
- `voyage`: Voyage AI (1024-dim, requires API key)
**Jina AI Tasks (via query parameter):**
- `retrieval.query`: For search queries
- `retrieval.passage`: For documents/passages
- `text-matching`: For similarity matching (default)
**Qwen3 Task (via query parameter):**
- `query`: For search queries (uses instruction-aware prompt)
- Default: Documents/passages (no instruction)
**Voyage AI Input Types (via query parameter):**
- `document`: For documents/passages
- `query`: For search queries
"""
model_name = model.lower()
# Handle single string or list of strings
is_single = isinstance(request.input, str)
texts = [request.input] if is_single else request.input
if model_name == "voyage":
if not voyage_client:
raise HTTPException(
status_code=503,
detail="Voyage AI not available. Set VOYAGE_API_KEY environment variable."
)
try:
voyage_input_type = input_type or "document"
result = voyage_client.embed(
texts=texts,
model="voyage-3",
input_type=voyage_input_type
)
embeddings = result.embeddings
# Calculate token usage
token_count = estimate_token_count(texts)
# Create OpenAI-compatible response
data = [
EmbeddingObject(index=i, embedding=emb)
for i, emb in enumerate(embeddings)
]
return OpenAIEmbeddingResponse(
model="voyage-3",
object="list",
usage=UsageInfo(total_tokens=token_count, prompt_tokens=token_count),
data=data
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Voyage AI error: {str(e)}")
elif model_name == "openrouter":
if not openrouter_available:
raise HTTPException(
status_code=503,
detail="OpenRouter not available. Set OPENROUTER_API_KEY environment variable."
)
try:
# Use OpenRouter with specified model or default
openrouter_model = task or "qwen/qwen3-embedding-8b" # Use task param as model selector
embeddings_list = get_openrouter_embeddings(texts, model=openrouter_model)
# Calculate token usage
token_count = estimate_token_count(texts)
# Create OpenAI-compatible response
data = [
EmbeddingObject(index=i, embedding=emb)
for i, emb in enumerate(embeddings_list)
]
return OpenAIEmbeddingResponse(
model=f"openrouter/{openrouter_model}",
object="list",
usage=UsageInfo(total_tokens=token_count, prompt_tokens=token_count),
data=data
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"OpenRouter error: {str(e)}")
elif model_name in MODELS:
try:
selected_model = MODELS[model_name]
# Qwen3 via Fireworks AI API (no local model)
if model_name == "qwen3" and selected_model == 'fireworks':
embeddings_list = get_fireworks_embeddings(texts, task=task)
# Qwen3 via OpenRouter API
elif model_name == "qwen3" and selected_model == 'openrouter':
embeddings_list = get_openrouter_embeddings(texts, model="qwen/qwen3-embedding-8b")
# Jina AI with task type
elif model_name == "jina" and task:
embeddings = selected_model.encode(
texts,
task=task,
convert_to_numpy=True
)
embeddings_list = embeddings.tolist()
else:
embeddings = selected_model.encode(
texts,
convert_to_numpy=True
)
embeddings_list = embeddings.tolist()
# Calculate token usage
token_count = estimate_token_count(texts)
# Create OpenAI-compatible response
data = [
EmbeddingObject(index=i, embedding=emb)
for i, emb in enumerate(embeddings_list)
]
# Determine the full model name for response
model_display_name = {
"jobbertv2": "TechWolf/JobBERT-v2",
"jobbertv3": "TechWolf/JobBERT-v3",
"jina": "jina-embeddings-v3",
"qwen3": "Qwen/Qwen3-Embedding-8B"
}.get(model_name, model_name)
return OpenAIEmbeddingResponse(
model=model_display_name,
object="list",
usage=UsageInfo(total_tokens=token_count, prompt_tokens=token_count),
data=data
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Model error: {str(e)}")
else:
raise HTTPException(
status_code=400,
detail=f"Invalid model '{model_name}'. Choose from: jobbertv2, jobbertv3, jina, qwen3, voyage"
)
@app.post("/embed/batch", response_model=BatchEmbeddingResponse)
async def create_embeddings_batch(
request: BatchEmbeddingRequest,
authenticated: bool = Depends(verify_api_key)
):
"""
Generate embeddings for multiple texts - Original batch format
**Models:**
- `jobbertv2`: JobBERT-v2 (768-dim, job-specific)
- `jobbertv3`: JobBERT-v3 (768-dim, job-specific, improved performance)
- `jina`: Jina AI embeddings-v3 (1024-dim, general purpose, supports task types)
- `qwen3`: Qwen3-Embedding-8B (4096-dim, MTEB #1, multilingual, 32k context)
- `voyage`: Voyage AI (1024-dim, requires API key)
**Jina AI Tasks:**
- `retrieval.query`: For search queries
- `retrieval.passage`: For documents/passages
- `text-matching`: For similarity matching (default)
- `classification`: For classification tasks
- `separation`: For clustering
**Voyage AI Input Types:**
- `document`: For documents/passages
- `query`: For search queries
"""
model_name = request.model.lower()
if model_name == "voyage":
if not voyage_client:
raise HTTPException(
status_code=503,
detail="Voyage AI not available. Set VOYAGE_API_KEY environment variable."
)
try:
voyage_input_type = request.input_type or "document"
result = voyage_client.embed(
texts=request.texts,
model="voyage-3",
input_type=voyage_input_type
)
embeddings = result.embeddings
dimension = len(embeddings[0]) if embeddings else 0
return BatchEmbeddingResponse(
embeddings=embeddings,
model="voyage-3",
dimension=dimension,
num_texts=len(request.texts)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Voyage AI error: {str(e)}")
elif model_name in MODELS:
try:
selected_model = MODELS[model_name]
# Qwen3 via Fireworks AI API (no local model)
if model_name == "qwen3" and selected_model == 'fireworks':
embeddings_list = get_fireworks_embeddings(request.texts, task=request.task)
# Qwen3 via OpenRouter API
elif model_name == "qwen3" and selected_model == 'openrouter':
embeddings_list = get_openrouter_embeddings(request.texts, model="qwen/qwen3-embedding-8b")
# Jina AI with task type
elif model_name == "jina" and request.task:
embeddings = selected_model.encode(
request.texts,
task=request.task,
convert_to_numpy=True
)
embeddings_list = embeddings.tolist()
else:
embeddings = selected_model.encode(
request.texts,
convert_to_numpy=True
)
embeddings_list = embeddings.tolist()
dimension = len(embeddings_list[0]) if embeddings_list else 0
return BatchEmbeddingResponse(
embeddings=embeddings_list,
model=model_name,
dimension=dimension,
num_texts=len(request.texts)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Model error: {str(e)}")
else:
raise HTTPException(
status_code=400,
detail=f"Invalid model '{model_name}'. Choose from: jobbertv2, jobbertv3, jina, qwen3, voyage"
)
@app.get("/models")
async def list_models(authenticated: bool = Depends(verify_api_key)):
"""List available models and their specifications"""
models_info = {
"jobbertv2": {
"name": "TechWolf/JobBERT-v2",
"dimension": 768,
"description": "Job-specific BERT model fine-tuned on job titles",
"max_tokens": 512,
"available": "jobbertv2" in MODELS
},
"jobbertv3": {
"name": "TechWolf/JobBERT-v3",
"dimension": 768,
"description": "Latest JobBERT model with improved performance",
"max_tokens": 512,
"available": "jobbertv3" in MODELS
},
"jina": {
"name": "jinaai/jina-embeddings-v3",
"dimension": 1024,
"description": "General-purpose embeddings with long context support",
"max_tokens": 8192,
"available": "jina" in MODELS,
"tasks": ["retrieval.query", "retrieval.passage", "text-matching", "classification", "separation"]
},
"qwen3": {
"name": "Qwen/Qwen3-Embedding-8B",
"dimension": 4096,
"description": "🏆 MTEB #1 multilingual model (100+ languages, 32k context, instruction-aware)",
"max_tokens": 32768,
"available": "qwen3" in MODELS,
"tasks": ["query", "document"],
"features": ["multilingual", "instruction-aware", "long-context"]
},
"voyage": {
"name": "voyage-3",
"dimension": 1024,
"description": "State-of-the-art embeddings (requires API key)",
"max_tokens": 32000,
"available": voyage_client is not None,
"input_types": ["document", "query"]
}
}
return models_info
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)