TraceMind / utils /hf_jobs_submission.py
kshitijthakkar's picture
fix: Complete hardware allocation matrix with no gaps (1B-100B+)
9e2cabe
"""
HuggingFace Jobs Submission Module
Handles submission of SMOLTRACE evaluation jobs to HuggingFace Jobs platform.
Uses the official HuggingFace Jobs API: `huggingface_hub.run_job()`
"""
import os
import uuid
from typing import Dict, Optional, List
def submit_hf_job(
model: str,
provider: str,
agent_type: str,
hardware: str,
dataset_name: str,
split: str = "train",
difficulty: str = "all",
parallel_workers: int = 1,
hf_token: Optional[str] = None,
hf_inference_provider: Optional[str] = None,
search_provider: str = "duckduckgo",
enable_tools: Optional[List[str]] = None,
output_format: str = "hub",
output_dir: Optional[str] = None,
enable_otel: bool = True,
enable_gpu_metrics: bool = True,
private: bool = False,
debug: bool = False,
quiet: bool = False,
run_id: Optional[str] = None,
timeout: str = "1h"
) -> Dict:
"""
Submit an evaluation job to HuggingFace Jobs using the run_job API
Args:
model: Model identifier (e.g., "openai/gpt-4")
provider: Provider type ("litellm", "inference", "transformers")
agent_type: Agent type ("tool", "code", "both")
hardware: Hardware type (e.g., "auto", "cpu-basic", "t4-small", "a10g-small")
dataset_name: HuggingFace dataset for evaluation
split: Dataset split to use
difficulty: Difficulty filter
parallel_workers: Number of parallel workers
hf_token: HuggingFace token
hf_inference_provider: HF Inference provider
search_provider: Search provider for agents
enable_tools: List of tools to enable
output_format: Output format ("hub" or "json")
output_dir: Output directory for JSON format
enable_otel: Enable OpenTelemetry tracing
enable_gpu_metrics: Enable GPU metrics collection
private: Make datasets private
debug: Enable debug mode
quiet: Enable quiet mode
run_id: Optional run ID (auto-generated if not provided)
timeout: Job timeout (default: "1h")
Returns:
dict: Job submission result with job_id, status, and details
"""
try:
from huggingface_hub import run_job
except ImportError:
return {
"success": False,
"error": "huggingface_hub package not installed or outdated. Install with: pip install -U huggingface_hub",
"job_id": None
}
# Validate HF token
token = hf_token or os.environ.get("HF_TOKEN")
if not token:
return {
"success": False,
"error": "HuggingFace token not configured. Please set HF_TOKEN in Settings.",
"job_id": None
}
# Generate job ID
job_id = run_id if run_id else f"job_{uuid.uuid4().hex[:8]}"
# Map hardware to HF Jobs flavor
if hardware == "auto":
flavor = _auto_select_hf_hardware(provider, model)
else:
flavor = hardware
# Determine if this is a GPU job
is_gpu_job = flavor not in ["cpu-basic", "cpu-upgrade"]
# Select appropriate Docker image
if is_gpu_job:
# GPU jobs use PyTorch with CUDA
image = "pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel"
pip_packages = "smoltrace ddgs smoltrace[gpu]"
else:
# CPU jobs use standard Python
image = "python:3.12"
pip_packages = "smoltrace ddgs"
# Build secrets dictionary
secrets = {
"HF_TOKEN": token
}
# Add LLM provider API keys from environment
llm_key_names = [
"OPENAI_API_KEY", "ANTHROPIC_API_KEY", "GOOGLE_API_KEY",
"GEMINI_API_KEY", "COHERE_API_KEY", "MISTRAL_API_KEY",
"TOGETHER_API_KEY", "GROQ_API_KEY", "REPLICATE_API_TOKEN",
"ANYSCALE_API_KEY", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY",
"AWS_REGION", "AZURE_OPENAI_API_KEY", "AZURE_OPENAI_ENDPOINT",
"LITELLM_API_KEY"
]
for key_name in llm_key_names:
value = os.environ.get(key_name)
if value:
secrets[key_name] = value
# Build SMOLTRACE command
cmd_parts = ["smoltrace-eval"]
cmd_parts.append(f"--model {model}")
cmd_parts.append(f"--provider {provider}")
if hf_inference_provider:
cmd_parts.append(f"--hf-inference-provider {hf_inference_provider}")
cmd_parts.append(f"--search-provider {search_provider}")
if enable_tools:
cmd_parts.append(f"--enable-tools {','.join(enable_tools)}")
cmd_parts.append(f"--agent-type {agent_type}")
cmd_parts.append(f"--dataset-name {dataset_name}")
cmd_parts.append(f"--split {split}")
if difficulty != "all":
cmd_parts.append(f"--difficulty {difficulty}")
if parallel_workers > 1:
cmd_parts.append(f"--parallel-workers {parallel_workers}")
cmd_parts.append(f"--output-format {output_format}")
if output_dir and output_format == "json":
cmd_parts.append(f"--output-dir {output_dir}")
if enable_otel:
cmd_parts.append("--enable-otel")
if not enable_gpu_metrics:
cmd_parts.append("--disable-gpu-metrics")
if private:
cmd_parts.append("--private")
if debug:
cmd_parts.append("--debug")
if quiet:
cmd_parts.append("--quiet")
cmd_parts.append(f"--run-id {job_id}")
smoltrace_command = " ".join(cmd_parts)
# Build full command with pip upgrade + install
# IMPORTANT: Upgrade pip first to avoid dependency resolution issues
# (older pip in conda struggles with fief-client[cli] backtracking)
# Set PYTHONIOENCODING to UTF-8 to handle unicode output properly
full_command = f"export PYTHONIOENCODING=utf-8 && pip install --upgrade pip && pip install {pip_packages} && {smoltrace_command}"
# Submit job using HuggingFace Jobs API
try:
job = run_job(
image=image,
command=["bash", "-c", full_command],
secrets=secrets,
flavor=flavor,
timeout=timeout
)
return {
"success": True,
"job_id": job_id,
"hf_job_id": job.job_id if hasattr(job, 'job_id') else str(job),
"platform": "HuggingFace Jobs",
"hardware": flavor,
"image": image,
"command": smoltrace_command,
"status": "submitted",
"message": f"Job successfully submitted to HuggingFace Jobs (flavor: {flavor})",
"instructions": f"""
✅ Job submitted successfully!
**Job Details:**
- Flavor: {flavor}
- Image: {image}
- Timeout: {timeout}
**Monitor your job:**
- View job status: https://huggingface.co/jobs
- HF Job ID: {job.job_id if hasattr(job, 'job_id') else 'check dashboard'}
**What happens next:**
1. Job starts running on HuggingFace infrastructure
2. SMOLTRACE evaluates your model
3. Results are automatically pushed to HuggingFace datasets
4. They will appear in TraceMind leaderboard when complete
""".strip()
}
except Exception as e:
return {
"success": False,
"error": f"Failed to submit job to HuggingFace: {str(e)}",
"job_id": job_id,
"command": smoltrace_command,
"debug_info": {
"image": image,
"flavor": flavor,
"timeout": timeout,
"secrets_configured": list(secrets.keys())
}
}
def _auto_select_hf_hardware(provider: str, model: str) -> str:
"""
Automatically select HuggingFace Jobs hardware based on model and provider.
Memory estimation for agentic workloads:
- Model weights (FP16): ~2GB per 1B params
- KV cache for long contexts: ~1.5-2x model size for agentic tasks
- Inference overhead: ~20-30% additional
- Total: ~4-5GB per 1B params for safe agentic execution
Args:
provider: Provider type
model: Model identifier
Returns:
str: HF Jobs flavor
"""
# API models only need CPU
if provider in ["litellm", "inference"]:
return "cpu-basic"
# Local models need GPU - select based on model size
# Conservative allocation for agentic tasks (model weights + KV cache + inference overhead)
# Memory estimation: ~4-5GB per 1B params for safe agentic execution
model_lower = model.lower()
# Extract model size using regex to capture the number before 'b'
import re
size_match = re.search(r'(\d+\.?\d*)b', model_lower)
if size_match:
model_size = float(size_match.group(1))
# Complete coverage from 0.5B to 100B+ with no gaps
# HF Jobs has limited GPU options: t4-small, a10g-large, a100-large
if model_size >= 13:
# 13B-100B+: A100 large (e.g., 13B, 14B, 27B, 30B, 48B, 70B)
return "a100-large"
elif model_size >= 6:
# 6B-12B: A10G large (e.g., 6B, 7B, 8B, 9B, 10B, 11B, 12B)
return "a10g-large"
elif model_size >= 1:
# 1B-5B: T4 small (e.g., 1B, 2B, 3B, 4B, 5B)
return "t4-small"
else:
# < 1B: T4 small
return "t4-small"
else:
# No size detected in model name - default to A100 (safe for agentic workloads)
return "a100-large"
def check_job_status(hf_job_id: str, hf_token: Optional[str] = None) -> Dict:
"""
Check the status of a HuggingFace Job using the Jobs API
Args:
hf_job_id: HF Job ID (format: username/job_hash or just job_hash)
hf_token: HuggingFace token (optional, uses env if not provided)
Returns:
dict: Job status information
"""
try:
from huggingface_hub import HfApi
except ImportError:
return {
"success": False,
"error": "huggingface_hub package not installed",
"job_id": hf_job_id
}
token = hf_token or os.environ.get("HF_TOKEN")
if not token:
return {
"success": False,
"error": "HuggingFace token not configured",
"job_id": hf_job_id
}
try:
api = HfApi(token=token)
# Parse job_id and namespace (username)
# Format can be "username/job_hash" or just "job_hash"
if "/" in hf_job_id:
namespace, job_id_only = hf_job_id.split("/", 1)
job_info = api.inspect_job(job_id=job_id_only, namespace=namespace)
else:
job_info = api.inspect_job(job_id=hf_job_id)
# Extract status stage from JobStatus object
if hasattr(job_info, 'status') and hasattr(job_info.status, 'stage'):
status = job_info.status.stage
else:
status = str(job_info.status) if hasattr(job_info, 'status') else "unknown"
return {
"success": True,
"job_id": hf_job_id,
"status": status,
"created_at": str(job_info.created_at) if hasattr(job_info, 'created_at') else None,
"flavor": job_info.flavor if hasattr(job_info, 'flavor') else None,
"url": job_info.url if hasattr(job_info, 'url') else None,
"info": str(job_info)
}
except Exception as e:
return {
"success": False,
"error": f"Failed to fetch job status: {str(e)}",
"job_id": hf_job_id
}
def get_job_logs(hf_job_id: str, hf_token: Optional[str] = None) -> Dict:
"""
Retrieve logs from a HuggingFace Job
Args:
hf_job_id: HF Job ID (format: username/job_hash or just job_hash)
hf_token: HuggingFace token (optional, uses env if not provided)
Returns:
dict: Job logs information
"""
try:
from huggingface_hub import HfApi
except ImportError:
return {
"success": False,
"error": "huggingface_hub package not installed",
"job_id": hf_job_id
}
token = hf_token or os.environ.get("HF_TOKEN")
if not token:
return {
"success": False,
"error": "HuggingFace token not configured",
"job_id": hf_job_id
}
try:
api = HfApi(token=token)
# Parse job_id and namespace (username)
# Format can be "username/job_hash" or just "job_hash"
if "/" in hf_job_id:
namespace, job_id_only = hf_job_id.split("/", 1)
logs_iterable = api.fetch_job_logs(job_id=job_id_only, namespace=namespace)
else:
logs_iterable = api.fetch_job_logs(job_id=hf_job_id)
# Convert iterable to string
logs = "\n".join(logs_iterable)
return {
"success": True,
"job_id": hf_job_id,
"logs": logs
}
except Exception as e:
return {
"success": False,
"error": f"Failed to fetch job logs: {str(e)}",
"job_id": hf_job_id,
"logs": ""
}
def list_user_jobs(hf_token: Optional[str] = None, limit: int = 10) -> Dict:
"""
List recent jobs for the authenticated user
Args:
hf_token: HuggingFace token (optional, uses env if not provided)
limit: Maximum number of jobs to return (applied after fetching)
Returns:
dict: List of user's jobs
"""
try:
from huggingface_hub import HfApi
except ImportError:
return {
"success": False,
"error": "huggingface_hub package not installed"
}
token = hf_token or os.environ.get("HF_TOKEN")
if not token:
return {
"success": False,
"error": "HuggingFace token not configured"
}
try:
api = HfApi(token=token)
# List user's jobs (no limit parameter in API, so we fetch all and slice)
all_jobs = api.list_jobs()
# Limit the results
jobs_to_display = all_jobs[:limit] if limit > 0 else all_jobs
job_list = []
for job in jobs_to_display:
# Extract owner name from JobOwner object
owner_name = job.owner.name if hasattr(job, 'owner') and hasattr(job.owner, 'name') else None
# Build job_id in the format: owner/id
if owner_name and hasattr(job, 'id'):
job_id = f"{owner_name}/{job.id}"
elif hasattr(job, 'id'):
job_id = job.id
else:
job_id = "unknown"
# Extract status stage from JobStatus object
if hasattr(job, 'status') and hasattr(job.status, 'stage'):
status = job.status.stage
else:
status = str(job.status) if hasattr(job, 'status') else "unknown"
job_list.append({
"job_id": job_id,
"status": status,
"created_at": str(job.created_at) if hasattr(job, 'created_at') else None
})
return {
"success": True,
"jobs": job_list,
"count": len(job_list)
}
except Exception as e:
return {
"success": False,
"error": f"Failed to list jobs: {str(e)}",
"jobs": []
}