Spaces:
Running
Running
| """ | |
| HuggingFace Jobs Submission Module | |
| Handles submission of SMOLTRACE evaluation jobs to HuggingFace Jobs platform. | |
| Uses the official HuggingFace Jobs API: `huggingface_hub.run_job()` | |
| """ | |
| import os | |
| import uuid | |
| from typing import Dict, Optional, List | |
| def submit_hf_job( | |
| model: str, | |
| provider: str, | |
| agent_type: str, | |
| hardware: str, | |
| dataset_name: str, | |
| split: str = "train", | |
| difficulty: str = "all", | |
| parallel_workers: int = 1, | |
| hf_token: Optional[str] = None, | |
| hf_inference_provider: Optional[str] = None, | |
| search_provider: str = "duckduckgo", | |
| enable_tools: Optional[List[str]] = None, | |
| output_format: str = "hub", | |
| output_dir: Optional[str] = None, | |
| enable_otel: bool = True, | |
| enable_gpu_metrics: bool = True, | |
| private: bool = False, | |
| debug: bool = False, | |
| quiet: bool = False, | |
| run_id: Optional[str] = None, | |
| timeout: str = "1h" | |
| ) -> Dict: | |
| """ | |
| Submit an evaluation job to HuggingFace Jobs using the run_job API | |
| Args: | |
| model: Model identifier (e.g., "openai/gpt-4") | |
| provider: Provider type ("litellm", "inference", "transformers") | |
| agent_type: Agent type ("tool", "code", "both") | |
| hardware: Hardware type (e.g., "auto", "cpu-basic", "t4-small", "a10g-small") | |
| dataset_name: HuggingFace dataset for evaluation | |
| split: Dataset split to use | |
| difficulty: Difficulty filter | |
| parallel_workers: Number of parallel workers | |
| hf_token: HuggingFace token | |
| hf_inference_provider: HF Inference provider | |
| search_provider: Search provider for agents | |
| enable_tools: List of tools to enable | |
| output_format: Output format ("hub" or "json") | |
| output_dir: Output directory for JSON format | |
| enable_otel: Enable OpenTelemetry tracing | |
| enable_gpu_metrics: Enable GPU metrics collection | |
| private: Make datasets private | |
| debug: Enable debug mode | |
| quiet: Enable quiet mode | |
| run_id: Optional run ID (auto-generated if not provided) | |
| timeout: Job timeout (default: "1h") | |
| Returns: | |
| dict: Job submission result with job_id, status, and details | |
| """ | |
| try: | |
| from huggingface_hub import run_job | |
| except ImportError: | |
| return { | |
| "success": False, | |
| "error": "huggingface_hub package not installed or outdated. Install with: pip install -U huggingface_hub", | |
| "job_id": None | |
| } | |
| # Validate HF token | |
| token = hf_token or os.environ.get("HF_TOKEN") | |
| if not token: | |
| return { | |
| "success": False, | |
| "error": "HuggingFace token not configured. Please set HF_TOKEN in Settings.", | |
| "job_id": None | |
| } | |
| # Generate job ID | |
| job_id = run_id if run_id else f"job_{uuid.uuid4().hex[:8]}" | |
| # Map hardware to HF Jobs flavor | |
| if hardware == "auto": | |
| flavor = _auto_select_hf_hardware(provider, model) | |
| else: | |
| flavor = hardware | |
| # Determine if this is a GPU job | |
| is_gpu_job = flavor not in ["cpu-basic", "cpu-upgrade"] | |
| # Select appropriate Docker image | |
| if is_gpu_job: | |
| # GPU jobs use PyTorch with CUDA | |
| image = "pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel" | |
| pip_packages = "smoltrace ddgs smoltrace[gpu]" | |
| else: | |
| # CPU jobs use standard Python | |
| image = "python:3.12" | |
| pip_packages = "smoltrace ddgs" | |
| # Build secrets dictionary | |
| secrets = { | |
| "HF_TOKEN": token | |
| } | |
| # Add LLM provider API keys from environment | |
| llm_key_names = [ | |
| "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "GOOGLE_API_KEY", | |
| "GEMINI_API_KEY", "COHERE_API_KEY", "MISTRAL_API_KEY", | |
| "TOGETHER_API_KEY", "GROQ_API_KEY", "REPLICATE_API_TOKEN", | |
| "ANYSCALE_API_KEY", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", | |
| "AWS_REGION", "AZURE_OPENAI_API_KEY", "AZURE_OPENAI_ENDPOINT", | |
| "LITELLM_API_KEY" | |
| ] | |
| for key_name in llm_key_names: | |
| value = os.environ.get(key_name) | |
| if value: | |
| secrets[key_name] = value | |
| # Build SMOLTRACE command | |
| cmd_parts = ["smoltrace-eval"] | |
| cmd_parts.append(f"--model {model}") | |
| cmd_parts.append(f"--provider {provider}") | |
| if hf_inference_provider: | |
| cmd_parts.append(f"--hf-inference-provider {hf_inference_provider}") | |
| cmd_parts.append(f"--search-provider {search_provider}") | |
| if enable_tools: | |
| cmd_parts.append(f"--enable-tools {','.join(enable_tools)}") | |
| cmd_parts.append(f"--agent-type {agent_type}") | |
| cmd_parts.append(f"--dataset-name {dataset_name}") | |
| cmd_parts.append(f"--split {split}") | |
| if difficulty != "all": | |
| cmd_parts.append(f"--difficulty {difficulty}") | |
| if parallel_workers > 1: | |
| cmd_parts.append(f"--parallel-workers {parallel_workers}") | |
| cmd_parts.append(f"--output-format {output_format}") | |
| if output_dir and output_format == "json": | |
| cmd_parts.append(f"--output-dir {output_dir}") | |
| if enable_otel: | |
| cmd_parts.append("--enable-otel") | |
| if not enable_gpu_metrics: | |
| cmd_parts.append("--disable-gpu-metrics") | |
| if private: | |
| cmd_parts.append("--private") | |
| if debug: | |
| cmd_parts.append("--debug") | |
| if quiet: | |
| cmd_parts.append("--quiet") | |
| cmd_parts.append(f"--run-id {job_id}") | |
| smoltrace_command = " ".join(cmd_parts) | |
| # Build full command with pip upgrade + install | |
| # IMPORTANT: Upgrade pip first to avoid dependency resolution issues | |
| # (older pip in conda struggles with fief-client[cli] backtracking) | |
| # Set PYTHONIOENCODING to UTF-8 to handle unicode output properly | |
| full_command = f"export PYTHONIOENCODING=utf-8 && pip install --upgrade pip && pip install {pip_packages} && {smoltrace_command}" | |
| # Submit job using HuggingFace Jobs API | |
| try: | |
| job = run_job( | |
| image=image, | |
| command=["bash", "-c", full_command], | |
| secrets=secrets, | |
| flavor=flavor, | |
| timeout=timeout | |
| ) | |
| return { | |
| "success": True, | |
| "job_id": job_id, | |
| "hf_job_id": job.job_id if hasattr(job, 'job_id') else str(job), | |
| "platform": "HuggingFace Jobs", | |
| "hardware": flavor, | |
| "image": image, | |
| "command": smoltrace_command, | |
| "status": "submitted", | |
| "message": f"Job successfully submitted to HuggingFace Jobs (flavor: {flavor})", | |
| "instructions": f""" | |
| ✅ Job submitted successfully! | |
| **Job Details:** | |
| - Flavor: {flavor} | |
| - Image: {image} | |
| - Timeout: {timeout} | |
| **Monitor your job:** | |
| - View job status: https://huggingface.co/jobs | |
| - HF Job ID: {job.job_id if hasattr(job, 'job_id') else 'check dashboard'} | |
| **What happens next:** | |
| 1. Job starts running on HuggingFace infrastructure | |
| 2. SMOLTRACE evaluates your model | |
| 3. Results are automatically pushed to HuggingFace datasets | |
| 4. They will appear in TraceMind leaderboard when complete | |
| """.strip() | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Failed to submit job to HuggingFace: {str(e)}", | |
| "job_id": job_id, | |
| "command": smoltrace_command, | |
| "debug_info": { | |
| "image": image, | |
| "flavor": flavor, | |
| "timeout": timeout, | |
| "secrets_configured": list(secrets.keys()) | |
| } | |
| } | |
| def _auto_select_hf_hardware(provider: str, model: str) -> str: | |
| """ | |
| Automatically select HuggingFace Jobs hardware based on model and provider. | |
| Memory estimation for agentic workloads: | |
| - Model weights (FP16): ~2GB per 1B params | |
| - KV cache for long contexts: ~1.5-2x model size for agentic tasks | |
| - Inference overhead: ~20-30% additional | |
| - Total: ~4-5GB per 1B params for safe agentic execution | |
| Args: | |
| provider: Provider type | |
| model: Model identifier | |
| Returns: | |
| str: HF Jobs flavor | |
| """ | |
| # API models only need CPU | |
| if provider in ["litellm", "inference"]: | |
| return "cpu-basic" | |
| # Local models need GPU - select based on model size | |
| # Conservative allocation for agentic tasks (model weights + KV cache + inference overhead) | |
| # Memory estimation: ~4-5GB per 1B params for safe agentic execution | |
| model_lower = model.lower() | |
| # Extract model size using regex to capture the number before 'b' | |
| import re | |
| size_match = re.search(r'(\d+\.?\d*)b', model_lower) | |
| if size_match: | |
| model_size = float(size_match.group(1)) | |
| # Complete coverage from 0.5B to 100B+ with no gaps | |
| # HF Jobs has limited GPU options: t4-small, a10g-large, a100-large | |
| if model_size >= 13: | |
| # 13B-100B+: A100 large (e.g., 13B, 14B, 27B, 30B, 48B, 70B) | |
| return "a100-large" | |
| elif model_size >= 6: | |
| # 6B-12B: A10G large (e.g., 6B, 7B, 8B, 9B, 10B, 11B, 12B) | |
| return "a10g-large" | |
| elif model_size >= 1: | |
| # 1B-5B: T4 small (e.g., 1B, 2B, 3B, 4B, 5B) | |
| return "t4-small" | |
| else: | |
| # < 1B: T4 small | |
| return "t4-small" | |
| else: | |
| # No size detected in model name - default to A100 (safe for agentic workloads) | |
| return "a100-large" | |
| def check_job_status(hf_job_id: str, hf_token: Optional[str] = None) -> Dict: | |
| """ | |
| Check the status of a HuggingFace Job using the Jobs API | |
| Args: | |
| hf_job_id: HF Job ID (format: username/job_hash or just job_hash) | |
| hf_token: HuggingFace token (optional, uses env if not provided) | |
| Returns: | |
| dict: Job status information | |
| """ | |
| try: | |
| from huggingface_hub import HfApi | |
| except ImportError: | |
| return { | |
| "success": False, | |
| "error": "huggingface_hub package not installed", | |
| "job_id": hf_job_id | |
| } | |
| token = hf_token or os.environ.get("HF_TOKEN") | |
| if not token: | |
| return { | |
| "success": False, | |
| "error": "HuggingFace token not configured", | |
| "job_id": hf_job_id | |
| } | |
| try: | |
| api = HfApi(token=token) | |
| # Parse job_id and namespace (username) | |
| # Format can be "username/job_hash" or just "job_hash" | |
| if "/" in hf_job_id: | |
| namespace, job_id_only = hf_job_id.split("/", 1) | |
| job_info = api.inspect_job(job_id=job_id_only, namespace=namespace) | |
| else: | |
| job_info = api.inspect_job(job_id=hf_job_id) | |
| # Extract status stage from JobStatus object | |
| if hasattr(job_info, 'status') and hasattr(job_info.status, 'stage'): | |
| status = job_info.status.stage | |
| else: | |
| status = str(job_info.status) if hasattr(job_info, 'status') else "unknown" | |
| return { | |
| "success": True, | |
| "job_id": hf_job_id, | |
| "status": status, | |
| "created_at": str(job_info.created_at) if hasattr(job_info, 'created_at') else None, | |
| "flavor": job_info.flavor if hasattr(job_info, 'flavor') else None, | |
| "url": job_info.url if hasattr(job_info, 'url') else None, | |
| "info": str(job_info) | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Failed to fetch job status: {str(e)}", | |
| "job_id": hf_job_id | |
| } | |
| def get_job_logs(hf_job_id: str, hf_token: Optional[str] = None) -> Dict: | |
| """ | |
| Retrieve logs from a HuggingFace Job | |
| Args: | |
| hf_job_id: HF Job ID (format: username/job_hash or just job_hash) | |
| hf_token: HuggingFace token (optional, uses env if not provided) | |
| Returns: | |
| dict: Job logs information | |
| """ | |
| try: | |
| from huggingface_hub import HfApi | |
| except ImportError: | |
| return { | |
| "success": False, | |
| "error": "huggingface_hub package not installed", | |
| "job_id": hf_job_id | |
| } | |
| token = hf_token or os.environ.get("HF_TOKEN") | |
| if not token: | |
| return { | |
| "success": False, | |
| "error": "HuggingFace token not configured", | |
| "job_id": hf_job_id | |
| } | |
| try: | |
| api = HfApi(token=token) | |
| # Parse job_id and namespace (username) | |
| # Format can be "username/job_hash" or just "job_hash" | |
| if "/" in hf_job_id: | |
| namespace, job_id_only = hf_job_id.split("/", 1) | |
| logs_iterable = api.fetch_job_logs(job_id=job_id_only, namespace=namespace) | |
| else: | |
| logs_iterable = api.fetch_job_logs(job_id=hf_job_id) | |
| # Convert iterable to string | |
| logs = "\n".join(logs_iterable) | |
| return { | |
| "success": True, | |
| "job_id": hf_job_id, | |
| "logs": logs | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Failed to fetch job logs: {str(e)}", | |
| "job_id": hf_job_id, | |
| "logs": "" | |
| } | |
| def list_user_jobs(hf_token: Optional[str] = None, limit: int = 10) -> Dict: | |
| """ | |
| List recent jobs for the authenticated user | |
| Args: | |
| hf_token: HuggingFace token (optional, uses env if not provided) | |
| limit: Maximum number of jobs to return (applied after fetching) | |
| Returns: | |
| dict: List of user's jobs | |
| """ | |
| try: | |
| from huggingface_hub import HfApi | |
| except ImportError: | |
| return { | |
| "success": False, | |
| "error": "huggingface_hub package not installed" | |
| } | |
| token = hf_token or os.environ.get("HF_TOKEN") | |
| if not token: | |
| return { | |
| "success": False, | |
| "error": "HuggingFace token not configured" | |
| } | |
| try: | |
| api = HfApi(token=token) | |
| # List user's jobs (no limit parameter in API, so we fetch all and slice) | |
| all_jobs = api.list_jobs() | |
| # Limit the results | |
| jobs_to_display = all_jobs[:limit] if limit > 0 else all_jobs | |
| job_list = [] | |
| for job in jobs_to_display: | |
| # Extract owner name from JobOwner object | |
| owner_name = job.owner.name if hasattr(job, 'owner') and hasattr(job.owner, 'name') else None | |
| # Build job_id in the format: owner/id | |
| if owner_name and hasattr(job, 'id'): | |
| job_id = f"{owner_name}/{job.id}" | |
| elif hasattr(job, 'id'): | |
| job_id = job.id | |
| else: | |
| job_id = "unknown" | |
| # Extract status stage from JobStatus object | |
| if hasattr(job, 'status') and hasattr(job.status, 'stage'): | |
| status = job.status.stage | |
| else: | |
| status = str(job.status) if hasattr(job, 'status') else "unknown" | |
| job_list.append({ | |
| "job_id": job_id, | |
| "status": status, | |
| "created_at": str(job.created_at) if hasattr(job, 'created_at') else None | |
| }) | |
| return { | |
| "success": True, | |
| "jobs": job_list, | |
| "count": len(job_list) | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Failed to list jobs: {str(e)}", | |
| "jobs": [] | |
| } | |