|
|
""" |
|
|
Standalone deployment utilities for publishing to HuggingFace Spaces. |
|
|
No Gradio dependencies - can be used in backend API. |
|
|
""" |
|
|
import os |
|
|
import re |
|
|
import json |
|
|
import uuid |
|
|
import tempfile |
|
|
import shutil |
|
|
import ast |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
from pathlib import Path |
|
|
|
|
|
from huggingface_hub import HfApi |
|
|
from backend_models import get_inference_client, get_real_model_id |
|
|
from backend_parsers import ( |
|
|
parse_transformers_js_output, |
|
|
parse_html_code, |
|
|
parse_python_requirements, |
|
|
parse_multi_file_python_output, |
|
|
strip_tool_call_markers, |
|
|
remove_code_block, |
|
|
extract_import_statements, |
|
|
generate_requirements_txt_with_llm |
|
|
) |
|
|
|
|
|
|
|
|
def prettify_comfyui_json_for_html(json_content: str) -> str: |
|
|
"""Convert ComfyUI JSON to stylized HTML display with download button""" |
|
|
try: |
|
|
|
|
|
parsed_json = json.loads(json_content) |
|
|
prettified_json = json.dumps(parsed_json, indent=2, ensure_ascii=False) |
|
|
|
|
|
|
|
|
html_content = f"""<!DOCTYPE html> |
|
|
<html lang="en"> |
|
|
<head> |
|
|
<meta charset="UTF-8"> |
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0"> |
|
|
<title>ComfyUI Workflow</title> |
|
|
<style> |
|
|
* {{ |
|
|
margin: 0; |
|
|
padding: 0; |
|
|
box-sizing: border-box; |
|
|
}} |
|
|
body {{ |
|
|
font-family: -apple-system, BlinkMacSystemFont, 'SF Pro Text', 'Segoe UI', system-ui, sans-serif; |
|
|
background-color: #000000; |
|
|
color: #f5f5f7; |
|
|
line-height: 1.6; |
|
|
padding: 20px; |
|
|
min-height: 100vh; |
|
|
}} |
|
|
.container {{ |
|
|
max-width: 1200px; |
|
|
margin: 0 auto; |
|
|
}} |
|
|
.header {{ |
|
|
text-align: center; |
|
|
margin-bottom: 40px; |
|
|
padding: 40px 20px; |
|
|
}} |
|
|
.header h1 {{ |
|
|
font-size: 48px; |
|
|
font-weight: 600; |
|
|
color: #ffffff; |
|
|
margin-bottom: 12px; |
|
|
letter-spacing: -0.02em; |
|
|
}} |
|
|
.header p {{ |
|
|
font-size: 18px; |
|
|
color: #86868b; |
|
|
font-weight: 400; |
|
|
}} |
|
|
.controls {{ |
|
|
display: flex; |
|
|
gap: 12px; |
|
|
margin-bottom: 24px; |
|
|
justify-content: center; |
|
|
}} |
|
|
.btn {{ |
|
|
padding: 12px 24px; |
|
|
border: none; |
|
|
border-radius: 24px; |
|
|
font-size: 14px; |
|
|
font-weight: 500; |
|
|
cursor: pointer; |
|
|
transition: all 0.2s; |
|
|
font-family: inherit; |
|
|
}} |
|
|
.btn-primary {{ |
|
|
background: #ffffff; |
|
|
color: #000000; |
|
|
}} |
|
|
.btn-primary:hover {{ |
|
|
background: #f5f5f7; |
|
|
transform: scale(0.98); |
|
|
}} |
|
|
.btn-secondary {{ |
|
|
background: #1d1d1f; |
|
|
color: #f5f5f7; |
|
|
border: 1px solid #424245; |
|
|
}} |
|
|
.btn-secondary:hover {{ |
|
|
background: #2d2d2f; |
|
|
transform: scale(0.98); |
|
|
}} |
|
|
.json-container {{ |
|
|
background-color: #1d1d1f; |
|
|
border-radius: 16px; |
|
|
padding: 32px; |
|
|
overflow-x: auto; |
|
|
border: 1px solid #424245; |
|
|
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.3); |
|
|
}} |
|
|
pre {{ |
|
|
margin: 0; |
|
|
font-family: 'SF Mono', 'Monaco', 'Menlo', 'Consolas', monospace; |
|
|
font-size: 13px; |
|
|
line-height: 1.6; |
|
|
white-space: pre-wrap; |
|
|
word-wrap: break-word; |
|
|
}} |
|
|
.json-key {{ |
|
|
color: #9cdcfe; |
|
|
}} |
|
|
.json-string {{ |
|
|
color: #ce9178; |
|
|
}} |
|
|
.json-number {{ |
|
|
color: #b5cea8; |
|
|
}} |
|
|
.json-boolean {{ |
|
|
color: #569cd6; |
|
|
}} |
|
|
.json-null {{ |
|
|
color: #569cd6; |
|
|
}} |
|
|
.success {{ |
|
|
color: #30d158; |
|
|
}} |
|
|
@media (max-width: 768px) {{ |
|
|
.header h1 {{ |
|
|
font-size: 32px; |
|
|
}} |
|
|
.controls {{ |
|
|
flex-direction: column; |
|
|
}} |
|
|
.json-container {{ |
|
|
padding: 20px; |
|
|
}} |
|
|
}} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="container"> |
|
|
<div class="header"> |
|
|
<h1>ComfyUI Workflow</h1> |
|
|
<p>View and download your workflow JSON</p> |
|
|
</div> |
|
|
|
|
|
<div class="controls"> |
|
|
<button class="btn btn-primary" onclick="downloadJSON()">Download JSON</button> |
|
|
<button class="btn btn-secondary" onclick="copyToClipboard()">Copy to Clipboard</button> |
|
|
</div> |
|
|
|
|
|
<div class="json-container"> |
|
|
<pre id="json-content">{prettified_json}</pre> |
|
|
</div> |
|
|
</div> |
|
|
|
|
|
<script> |
|
|
function copyToClipboard() {{ |
|
|
const jsonContent = document.getElementById('json-content').textContent; |
|
|
navigator.clipboard.writeText(jsonContent).then(() => {{ |
|
|
const btn = event.target; |
|
|
const originalText = btn.textContent; |
|
|
btn.textContent = 'Copied!'; |
|
|
btn.classList.add('success'); |
|
|
setTimeout(() => {{ |
|
|
btn.textContent = originalText; |
|
|
btn.classList.remove('success'); |
|
|
}}, 2000); |
|
|
}}).catch(err => {{ |
|
|
alert('Failed to copy to clipboard'); |
|
|
}}); |
|
|
}} |
|
|
|
|
|
function downloadJSON() {{ |
|
|
const jsonContent = document.getElementById('json-content').textContent; |
|
|
const blob = new Blob([jsonContent], {{ type: 'application/json' }}); |
|
|
const url = URL.createObjectURL(blob); |
|
|
const a = document.createElement('a'); |
|
|
a.href = url; |
|
|
a.download = 'comfyui_workflow.json'; |
|
|
document.body.appendChild(a); |
|
|
a.click(); |
|
|
document.body.removeChild(a); |
|
|
URL.revokeObjectURL(url); |
|
|
|
|
|
const btn = event.target; |
|
|
const originalText = btn.textContent; |
|
|
btn.textContent = 'Downloaded!'; |
|
|
btn.classList.add('success'); |
|
|
setTimeout(() => {{ |
|
|
btn.textContent = originalText; |
|
|
btn.classList.remove('success'); |
|
|
}}, 2000); |
|
|
}} |
|
|
|
|
|
// Add syntax highlighting |
|
|
function highlightJSON() {{ |
|
|
const content = document.getElementById('json-content'); |
|
|
let html = content.innerHTML; |
|
|
|
|
|
// Highlight different JSON elements |
|
|
html = html.replace(/"([^"]+)":/g, '<span class="json-key">"$1":</span>'); |
|
|
html = html.replace(/: "([^"]*)"/g, ': <span class="json-string">"$1"</span>'); |
|
|
html = html.replace(/: (-?\\d+\\.?\\d*)/g, ': <span class="json-number">$1</span>'); |
|
|
html = html.replace(/: (true|false)/g, ': <span class="json-boolean">$1</span>'); |
|
|
html = html.replace(/: null/g, ': <span class="json-null">null</span>'); |
|
|
|
|
|
content.innerHTML = html; |
|
|
}} |
|
|
|
|
|
// Apply syntax highlighting after page load |
|
|
window.addEventListener('load', highlightJSON); |
|
|
</script> |
|
|
</body> |
|
|
</html>""" |
|
|
return html_content |
|
|
except json.JSONDecodeError: |
|
|
|
|
|
return f"""<!DOCTYPE html> |
|
|
<html lang="en"> |
|
|
<head> |
|
|
<meta charset="UTF-8"> |
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0"> |
|
|
<title>ComfyUI Workflow</title> |
|
|
<style> |
|
|
body {{ |
|
|
font-family: -apple-system, BlinkMacSystemFont, 'SF Pro Text', sans-serif; |
|
|
background-color: #000000; |
|
|
color: #f5f5f7; |
|
|
padding: 40px; |
|
|
}} |
|
|
pre {{ |
|
|
background: #1d1d1f; |
|
|
padding: 24px; |
|
|
border-radius: 12px; |
|
|
overflow-x: auto; |
|
|
}} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<h1>ComfyUI Workflow</h1> |
|
|
<p>Error: Invalid JSON format</p> |
|
|
<pre>{json_content}</pre> |
|
|
</body> |
|
|
</html>""" |
|
|
except Exception as e: |
|
|
print(f"Error prettifying ComfyUI JSON: {e}") |
|
|
return json_content |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_streamlit_code(code: str) -> bool: |
|
|
"""Check if code is Streamlit""" |
|
|
return 'import streamlit' in code or 'streamlit.run' in code |
|
|
|
|
|
|
|
|
def is_gradio_code(code: str) -> bool: |
|
|
"""Check if code is Gradio""" |
|
|
return 'import gradio' in code or 'gr.' in code |
|
|
|
|
|
|
|
|
def detect_sdk_from_code(code: str, language: str) -> str: |
|
|
"""Detect the appropriate SDK from code and language""" |
|
|
if language == "html": |
|
|
return "static" |
|
|
elif language == "transformers.js": |
|
|
return "static" |
|
|
elif language == "comfyui": |
|
|
return "static" |
|
|
elif language == "react": |
|
|
return "docker" |
|
|
elif language == "streamlit" or is_streamlit_code(code): |
|
|
return "docker" |
|
|
elif language == "gradio" or is_gradio_code(code): |
|
|
return "gradio" |
|
|
else: |
|
|
return "gradio" |
|
|
|
|
|
|
|
|
def add_anycoder_tag_to_readme(api, repo_id: str, app_port: Optional[int] = None, sdk: Optional[str] = None) -> None: |
|
|
""" |
|
|
Download existing README, add anycoder tag and app_port if needed, and upload back. |
|
|
Preserves all existing README content and frontmatter. |
|
|
|
|
|
Args: |
|
|
api: HuggingFace API client |
|
|
repo_id: Repository ID (username/space-name) |
|
|
app_port: Optional port number to set for Docker spaces (e.g., 7860) |
|
|
sdk: Optional SDK type (e.g., 'gradio', 'streamlit', 'docker', 'static') |
|
|
""" |
|
|
try: |
|
|
import tempfile |
|
|
import re |
|
|
|
|
|
|
|
|
readme_path = api.hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
filename="README.md", |
|
|
repo_type="space" |
|
|
) |
|
|
|
|
|
|
|
|
with open(readme_path, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
|
|
|
|
|
|
if content.startswith('---'): |
|
|
|
|
|
parts = content.split('---', 2) |
|
|
if len(parts) >= 3: |
|
|
frontmatter = parts[1].strip() |
|
|
body = parts[2] if len(parts) > 2 else "" |
|
|
|
|
|
|
|
|
if 'tags:' in frontmatter: |
|
|
|
|
|
if '- anycoder' not in frontmatter: |
|
|
frontmatter = re.sub(r'(tags:\s*\n(?:\s*-\s*[^\n]+\n)*)', r'\1- anycoder\n', frontmatter) |
|
|
else: |
|
|
|
|
|
frontmatter += '\ntags:\n- anycoder' |
|
|
|
|
|
|
|
|
if app_port is not None and 'app_port:' not in frontmatter: |
|
|
frontmatter += f'\napp_port: {app_port}' |
|
|
|
|
|
|
|
|
if sdk == 'gradio': |
|
|
if 'sdk_version:' in frontmatter: |
|
|
|
|
|
frontmatter = re.sub(r'sdk_version:\s*[^\n]+', 'sdk_version: 6.0.2', frontmatter) |
|
|
print(f"[README] Updated sdk_version to 6.0.2 for Gradio space") |
|
|
else: |
|
|
|
|
|
frontmatter += '\nsdk_version: 6.0.2' |
|
|
print(f"[README] Added sdk_version: 6.0.2 for Gradio space") |
|
|
|
|
|
|
|
|
new_content = f"---\n{frontmatter}\n---{body}" |
|
|
else: |
|
|
|
|
|
new_content = content.replace('---', '---\ntags:\n- anycoder\n---', 1) |
|
|
else: |
|
|
|
|
|
app_port_line = f'\napp_port: {app_port}' if app_port else '' |
|
|
sdk_version_line = '\nsdk_version: 6.0.2' if sdk == 'gradio' else '' |
|
|
new_content = f"---\ntags:\n- anycoder{app_port_line}{sdk_version_line}\n---\n\n{content}" |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".md", delete=False, encoding='utf-8') as f: |
|
|
f.write(new_content) |
|
|
temp_path = f.name |
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=temp_path, |
|
|
path_in_repo="README.md", |
|
|
repo_id=repo_id, |
|
|
repo_type="space" |
|
|
) |
|
|
|
|
|
os.unlink(temp_path) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Could not modify README.md to add anycoder tag: {e}") |
|
|
|
|
|
|
|
|
def create_dockerfile_for_streamlit(space_name: str) -> str: |
|
|
"""Create Dockerfile for Streamlit app""" |
|
|
return f"""FROM python:3.11-slim |
|
|
|
|
|
WORKDIR /app |
|
|
|
|
|
COPY requirements.txt . |
|
|
RUN pip install --no-cache-dir -r requirements.txt |
|
|
|
|
|
COPY . . |
|
|
|
|
|
EXPOSE 7860 |
|
|
|
|
|
CMD ["streamlit", "run", "app.py", "--server.port=7860", "--server.address=0.0.0.0"] |
|
|
""" |
|
|
|
|
|
|
|
|
def create_dockerfile_for_react(space_name: str) -> str: |
|
|
"""Create Dockerfile for React app""" |
|
|
return f"""FROM node:18-slim |
|
|
|
|
|
# Use existing node user |
|
|
USER node |
|
|
ENV HOME=/home/node |
|
|
ENV PATH=/home/node/.local/bin:$PATH |
|
|
|
|
|
WORKDIR /home/node/app |
|
|
|
|
|
COPY --chown=node:node package*.json ./ |
|
|
RUN npm install |
|
|
|
|
|
COPY --chown=node:node . . |
|
|
RUN npm run build |
|
|
|
|
|
EXPOSE 7860 |
|
|
|
|
|
CMD ["npm", "start", "--", "-p", "7860"] |
|
|
""" |
|
|
|
|
|
|
|
|
def extract_space_id_from_history(history: Optional[List], username: Optional[str] = None) -> Optional[str]: |
|
|
""" |
|
|
Extract existing space ID from chat history (for updates after followups/imports) |
|
|
|
|
|
Args: |
|
|
history: Chat history (list of lists [[role, content], ...] or list of dicts) |
|
|
username: Current username (to verify ownership of imported spaces) |
|
|
|
|
|
Returns: |
|
|
Space ID (username/space-name) if found, None otherwise |
|
|
""" |
|
|
if not history: |
|
|
return None |
|
|
|
|
|
import re |
|
|
existing_space = None |
|
|
|
|
|
|
|
|
for msg in history: |
|
|
|
|
|
if isinstance(msg, list) and len(msg) >= 2: |
|
|
role = msg[0] |
|
|
content = msg[1] |
|
|
elif isinstance(msg, dict): |
|
|
role = msg.get('role', '') |
|
|
content = msg.get('content', '') |
|
|
else: |
|
|
continue |
|
|
|
|
|
|
|
|
if role == 'assistant': |
|
|
|
|
|
content_lower = content.lower() |
|
|
has_deployment_indicator = ( |
|
|
"deployed" in content_lower or |
|
|
"updated" in content_lower or |
|
|
"β
" in content |
|
|
) |
|
|
|
|
|
if has_deployment_indicator: |
|
|
|
|
|
match = re.search(r'huggingface\.co/spaces/([^/\s\)]+/[^/\s\)]+)', content) |
|
|
if match: |
|
|
existing_space = match.group(1) |
|
|
print(f"[Extract Space] Found existing space: {existing_space}") |
|
|
break |
|
|
|
|
|
|
|
|
elif role == 'user': |
|
|
if "import" in content.lower() and "space" in content.lower(): |
|
|
|
|
|
match = re.search(r'huggingface\.co/spaces/([^/\s\)]+/[^/\s\)]+)', content) |
|
|
if match: |
|
|
imported_space = match.group(1) |
|
|
|
|
|
if username and imported_space.startswith(f"{username}/"): |
|
|
existing_space = imported_space |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
return existing_space |
|
|
|
|
|
|
|
|
def deploy_to_huggingface_space( |
|
|
code: str, |
|
|
language: str, |
|
|
space_name: Optional[str] = None, |
|
|
token: Optional[str] = None, |
|
|
username: Optional[str] = None, |
|
|
description: Optional[str] = None, |
|
|
private: bool = False, |
|
|
existing_repo_id: Optional[str] = None, |
|
|
commit_message: Optional[str] = None, |
|
|
history: Optional[List[Dict]] = None |
|
|
) -> Tuple[bool, str, Optional[str]]: |
|
|
""" |
|
|
Deploy code to HuggingFace Spaces (create new or update existing) |
|
|
|
|
|
Args: |
|
|
code: Generated code to deploy |
|
|
language: Target language/framework (html, gradio, streamlit, react, transformers.js, comfyui) |
|
|
space_name: Name for the space (auto-generated if None, ignored if existing_repo_id provided) |
|
|
token: HuggingFace API token |
|
|
username: HuggingFace username |
|
|
description: Space description |
|
|
private: Whether to make the space private (only for new spaces) |
|
|
existing_repo_id: If provided (username/space-name), updates this space instead of creating new one |
|
|
commit_message: Custom commit message (defaults to "Deploy from anycoder" or "Update from anycoder") |
|
|
history: Chat history (list of dicts with 'role' and 'content') - used to detect followups/imports |
|
|
|
|
|
Returns: |
|
|
Tuple of (success: bool, message: str, space_url: Optional[str]) |
|
|
""" |
|
|
if not token: |
|
|
token = os.getenv("HF_TOKEN") |
|
|
if not token: |
|
|
return False, "No HuggingFace token provided", None |
|
|
|
|
|
try: |
|
|
api = HfApi(token=token) |
|
|
|
|
|
|
|
|
if not username: |
|
|
try: |
|
|
user_info = api.whoami() |
|
|
username = user_info.get("name") or user_info.get("preferred_username") or "user" |
|
|
except Exception as e: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
if not existing_repo_id and history: |
|
|
existing_repo_id = extract_space_id_from_history(history, username) |
|
|
if existing_repo_id: |
|
|
print(f"[Deploy] Detected existing space from history: {existing_repo_id}") |
|
|
|
|
|
|
|
|
is_update = existing_repo_id is not None |
|
|
|
|
|
print(f"[Deploy] ========== DEPLOYMENT DECISION ==========") |
|
|
print(f"[Deploy] existing_repo_id provided: {existing_repo_id}") |
|
|
print(f"[Deploy] history provided: {history is not None} (length: {len(history) if history else 0})") |
|
|
print(f"[Deploy] username: {username}") |
|
|
print(f"[Deploy] is_update: {is_update}") |
|
|
print(f"[Deploy] language: {language}") |
|
|
print(f"[Deploy] ============================================") |
|
|
|
|
|
|
|
|
if is_update and language == "gradio": |
|
|
print(f"[Deploy] Gradio space update - updating .py files and uploading any new files") |
|
|
|
|
|
|
|
|
files = parse_multi_file_python_output(code) |
|
|
|
|
|
|
|
|
if not files: |
|
|
print(f"[Deploy] No file markers found, using entire code as app.py") |
|
|
cleaned_code = remove_code_block(code) |
|
|
files['app.py'] = cleaned_code |
|
|
|
|
|
if not files: |
|
|
return False, "Error: No files found in generated code", None |
|
|
|
|
|
print(f"[Deploy] Generated {len(files)} file(s): {list(files.keys())}") |
|
|
|
|
|
|
|
|
|
|
|
is_redesign = False |
|
|
if commit_message and "redesign" in commit_message.lower(): |
|
|
is_redesign = True |
|
|
elif history: |
|
|
|
|
|
for role, content in reversed(history): |
|
|
if role == "user" and content and "redesign" in content.lower(): |
|
|
is_redesign = True |
|
|
break |
|
|
|
|
|
if is_redesign: |
|
|
print(f"[Deploy] Redesign operation detected - filtering to ONLY app.py") |
|
|
app_py_content = files.get('app.py') |
|
|
if not app_py_content: |
|
|
return False, "Error: No app.py found in redesign output", None |
|
|
files = {'app.py': app_py_content} |
|
|
print(f"[Deploy] Will only update app.py ({len(app_py_content)} chars)") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
updated_files = [] |
|
|
for file_path, content in files.items(): |
|
|
print(f"[Deploy] Uploading {file_path} ({len(content)} chars)") |
|
|
success, msg = update_space_file( |
|
|
repo_id=existing_repo_id, |
|
|
file_path=file_path, |
|
|
content=content, |
|
|
token=token, |
|
|
commit_message=commit_message or f"Update {file_path} from anycoder" |
|
|
) |
|
|
|
|
|
if success: |
|
|
updated_files.append(file_path) |
|
|
else: |
|
|
print(f"[Deploy] Warning: Failed to update {file_path}: {msg}") |
|
|
|
|
|
if updated_files: |
|
|
space_url = f"https://huggingface.co/spaces/{existing_repo_id}" |
|
|
files_list = ", ".join(updated_files) |
|
|
return True, f"β
Updated {len(updated_files)} file(s): {files_list}! View at: {space_url}", space_url |
|
|
else: |
|
|
return False, "Failed to update any files", None |
|
|
|
|
|
if is_update: |
|
|
|
|
|
repo_id = existing_repo_id |
|
|
space_name = existing_repo_id.split('/')[-1] |
|
|
if '/' in existing_repo_id: |
|
|
username = existing_repo_id.split('/')[0] |
|
|
elif not username: |
|
|
|
|
|
try: |
|
|
user_info = api.whoami() |
|
|
username = user_info.get("name") or user_info.get("preferred_username") or "user" |
|
|
except Exception as e: |
|
|
return False, f"Failed to get user info: {str(e)}", None |
|
|
else: |
|
|
|
|
|
if not username: |
|
|
try: |
|
|
user_info = api.whoami() |
|
|
username = user_info.get("name") or user_info.get("preferred_username") or "user" |
|
|
except Exception as e: |
|
|
return False, f"Failed to get user info: {str(e)}", None |
|
|
|
|
|
|
|
|
if not space_name or space_name.strip() == "": |
|
|
space_name = f"anycoder-{uuid.uuid4().hex[:8]}" |
|
|
print(f"[Deploy] Auto-generated space name: {space_name}") |
|
|
|
|
|
|
|
|
space_name = re.sub(r'[^a-z0-9-]', '-', space_name.lower()) |
|
|
space_name = re.sub(r'-+', '-', space_name).strip('-') |
|
|
|
|
|
|
|
|
if not space_name: |
|
|
space_name = f"anycoder-{uuid.uuid4().hex[:8]}" |
|
|
print(f"[Deploy] Space name was empty after cleaning, regenerated: {space_name}") |
|
|
|
|
|
repo_id = f"{username}/{space_name}" |
|
|
print(f"[Deploy] Using repo_id: {repo_id}") |
|
|
|
|
|
|
|
|
sdk = detect_sdk_from_code(code, language) |
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
temp_path = Path(temp_dir) |
|
|
|
|
|
|
|
|
app_port = None |
|
|
use_individual_uploads = False |
|
|
|
|
|
if language == "transformers.js": |
|
|
try: |
|
|
files = parse_transformers_js_output(code) |
|
|
print(f"[Deploy] Parsed transformers.js files: {list(files.keys())}") |
|
|
|
|
|
|
|
|
for fname, fcontent in files.items(): |
|
|
if fcontent: |
|
|
print(f"[Deploy] {fname}: {len(fcontent)} characters") |
|
|
else: |
|
|
print(f"[Deploy] {fname}: EMPTY") |
|
|
|
|
|
|
|
|
required_files = {'index.html', 'index.js', 'style.css'} |
|
|
missing_from_dict = required_files - set(files.keys()) |
|
|
|
|
|
if missing_from_dict: |
|
|
error_msg = f"Failed to parse required files: {', '.join(sorted(missing_from_dict))}. " |
|
|
error_msg += f"Parsed files: {', '.join(files.keys()) if files else 'none'}. " |
|
|
error_msg += "Transformers.js apps require all three files (index.html, index.js, style.css). Please regenerate using the correct format." |
|
|
print(f"[Deploy] {error_msg}") |
|
|
return False, error_msg, None |
|
|
|
|
|
|
|
|
empty_files = [name for name in required_files if not files.get(name, '').strip()] |
|
|
if empty_files: |
|
|
error_msg = f"Empty file content detected: {', '.join(sorted(empty_files))}. " |
|
|
error_msg += "All three files must contain actual code. Please regenerate with complete content." |
|
|
print(f"[Deploy] {error_msg}") |
|
|
return False, error_msg, None |
|
|
|
|
|
|
|
|
for filename, content in files.items(): |
|
|
file_path = temp_path / filename |
|
|
print(f"[Deploy] Writing {filename} ({len(content)} chars) to {file_path}") |
|
|
|
|
|
file_path.write_text(content, encoding='utf-8') |
|
|
|
|
|
written_size = file_path.stat().st_size |
|
|
print(f"[Deploy] Verified {filename}: {written_size} bytes on disk") |
|
|
|
|
|
|
|
|
use_individual_uploads = True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[Deploy] Error parsing transformers.js: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return False, f"Error parsing transformers.js output: {str(e)}", None |
|
|
|
|
|
elif language == "html": |
|
|
html_code = parse_html_code(code) |
|
|
(temp_path / "index.html").write_text(html_code, encoding='utf-8') |
|
|
|
|
|
elif language == "comfyui": |
|
|
|
|
|
html_code = prettify_comfyui_json_for_html(code) |
|
|
(temp_path / "index.html").write_text(html_code, encoding='utf-8') |
|
|
|
|
|
elif language in ["gradio", "streamlit"]: |
|
|
files = parse_multi_file_python_output(code) |
|
|
|
|
|
|
|
|
if not files: |
|
|
print(f"[Deploy] No file markers found in {language} code, using entire code as app.py") |
|
|
|
|
|
cleaned_code = remove_code_block(code) |
|
|
|
|
|
app_filename = "streamlit_app.py" if language == "streamlit" else "app.py" |
|
|
files[app_filename] = cleaned_code |
|
|
|
|
|
|
|
|
for filename, content in files.items(): |
|
|
file_path = temp_path / filename |
|
|
file_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
file_path.write_text(content, encoding='utf-8') |
|
|
|
|
|
|
|
|
if "requirements.txt" not in files: |
|
|
|
|
|
main_app = files.get('streamlit_app.py') or files.get('app.py', '') |
|
|
if main_app: |
|
|
print(f"[Deploy] Generating requirements.txt from imports in {language} app") |
|
|
import_statements = extract_import_statements(main_app) |
|
|
requirements_content = generate_requirements_txt_with_llm(import_statements) |
|
|
(temp_path / "requirements.txt").write_text(requirements_content, encoding='utf-8') |
|
|
print(f"[Deploy] Generated requirements.txt with {len(requirements_content.splitlines())} lines") |
|
|
else: |
|
|
|
|
|
if language == "gradio": |
|
|
(temp_path / "requirements.txt").write_text("gradio>=4.0.0\n", encoding='utf-8') |
|
|
elif language == "streamlit": |
|
|
(temp_path / "requirements.txt").write_text("streamlit>=1.30.0\n", encoding='utf-8') |
|
|
|
|
|
|
|
|
if sdk == "docker": |
|
|
if language == "streamlit": |
|
|
dockerfile = create_dockerfile_for_streamlit(space_name) |
|
|
(temp_path / "Dockerfile").write_text(dockerfile, encoding='utf-8') |
|
|
app_port = 7860 |
|
|
use_individual_uploads = True |
|
|
|
|
|
elif language == "react": |
|
|
|
|
|
files = parse_multi_file_python_output(code) |
|
|
|
|
|
if not files: |
|
|
return False, "Error: Could not parse React output", None |
|
|
|
|
|
|
|
|
if 'Dockerfile' not in files: |
|
|
dockerfile = create_dockerfile_for_react(space_name) |
|
|
files['Dockerfile'] = dockerfile |
|
|
|
|
|
|
|
|
for filename, content in files.items(): |
|
|
file_path = temp_path / filename |
|
|
file_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
file_path.write_text(content, encoding='utf-8') |
|
|
|
|
|
app_port = 7860 |
|
|
use_individual_uploads = True |
|
|
|
|
|
else: |
|
|
|
|
|
files = parse_multi_file_python_output(code) |
|
|
|
|
|
|
|
|
if not files: |
|
|
print(f"[Deploy] No file markers found in default (gradio) code, using entire code as app.py") |
|
|
|
|
|
cleaned_code = remove_code_block(code) |
|
|
files['app.py'] = cleaned_code |
|
|
|
|
|
|
|
|
for filename, content in files.items(): |
|
|
file_path = temp_path / filename |
|
|
file_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
file_path.write_text(content, encoding='utf-8') |
|
|
|
|
|
|
|
|
if "requirements.txt" not in files: |
|
|
main_app = files.get('app.py', '') |
|
|
if main_app: |
|
|
print(f"[Deploy] Generating requirements.txt from imports in default app") |
|
|
import_statements = extract_import_statements(main_app) |
|
|
requirements_content = generate_requirements_txt_with_llm(import_statements) |
|
|
(temp_path / "requirements.txt").write_text(requirements_content, encoding='utf-8') |
|
|
print(f"[Deploy] Generated requirements.txt with {len(requirements_content.splitlines())} lines") |
|
|
else: |
|
|
|
|
|
(temp_path / "requirements.txt").write_text("gradio>=4.0.0\n", encoding='utf-8') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not is_update and sdk != "docker" and language not in ["transformers.js"]: |
|
|
print(f"[Deploy] Creating NEW {sdk} space: {repo_id}") |
|
|
try: |
|
|
api.create_repo( |
|
|
repo_id=repo_id, |
|
|
repo_type="space", |
|
|
space_sdk=sdk, |
|
|
private=private, |
|
|
exist_ok=True |
|
|
) |
|
|
except Exception as e: |
|
|
return False, f"Failed to create space: {str(e)}", None |
|
|
elif is_update: |
|
|
print(f"[Deploy] UPDATING existing space: {repo_id} (skipping create_repo)") |
|
|
|
|
|
|
|
|
if language == "transformers.js": |
|
|
if not is_update: |
|
|
print(f"[Deploy] Creating NEW transformers.js space via template duplication") |
|
|
print(f"[Deploy] space_name value: '{space_name}' (type: {type(space_name)})") |
|
|
|
|
|
|
|
|
if not space_name: |
|
|
return False, "Internal error: space_name is None after generation", None |
|
|
|
|
|
try: |
|
|
from huggingface_hub import duplicate_space |
|
|
|
|
|
|
|
|
|
|
|
clean_space_name = space_name.strip() |
|
|
print(f"[Deploy] Attempting to duplicate template space to: {clean_space_name}") |
|
|
|
|
|
duplicated_repo = duplicate_space( |
|
|
from_id="static-templates/transformers.js", |
|
|
to_id=clean_space_name, |
|
|
token=token, |
|
|
exist_ok=True |
|
|
) |
|
|
print(f"[Deploy] Template duplication result: {duplicated_repo} (type: {type(duplicated_repo)})") |
|
|
except Exception as e: |
|
|
print(f"[Deploy] Exception during duplicate_space: {type(e).__name__}: {str(e)}") |
|
|
|
|
|
|
|
|
space_exists = False |
|
|
try: |
|
|
if api.space_info(repo_id): |
|
|
space_exists = True |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
error_msg = str(e) |
|
|
if ("'url'" in error_msg or "RepoUrl" in error_msg) and space_exists: |
|
|
print(f"[Deploy] Space exists despite RepoUrl error, continuing with deployment") |
|
|
else: |
|
|
|
|
|
print(f"[Deploy] Template duplication failed, attempting fallback to create_repo: {e}") |
|
|
try: |
|
|
api.create_repo( |
|
|
repo_id=repo_id, |
|
|
repo_type="space", |
|
|
space_sdk="static", |
|
|
private=private, |
|
|
exist_ok=True |
|
|
) |
|
|
print(f"[Deploy] Fallback create_repo successful") |
|
|
except Exception as e2: |
|
|
return False, f"Failed to create transformers.js space (both duplication and fallback failed): {str(e2)}", None |
|
|
else: |
|
|
|
|
|
try: |
|
|
space_info = api.space_info(repo_id) |
|
|
if not space_info: |
|
|
return False, f"Could not access space {repo_id} for update", None |
|
|
except Exception as e: |
|
|
return False, f"Cannot update space {repo_id}: {str(e)}", None |
|
|
|
|
|
|
|
|
elif sdk == "docker" and language in ["streamlit", "react"]: |
|
|
if not is_update: |
|
|
print(f"[Deploy] Creating NEW Docker space for {language}: {repo_id}") |
|
|
try: |
|
|
from huggingface_hub import create_repo as hf_create_repo |
|
|
hf_create_repo( |
|
|
repo_id=repo_id, |
|
|
repo_type="space", |
|
|
space_sdk="docker", |
|
|
token=token, |
|
|
exist_ok=True |
|
|
) |
|
|
except Exception as e: |
|
|
return False, f"Failed to create Docker space: {str(e)}", None |
|
|
|
|
|
|
|
|
if not commit_message: |
|
|
commit_message = "Update from anycoder" if is_update else "Deploy from anycoder" |
|
|
|
|
|
try: |
|
|
if language == "transformers.js": |
|
|
|
|
|
|
|
|
import time |
|
|
|
|
|
|
|
|
files_to_upload = [ |
|
|
("index.html", files.get('index.html')), |
|
|
("index.js", files.get('index.js')), |
|
|
("style.css", files.get('style.css')) |
|
|
] |
|
|
|
|
|
max_attempts = 3 |
|
|
for file_name, file_content in files_to_upload: |
|
|
if not file_content: |
|
|
return False, f"Missing content for {file_name}", None |
|
|
|
|
|
success = False |
|
|
last_error = None |
|
|
|
|
|
for attempt in range(max_attempts): |
|
|
temp_file_path = None |
|
|
try: |
|
|
|
|
|
print(f"[Deploy] Creating temp file for {file_name} with {len(file_content)} chars") |
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=f".{file_name.split('.')[-1]}", delete=False) as f: |
|
|
f.write(file_content) |
|
|
temp_file_path = f.name |
|
|
|
|
|
|
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=temp_file_path, |
|
|
path_in_repo=file_name, |
|
|
repo_id=repo_id, |
|
|
repo_type="space" |
|
|
) |
|
|
success = True |
|
|
print(f"[Deploy] Successfully uploaded {file_name}") |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
last_error = e |
|
|
error_str = str(e) |
|
|
print(f"[Deploy] Upload error for {file_name}: {error_str}") |
|
|
if "403" in error_str or "Forbidden" in error_str: |
|
|
return False, f"Permission denied uploading {file_name}. Check your token has write access to {repo_id}.", None |
|
|
|
|
|
if attempt < max_attempts - 1: |
|
|
time.sleep(2) |
|
|
print(f"[Deploy] Retry {attempt + 1}/{max_attempts} for {file_name}") |
|
|
finally: |
|
|
|
|
|
if temp_file_path and os.path.exists(temp_file_path): |
|
|
os.unlink(temp_file_path) |
|
|
|
|
|
if not success: |
|
|
return False, f"Failed to upload {file_name} after {max_attempts} attempts: {last_error}", None |
|
|
|
|
|
elif use_individual_uploads: |
|
|
|
|
|
import time |
|
|
|
|
|
|
|
|
files_to_upload = [] |
|
|
for file_path in temp_path.rglob('*'): |
|
|
if file_path.is_file(): |
|
|
|
|
|
rel_path = file_path.relative_to(temp_path) |
|
|
files_to_upload.append(str(rel_path).replace('\\', '/')) |
|
|
|
|
|
if not files_to_upload: |
|
|
return False, "No files to upload", None |
|
|
|
|
|
print(f"[Deploy] Uploading {len(files_to_upload)} files individually: {files_to_upload}") |
|
|
|
|
|
max_attempts = 3 |
|
|
for filename in files_to_upload: |
|
|
|
|
|
file_path = temp_path / filename.replace('/', os.sep) |
|
|
if not file_path.exists(): |
|
|
return False, f"Failed to upload: {filename} not found", None |
|
|
|
|
|
|
|
|
success = False |
|
|
last_error = None |
|
|
|
|
|
for attempt in range(max_attempts): |
|
|
try: |
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=str(file_path), |
|
|
path_in_repo=filename, |
|
|
repo_id=repo_id, |
|
|
repo_type="space" |
|
|
) |
|
|
success = True |
|
|
print(f"[Deploy] Successfully uploaded {filename}") |
|
|
break |
|
|
except Exception as e: |
|
|
last_error = e |
|
|
error_str = str(e) |
|
|
print(f"[Deploy] Upload error for {filename}: {error_str}") |
|
|
if "403" in error_str or "Forbidden" in error_str: |
|
|
return False, f"Permission denied uploading {filename}. Check your token has write access to {repo_id}.", None |
|
|
if attempt < max_attempts - 1: |
|
|
time.sleep(2) |
|
|
print(f"[Deploy] Retry {attempt + 1}/{max_attempts} for {filename}") |
|
|
|
|
|
if not success: |
|
|
return False, f"Failed to upload {filename} after {max_attempts} attempts: {last_error}", None |
|
|
else: |
|
|
|
|
|
print(f"[Deploy] Uploading folder to {repo_id}") |
|
|
api.upload_folder( |
|
|
folder_path=str(temp_path), |
|
|
repo_id=repo_id, |
|
|
repo_type="space" |
|
|
) |
|
|
except Exception as e: |
|
|
return False, f"Failed to upload files: {str(e)}", None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
import time |
|
|
if not is_update: |
|
|
time.sleep(2) |
|
|
add_anycoder_tag_to_readme(api, repo_id, app_port, sdk) |
|
|
except Exception as e: |
|
|
|
|
|
print(f"Warning: Could not add anycoder tag to README: {e}") |
|
|
|
|
|
|
|
|
if is_update and language == "transformers.js": |
|
|
try: |
|
|
api.restart_space(repo_id=repo_id) |
|
|
print(f"[Deploy] Restarted space after update: {repo_id}") |
|
|
except Exception as restart_error: |
|
|
|
|
|
print(f"Note: Could not restart space after update: {restart_error}") |
|
|
|
|
|
space_url = f"https://huggingface.co/spaces/{repo_id}" |
|
|
action = "Updated" if is_update else "Deployed" |
|
|
|
|
|
|
|
|
|
|
|
success_msg = f"β
{action}! View your space at: {space_url}" |
|
|
|
|
|
return True, success_msg, space_url |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[Deploy] Top-level exception caught: {type(e).__name__}: {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return False, f"Deployment error: {str(e)}", None |
|
|
|
|
|
|
|
|
def update_space_file( |
|
|
repo_id: str, |
|
|
file_path: str, |
|
|
content: str, |
|
|
token: Optional[str] = None, |
|
|
commit_message: Optional[str] = None |
|
|
) -> Tuple[bool, str]: |
|
|
""" |
|
|
Update a single file in an existing HuggingFace Space |
|
|
|
|
|
Args: |
|
|
repo_id: Full repo ID (username/space-name) |
|
|
file_path: Path of file to update (e.g., "app.py") |
|
|
content: New file content |
|
|
token: HuggingFace API token |
|
|
commit_message: Commit message (default: "Update {file_path}") |
|
|
|
|
|
Returns: |
|
|
Tuple of (success: bool, message: str) |
|
|
""" |
|
|
if not token: |
|
|
token = os.getenv("HF_TOKEN") |
|
|
if not token: |
|
|
return False, "No HuggingFace token provided" |
|
|
|
|
|
try: |
|
|
api = HfApi(token=token) |
|
|
|
|
|
if not commit_message: |
|
|
commit_message = f"Update {file_path}" |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix=f'.{file_path.split(".")[-1]}', delete=False) as f: |
|
|
f.write(content) |
|
|
temp_path = f.name |
|
|
|
|
|
try: |
|
|
api.upload_file( |
|
|
path_or_fileobj=temp_path, |
|
|
path_in_repo=file_path, |
|
|
repo_id=repo_id, |
|
|
repo_type="space", |
|
|
commit_message=commit_message |
|
|
) |
|
|
return True, f"β
Successfully updated {file_path}" |
|
|
finally: |
|
|
os.unlink(temp_path) |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Failed to update file: {str(e)}" |
|
|
|
|
|
|
|
|
def delete_space( |
|
|
repo_id: str, |
|
|
token: Optional[str] = None |
|
|
) -> Tuple[bool, str]: |
|
|
""" |
|
|
Delete a HuggingFace Space |
|
|
|
|
|
Args: |
|
|
repo_id: Full repo ID (username/space-name) |
|
|
token: HuggingFace API token |
|
|
|
|
|
Returns: |
|
|
Tuple of (success: bool, message: str) |
|
|
""" |
|
|
if not token: |
|
|
token = os.getenv("HF_TOKEN") |
|
|
if not token: |
|
|
return False, "No HuggingFace token provided" |
|
|
|
|
|
try: |
|
|
api = HfApi(token=token) |
|
|
api.delete_repo(repo_id=repo_id, repo_type="space") |
|
|
return True, f"β
Successfully deleted {repo_id}" |
|
|
except Exception as e: |
|
|
return False, f"Failed to delete space: {str(e)}" |
|
|
|
|
|
|
|
|
def list_user_spaces( |
|
|
username: Optional[str] = None, |
|
|
token: Optional[str] = None |
|
|
) -> Tuple[bool, str, Optional[List[Dict]]]: |
|
|
""" |
|
|
List all spaces for a user |
|
|
|
|
|
Args: |
|
|
username: HuggingFace username (gets from token if None) |
|
|
token: HuggingFace API token |
|
|
|
|
|
Returns: |
|
|
Tuple of (success: bool, message: str, spaces: Optional[List[Dict]]) |
|
|
""" |
|
|
if not token: |
|
|
token = os.getenv("HF_TOKEN") |
|
|
if not token: |
|
|
return False, "No HuggingFace token provided", None |
|
|
|
|
|
try: |
|
|
api = HfApi(token=token) |
|
|
|
|
|
|
|
|
if not username: |
|
|
user_info = api.whoami() |
|
|
username = user_info.get("name") or user_info.get("preferred_username") |
|
|
|
|
|
|
|
|
spaces = api.list_spaces(author=username) |
|
|
|
|
|
space_list = [] |
|
|
for space in spaces: |
|
|
space_list.append({ |
|
|
"id": space.id, |
|
|
"author": space.author, |
|
|
"name": getattr(space, 'name', space.id.split('/')[-1]), |
|
|
"sdk": getattr(space, 'sdk', 'unknown'), |
|
|
"private": getattr(space, 'private', False), |
|
|
"url": f"https://huggingface.co/spaces/{space.id}" |
|
|
}) |
|
|
|
|
|
return True, f"Found {len(space_list)} spaces", space_list |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Failed to list spaces: {str(e)}", None |
|
|
|
|
|
|
|
|
def duplicate_space_to_user( |
|
|
from_space_id: str, |
|
|
to_space_name: Optional[str] = None, |
|
|
token: Optional[str] = None, |
|
|
private: bool = False |
|
|
) -> Tuple[bool, str, Optional[str]]: |
|
|
""" |
|
|
Duplicate a HuggingFace Space to the user's account |
|
|
|
|
|
Args: |
|
|
from_space_id: Source space ID (username/space-name) |
|
|
to_space_name: Destination space name (just the name, not full ID) |
|
|
token: HuggingFace API token |
|
|
private: Whether the duplicated space should be private |
|
|
|
|
|
Returns: |
|
|
Tuple of (success: bool, message: str, space_url: Optional[str]) |
|
|
""" |
|
|
if not token: |
|
|
token = os.getenv("HF_TOKEN") |
|
|
if not token: |
|
|
return False, "No HuggingFace token provided", None |
|
|
|
|
|
try: |
|
|
from huggingface_hub import duplicate_space |
|
|
|
|
|
|
|
|
api = HfApi(token=token) |
|
|
user_info = api.whoami() |
|
|
username = user_info.get("name") or user_info.get("preferred_username") or "user" |
|
|
|
|
|
|
|
|
print(f"[Duplicate] Fetching info for {from_space_id}") |
|
|
original_hardware = None |
|
|
original_storage = None |
|
|
original_sdk = None |
|
|
try: |
|
|
original_space_info = api.space_info(from_space_id) |
|
|
|
|
|
original_sdk = getattr(original_space_info, 'sdk', None) |
|
|
|
|
|
runtime = getattr(original_space_info, 'runtime', None) |
|
|
if runtime: |
|
|
original_hardware = getattr(runtime, 'hardware', None) |
|
|
original_storage = getattr(runtime, 'storage', None) |
|
|
print(f"[Duplicate] Original space SDK: {original_sdk}, hardware: {original_hardware}, storage: {original_storage}") |
|
|
except Exception as e: |
|
|
print(f"[Duplicate] Could not fetch space info: {e}") |
|
|
|
|
|
|
|
|
if not to_space_name: |
|
|
|
|
|
original_name = from_space_id.split('/')[-1] |
|
|
to_space_name = original_name |
|
|
|
|
|
|
|
|
to_space_name = re.sub(r'[^a-z0-9-]', '-', to_space_name.lower()) |
|
|
to_space_name = re.sub(r'-+', '-', to_space_name).strip('-') |
|
|
|
|
|
|
|
|
to_space_id = f"{username}/{to_space_name}" |
|
|
|
|
|
print(f"[Duplicate] Duplicating {from_space_id} to {to_space_id}") |
|
|
|
|
|
|
|
|
duplicate_params = { |
|
|
"from_id": from_space_id, |
|
|
"to_id": to_space_name, |
|
|
"token": token, |
|
|
"exist_ok": True |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
hardware_to_use = original_hardware if original_hardware else "cpu-basic" |
|
|
duplicate_params["hardware"] = hardware_to_use |
|
|
print(f"[Duplicate] Hardware: {hardware_to_use} (SDK: {original_sdk}, original: {original_hardware})") |
|
|
|
|
|
|
|
|
if original_storage and original_storage.get('requested'): |
|
|
duplicate_params["storage"] = original_storage.get('requested') |
|
|
print(f"[Duplicate] Storage: {original_storage.get('requested')}") |
|
|
|
|
|
|
|
|
if private: |
|
|
duplicate_params["private"] = private |
|
|
|
|
|
|
|
|
print(f"[Duplicate] Duplicating {from_space_id} to {username}/{to_space_name}") |
|
|
print(f"[Duplicate] Parameters: {list(duplicate_params.keys())}") |
|
|
|
|
|
try: |
|
|
duplicated_repo = duplicate_space(**duplicate_params) |
|
|
except Exception as dup_error: |
|
|
|
|
|
error_str = str(dup_error).lower() |
|
|
if 'zero' in error_str or 'hardware' in error_str: |
|
|
print(f"[Duplicate] Hardware error detected (likely zero-gpu issue): {dup_error}") |
|
|
print(f"[Duplicate] Retrying with cpu-basic hardware...") |
|
|
|
|
|
|
|
|
duplicate_params["hardware"] = "cpu-basic" |
|
|
try: |
|
|
duplicated_repo = duplicate_space(**duplicate_params) |
|
|
print(f"[Duplicate] β
Successfully duplicated with cpu-basic hardware") |
|
|
except Exception as retry_error: |
|
|
print(f"[Duplicate] Retry with cpu-basic also failed: {retry_error}") |
|
|
raise retry_error |
|
|
else: |
|
|
|
|
|
raise dup_error |
|
|
|
|
|
|
|
|
space_url = f"https://huggingface.co/spaces/{to_space_id}" |
|
|
|
|
|
success_msg = f"β
Space duplicated! View at: {space_url}" |
|
|
print(f"[Duplicate] {success_msg}") |
|
|
|
|
|
return True, success_msg, space_url |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[Duplicate] Error: {type(e).__name__}: {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return False, f"Failed to duplicate space: {str(e)}", None |
|
|
|
|
|
|
|
|
def create_pull_request_on_space( |
|
|
repo_id: str, |
|
|
code: str, |
|
|
language: str, |
|
|
token: Optional[str] = None, |
|
|
pr_title: Optional[str] = None, |
|
|
pr_description: Optional[str] = None |
|
|
) -> Tuple[bool, str, Optional[str]]: |
|
|
""" |
|
|
Create a Pull Request on an existing HuggingFace Space with redesigned code |
|
|
|
|
|
Args: |
|
|
repo_id: Full repo ID (username/space-name) |
|
|
code: New code to propose |
|
|
language: Language/framework type |
|
|
token: HuggingFace API token |
|
|
pr_title: Title for the PR (default: "Redesign from AnyCoder") |
|
|
pr_description: Description for the PR |
|
|
|
|
|
Returns: |
|
|
Tuple of (success: bool, message: str, pr_url: Optional[str]) |
|
|
""" |
|
|
if not token: |
|
|
token = os.getenv("HF_TOKEN") |
|
|
if not token: |
|
|
return False, "No HuggingFace token provided", None |
|
|
|
|
|
try: |
|
|
api = HfApi(token=token) |
|
|
|
|
|
|
|
|
try: |
|
|
space_info = api.space_info(repo_id=repo_id, token=token) |
|
|
print(f"[PR] Space info: private={space_info.private if hasattr(space_info, 'private') else 'unknown'}") |
|
|
|
|
|
|
|
|
if hasattr(space_info, 'private') and space_info.private: |
|
|
return False, "β Cannot create PR on private space. The space must be public to accept PRs from others.", None |
|
|
except Exception as info_error: |
|
|
print(f"[PR] Could not fetch space info: {info_error}") |
|
|
|
|
|
|
|
|
|
|
|
if not pr_title: |
|
|
pr_title = "π¨ Redesign from AnyCoder" |
|
|
|
|
|
if not pr_description: |
|
|
pr_description = """This Pull Request contains a redesigned version of the app with: |
|
|
|
|
|
- β¨ Modern, mobile-friendly design |
|
|
- π― Minimal, clean components |
|
|
- π± Responsive layout |
|
|
- π Improved user experience |
|
|
|
|
|
Generated by [AnyCoder](https://huggingface.co/spaces/akhaliq/anycoder)""" |
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
temp_path = Path(temp_dir) |
|
|
|
|
|
|
|
|
if language == "transformers.js": |
|
|
try: |
|
|
files = parse_transformers_js_output(code) |
|
|
print(f"[PR] Parsed transformers.js files: {list(files.keys())}") |
|
|
|
|
|
|
|
|
for filename, content in files.items(): |
|
|
file_path = temp_path / filename |
|
|
file_path.write_text(content, encoding='utf-8') |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[PR] Error parsing transformers.js: {e}") |
|
|
return False, f"Error parsing transformers.js output: {str(e)}", None |
|
|
|
|
|
elif language == "html": |
|
|
html_code = parse_html_code(code) |
|
|
(temp_path / "index.html").write_text(html_code, encoding='utf-8') |
|
|
|
|
|
elif language == "comfyui": |
|
|
html_code = prettify_comfyui_json_for_html(code) |
|
|
(temp_path / "index.html").write_text(html_code, encoding='utf-8') |
|
|
|
|
|
elif language in ["gradio", "streamlit", "react"]: |
|
|
files = parse_multi_file_python_output(code) |
|
|
|
|
|
|
|
|
if not files: |
|
|
print(f"[PR] No file markers found, using entire code as main file") |
|
|
cleaned_code = remove_code_block(code) |
|
|
if language == "streamlit": |
|
|
files["streamlit_app.py"] = cleaned_code |
|
|
elif language == "react": |
|
|
files["app.tsx"] = cleaned_code |
|
|
else: |
|
|
files["app.py"] = cleaned_code |
|
|
|
|
|
|
|
|
|
|
|
if language == "gradio": |
|
|
print(f"[PR] Gradio app - filtering to only .py files") |
|
|
py_files = {fname: content for fname, content in files.items() if fname.endswith('.py')} |
|
|
if not py_files: |
|
|
print(f"[PR] Warning: No .py files found in parsed output") |
|
|
return False, "No Python files found in generated code for Gradio PR", None |
|
|
|
|
|
|
|
|
is_redesign = "redesign" in pr_title.lower() if pr_title else False |
|
|
|
|
|
if is_redesign: |
|
|
print(f"[PR] Redesign PR detected - filtering to ONLY app.py") |
|
|
if 'app.py' not in py_files: |
|
|
print(f"[PR] Warning: No app.py found in redesign output") |
|
|
return False, "No app.py found in redesign output for Gradio PR", None |
|
|
files = {'app.py': py_files['app.py']} |
|
|
print(f"[PR] Will only update app.py ({len(py_files['app.py'])} chars)") |
|
|
else: |
|
|
files = py_files |
|
|
print(f"[PR] Will update {len(files)} Python file(s): {list(files.keys())}") |
|
|
|
|
|
|
|
|
for filename, content in files.items(): |
|
|
file_path = temp_path / filename |
|
|
file_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
file_path.write_text(content, encoding='utf-8') |
|
|
|
|
|
|
|
|
|
|
|
if language == "streamlit" and "requirements.txt" not in files: |
|
|
main_app = files.get('streamlit_app.py') or files.get('app.py', '') |
|
|
if main_app: |
|
|
print(f"[PR] Generating requirements.txt from imports") |
|
|
import_statements = extract_import_statements(main_app) |
|
|
requirements_content = generate_requirements_txt_with_llm(import_statements) |
|
|
(temp_path / "requirements.txt").write_text(requirements_content, encoding='utf-8') |
|
|
|
|
|
else: |
|
|
|
|
|
files = parse_multi_file_python_output(code) |
|
|
if not files: |
|
|
cleaned_code = remove_code_block(code) |
|
|
files['app.py'] = cleaned_code |
|
|
|
|
|
for filename, content in files.items(): |
|
|
file_path = temp_path / filename |
|
|
file_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
file_path.write_text(content, encoding='utf-8') |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
print(f"[PR] Creating pull request with files on {repo_id}") |
|
|
|
|
|
|
|
|
from huggingface_hub import CommitOperationAdd |
|
|
operations = [] |
|
|
|
|
|
for file_path in temp_path.rglob('*'): |
|
|
if file_path.is_file(): |
|
|
rel_path = file_path.relative_to(temp_path) |
|
|
operations.append( |
|
|
CommitOperationAdd( |
|
|
path_in_repo=str(rel_path), |
|
|
path_or_fileobj=str(file_path) |
|
|
) |
|
|
) |
|
|
|
|
|
print(f"[PR] Prepared {len(operations)} file operations") |
|
|
print(f"[PR] Token being used (first 20 chars): {token[:20] if token else 'None'}...") |
|
|
|
|
|
|
|
|
commit_info = api.create_commit( |
|
|
repo_id=repo_id, |
|
|
repo_type="space", |
|
|
operations=operations, |
|
|
commit_message=pr_title, |
|
|
commit_description=pr_description, |
|
|
create_pr=True, |
|
|
token=token, |
|
|
) |
|
|
|
|
|
|
|
|
pr_url = commit_info.pr_url if hasattr(commit_info, 'pr_url') else None |
|
|
pr_num = commit_info.pr_num if hasattr(commit_info, 'pr_num') else None |
|
|
|
|
|
if not pr_url and pr_num: |
|
|
pr_url = f"https://huggingface.co/spaces/{repo_id}/discussions/{pr_num}" |
|
|
elif not pr_url: |
|
|
pr_url = f"https://huggingface.co/spaces/{repo_id}/discussions" |
|
|
|
|
|
print(f"[PR] Created PR: {pr_url}") |
|
|
success_msg = f"β
Pull Request created! View at: {pr_url}" |
|
|
|
|
|
return True, success_msg, pr_url |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
print(f"[PR] Error creating pull request: {error_msg}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
if "403" in error_msg or "Forbidden" in error_msg or "Authorization" in error_msg: |
|
|
user_msg = ( |
|
|
"β Cannot create Pull Request: Permission denied.\n\n" |
|
|
"**Possible reasons:**\n" |
|
|
"- The space owner hasn't enabled Pull Requests\n" |
|
|
"- You don't have write access to this space\n" |
|
|
"- Spaces have stricter PR permissions than models/datasets\n\n" |
|
|
"**What you can do:**\n" |
|
|
"β
Use the 'Redesign' button WITHOUT checking 'Create PR' - this will:\n" |
|
|
" 1. Duplicate the space to your account\n" |
|
|
" 2. Apply the redesign to your copy\n" |
|
|
" 3. You'll own the new space!\n\n" |
|
|
"Or contact the space owner to enable Pull Requests." |
|
|
) |
|
|
else: |
|
|
user_msg = f"Failed to create pull request: {error_msg}" |
|
|
|
|
|
return False, user_msg, None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[PR] Top-level exception: {type(e).__name__}: {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return False, f"Pull request error: {str(e)}", None |
|
|
|
|
|
|