yeye / media_processing.py
mgbam's picture
Create media_processing.py
9009981 verified
raw
history blame
48 kB
import os
import base64
import cv2
import numpy as np
from PIL import Image
import pytesseract
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
import html2text
import json
import time
import webbrowser
import urllib.parse
import copy
import html
import tempfile
import uuid
import datetime
import threading
import atexit
from huggingface_hub import HfApi
import gradio as gr
import subprocess
import re
# ---------------------------------------------------------------------------
# Video temp-file management (per-session tracking and cleanup)
# ---------------------------------------------------------------------------
VIDEO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_videos")
VIDEO_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
_SESSION_VIDEO_FILES: Dict[str, List[str]] = {}
_VIDEO_FILES_LOCK = threading.Lock()
def _ensure_video_dir_exists() -> None:
try:
os.makedirs(VIDEO_TEMP_DIR, exist_ok=True)
except Exception:
pass
def _register_video_for_session(session_id: Optional[str], file_path: str) -> None:
if not session_id or not file_path:
return
with _VIDEO_FILES_LOCK:
if session_id not in _SESSION_VIDEO_FILES:
_SESSION_VIDEO_FILES[session_id] = []
_SESSION_VIDEO_FILES[session_id].append(file_path)
def cleanup_session_videos(session_id: Optional[str]) -> None:
if not session_id:
return
with _VIDEO_FILES_LOCK:
file_list = _SESSION_VIDEO_FILES.pop(session_id, [])
for path in file_list:
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
# Best-effort cleanup
pass
def reap_old_videos(ttl_seconds: int = VIDEO_FILE_TTL_SECONDS) -> None:
"""Delete old video files in the temp directory based on modification time."""
try:
_ensure_video_dir_exists()
now_ts = time.time()
for name in os.listdir(VIDEO_TEMP_DIR):
path = os.path.join(VIDEO_TEMP_DIR, name)
try:
if not os.path.isfile(path):
continue
mtime = os.path.getmtime(path)
if now_ts - mtime > ttl_seconds:
os.unlink(path)
except Exception:
pass
except Exception:
# Temp dir might not exist or be accessible; ignore
pass
# ---------------------------------------------------------------------------
# Audio temp-file management (per-session tracking and cleanup)
# ---------------------------------------------------------------------------
AUDIO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_audio")
AUDIO_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
_SESSION_AUDIO_FILES: Dict[str, List[str]] = {}
_AUDIO_FILES_LOCK = threading.Lock()
def _ensure_audio_dir_exists() -> None:
try:
os.makedirs(AUDIO_TEMP_DIR, exist_ok=True)
except Exception:
pass
def _register_audio_for_session(session_id: Optional[str], file_path: str) -> None:
if not session_id or not file_path:
return
with _AUDIO_FILES_LOCK:
if session_id not in _SESSION_AUDIO_FILES:
_SESSION_AUDIO_FILES[session_id] = []
_SESSION_AUDIO_FILES[session_id].append(file_path)
def cleanup_session_audio(session_id: Optional[str]) -> None:
if not session_id:
return
with _AUDIO_FILES_LOCK:
file_list = _SESSION_AUDIO_FILES.pop(session_id, [])
for path in file_list:
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
pass
def reap_old_audio(ttl_seconds: int = AUDIO_FILE_TTL_SECONDS) -> None:
try:
_ensure_audio_dir_exists()
now_ts = time.time()
for name in os.listdir(AUDIO_TEMP_DIR):
path = os.path.join(AUDIO_TEMP_DIR, name)
try:
if not os.path.isfile(path):
continue
mtime = os.path.getmtime(path)
if now_ts - mtime > ttl_seconds:
os.unlink(path)
except Exception:
pass
except Exception:
pass
# ---------------------------------------------------------------------------
# General temp media file management (per-session tracking and cleanup)
# ---------------------------------------------------------------------------
MEDIA_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_media")
MEDIA_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
_SESSION_MEDIA_FILES: Dict[str, List[str]] = {}
_MEDIA_FILES_LOCK = threading.Lock()
# Global dictionary to store temporary media files for the session
temp_media_files = {}
def _ensure_media_dir_exists() -> None:
"""Ensure the media temp directory exists."""
try:
os.makedirs(MEDIA_TEMP_DIR, exist_ok=True)
except Exception:
pass
def track_session_media_file(session_id: Optional[str], file_path: str) -> None:
"""Track a media file for session-based cleanup."""
if not session_id or not file_path:
return
with _MEDIA_FILES_LOCK:
if session_id not in _SESSION_MEDIA_FILES:
_SESSION_MEDIA_FILES[session_id] = []
_SESSION_MEDIA_FILES[session_id].append(file_path)
def cleanup_session_media(session_id: Optional[str]) -> None:
"""Clean up media files for a specific session."""
if not session_id:
return
with _MEDIA_FILES_LOCK:
files_to_clean = _SESSION_MEDIA_FILES.pop(session_id, [])
for path in files_to_clean:
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
# Best-effort cleanup
pass
def reap_old_media(ttl_seconds: int = MEDIA_FILE_TTL_SECONDS) -> None:
"""Delete old media files in the temp directory based on modification time."""
try:
_ensure_media_dir_exists()
now_ts = time.time()
for name in os.listdir(MEDIA_TEMP_DIR):
path = os.path.join(MEDIA_TEMP_DIR, name)
if os.path.isfile(path):
try:
mtime = os.path.getmtime(path)
if (now_ts - mtime) > ttl_seconds:
os.unlink(path)
except Exception:
pass
except Exception:
# Temp dir might not exist or be accessible; ignore
pass
def cleanup_all_temp_media_on_startup() -> None:
"""Clean up all temporary media files on app startup."""
try:
# Clean up temp_media_files registry
temp_media_files.clear()
# Clean up actual files from disk (assume all are orphaned on startup)
_ensure_media_dir_exists()
for name in os.listdir(MEDIA_TEMP_DIR):
path = os.path.join(MEDIA_TEMP_DIR, name)
if os.path.isfile(path):
try:
os.unlink(path)
except Exception:
pass
# Clear session tracking
with _MEDIA_FILES_LOCK:
_SESSION_MEDIA_FILES.clear()
print("[StartupCleanup] Cleaned up orphaned temporary media files")
except Exception as e:
print(f"[StartupCleanup] Error during media cleanup: {str(e)}")
def cleanup_all_temp_media_on_shutdown() -> None:
"""Clean up all temporary media files on app shutdown."""
try:
print("[ShutdownCleanup] Cleaning up temporary media files...")
# Clean up temp_media_files registry and remove files
for file_id, file_info in temp_media_files.items():
try:
if os.path.exists(file_info['path']):
os.unlink(file_info['path'])
except Exception:
pass
temp_media_files.clear()
# Clean up all session files
with _MEDIA_FILES_LOCK:
for session_id, file_paths in _SESSION_MEDIA_FILES.items():
for path in file_paths:
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
pass
_SESSION_MEDIA_FILES.clear()
print("[ShutdownCleanup] Temporary media cleanup completed")
except Exception as e:
print(f"[ShutdownCleanup] Error during cleanup: {str(e)}")
# Register shutdown cleanup handler
atexit.register(cleanup_all_temp_media_on_shutdown)
def create_temp_media_url(media_bytes: bytes, filename: str, media_type: str = "image", session_id: Optional[str] = None) -> str:
"""Create a temporary file and return a local URL for preview."""
try:
# Create unique filename with timestamp and UUID
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
base_name, ext = os.path.splitext(filename)
unique_filename = f"{media_type}_{timestamp}_{unique_id}_{base_name}{ext}"
# Create temporary file in the dedicated directory
_ensure_media_dir_exists()
temp_path = os.path.join(MEDIA_TEMP_DIR, unique_filename)
# Write media bytes to temporary file
with open(temp_path, 'wb') as f:
f.write(media_bytes)
# Track file for session-based cleanup
if session_id:
track_session_media_file(session_id, temp_path)
# Store the file info for later upload
file_id = f"{media_type}_{unique_id}"
temp_media_files[file_id] = {
'path': temp_path,
'filename': filename,
'media_type': media_type,
'media_bytes': media_bytes
}
# Return file:// URL for preview
file_url = f"file://{temp_path}"
print(f"[TempMedia] Created temporary {media_type} file: {file_url}")
return file_url
except Exception as e:
print(f"[TempMedia] Failed to create temporary file: {str(e)}")
return f"Error creating temporary {media_type} file: {str(e)}"
def upload_media_to_hf(media_bytes: bytes, filename: str, media_type: str = "image", token: gr.OAuthToken | None = None, use_temp: bool = True) -> str:
"""Upload media file to user's Hugging Face account or create temporary file."""
try:
# If use_temp is True, create temporary file for preview
if use_temp:
return create_temp_media_url(media_bytes, filename, media_type)
# Otherwise, upload to Hugging Face for permanent URL
# Try to get token from OAuth first, then fall back to environment variable
hf_token = None
if token and token.token:
hf_token = token.token
else:
hf_token = os.getenv('HF_TOKEN')
if not hf_token:
return "Error: Please log in with your Hugging Face account to upload media, or set HF_TOKEN environment variable."
# Initialize HF API
api = HfApi(token=hf_token)
# Get current user info to determine username
try:
user_info = api.whoami()
username = user_info.get('name', 'unknown-user')
except Exception as e:
print(f"[HFUpload] Could not get user info: {e}")
username = 'anycoder-user'
# Create repository name for media storage
repo_name = f"{username}/anycoder-media"
# Try to create the repository if it doesn't exist
try:
api.create_repo(
repo_id=repo_name,
repo_type="dataset",
private=False,
exist_ok=True
)
print(f"[HFUpload] Repository {repo_name} ready")
except Exception as e:
print(f"[HFUpload] Repository creation/access issue: {e}")
# Continue anyway, repo might already exist
# Create unique filename with timestamp and UUID
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
base_name, ext = os.path.splitext(filename)
unique_filename = f"{media_type}/{timestamp}_{unique_id}_{base_name}{ext}"
# Create temporary file for upload
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
temp_file.write(media_bytes)
temp_path = temp_file.name
try:
# Upload file to HF repository
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=unique_filename,
repo_id=repo_name,
repo_type="dataset",
commit_message=f"Upload {media_type} generated by AnyCoder"
)
# Generate permanent URL
permanent_url = f"https://huggingface.co/datasets/{repo_name}/resolve/main/{unique_filename}"
print(f"[HFUpload] Successfully uploaded {media_type} to {permanent_url}")
return permanent_url
finally:
# Clean up temporary file
try:
os.unlink(temp_path)
except Exception:
pass
except Exception as e:
print(f"[HFUpload] Upload failed: {str(e)}")
return f"Error uploading {media_type} to Hugging Face: {str(e)}"
def upload_temp_files_to_hf_and_replace_urls(html_content: str, token: gr.OAuthToken | None = None) -> str:
"""Upload all temporary media files to HF and replace their URLs in HTML content."""
try:
if not temp_media_files:
print("[DeployUpload] No temporary media files to upload")
return html_content
print(f"[DeployUpload] Uploading {len(temp_media_files)} temporary media files to HF")
updated_content = html_content
for file_id, file_info in temp_media_files.items():
try:
# Upload to HF with permanent URL
permanent_url = upload_media_to_hf(
file_info['media_bytes'],
file_info['filename'],
file_info['media_type'],
token,
use_temp=False # Force permanent upload
)
if not permanent_url.startswith("Error"):
# Replace the temporary file URL with permanent URL
temp_url = f"file://{file_info['path']}"
updated_content = updated_content.replace(temp_url, permanent_url)
print(f"[DeployUpload] Replaced {temp_url} with {permanent_url}")
else:
print(f"[DeployUpload] Failed to upload {file_id}: {permanent_url}")
except Exception as e:
print(f"[DeployUpload] Error uploading {file_id}: {str(e)}")
continue
# Clean up temporary files after upload
cleanup_temp_media_files()
return updated_content
except Exception as e:
print(f"[DeployUpload] Failed to upload temporary files: {str(e)}")
return html_content
def cleanup_temp_media_files():
"""Clean up temporary media files from disk and memory."""
try:
for file_id, file_info in temp_media_files.items():
try:
if os.path.exists(file_info['path']):
os.remove(file_info['path'])
print(f"[TempCleanup] Removed {file_info['path']}")
except Exception as e:
print(f"[TempCleanup] Failed to remove {file_info['path']}: {str(e)}")
# Clear the global dictionary
temp_media_files.clear()
print("[TempCleanup] Cleared temporary media files registry")
except Exception as e:
print(f"[TempCleanup] Error during cleanup: {str(e)}")
def generate_image_with_qwen(prompt: str, image_index: int = 0, token: gr.OAuthToken | None = None) -> str:
"""Generate image using Qwen image model via Hugging Face InferenceClient and upload to HF for permanent URL"""
try:
# Check if HF_TOKEN is available
if not os.getenv('HF_TOKEN'):
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
# Create InferenceClient for Qwen image generation
client = InferenceClient(
provider="auto",
api_key=os.getenv('HF_TOKEN'),
bill_to="huggingface",
)
# Generate image using Qwen/Qwen-Image model
image = client.text_to_image(
prompt,
model="Qwen/Qwen-Image",
)
# Resize image to reduce size while maintaining quality
max_size = 1024 # Increased size since we're not using data URIs
if image.width > max_size or image.height > max_size:
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convert PIL Image to bytes for upload
import io
buffer = io.BytesIO()
# Save as JPEG with good quality since we're not embedding
image.convert('RGB').save(buffer, format='JPEG', quality=90, optimize=True)
image_bytes = buffer.getvalue()
# Create temporary URL for preview (will be uploaded to HF during deploy)
filename = f"generated_image_{image_index}.jpg"
temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
# Check if creation was successful
if temp_url.startswith("Error"):
return temp_url
# Return HTML img tag with temporary URL
return f'<img src="{temp_url}" alt="{prompt}" style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;" loading="lazy" />'
except Exception as e:
print(f"Image generation error: {str(e)}")
return f"Error generating image: {str(e)}"
def generate_image_to_image(input_image_data, prompt: str, token: gr.OAuthToken | None = None) -> str:
"""Generate an image using image-to-image with Qwen-Image-Edit via Hugging Face InferenceClient."""
try:
# Check token
if not os.getenv('HF_TOKEN'):
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
# Prepare client
client = InferenceClient(
provider="auto",
api_key=os.getenv('HF_TOKEN'),
bill_to="huggingface",
)
# Normalize input image to bytes
import io
from PIL import Image
try:
import numpy as np
except Exception:
np = None
if hasattr(input_image_data, 'read'):
# File-like object
raw = input_image_data.read()
pil_image = Image.open(io.BytesIO(raw))
elif hasattr(input_image_data, 'mode') and hasattr(input_image_data, 'size'):
# PIL Image
pil_image = input_image_data
elif np is not None and isinstance(input_image_data, np.ndarray):
pil_image = Image.fromarray(input_image_data)
elif isinstance(input_image_data, (bytes, bytearray)):
pil_image = Image.open(io.BytesIO(input_image_data))
else:
# Fallback: try to convert via bytes
pil_image = Image.open(io.BytesIO(bytes(input_image_data)))
# Ensure RGB
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
# Resize input image to avoid request body size limits
max_input_size = 1024
if pil_image.width > max_input_size or pil_image.height > max_input_size:
pil_image.thumbnail((max_input_size, max_input_size), Image.Resampling.LANCZOS)
buf = io.BytesIO()
pil_image.save(buf, format='JPEG', quality=85, optimize=True)
input_bytes = buf.getvalue()
# Call image-to-image
image = client.image_to_image(
input_bytes,
prompt=prompt,
model="Qwen/Qwen-Image-Edit",
)
# Resize/optimize (larger since not using data URIs)
max_size = 1024
if image.width > max_size or image.height > max_size:
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
out_buf = io.BytesIO()
image.convert('RGB').save(out_buf, format='JPEG', quality=90, optimize=True)
image_bytes = out_buf.getvalue()
# Create temporary URL for preview (will be uploaded to HF during deploy)
filename = "image_to_image_result.jpg"
temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
# Check if creation was successful
if temp_url.startswith("Error"):
return temp_url
return f"<img src=\"{temp_url}\" alt=\"{prompt}\" style=\"max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;\" loading=\"lazy\" />"
except Exception as e:
print(f"Image-to-image generation error: {str(e)}")
return f"Error generating image (image-to-image): {str(e)}"
def generate_video_from_image(input_image_data, prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
"""Generate a video from an input image and prompt using Hugging Face InferenceClient."""
try:
print("[Image2Video] Starting video generation")
if not os.getenv('HF_TOKEN'):
print("[Image2Video] Missing HF_TOKEN")
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
# Prepare client
client = InferenceClient(
provider="auto",
api_key=os.getenv('HF_TOKEN'),
bill_to="huggingface",
)
print(f"[Image2Video] InferenceClient initialized (provider=auto)")
# Normalize input image to bytes, with downscale/compress to cap request size
import io
from PIL import Image
try:
import numpy as np
except Exception:
np = None
def _load_pil(img_like) -> Image.Image:
if hasattr(img_like, 'read'):
return Image.open(io.BytesIO(img_like.read()))
if hasattr(img_like, 'mode') and hasattr(img_like, 'size'):
return img_like
if np is not None and isinstance(img_like, np.ndarray):
return Image.fromarray(img_like)
if isinstance(img_like, (bytes, bytearray)):
return Image.open(io.BytesIO(img_like))
return Image.open(io.BytesIO(bytes(img_like)))
pil_image = _load_pil(input_image_data)
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
try:
print(f"[Image2Video] Input PIL image size={pil_image.size} mode={pil_image.mode}")
except Exception:
pass
# Progressive encode to keep payload under ~3.9MB (below 4MB limit)
MAX_BYTES = 3_900_000
max_dim = 1024 # initial cap on longest edge
quality = 90
def encode_current(pil: Image.Image, q: int) -> bytes:
tmp = io.BytesIO()
pil.save(tmp, format='JPEG', quality=q, optimize=True)
return tmp.getvalue()
# Downscale while the longest edge exceeds max_dim
while max(pil_image.size) > max_dim:
ratio = max_dim / float(max(pil_image.size))
new_size = (max(1, int(pil_image.size[0] * ratio)), max(1, int(pil_image.size[1] * ratio)))
pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
encoded = encode_current(pil_image, quality)
# If still too big, iteratively reduce quality, then dimensions
while len(encoded) > MAX_BYTES and (quality > 40 or max(pil_image.size) > 640):
if quality > 40:
quality -= 10
else:
# reduce dims by 15% if already at low quality
new_w = max(1, int(pil_image.size[0] * 0.85))
new_h = max(1, int(pil_image.size[1] * 0.85))
pil_image = pil_image.resize((new_w, new_h), Image.Resampling.LANCZOS)
encoded = encode_current(pil_image, quality)
input_bytes = encoded
# Call image-to-video; require method support
model_id = "Lightricks/LTX-Video-0.9.8-13B-distilled"
image_to_video_method = getattr(client, "image_to_video", None)
if not callable(image_to_video_method):
print("[Image2Video] InferenceClient.image_to_video not available in this huggingface_hub version")
return (
"Error generating video (image-to-video): Your installed huggingface_hub version "
"does not expose InferenceClient.image_to_video. Please upgrade with "
"`pip install -U huggingface_hub` and try again."
)
print(f"[Image2Video] Calling image_to_video with model={model_id}, prompt length={len(prompt or '')}")
video_bytes = image_to_video_method(
input_bytes,
prompt=prompt,
model=model_id,
)
print(f"[Image2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}")
# Create temporary URL for preview (will be uploaded to HF during deploy)
filename = "image_to_video_result.mp4"
temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
# Check if creation was successful
if temp_url.startswith("Error"):
return temp_url
video_html = (
f'<video controls autoplay muted loop playsinline '
f'style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0; display: block;" '
f'onloadstart="this.style.backgroundColor=\'#f0f0f0\'" '
f'onerror="this.style.display=\'none\'; console.error(\'Video failed to load\')">'
f'<source src="{temp_url}" type="video/mp4" />'
f'<p style="text-align: center; color: #666;">Your browser does not support the video tag.</p>'
f'</video>'
)
print(f"[Image2Video] Successfully generated video HTML tag with temporary URL: {temp_url}")
# Validate the generated video HTML
if not validate_video_html(video_html):
print("[Image2Video] Generated video HTML failed validation")
return "Error: Generated video HTML is malformed"
return video_html
except Exception as e:
import traceback
print("[Image2Video] Exception during generation:")
traceback.print_exc()
print(f"Image-to-video generation error: {str(e)}")
return f"Error generating video (image-to-video): {str(e)}"
def generate_video_from_text(prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
"""Generate a video from a text prompt using Hugging Face InferenceClient."""
try:
print("[Text2Video] Starting video generation from text")
if not os.getenv('HF_TOKEN'):
print("[Text2Video] Missing HF_TOKEN")
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
client = InferenceClient(
provider="auto",
api_key=os.getenv('HF_TOKEN'),
bill_to="huggingface",
)
print("[Text2Video] InferenceClient initialized (provider=auto)")
# Ensure the client has text_to_video (newer huggingface_hub)
text_to_video_method = getattr(client, "text_to_video", None)
if not callable(text_to_video_method):
print("[Text2Video] InferenceClient.text_to_video not available in this huggingface_hub version")
return (
"Error generating video (text-to-video): Your installed huggingface_hub version "
"does not expose InferenceClient.text_to_video. Please upgrade with "
"`pip install -U huggingface_hub` and try again."
)
model_id = "Wan-AI/Wan2.2-T2V-A14B"
prompt_str = (prompt or "").strip()
print(f"[Text2Video] Calling text_to_video with model={model_id}, prompt length={len(prompt_str)}")
video_bytes = text_to_video_method(
prompt_str,
model=model_id,
)
print(f"[Text2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}")
# Create temporary URL for preview (will be uploaded to HF during deploy)
filename = "text_to_video_result.mp4"
temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
# Check if creation was successful
if temp_url.startswith("Error"):
return temp_url
video_html = (
f'<video controls autoplay muted loop playsinline '
f'style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0; display: block;" '
f'onloadstart="this.style.backgroundColor=\'#f0f0f0\'" '
f'onerror="this.style.display=\'none\'; console.error(\'Video failed to load\')">'
f'<source src="{temp_url}" type="video/mp4" />'
f'<p style="text-align: center; color: #666;">Your browser does not support the video tag.</p>'
f'</video>'
)
print(f"[Text2Video] Successfully generated video HTML tag with temporary URL: {temp_url}")
# Validate the generated video HTML
if not validate_video_html(video_html):
print("[Text2Video] Generated video HTML failed validation")
return "Error: Generated video HTML is malformed"
return video_html
except Exception as e:
import traceback
print("[Text2Video] Exception during generation:")
traceback.print_exc()
print(f"Text-to-video generation error: {str(e)}")
return f"Error generating video (text-to-video): {str(e)}"
def generate_music_from_text(prompt: str, music_length_ms: int = 30000, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
"""Generate music from a text prompt using ElevenLabs Music API and return an HTML <audio> tag."""
try:
api_key = os.getenv('ELEVENLABS_API_KEY')
if not api_key:
return "Error: ELEVENLABS_API_KEY environment variable is not set."
headers = {
'Content-Type': 'application/json',
'xi-api-key': api_key,
}
payload = {
'prompt': (prompt or 'Epic orchestral theme with soaring strings and powerful brass'),
'music_length_ms': int(music_length_ms) if music_length_ms else 30000,
}
resp = requests.post('https://api.elevenlabs.io/v1/music/compose', headers=headers, json=payload)
try:
resp.raise_for_status()
except Exception as e:
return f"Error generating music: {getattr(e, 'response', resp).text if hasattr(e, 'response') else resp.text}"
# Create temporary URL for preview (will be uploaded to HF during deploy)
filename = "generated_music.mp3"
temp_url = upload_media_to_hf(resp.content, filename, "audio", token, use_temp=True)
# Check if creation was successful
if temp_url.startswith("Error"):
return temp_url
audio_html = (
"<div class=\"anycoder-music\" style=\"max-width:420px;margin:16px auto;padding:12px 16px;border:1px solid #e5e7eb;border-radius:12px;background:linear-gradient(180deg,#fafafa,#f3f4f6);box-shadow:0 2px 8px rgba(0,0,0,0.06)\">"
" <div style=\"font-size:13px;color:#374151;margin-bottom:8px;display:flex:align-items:center;gap:6px\">"
" <span>🎵 Generated music</span>"
" </div>"
f" <audio controls autoplay loop style=\"width:100%;outline:none;\">"
f" <source src=\"{temp_url}\" type=\"audio/mpeg\" />"
" Your browser does not support the audio element."
" </audio>"
"</div>"
)
print(f"[Music] Successfully generated music HTML tag with temporary URL: {temp_url}")
return audio_html
except Exception as e:
return f"Error generating music: {str(e)}"
def extract_image_prompts_from_text(text: str, num_images_needed: int = 1) -> list:
"""Extract image generation prompts from the full text based on number of images needed"""
# Use the entire text as the base prompt for image generation
# Clean up the text and create variations for the required number of images
# Clean the text
cleaned_text = text.strip()
if not cleaned_text:
return []
# Create variations of the prompt for the required number of images
prompts = []
# Generate exactly the number of images needed
for i in range(num_images_needed):
if i == 0:
# First image: Use the full prompt as-is
prompts.append(cleaned_text)
elif i == 1:
# Second image: Add "visual representation" to make it more image-focused
prompts.append(f"Visual representation of {cleaned_text}")
elif i == 2:
# Third image: Add "illustration" to create a different style
prompts.append(f"Illustration of {cleaned_text}")
else:
# For additional images, use different variations
variations = [
f"Digital art of {cleaned_text}",
f"Modern design of {cleaned_text}",
f"Professional illustration of {cleaned_text}",
f"Clean design of {cleaned_text}",
f"Beautiful visualization of {cleaned_text}",
f"Stylish representation of {cleaned_text}",
f"Contemporary design of {cleaned_text}",
f"Elegant illustration of {cleaned_text}"
]
variation_index = (i - 3) % len(variations)
prompts.append(variations[variation_index])
return prompts
def create_image_replacement_blocks(html_content: str, user_prompt: str) -> str:
"""Create search/replace blocks to replace placeholder images with generated Qwen images"""
if not user_prompt:
return ""
# Find existing image placeholders in the HTML first
import re
# Common patterns for placeholder images
placeholder_patterns = [
r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>',
r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>', # Base64 images
r'<img[^>]*src=["\']#["\'][^>]*>', # Empty src
r'<img[^>]*src=["\']about:blank["\'][^>]*>', # About blank
]
# Find all placeholder images
placeholder_images = []
for pattern in placeholder_patterns:
matches = re.findall(pattern, html_content, re.IGNORECASE)
placeholder_images.extend(matches)
# Filter out HF URLs from placeholders (they are real generated content)
placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
# If no placeholder images found, look for any img tags
if not placeholder_images:
img_pattern = r'<img[^>]*>'
# Case-insensitive to catch <IMG> or mixed-case tags
placeholder_images = re.findall(img_pattern, html_content, re.IGNORECASE)
# Also look for div elements that might be image placeholders
div_placeholder_patterns = [
r'<div[^>]*class=["\'][^"\']*(?:image|img|photo|picture)[^"\']*["\'][^>]*>.*?</div>',
r'<div[^>]*id=["\'][^"\']*(?:image|img|photo|picture)[^"\']*["\'][^>]*>.*?</div>',
]
for pattern in div_placeholder_patterns:
matches = re.findall(pattern, html_content, re.IGNORECASE | re.DOTALL)
placeholder_images.extend(matches)
# Count how many images we need to generate
num_images_needed = len(placeholder_images)
if num_images_needed == 0:
return ""
# Generate image prompts based on the number of images found
image_prompts = extract_image_prompts_from_text(user_prompt, num_images_needed)
# Generate images for each prompt
generated_images = []
for i, prompt in enumerate(image_prompts):
image_html = generate_image_with_qwen(prompt, i, token=None) # TODO: Pass token from parent context
if not image_html.startswith("Error"):
generated_images.append((i, image_html))
if not generated_images:
return ""
# Create search/replace blocks
replacement_blocks = []
for i, (prompt_index, generated_image) in enumerate(generated_images):
if i < len(placeholder_images):
# Replace existing placeholder
placeholder = placeholder_images[i]
# Clean up the placeholder for better matching
placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip())
# Try multiple variations of the placeholder for better matching
placeholder_variations = [
placeholder_clean,
placeholder_clean.replace('"', "'"),
placeholder_clean.replace("'", '"'),
re.sub(r'\s+', ' ', placeholder_clean),
placeholder_clean.replace(' ', ' '),
]
# Create a replacement block for each variation
for variation in placeholder_variations:
replacement_blocks.append(f"""{SEARCH_START}
{variation}
{DIVIDER}
{generated_image}
{REPLACE_END}""")
else:
# Add new image if we have more generated images than placeholders
# Find a good insertion point (after body tag or main content)
if '<body' in html_content:
body_end = html_content.find('>', html_content.find('<body')) + 1
insertion_point = html_content[:body_end] + '\n '
replacement_blocks.append(f"""{SEARCH_START}
{insertion_point}
{DIVIDER}
{insertion_point}
{generated_image}
{REPLACE_END}""")
return '\n\n'.join(replacement_blocks)
def create_image_replacement_blocks_text_to_image_single(html_content: str, prompt: str) -> str:
"""Create search/replace blocks that generate and insert ONLY ONE text-to-image result."""
if not prompt or not prompt.strip():
return ""
import re
# Detect placeholders similarly to the multi-image version
placeholder_patterns = [
r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>',
r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']#["\'][^>]*>',
r'<img[^>]*src=["\']about:blank["\'][^>]*>',
]
placeholder_images = []
for pattern in placeholder_patterns:
matches = re.findall(pattern, html_content, re.IGNORECASE)
if matches:
placeholder_images.extend(matches)
# Filter out HF URLs from placeholders (they are real generated content)
placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
# Filter out HF URLs from placeholders (they are real generated content)
placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
# Fallback to any <img> if no placeholders
if not placeholder_images:
img_pattern = r'<img[^>]*>'
placeholder_images = re.findall(img_pattern, html_content)
# Generate a single image
image_html = generate_image_with_qwen(prompt, 0, token=None) # TODO: Pass token from parent context
if image_html.startswith("Error"):
return ""
# Replace first placeholder if present
if placeholder_images:
placeholder = placeholder_images[0]
placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip())
placeholder_variations = [
placeholder_clean,
placeholder_clean.replace('"', "'"),
placeholder_clean.replace("'", '"'),
re.sub(r'\s+', ' ', placeholder_clean),
placeholder_clean.replace(' ', ' '),
]
blocks = []
for variation in placeholder_variations:
blocks.append(f"""{SEARCH_START}
{variation}
{DIVIDER}
{image_html}
{REPLACE_END}""")
return '\n\n'.join(blocks)
# Otherwise insert after <body>
if '<body' in html_content:
body_end = html_content.find('>', html_content.find('<body')) + 1
insertion_point = html_content[:body_end] + '\n '
return f"""{SEARCH_START}
{insertion_point}
{DIVIDER}
{insertion_point}
{image_html}
{REPLACE_END}"""
# If no <body>, just append
return f"{SEARCH_START}\n\n{DIVIDER}\n{image_html}\n{REPLACE_END}"
def create_video_replacement_blocks_text_to_video(html_content: str, prompt: str, session_id: Optional[str] = None) -> str:
"""Create search/replace blocks that generate and insert ONLY ONE text-to-video result."""
if not prompt or not prompt.strip():
return ""
import re
# Detect the same placeholders as image counterparts, to replace the first image slot with a video
placeholder_patterns = [
r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>',
r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>',
r'<img[^>]*src=["\']#["\'][^>]*>',
r'<img[^>]*src=["\']about:blank["\'][^>]*>',
]
placeholder_images = []
for pattern in placeholder_patterns:
matches = re.findall(pattern, html_content, re.IGNORECASE)
if matches:
placeholder_images.extend(matches)
# Filter out HF URLs from placeholders (they are real generated content)
placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
if not placeholder_images:
img_pattern = r'<img[^>]*>'
placeholder_images = re.findall(img_pattern, html_content)
video_html = generate_video_from_text(prompt, session_id=session_id, token=None) # TODO: Pass token from parent context
if video_html.startswith("Error"):
return ""
# Replace first placeholder if present
if placeholder_images:
placeholder = placeholder_images[0]
placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip())
placeholder_variations = [
placeholder,
placeholder_clean,
placeholder_clean.replace('"', "'"),
placeholder_clean.replace("'", '"'),
re.sub(r'\s+', ' ', placeholder_clean),
placeholder_clean.replace(' ', ' '),
]
blocks = []
for variation in placeholder_variations:
blocks.append(f"""{SEARCH_START}
{variation}
{DIVIDER}
{video_html}
{REPLACE_END}""")
return '\n\n'.join(blocks)
# Otherwise insert after <body> with proper container
if '<body' in html_content:
body_start = html_content.find('<body')
body_end = html_content.find('>', body_start) + 1
opening_body_tag = html_content[body_start:body_end]
# Look for existing container elements to insert into
body_content_start = body_end
# Try to find a good insertion point within existing content structure
patterns_to_try = [
r'<main[^>]*>',
r'<section[^>]*class="[^"]*hero[^"]*"[^>]*>',
r'<div[^>]*class="[^"]*container[^"]*"[^>]*>',
r'<header[^>]*>',
]
insertion_point = None
for pattern in patterns_to_try:
import re
match = re.search(pattern, html_content[body_content_start:], re.IGNORECASE)
if match:
match_end = body_content_start + match.end()
# Find the end of this tag
tag_content = html_content[body_content_start + match.start():match_end]
insertion_point = html_content[:match_end] + '\n '
break
if not insertion_point:
# Fallback to right after body tag with container div
insertion_point = html_content[:body_end] + '\n '
video_with_container = f'<div class="video-container" style="margin: 20px 0; text-align: center;">\n {video_html}\n </div>'
return f"""{SEARCH_START}
{insertion_point}
{DIVIDER}
{insertion_point}
{video_with_container}
{REPLACE_END}"""
else:
return f"""{SEARCH_START}
{insertion_point}
{DIVIDER}
{insertion_point}
{video_html}
{REPLACE_END}"""
# If no <body>, just append
return f"{SEARCH_START}\n\n{DIVIDER}\n{video_html}\n{REPLACE_END}"
def create_music_replacement_blocks_text_to_music(html_content: str, prompt: str, session_id: Optional[str] = None) -> str:
"""Create search/replace blocks that insert ONE generated <audio> near the top of <body>."""
if not prompt or not prompt.strip():
return ""
audio_html = generate_music_from_text(prompt, session_id=session_id, token=None) # TODO: Pass token from parent context
if audio_html.startswith("Error"):
return ""
# Prefer inserting after the first <section>...</section> if present; else after <body>
import re
section_match = re.search(r"<section\b[\s\S]*?</section>", html_content, flags=re.IGNORECASE)
if section_match:
section_html = section_match.group(0)
section_clean = re.sub(r"\s+", " ", section_html.strip())
variations = [
section_html,
section_clean,
section_clean.replace('"', "'"),
section_clean.replace("'", '"'),
re.sub(r"\s+", " ", section_clean),
]
blocks = []
for v in variations:
blocks.append(f"""{SEARCH_START}
{v}
{DIVIDER}
{v}\n {audio_html}
{REPLACE_END}""")
return "\n\n".join(blocks)
if '<body' in html_content:
body_end = html_content.find('>', html_content.find('<body')) + 1
insertion_point = html_content[:body_end] + '\n '
return f"""{SEARCH_START}
{insertion_point}
{DIVIDER}
{insertion_point}
{audio_html}
{REPLACE_END}"""
# If no <body>, just append
return f"{SEARCH_START}\n\n{DIVIDER}\n{audio_html}\n{REPLACE_END}"