""" Embedding Inference API Supports JobBERT v2/v3, Jina AI, and Voyage AI embeddings Compatible with Elasticsearch inference endpoint format """ from fastapi import FastAPI, HTTPException, Query, Security, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import List, Optional, Union from sentence_transformers import SentenceTransformer import os import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI( title="Embedding Inference API", description="Generate embeddings using JobBERT v2/v3, Jina AI, or Voyage AI", version="1.0.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) MODELS = {} VOYAGE_API_KEY = os.environ.get('VOYAGE_API_KEY', '') FIREWORKS_API_KEY = os.environ.get('FIREWORKS_API_KEY', '') OPENROUTER_API_KEY = os.environ.get('OPENROUTER_API_KEY', '') API_KEY = os.environ.get('API_KEY', '') REQUIRE_API_KEY = os.environ.get('REQUIRE_API_KEY', 'false').lower() == 'true' # Set cache directories to writable location (important for Docker/HF Spaces) os.environ['TRANSFORMERS_CACHE'] = os.environ.get('TRANSFORMERS_CACHE', '/tmp/transformers_cache') os.environ['HF_HOME'] = os.environ.get('HF_HOME', '/tmp/huggingface') os.environ['SENTENCE_TRANSFORMERS_HOME'] = os.environ.get('SENTENCE_TRANSFORMERS_HOME', '/tmp/sentence_transformers') # Create cache directories if they don't exist for cache_dir in [os.environ['TRANSFORMERS_CACHE'], os.environ['HF_HOME'], os.environ['SENTENCE_TRANSFORMERS_HOME']]: os.makedirs(cache_dir, exist_ok=True) security = HTTPBearer(auto_error=False) voyage_client = None fireworks_available = False openrouter_available = False logger.info(f"API Key authentication: {'ENABLED' if REQUIRE_API_KEY else 'DISABLED'}") if API_KEY: logger.info(f"✓ API Key configured (length: {len(API_KEY)})") else: logger.info("ℹ️ No API Key set") if VOYAGE_API_KEY: try: import voyageai voyage_client = voyageai.Client(api_key=VOYAGE_API_KEY) logger.info("✓ Voyage AI client initialized") except ImportError: logger.warning("⚠️ voyageai package not installed") except Exception as e: logger.warning(f"⚠️ Voyage AI initialization failed: {e}") if FIREWORKS_API_KEY: try: import requests # Test Fireworks AI connection test_response = requests.get( "https://api.fireworks.ai/inference/v1/models", headers={"Authorization": f"Bearer {FIREWORKS_API_KEY}"}, timeout=5 ) if test_response.status_code in [200, 401, 403]: # 401/403 means auth works, just list might be restricted fireworks_available = True logger.info("✓ Fireworks AI API key configured (Qwen3 available)") else: logger.warning(f"⚠️ Fireworks AI API key validation unclear (status: {test_response.status_code})") # Still mark as available - the embeddings endpoint might work fireworks_available = True except ImportError: logger.warning("⚠️ requests package not installed (needed for Fireworks AI)") except Exception as e: logger.warning(f"⚠️ Fireworks AI validation failed: {e}") # Still mark as available if key is set fireworks_available = True if FIREWORKS_API_KEY else False if OPENROUTER_API_KEY: try: import requests openrouter_available = True logger.info("✓ OpenRouter API key configured (Qwen3, text-embedding-3-small, and more available)") except ImportError: logger.warning("⚠️ requests package not installed (needed for OpenRouter)") except Exception as e: logger.warning(f"⚠️ OpenRouter validation failed: {e}") openrouter_available = True if OPENROUTER_API_KEY else False def load_models(): """Load embedding models on startup (gracefully handles failures)""" # JobBERT-v2 try: logger.info("Loading JobBERT-v2...") # MODELS['jobbertv2'] = SentenceTransformer('TechWolf/JobBERT-v2') logger.info("✓ JobBERT-v2 loaded") except Exception as e: logger.warning(f"⚠️ JobBERT-v2 not loaded: {e}") # JobBERT-v3 try: logger.info("Loading JobBERT-v3...") MODELS['jobbertv3'] = SentenceTransformer('TechWolf/JobBERT-v3') logger.info("✓ JobBERT-v3 loaded") except Exception as e: logger.warning(f"⚠️ JobBERT-v3 not loaded: {e}") # Jina AI try: logger.info("Loading Jina AI embeddings-v3...") MODELS['jina'] = SentenceTransformer('jinaai/jina-embeddings-v3', trust_remote_code=True) logger.info("✓ Jina AI v3 loaded") except Exception as e: logger.warning(f"⚠️ Jina AI v3 not loaded: {e}") # Qwen3-Embedding-8B via Fireworks AI or OpenRouter (API-based, no download needed!) if fireworks_available: MODELS['qwen3'] = 'fireworks' # Mark as available via Fireworks AI logger.info("✓ Qwen3-Embedding-8B available via Fireworks AI API (MTEB #1, no local model needed)") elif openrouter_available: MODELS['qwen3'] = 'openrouter' # Mark as available via OpenRouter logger.info("✓ Qwen3-Embedding-8B available via OpenRouter API (MTEB #1, no local model needed)") else: logger.warning("⚠️ Qwen3-Embedding-8B not available") logger.warning(" To enable: Set FIREWORKS_API_KEY or OPENROUTER_API_KEY environment variable") logger.warning(" Fireworks: https://fireworks.ai | OpenRouter: https://openrouter.ai") logger.warning(" This avoids 15GB local download!") # Check if at least one model loaded if not MODELS: error_msg = "No embedding models could be loaded! Check logs above for details." logger.error(error_msg) raise RuntimeError(error_msg) logger.info(f"Loaded models: {list(MODELS.keys())}") logger.info("API ready!") async def verify_api_key(credentials: Optional[HTTPAuthorizationCredentials] = Security(security)): """Verify API key from Authorization header""" if not REQUIRE_API_KEY: return True if not API_KEY: raise HTTPException( status_code=500, detail="API key authentication is enabled but no API key is configured on the server" ) if credentials is None: raise HTTPException( status_code=401, detail="Missing authentication credentials. Use: Authorization: Bearer YOUR_API_KEY" ) if credentials.credentials != API_KEY: raise HTTPException( status_code=403, detail="Invalid API key" ) return True def estimate_token_count(texts: List[str]) -> int: """Estimate token count for input texts (rough approximation)""" # Simple estimation: ~1 token per 4 characters total_chars = sum(len(text) for text in texts) return max(1, total_chars // 4) def get_fireworks_embeddings(texts: List[str], task: Optional[str] = None) -> List[List[float]]: """ Get embeddings from Fireworks AI Qwen3-Embedding-8B Args: texts: List of texts to embed task: Optional task type ('query' for instruction-aware) Returns: List of embedding vectors (4096-dim each) """ import requests import json if not FIREWORKS_API_KEY: raise Exception("FIREWORKS_API_KEY not configured") # Fireworks AI embeddings endpoint response = requests.post( "https://api.fireworks.ai/inference/v1/embeddings", headers={ "Accept": "application/json", "Content-Type": "application/json", "Authorization": f"Bearer {FIREWORKS_API_KEY}" }, data=json.dumps({ "model": "accounts/fireworks/models/qwen3-embedding-8b", "input": texts }), timeout=30 ) if response.status_code != 200: raise Exception(f"Fireworks AI API error: {response.status_code} - {response.text}") result = response.json() embeddings = [item["embedding"] for item in result["data"]] return embeddings def get_openrouter_embeddings(texts: List[str], model: str = "qwen/qwen3-embedding-8b") -> List[List[float]]: """ Get embeddings from OpenRouter API Args: texts: List of texts to embed model: Model to use (default: qwen/qwen3-embedding-8b) Also supports: openai/text-embedding-3-small, openai/text-embedding-3-large Returns: List of embedding vectors """ import requests if not OPENROUTER_API_KEY: raise Exception("OPENROUTER_API_KEY not configured") response = requests.post( "https://openrouter.ai/api/v1/embeddings", headers={ "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json" }, json={ "model": model, "input": texts }, timeout=30 ) if response.status_code != 200: raise Exception(f"OpenRouter API error: {response.status_code} - {response.text}") result = response.json() embeddings = [item["embedding"] for item in result["data"]] return embeddings @app.on_event("startup") async def startup_event(): load_models() class ElasticsearchInferenceRequest(BaseModel): input: Union[str, List[str]] = Field(..., description="Text or list of texts to embed") class Config: schema_extra = { "example": { "input": "Software Engineer" } } class EmbeddingObject(BaseModel): object: str = Field("embedding", description="Object type") index: int = Field(..., description="Index of the embedding") embedding: List[float] = Field(..., description="Embedding vector") class UsageInfo(BaseModel): total_tokens: int = Field(..., description="Total tokens processed") prompt_tokens: int = Field(..., description="Prompt tokens") class OpenAIEmbeddingResponse(BaseModel): model: str = Field(..., description="Model used for embeddings") object: str = Field("list", description="Object type") usage: UsageInfo = Field(..., description="Token usage information") data: List[EmbeddingObject] = Field(..., description="List of embeddings") # Legacy response models (kept for backward compatibility if needed) class ElasticsearchInferenceResponse(BaseModel): embedding: List[float] = Field(..., description="Embedding vector for single input") class ElasticsearchInferenceBatchResponse(BaseModel): embeddings: List[List[float]] = Field(..., description="List of embedding vectors for batch input") class BatchEmbeddingRequest(BaseModel): texts: List[str] = Field(..., description="List of texts to embed", min_items=1) model: str = Field(..., description="Model to use: 'jobbertv2', 'jobbertv3', 'jina', or 'voyage'") task: Optional[str] = Field(None, description="Task type for Jina AI: 'retrieval.query', 'retrieval.passage', 'text-matching', etc.") input_type: Optional[str] = Field(None, description="Input type for Voyage AI: 'document' or 'query'") class Config: schema_extra = { "example": { "texts": ["Software Engineer", "Data Scientist"], "model": "jobbertv3", "task": "text-matching" } } class BatchEmbeddingResponse(BaseModel): embeddings: List[List[float]] = Field(..., description="List of embedding vectors") model: str = Field(..., description="Model used") dimension: int = Field(..., description="Embedding dimension") num_texts: int = Field(..., description="Number of texts processed") class HealthResponse(BaseModel): status: str models_loaded: List[str] voyage_available: bool fireworks_available: bool openrouter_available: bool api_key_required: bool @app.get("/", response_model=dict) async def root(): """Root endpoint with API information""" return { "message": "Embedding Inference API", "version": "1.0.0", "endpoints": { "/health": "Health check and available models", "/embed": "Generate embeddings - Elasticsearch compatible (POST)", "/embed/batch": "Generate batch embeddings (POST)", "/models": "List available models", "/docs": "API documentation" } } @app.get("/health", response_model=HealthResponse) async def health(): """Health check endpoint (no authentication required)""" models_loaded = list(MODELS.keys()) return { "status": "healthy", "models_loaded": models_loaded, "voyage_available": voyage_client is not None, "fireworks_available": fireworks_available, "openrouter_available": openrouter_available, "api_key_required": REQUIRE_API_KEY } @app.post("/embed", response_model=OpenAIEmbeddingResponse) async def create_embeddings_elasticsearch( request: ElasticsearchInferenceRequest, model: str = Query("jobbertv3", description="Model: jobbertv2, jobbertv3, jina, or voyage"), task: Optional[str] = Query(None, description="Task for Jina AI: retrieval.query, retrieval.passage, text-matching, etc."), input_type: Optional[str] = Query(None, description="Input type for Voyage AI: document or query"), authenticated: bool = Depends(verify_api_key) ): """ Generate embeddings - Elasticsearch inference endpoint compatible format **Usage:** - Single text: `POST /embed?model=jobbertv3` with body `{"input": "Software Engineer"}` - Multiple texts: `POST /embed?model=jina` with body `{"input": ["text1", "text2"]}` **Models (via query parameter):** - `jobbertv2`: JobBERT-v2 (768-dim, job-specific) - `jobbertv3`: JobBERT-v3 (768-dim, job-specific, improved performance) - default - `jina`: Jina AI embeddings-v3 (1024-dim, general purpose) - `qwen3`: Qwen3-Embedding-8B (4096-dim, MTEB #1, multilingual, 32k context, via Fireworks or OpenRouter) - `openrouter`: OpenRouter embeddings (supports multiple models, requires API key) - `voyage`: Voyage AI (1024-dim, requires API key) **Jina AI Tasks (via query parameter):** - `retrieval.query`: For search queries - `retrieval.passage`: For documents/passages - `text-matching`: For similarity matching (default) **Qwen3 Task (via query parameter):** - `query`: For search queries (uses instruction-aware prompt) - Default: Documents/passages (no instruction) **Voyage AI Input Types (via query parameter):** - `document`: For documents/passages - `query`: For search queries """ model_name = model.lower() # Handle single string or list of strings is_single = isinstance(request.input, str) texts = [request.input] if is_single else request.input if model_name == "voyage": if not voyage_client: raise HTTPException( status_code=503, detail="Voyage AI not available. Set VOYAGE_API_KEY environment variable." ) try: voyage_input_type = input_type or "document" result = voyage_client.embed( texts=texts, model="voyage-3", input_type=voyage_input_type ) embeddings = result.embeddings # Calculate token usage token_count = estimate_token_count(texts) # Create OpenAI-compatible response data = [ EmbeddingObject(index=i, embedding=emb) for i, emb in enumerate(embeddings) ] return OpenAIEmbeddingResponse( model="voyage-3", object="list", usage=UsageInfo(total_tokens=token_count, prompt_tokens=token_count), data=data ) except Exception as e: raise HTTPException(status_code=500, detail=f"Voyage AI error: {str(e)}") elif model_name == "openrouter": if not openrouter_available: raise HTTPException( status_code=503, detail="OpenRouter not available. Set OPENROUTER_API_KEY environment variable." ) try: # Use OpenRouter with specified model or default openrouter_model = task or "qwen/qwen3-embedding-8b" # Use task param as model selector embeddings_list = get_openrouter_embeddings(texts, model=openrouter_model) # Calculate token usage token_count = estimate_token_count(texts) # Create OpenAI-compatible response data = [ EmbeddingObject(index=i, embedding=emb) for i, emb in enumerate(embeddings_list) ] return OpenAIEmbeddingResponse( model=f"openrouter/{openrouter_model}", object="list", usage=UsageInfo(total_tokens=token_count, prompt_tokens=token_count), data=data ) except Exception as e: raise HTTPException(status_code=500, detail=f"OpenRouter error: {str(e)}") elif model_name in MODELS: try: selected_model = MODELS[model_name] # Qwen3 via Fireworks AI API (no local model) if model_name == "qwen3" and selected_model == 'fireworks': embeddings_list = get_fireworks_embeddings(texts, task=task) # Qwen3 via OpenRouter API elif model_name == "qwen3" and selected_model == 'openrouter': embeddings_list = get_openrouter_embeddings(texts, model="qwen/qwen3-embedding-8b") # Jina AI with task type elif model_name == "jina" and task: embeddings = selected_model.encode( texts, task=task, convert_to_numpy=True ) embeddings_list = embeddings.tolist() else: embeddings = selected_model.encode( texts, convert_to_numpy=True ) embeddings_list = embeddings.tolist() # Calculate token usage token_count = estimate_token_count(texts) # Create OpenAI-compatible response data = [ EmbeddingObject(index=i, embedding=emb) for i, emb in enumerate(embeddings_list) ] # Determine the full model name for response model_display_name = { "jobbertv2": "TechWolf/JobBERT-v2", "jobbertv3": "TechWolf/JobBERT-v3", "jina": "jina-embeddings-v3", "qwen3": "Qwen/Qwen3-Embedding-8B" }.get(model_name, model_name) return OpenAIEmbeddingResponse( model=model_display_name, object="list", usage=UsageInfo(total_tokens=token_count, prompt_tokens=token_count), data=data ) except Exception as e: raise HTTPException(status_code=500, detail=f"Model error: {str(e)}") else: raise HTTPException( status_code=400, detail=f"Invalid model '{model_name}'. Choose from: jobbertv2, jobbertv3, jina, qwen3, voyage" ) @app.post("/embed/batch", response_model=BatchEmbeddingResponse) async def create_embeddings_batch( request: BatchEmbeddingRequest, authenticated: bool = Depends(verify_api_key) ): """ Generate embeddings for multiple texts - Original batch format **Models:** - `jobbertv2`: JobBERT-v2 (768-dim, job-specific) - `jobbertv3`: JobBERT-v3 (768-dim, job-specific, improved performance) - `jina`: Jina AI embeddings-v3 (1024-dim, general purpose, supports task types) - `qwen3`: Qwen3-Embedding-8B (4096-dim, MTEB #1, multilingual, 32k context) - `voyage`: Voyage AI (1024-dim, requires API key) **Jina AI Tasks:** - `retrieval.query`: For search queries - `retrieval.passage`: For documents/passages - `text-matching`: For similarity matching (default) - `classification`: For classification tasks - `separation`: For clustering **Voyage AI Input Types:** - `document`: For documents/passages - `query`: For search queries """ model_name = request.model.lower() if model_name == "voyage": if not voyage_client: raise HTTPException( status_code=503, detail="Voyage AI not available. Set VOYAGE_API_KEY environment variable." ) try: voyage_input_type = request.input_type or "document" result = voyage_client.embed( texts=request.texts, model="voyage-3", input_type=voyage_input_type ) embeddings = result.embeddings dimension = len(embeddings[0]) if embeddings else 0 return BatchEmbeddingResponse( embeddings=embeddings, model="voyage-3", dimension=dimension, num_texts=len(request.texts) ) except Exception as e: raise HTTPException(status_code=500, detail=f"Voyage AI error: {str(e)}") elif model_name in MODELS: try: selected_model = MODELS[model_name] # Qwen3 via Fireworks AI API (no local model) if model_name == "qwen3" and selected_model == 'fireworks': embeddings_list = get_fireworks_embeddings(request.texts, task=request.task) # Qwen3 via OpenRouter API elif model_name == "qwen3" and selected_model == 'openrouter': embeddings_list = get_openrouter_embeddings(request.texts, model="qwen/qwen3-embedding-8b") # Jina AI with task type elif model_name == "jina" and request.task: embeddings = selected_model.encode( request.texts, task=request.task, convert_to_numpy=True ) embeddings_list = embeddings.tolist() else: embeddings = selected_model.encode( request.texts, convert_to_numpy=True ) embeddings_list = embeddings.tolist() dimension = len(embeddings_list[0]) if embeddings_list else 0 return BatchEmbeddingResponse( embeddings=embeddings_list, model=model_name, dimension=dimension, num_texts=len(request.texts) ) except Exception as e: raise HTTPException(status_code=500, detail=f"Model error: {str(e)}") else: raise HTTPException( status_code=400, detail=f"Invalid model '{model_name}'. Choose from: jobbertv2, jobbertv3, jina, qwen3, voyage" ) @app.get("/models") async def list_models(authenticated: bool = Depends(verify_api_key)): """List available models and their specifications""" models_info = { "jobbertv2": { "name": "TechWolf/JobBERT-v2", "dimension": 768, "description": "Job-specific BERT model fine-tuned on job titles", "max_tokens": 512, "available": "jobbertv2" in MODELS }, "jobbertv3": { "name": "TechWolf/JobBERT-v3", "dimension": 768, "description": "Latest JobBERT model with improved performance", "max_tokens": 512, "available": "jobbertv3" in MODELS }, "jina": { "name": "jinaai/jina-embeddings-v3", "dimension": 1024, "description": "General-purpose embeddings with long context support", "max_tokens": 8192, "available": "jina" in MODELS, "tasks": ["retrieval.query", "retrieval.passage", "text-matching", "classification", "separation"] }, "qwen3": { "name": "Qwen/Qwen3-Embedding-8B", "dimension": 4096, "description": "🏆 MTEB #1 multilingual model (100+ languages, 32k context, instruction-aware)", "max_tokens": 32768, "available": "qwen3" in MODELS, "tasks": ["query", "document"], "features": ["multilingual", "instruction-aware", "long-context"] }, "voyage": { "name": "voyage-3", "dimension": 1024, "description": "State-of-the-art embeddings (requires API key)", "max_tokens": 32000, "available": voyage_client is not None, "input_types": ["document", "query"] } } return models_info if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)