|
|
import os |
|
|
import base64 |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import pytesseract |
|
|
import requests |
|
|
from urllib.parse import urlparse, urljoin |
|
|
from bs4 import BeautifulSoup |
|
|
import html2text |
|
|
import json |
|
|
import time |
|
|
import webbrowser |
|
|
import urllib.parse |
|
|
import copy |
|
|
import html |
|
|
import tempfile |
|
|
import uuid |
|
|
import datetime |
|
|
import threading |
|
|
import atexit |
|
|
from huggingface_hub import HfApi |
|
|
import gradio as gr |
|
|
import subprocess |
|
|
import re |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
VIDEO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_videos") |
|
|
VIDEO_FILE_TTL_SECONDS = 6 * 60 * 60 |
|
|
_SESSION_VIDEO_FILES: Dict[str, List[str]] = {} |
|
|
_VIDEO_FILES_LOCK = threading.Lock() |
|
|
|
|
|
def _ensure_video_dir_exists() -> None: |
|
|
try: |
|
|
os.makedirs(VIDEO_TEMP_DIR, exist_ok=True) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def _register_video_for_session(session_id: Optional[str], file_path: str) -> None: |
|
|
if not session_id or not file_path: |
|
|
return |
|
|
with _VIDEO_FILES_LOCK: |
|
|
if session_id not in _SESSION_VIDEO_FILES: |
|
|
_SESSION_VIDEO_FILES[session_id] = [] |
|
|
_SESSION_VIDEO_FILES[session_id].append(file_path) |
|
|
|
|
|
def cleanup_session_videos(session_id: Optional[str]) -> None: |
|
|
if not session_id: |
|
|
return |
|
|
with _VIDEO_FILES_LOCK: |
|
|
file_list = _SESSION_VIDEO_FILES.pop(session_id, []) |
|
|
for path in file_list: |
|
|
try: |
|
|
if path and os.path.exists(path): |
|
|
os.unlink(path) |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|
|
|
def reap_old_videos(ttl_seconds: int = VIDEO_FILE_TTL_SECONDS) -> None: |
|
|
"""Delete old video files in the temp directory based on modification time.""" |
|
|
try: |
|
|
_ensure_video_dir_exists() |
|
|
now_ts = time.time() |
|
|
for name in os.listdir(VIDEO_TEMP_DIR): |
|
|
path = os.path.join(VIDEO_TEMP_DIR, name) |
|
|
try: |
|
|
if not os.path.isfile(path): |
|
|
continue |
|
|
mtime = os.path.getmtime(path) |
|
|
if now_ts - mtime > ttl_seconds: |
|
|
os.unlink(path) |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
AUDIO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_audio") |
|
|
AUDIO_FILE_TTL_SECONDS = 6 * 60 * 60 |
|
|
_SESSION_AUDIO_FILES: Dict[str, List[str]] = {} |
|
|
_AUDIO_FILES_LOCK = threading.Lock() |
|
|
|
|
|
def _ensure_audio_dir_exists() -> None: |
|
|
try: |
|
|
os.makedirs(AUDIO_TEMP_DIR, exist_ok=True) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def _register_audio_for_session(session_id: Optional[str], file_path: str) -> None: |
|
|
if not session_id or not file_path: |
|
|
return |
|
|
with _AUDIO_FILES_LOCK: |
|
|
if session_id not in _SESSION_AUDIO_FILES: |
|
|
_SESSION_AUDIO_FILES[session_id] = [] |
|
|
_SESSION_AUDIO_FILES[session_id].append(file_path) |
|
|
|
|
|
def cleanup_session_audio(session_id: Optional[str]) -> None: |
|
|
if not session_id: |
|
|
return |
|
|
with _AUDIO_FILES_LOCK: |
|
|
file_list = _SESSION_AUDIO_FILES.pop(session_id, []) |
|
|
for path in file_list: |
|
|
try: |
|
|
if path and os.path.exists(path): |
|
|
os.unlink(path) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def reap_old_audio(ttl_seconds: int = AUDIO_FILE_TTL_SECONDS) -> None: |
|
|
try: |
|
|
_ensure_audio_dir_exists() |
|
|
now_ts = time.time() |
|
|
for name in os.listdir(AUDIO_TEMP_DIR): |
|
|
path = os.path.join(AUDIO_TEMP_DIR, name) |
|
|
try: |
|
|
if not os.path.isfile(path): |
|
|
continue |
|
|
mtime = os.path.getmtime(path) |
|
|
if now_ts - mtime > ttl_seconds: |
|
|
os.unlink(path) |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MEDIA_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_media") |
|
|
MEDIA_FILE_TTL_SECONDS = 6 * 60 * 60 |
|
|
_SESSION_MEDIA_FILES: Dict[str, List[str]] = {} |
|
|
_MEDIA_FILES_LOCK = threading.Lock() |
|
|
|
|
|
|
|
|
temp_media_files = {} |
|
|
|
|
|
def _ensure_media_dir_exists() -> None: |
|
|
"""Ensure the media temp directory exists.""" |
|
|
try: |
|
|
os.makedirs(MEDIA_TEMP_DIR, exist_ok=True) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def track_session_media_file(session_id: Optional[str], file_path: str) -> None: |
|
|
"""Track a media file for session-based cleanup.""" |
|
|
if not session_id or not file_path: |
|
|
return |
|
|
with _MEDIA_FILES_LOCK: |
|
|
if session_id not in _SESSION_MEDIA_FILES: |
|
|
_SESSION_MEDIA_FILES[session_id] = [] |
|
|
_SESSION_MEDIA_FILES[session_id].append(file_path) |
|
|
|
|
|
def cleanup_session_media(session_id: Optional[str]) -> None: |
|
|
"""Clean up media files for a specific session.""" |
|
|
if not session_id: |
|
|
return |
|
|
with _MEDIA_FILES_LOCK: |
|
|
files_to_clean = _SESSION_MEDIA_FILES.pop(session_id, []) |
|
|
|
|
|
for path in files_to_clean: |
|
|
try: |
|
|
if path and os.path.exists(path): |
|
|
os.unlink(path) |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|
|
|
def reap_old_media(ttl_seconds: int = MEDIA_FILE_TTL_SECONDS) -> None: |
|
|
"""Delete old media files in the temp directory based on modification time.""" |
|
|
try: |
|
|
_ensure_media_dir_exists() |
|
|
now_ts = time.time() |
|
|
for name in os.listdir(MEDIA_TEMP_DIR): |
|
|
path = os.path.join(MEDIA_TEMP_DIR, name) |
|
|
if os.path.isfile(path): |
|
|
try: |
|
|
mtime = os.path.getmtime(path) |
|
|
if (now_ts - mtime) > ttl_seconds: |
|
|
os.unlink(path) |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|
|
|
def cleanup_all_temp_media_on_startup() -> None: |
|
|
"""Clean up all temporary media files on app startup.""" |
|
|
try: |
|
|
|
|
|
temp_media_files.clear() |
|
|
|
|
|
|
|
|
_ensure_media_dir_exists() |
|
|
for name in os.listdir(MEDIA_TEMP_DIR): |
|
|
path = os.path.join(MEDIA_TEMP_DIR, name) |
|
|
if os.path.isfile(path): |
|
|
try: |
|
|
os.unlink(path) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
with _MEDIA_FILES_LOCK: |
|
|
_SESSION_MEDIA_FILES.clear() |
|
|
|
|
|
print("[StartupCleanup] Cleaned up orphaned temporary media files") |
|
|
except Exception as e: |
|
|
print(f"[StartupCleanup] Error during media cleanup: {str(e)}") |
|
|
|
|
|
def cleanup_all_temp_media_on_shutdown() -> None: |
|
|
"""Clean up all temporary media files on app shutdown.""" |
|
|
try: |
|
|
print("[ShutdownCleanup] Cleaning up temporary media files...") |
|
|
|
|
|
|
|
|
for file_id, file_info in temp_media_files.items(): |
|
|
try: |
|
|
if os.path.exists(file_info['path']): |
|
|
os.unlink(file_info['path']) |
|
|
except Exception: |
|
|
pass |
|
|
temp_media_files.clear() |
|
|
|
|
|
|
|
|
with _MEDIA_FILES_LOCK: |
|
|
for session_id, file_paths in _SESSION_MEDIA_FILES.items(): |
|
|
for path in file_paths: |
|
|
try: |
|
|
if path and os.path.exists(path): |
|
|
os.unlink(path) |
|
|
except Exception: |
|
|
pass |
|
|
_SESSION_MEDIA_FILES.clear() |
|
|
|
|
|
print("[ShutdownCleanup] Temporary media cleanup completed") |
|
|
except Exception as e: |
|
|
print(f"[ShutdownCleanup] Error during cleanup: {str(e)}") |
|
|
|
|
|
|
|
|
atexit.register(cleanup_all_temp_media_on_shutdown) |
|
|
|
|
|
def create_temp_media_url(media_bytes: bytes, filename: str, media_type: str = "image", session_id: Optional[str] = None) -> str: |
|
|
"""Create a temporary file and return a local URL for preview.""" |
|
|
try: |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
unique_id = str(uuid.uuid4())[:8] |
|
|
base_name, ext = os.path.splitext(filename) |
|
|
unique_filename = f"{media_type}_{timestamp}_{unique_id}_{base_name}{ext}" |
|
|
|
|
|
|
|
|
_ensure_media_dir_exists() |
|
|
temp_path = os.path.join(MEDIA_TEMP_DIR, unique_filename) |
|
|
|
|
|
|
|
|
with open(temp_path, 'wb') as f: |
|
|
f.write(media_bytes) |
|
|
|
|
|
|
|
|
if session_id: |
|
|
track_session_media_file(session_id, temp_path) |
|
|
|
|
|
|
|
|
file_id = f"{media_type}_{unique_id}" |
|
|
temp_media_files[file_id] = { |
|
|
'path': temp_path, |
|
|
'filename': filename, |
|
|
'media_type': media_type, |
|
|
'media_bytes': media_bytes |
|
|
} |
|
|
|
|
|
|
|
|
file_url = f"file://{temp_path}" |
|
|
print(f"[TempMedia] Created temporary {media_type} file: {file_url}") |
|
|
return file_url |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[TempMedia] Failed to create temporary file: {str(e)}") |
|
|
return f"Error creating temporary {media_type} file: {str(e)}" |
|
|
|
|
|
def upload_media_to_hf(media_bytes: bytes, filename: str, media_type: str = "image", token: gr.OAuthToken | None = None, use_temp: bool = True) -> str: |
|
|
"""Upload media file to user's Hugging Face account or create temporary file.""" |
|
|
try: |
|
|
|
|
|
if use_temp: |
|
|
return create_temp_media_url(media_bytes, filename, media_type) |
|
|
|
|
|
|
|
|
|
|
|
hf_token = None |
|
|
if token and token.token: |
|
|
hf_token = token.token |
|
|
else: |
|
|
hf_token = os.getenv('HF_TOKEN') |
|
|
|
|
|
if not hf_token: |
|
|
return "Error: Please log in with your Hugging Face account to upload media, or set HF_TOKEN environment variable." |
|
|
|
|
|
|
|
|
api = HfApi(token=hf_token) |
|
|
|
|
|
|
|
|
try: |
|
|
user_info = api.whoami() |
|
|
username = user_info.get('name', 'unknown-user') |
|
|
except Exception as e: |
|
|
print(f"[HFUpload] Could not get user info: {e}") |
|
|
username = 'anycoder-user' |
|
|
|
|
|
|
|
|
repo_name = f"{username}/anycoder-media" |
|
|
|
|
|
|
|
|
try: |
|
|
api.create_repo( |
|
|
repo_id=repo_name, |
|
|
repo_type="dataset", |
|
|
private=False, |
|
|
exist_ok=True |
|
|
) |
|
|
print(f"[HFUpload] Repository {repo_name} ready") |
|
|
except Exception as e: |
|
|
print(f"[HFUpload] Repository creation/access issue: {e}") |
|
|
|
|
|
|
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
unique_id = str(uuid.uuid4())[:8] |
|
|
base_name, ext = os.path.splitext(filename) |
|
|
unique_filename = f"{media_type}/{timestamp}_{unique_id}_{base_name}{ext}" |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file: |
|
|
temp_file.write(media_bytes) |
|
|
temp_path = temp_file.name |
|
|
|
|
|
try: |
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=temp_path, |
|
|
path_in_repo=unique_filename, |
|
|
repo_id=repo_name, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Upload {media_type} generated by AnyCoder" |
|
|
) |
|
|
|
|
|
|
|
|
permanent_url = f"https://huggingface.co/datasets/{repo_name}/resolve/main/{unique_filename}" |
|
|
print(f"[HFUpload] Successfully uploaded {media_type} to {permanent_url}") |
|
|
return permanent_url |
|
|
|
|
|
finally: |
|
|
|
|
|
try: |
|
|
os.unlink(temp_path) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[HFUpload] Upload failed: {str(e)}") |
|
|
return f"Error uploading {media_type} to Hugging Face: {str(e)}" |
|
|
|
|
|
def upload_temp_files_to_hf_and_replace_urls(html_content: str, token: gr.OAuthToken | None = None) -> str: |
|
|
"""Upload all temporary media files to HF and replace their URLs in HTML content.""" |
|
|
try: |
|
|
if not temp_media_files: |
|
|
print("[DeployUpload] No temporary media files to upload") |
|
|
return html_content |
|
|
|
|
|
print(f"[DeployUpload] Uploading {len(temp_media_files)} temporary media files to HF") |
|
|
updated_content = html_content |
|
|
|
|
|
for file_id, file_info in temp_media_files.items(): |
|
|
try: |
|
|
|
|
|
permanent_url = upload_media_to_hf( |
|
|
file_info['media_bytes'], |
|
|
file_info['filename'], |
|
|
file_info['media_type'], |
|
|
token, |
|
|
use_temp=False |
|
|
) |
|
|
|
|
|
if not permanent_url.startswith("Error"): |
|
|
|
|
|
temp_url = f"file://{file_info['path']}" |
|
|
updated_content = updated_content.replace(temp_url, permanent_url) |
|
|
print(f"[DeployUpload] Replaced {temp_url} with {permanent_url}") |
|
|
else: |
|
|
print(f"[DeployUpload] Failed to upload {file_id}: {permanent_url}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DeployUpload] Error uploading {file_id}: {str(e)}") |
|
|
continue |
|
|
|
|
|
|
|
|
cleanup_temp_media_files() |
|
|
|
|
|
return updated_content |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DeployUpload] Failed to upload temporary files: {str(e)}") |
|
|
return html_content |
|
|
|
|
|
def cleanup_temp_media_files(): |
|
|
"""Clean up temporary media files from disk and memory.""" |
|
|
try: |
|
|
for file_id, file_info in temp_media_files.items(): |
|
|
try: |
|
|
if os.path.exists(file_info['path']): |
|
|
os.remove(file_info['path']) |
|
|
print(f"[TempCleanup] Removed {file_info['path']}") |
|
|
except Exception as e: |
|
|
print(f"[TempCleanup] Failed to remove {file_info['path']}: {str(e)}") |
|
|
|
|
|
|
|
|
temp_media_files.clear() |
|
|
print("[TempCleanup] Cleared temporary media files registry") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[TempCleanup] Error during cleanup: {str(e)}") |
|
|
|
|
|
def generate_image_with_qwen(prompt: str, image_index: int = 0, token: gr.OAuthToken | None = None) -> str: |
|
|
"""Generate image using Qwen image model via Hugging Face InferenceClient and upload to HF for permanent URL""" |
|
|
try: |
|
|
|
|
|
if not os.getenv('HF_TOKEN'): |
|
|
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token." |
|
|
|
|
|
|
|
|
client = InferenceClient( |
|
|
provider="auto", |
|
|
api_key=os.getenv('HF_TOKEN'), |
|
|
bill_to="huggingface", |
|
|
) |
|
|
|
|
|
|
|
|
image = client.text_to_image( |
|
|
prompt, |
|
|
model="Qwen/Qwen-Image", |
|
|
) |
|
|
|
|
|
|
|
|
max_size = 1024 |
|
|
if image.width > max_size or image.height > max_size: |
|
|
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) |
|
|
|
|
|
|
|
|
import io |
|
|
buffer = io.BytesIO() |
|
|
|
|
|
image.convert('RGB').save(buffer, format='JPEG', quality=90, optimize=True) |
|
|
image_bytes = buffer.getvalue() |
|
|
|
|
|
|
|
|
filename = f"generated_image_{image_index}.jpg" |
|
|
temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True) |
|
|
|
|
|
|
|
|
if temp_url.startswith("Error"): |
|
|
return temp_url |
|
|
|
|
|
|
|
|
return f'<img src="{temp_url}" alt="{prompt}" style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;" loading="lazy" />' |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Image generation error: {str(e)}") |
|
|
return f"Error generating image: {str(e)}" |
|
|
|
|
|
def generate_image_to_image(input_image_data, prompt: str, token: gr.OAuthToken | None = None) -> str: |
|
|
"""Generate an image using image-to-image with Qwen-Image-Edit via Hugging Face InferenceClient.""" |
|
|
try: |
|
|
|
|
|
if not os.getenv('HF_TOKEN'): |
|
|
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token." |
|
|
|
|
|
|
|
|
client = InferenceClient( |
|
|
provider="auto", |
|
|
api_key=os.getenv('HF_TOKEN'), |
|
|
bill_to="huggingface", |
|
|
) |
|
|
|
|
|
|
|
|
import io |
|
|
from PIL import Image |
|
|
try: |
|
|
import numpy as np |
|
|
except Exception: |
|
|
np = None |
|
|
|
|
|
if hasattr(input_image_data, 'read'): |
|
|
|
|
|
raw = input_image_data.read() |
|
|
pil_image = Image.open(io.BytesIO(raw)) |
|
|
elif hasattr(input_image_data, 'mode') and hasattr(input_image_data, 'size'): |
|
|
|
|
|
pil_image = input_image_data |
|
|
elif np is not None and isinstance(input_image_data, np.ndarray): |
|
|
pil_image = Image.fromarray(input_image_data) |
|
|
elif isinstance(input_image_data, (bytes, bytearray)): |
|
|
pil_image = Image.open(io.BytesIO(input_image_data)) |
|
|
else: |
|
|
|
|
|
pil_image = Image.open(io.BytesIO(bytes(input_image_data))) |
|
|
|
|
|
|
|
|
if pil_image.mode != 'RGB': |
|
|
pil_image = pil_image.convert('RGB') |
|
|
|
|
|
|
|
|
max_input_size = 1024 |
|
|
if pil_image.width > max_input_size or pil_image.height > max_input_size: |
|
|
pil_image.thumbnail((max_input_size, max_input_size), Image.Resampling.LANCZOS) |
|
|
|
|
|
buf = io.BytesIO() |
|
|
pil_image.save(buf, format='JPEG', quality=85, optimize=True) |
|
|
input_bytes = buf.getvalue() |
|
|
|
|
|
|
|
|
image = client.image_to_image( |
|
|
input_bytes, |
|
|
prompt=prompt, |
|
|
model="Qwen/Qwen-Image-Edit", |
|
|
) |
|
|
|
|
|
|
|
|
max_size = 1024 |
|
|
if image.width > max_size or image.height > max_size: |
|
|
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) |
|
|
|
|
|
out_buf = io.BytesIO() |
|
|
image.convert('RGB').save(out_buf, format='JPEG', quality=90, optimize=True) |
|
|
image_bytes = out_buf.getvalue() |
|
|
|
|
|
|
|
|
filename = "image_to_image_result.jpg" |
|
|
temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True) |
|
|
|
|
|
|
|
|
if temp_url.startswith("Error"): |
|
|
return temp_url |
|
|
|
|
|
return f"<img src=\"{temp_url}\" alt=\"{prompt}\" style=\"max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;\" loading=\"lazy\" />" |
|
|
except Exception as e: |
|
|
print(f"Image-to-image generation error: {str(e)}") |
|
|
return f"Error generating image (image-to-image): {str(e)}" |
|
|
|
|
|
def generate_video_from_image(input_image_data, prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str: |
|
|
"""Generate a video from an input image and prompt using Hugging Face InferenceClient.""" |
|
|
try: |
|
|
print("[Image2Video] Starting video generation") |
|
|
if not os.getenv('HF_TOKEN'): |
|
|
print("[Image2Video] Missing HF_TOKEN") |
|
|
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token." |
|
|
|
|
|
|
|
|
client = InferenceClient( |
|
|
provider="auto", |
|
|
api_key=os.getenv('HF_TOKEN'), |
|
|
bill_to="huggingface", |
|
|
) |
|
|
print(f"[Image2Video] InferenceClient initialized (provider=auto)") |
|
|
|
|
|
|
|
|
import io |
|
|
from PIL import Image |
|
|
try: |
|
|
import numpy as np |
|
|
except Exception: |
|
|
np = None |
|
|
|
|
|
def _load_pil(img_like) -> Image.Image: |
|
|
if hasattr(img_like, 'read'): |
|
|
return Image.open(io.BytesIO(img_like.read())) |
|
|
if hasattr(img_like, 'mode') and hasattr(img_like, 'size'): |
|
|
return img_like |
|
|
if np is not None and isinstance(img_like, np.ndarray): |
|
|
return Image.fromarray(img_like) |
|
|
if isinstance(img_like, (bytes, bytearray)): |
|
|
return Image.open(io.BytesIO(img_like)) |
|
|
return Image.open(io.BytesIO(bytes(img_like))) |
|
|
|
|
|
pil_image = _load_pil(input_image_data) |
|
|
if pil_image.mode != 'RGB': |
|
|
pil_image = pil_image.convert('RGB') |
|
|
try: |
|
|
print(f"[Image2Video] Input PIL image size={pil_image.size} mode={pil_image.mode}") |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
MAX_BYTES = 3_900_000 |
|
|
max_dim = 1024 |
|
|
quality = 90 |
|
|
|
|
|
def encode_current(pil: Image.Image, q: int) -> bytes: |
|
|
tmp = io.BytesIO() |
|
|
pil.save(tmp, format='JPEG', quality=q, optimize=True) |
|
|
return tmp.getvalue() |
|
|
|
|
|
|
|
|
while max(pil_image.size) > max_dim: |
|
|
ratio = max_dim / float(max(pil_image.size)) |
|
|
new_size = (max(1, int(pil_image.size[0] * ratio)), max(1, int(pil_image.size[1] * ratio))) |
|
|
pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS) |
|
|
|
|
|
encoded = encode_current(pil_image, quality) |
|
|
|
|
|
while len(encoded) > MAX_BYTES and (quality > 40 or max(pil_image.size) > 640): |
|
|
if quality > 40: |
|
|
quality -= 10 |
|
|
else: |
|
|
|
|
|
new_w = max(1, int(pil_image.size[0] * 0.85)) |
|
|
new_h = max(1, int(pil_image.size[1] * 0.85)) |
|
|
pil_image = pil_image.resize((new_w, new_h), Image.Resampling.LANCZOS) |
|
|
encoded = encode_current(pil_image, quality) |
|
|
|
|
|
input_bytes = encoded |
|
|
|
|
|
|
|
|
model_id = "Lightricks/LTX-Video-0.9.8-13B-distilled" |
|
|
image_to_video_method = getattr(client, "image_to_video", None) |
|
|
if not callable(image_to_video_method): |
|
|
print("[Image2Video] InferenceClient.image_to_video not available in this huggingface_hub version") |
|
|
return ( |
|
|
"Error generating video (image-to-video): Your installed huggingface_hub version " |
|
|
"does not expose InferenceClient.image_to_video. Please upgrade with " |
|
|
"`pip install -U huggingface_hub` and try again." |
|
|
) |
|
|
print(f"[Image2Video] Calling image_to_video with model={model_id}, prompt length={len(prompt or '')}") |
|
|
video_bytes = image_to_video_method( |
|
|
input_bytes, |
|
|
prompt=prompt, |
|
|
model=model_id, |
|
|
) |
|
|
print(f"[Image2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}") |
|
|
|
|
|
|
|
|
filename = "image_to_video_result.mp4" |
|
|
temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True) |
|
|
|
|
|
|
|
|
if temp_url.startswith("Error"): |
|
|
return temp_url |
|
|
|
|
|
video_html = ( |
|
|
f'<video controls autoplay muted loop playsinline ' |
|
|
f'style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0; display: block;" ' |
|
|
f'onloadstart="this.style.backgroundColor=\'#f0f0f0\'" ' |
|
|
f'onerror="this.style.display=\'none\'; console.error(\'Video failed to load\')">' |
|
|
f'<source src="{temp_url}" type="video/mp4" />' |
|
|
f'<p style="text-align: center; color: #666;">Your browser does not support the video tag.</p>' |
|
|
f'</video>' |
|
|
) |
|
|
|
|
|
print(f"[Image2Video] Successfully generated video HTML tag with temporary URL: {temp_url}") |
|
|
|
|
|
|
|
|
if not validate_video_html(video_html): |
|
|
print("[Image2Video] Generated video HTML failed validation") |
|
|
return "Error: Generated video HTML is malformed" |
|
|
|
|
|
return video_html |
|
|
except Exception as e: |
|
|
import traceback |
|
|
print("[Image2Video] Exception during generation:") |
|
|
traceback.print_exc() |
|
|
print(f"Image-to-video generation error: {str(e)}") |
|
|
return f"Error generating video (image-to-video): {str(e)}" |
|
|
|
|
|
def generate_video_from_text(prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str: |
|
|
"""Generate a video from a text prompt using Hugging Face InferenceClient.""" |
|
|
try: |
|
|
print("[Text2Video] Starting video generation from text") |
|
|
if not os.getenv('HF_TOKEN'): |
|
|
print("[Text2Video] Missing HF_TOKEN") |
|
|
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token." |
|
|
|
|
|
client = InferenceClient( |
|
|
provider="auto", |
|
|
api_key=os.getenv('HF_TOKEN'), |
|
|
bill_to="huggingface", |
|
|
) |
|
|
print("[Text2Video] InferenceClient initialized (provider=auto)") |
|
|
|
|
|
|
|
|
text_to_video_method = getattr(client, "text_to_video", None) |
|
|
if not callable(text_to_video_method): |
|
|
print("[Text2Video] InferenceClient.text_to_video not available in this huggingface_hub version") |
|
|
return ( |
|
|
"Error generating video (text-to-video): Your installed huggingface_hub version " |
|
|
"does not expose InferenceClient.text_to_video. Please upgrade with " |
|
|
"`pip install -U huggingface_hub` and try again." |
|
|
) |
|
|
|
|
|
model_id = "Wan-AI/Wan2.2-T2V-A14B" |
|
|
prompt_str = (prompt or "").strip() |
|
|
print(f"[Text2Video] Calling text_to_video with model={model_id}, prompt length={len(prompt_str)}") |
|
|
video_bytes = text_to_video_method( |
|
|
prompt_str, |
|
|
model=model_id, |
|
|
) |
|
|
print(f"[Text2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}") |
|
|
|
|
|
|
|
|
filename = "text_to_video_result.mp4" |
|
|
temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True) |
|
|
|
|
|
|
|
|
if temp_url.startswith("Error"): |
|
|
return temp_url |
|
|
|
|
|
video_html = ( |
|
|
f'<video controls autoplay muted loop playsinline ' |
|
|
f'style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0; display: block;" ' |
|
|
f'onloadstart="this.style.backgroundColor=\'#f0f0f0\'" ' |
|
|
f'onerror="this.style.display=\'none\'; console.error(\'Video failed to load\')">' |
|
|
f'<source src="{temp_url}" type="video/mp4" />' |
|
|
f'<p style="text-align: center; color: #666;">Your browser does not support the video tag.</p>' |
|
|
f'</video>' |
|
|
) |
|
|
|
|
|
print(f"[Text2Video] Successfully generated video HTML tag with temporary URL: {temp_url}") |
|
|
|
|
|
|
|
|
if not validate_video_html(video_html): |
|
|
print("[Text2Video] Generated video HTML failed validation") |
|
|
return "Error: Generated video HTML is malformed" |
|
|
|
|
|
return video_html |
|
|
except Exception as e: |
|
|
import traceback |
|
|
print("[Text2Video] Exception during generation:") |
|
|
traceback.print_exc() |
|
|
print(f"Text-to-video generation error: {str(e)}") |
|
|
return f"Error generating video (text-to-video): {str(e)}" |
|
|
|
|
|
def generate_music_from_text(prompt: str, music_length_ms: int = 30000, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str: |
|
|
"""Generate music from a text prompt using ElevenLabs Music API and return an HTML <audio> tag.""" |
|
|
try: |
|
|
api_key = os.getenv('ELEVENLABS_API_KEY') |
|
|
if not api_key: |
|
|
return "Error: ELEVENLABS_API_KEY environment variable is not set." |
|
|
|
|
|
headers = { |
|
|
'Content-Type': 'application/json', |
|
|
'xi-api-key': api_key, |
|
|
} |
|
|
payload = { |
|
|
'prompt': (prompt or 'Epic orchestral theme with soaring strings and powerful brass'), |
|
|
'music_length_ms': int(music_length_ms) if music_length_ms else 30000, |
|
|
} |
|
|
|
|
|
resp = requests.post('https://api.elevenlabs.io/v1/music/compose', headers=headers, json=payload) |
|
|
try: |
|
|
resp.raise_for_status() |
|
|
except Exception as e: |
|
|
return f"Error generating music: {getattr(e, 'response', resp).text if hasattr(e, 'response') else resp.text}" |
|
|
|
|
|
|
|
|
filename = "generated_music.mp3" |
|
|
temp_url = upload_media_to_hf(resp.content, filename, "audio", token, use_temp=True) |
|
|
|
|
|
|
|
|
if temp_url.startswith("Error"): |
|
|
return temp_url |
|
|
|
|
|
audio_html = ( |
|
|
"<div class=\"anycoder-music\" style=\"max-width:420px;margin:16px auto;padding:12px 16px;border:1px solid #e5e7eb;border-radius:12px;background:linear-gradient(180deg,#fafafa,#f3f4f6);box-shadow:0 2px 8px rgba(0,0,0,0.06)\">" |
|
|
" <div style=\"font-size:13px;color:#374151;margin-bottom:8px;display:flex:align-items:center;gap:6px\">" |
|
|
" <span>🎵 Generated music</span>" |
|
|
" </div>" |
|
|
f" <audio controls autoplay loop style=\"width:100%;outline:none;\">" |
|
|
f" <source src=\"{temp_url}\" type=\"audio/mpeg\" />" |
|
|
" Your browser does not support the audio element." |
|
|
" </audio>" |
|
|
"</div>" |
|
|
) |
|
|
|
|
|
print(f"[Music] Successfully generated music HTML tag with temporary URL: {temp_url}") |
|
|
return audio_html |
|
|
except Exception as e: |
|
|
return f"Error generating music: {str(e)}" |
|
|
|
|
|
def extract_image_prompts_from_text(text: str, num_images_needed: int = 1) -> list: |
|
|
"""Extract image generation prompts from the full text based on number of images needed""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cleaned_text = text.strip() |
|
|
if not cleaned_text: |
|
|
return [] |
|
|
|
|
|
|
|
|
prompts = [] |
|
|
|
|
|
|
|
|
for i in range(num_images_needed): |
|
|
if i == 0: |
|
|
|
|
|
prompts.append(cleaned_text) |
|
|
elif i == 1: |
|
|
|
|
|
prompts.append(f"Visual representation of {cleaned_text}") |
|
|
elif i == 2: |
|
|
|
|
|
prompts.append(f"Illustration of {cleaned_text}") |
|
|
else: |
|
|
|
|
|
variations = [ |
|
|
f"Digital art of {cleaned_text}", |
|
|
f"Modern design of {cleaned_text}", |
|
|
f"Professional illustration of {cleaned_text}", |
|
|
f"Clean design of {cleaned_text}", |
|
|
f"Beautiful visualization of {cleaned_text}", |
|
|
f"Stylish representation of {cleaned_text}", |
|
|
f"Contemporary design of {cleaned_text}", |
|
|
f"Elegant illustration of {cleaned_text}" |
|
|
] |
|
|
variation_index = (i - 3) % len(variations) |
|
|
prompts.append(variations[variation_index]) |
|
|
|
|
|
return prompts |
|
|
|
|
|
def create_image_replacement_blocks(html_content: str, user_prompt: str) -> str: |
|
|
"""Create search/replace blocks to replace placeholder images with generated Qwen images""" |
|
|
if not user_prompt: |
|
|
return "" |
|
|
|
|
|
|
|
|
import re |
|
|
|
|
|
|
|
|
placeholder_patterns = [ |
|
|
r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']#["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']about:blank["\'][^>]*>', |
|
|
] |
|
|
|
|
|
|
|
|
placeholder_images = [] |
|
|
for pattern in placeholder_patterns: |
|
|
matches = re.findall(pattern, html_content, re.IGNORECASE) |
|
|
placeholder_images.extend(matches) |
|
|
|
|
|
|
|
|
placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img] |
|
|
|
|
|
|
|
|
if not placeholder_images: |
|
|
img_pattern = r'<img[^>]*>' |
|
|
|
|
|
placeholder_images = re.findall(img_pattern, html_content, re.IGNORECASE) |
|
|
|
|
|
|
|
|
div_placeholder_patterns = [ |
|
|
r'<div[^>]*class=["\'][^"\']*(?:image|img|photo|picture)[^"\']*["\'][^>]*>.*?</div>', |
|
|
r'<div[^>]*id=["\'][^"\']*(?:image|img|photo|picture)[^"\']*["\'][^>]*>.*?</div>', |
|
|
] |
|
|
|
|
|
for pattern in div_placeholder_patterns: |
|
|
matches = re.findall(pattern, html_content, re.IGNORECASE | re.DOTALL) |
|
|
placeholder_images.extend(matches) |
|
|
|
|
|
|
|
|
num_images_needed = len(placeholder_images) |
|
|
|
|
|
if num_images_needed == 0: |
|
|
return "" |
|
|
|
|
|
|
|
|
image_prompts = extract_image_prompts_from_text(user_prompt, num_images_needed) |
|
|
|
|
|
|
|
|
generated_images = [] |
|
|
for i, prompt in enumerate(image_prompts): |
|
|
image_html = generate_image_with_qwen(prompt, i, token=None) |
|
|
if not image_html.startswith("Error"): |
|
|
generated_images.append((i, image_html)) |
|
|
|
|
|
if not generated_images: |
|
|
return "" |
|
|
|
|
|
|
|
|
replacement_blocks = [] |
|
|
|
|
|
for i, (prompt_index, generated_image) in enumerate(generated_images): |
|
|
if i < len(placeholder_images): |
|
|
|
|
|
placeholder = placeholder_images[i] |
|
|
|
|
|
placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip()) |
|
|
|
|
|
|
|
|
placeholder_variations = [ |
|
|
placeholder_clean, |
|
|
placeholder_clean.replace('"', "'"), |
|
|
placeholder_clean.replace("'", '"'), |
|
|
re.sub(r'\s+', ' ', placeholder_clean), |
|
|
placeholder_clean.replace(' ', ' '), |
|
|
] |
|
|
|
|
|
|
|
|
for variation in placeholder_variations: |
|
|
replacement_blocks.append(f"""{SEARCH_START} |
|
|
{variation} |
|
|
{DIVIDER} |
|
|
{generated_image} |
|
|
{REPLACE_END}""") |
|
|
else: |
|
|
|
|
|
|
|
|
if '<body' in html_content: |
|
|
body_end = html_content.find('>', html_content.find('<body')) + 1 |
|
|
insertion_point = html_content[:body_end] + '\n ' |
|
|
replacement_blocks.append(f"""{SEARCH_START} |
|
|
{insertion_point} |
|
|
{DIVIDER} |
|
|
{insertion_point} |
|
|
{generated_image} |
|
|
{REPLACE_END}""") |
|
|
|
|
|
return '\n\n'.join(replacement_blocks) |
|
|
|
|
|
def create_image_replacement_blocks_text_to_image_single(html_content: str, prompt: str) -> str: |
|
|
"""Create search/replace blocks that generate and insert ONLY ONE text-to-image result.""" |
|
|
if not prompt or not prompt.strip(): |
|
|
return "" |
|
|
|
|
|
import re |
|
|
|
|
|
|
|
|
placeholder_patterns = [ |
|
|
r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']#["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']about:blank["\'][^>]*>', |
|
|
] |
|
|
|
|
|
placeholder_images = [] |
|
|
for pattern in placeholder_patterns: |
|
|
matches = re.findall(pattern, html_content, re.IGNORECASE) |
|
|
if matches: |
|
|
placeholder_images.extend(matches) |
|
|
|
|
|
|
|
|
placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img] |
|
|
|
|
|
|
|
|
placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img] |
|
|
|
|
|
|
|
|
if not placeholder_images: |
|
|
img_pattern = r'<img[^>]*>' |
|
|
placeholder_images = re.findall(img_pattern, html_content) |
|
|
|
|
|
|
|
|
image_html = generate_image_with_qwen(prompt, 0, token=None) |
|
|
if image_html.startswith("Error"): |
|
|
return "" |
|
|
|
|
|
|
|
|
if placeholder_images: |
|
|
placeholder = placeholder_images[0] |
|
|
placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip()) |
|
|
placeholder_variations = [ |
|
|
placeholder_clean, |
|
|
placeholder_clean.replace('"', "'"), |
|
|
placeholder_clean.replace("'", '"'), |
|
|
re.sub(r'\s+', ' ', placeholder_clean), |
|
|
placeholder_clean.replace(' ', ' '), |
|
|
] |
|
|
blocks = [] |
|
|
for variation in placeholder_variations: |
|
|
blocks.append(f"""{SEARCH_START} |
|
|
{variation} |
|
|
{DIVIDER} |
|
|
{image_html} |
|
|
{REPLACE_END}""") |
|
|
return '\n\n'.join(blocks) |
|
|
|
|
|
|
|
|
if '<body' in html_content: |
|
|
body_end = html_content.find('>', html_content.find('<body')) + 1 |
|
|
insertion_point = html_content[:body_end] + '\n ' |
|
|
return f"""{SEARCH_START} |
|
|
{insertion_point} |
|
|
{DIVIDER} |
|
|
{insertion_point} |
|
|
{image_html} |
|
|
{REPLACE_END}""" |
|
|
|
|
|
|
|
|
return f"{SEARCH_START}\n\n{DIVIDER}\n{image_html}\n{REPLACE_END}" |
|
|
|
|
|
def create_video_replacement_blocks_text_to_video(html_content: str, prompt: str, session_id: Optional[str] = None) -> str: |
|
|
"""Create search/replace blocks that generate and insert ONLY ONE text-to-video result.""" |
|
|
if not prompt or not prompt.strip(): |
|
|
return "" |
|
|
|
|
|
import re |
|
|
|
|
|
|
|
|
placeholder_patterns = [ |
|
|
r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']#["\'][^>]*>', |
|
|
r'<img[^>]*src=["\']about:blank["\'][^>]*>', |
|
|
] |
|
|
|
|
|
placeholder_images = [] |
|
|
for pattern in placeholder_patterns: |
|
|
matches = re.findall(pattern, html_content, re.IGNORECASE) |
|
|
if matches: |
|
|
placeholder_images.extend(matches) |
|
|
|
|
|
|
|
|
placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img] |
|
|
|
|
|
if not placeholder_images: |
|
|
img_pattern = r'<img[^>]*>' |
|
|
placeholder_images = re.findall(img_pattern, html_content) |
|
|
|
|
|
video_html = generate_video_from_text(prompt, session_id=session_id, token=None) |
|
|
if video_html.startswith("Error"): |
|
|
return "" |
|
|
|
|
|
|
|
|
if placeholder_images: |
|
|
placeholder = placeholder_images[0] |
|
|
placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip()) |
|
|
placeholder_variations = [ |
|
|
placeholder, |
|
|
placeholder_clean, |
|
|
placeholder_clean.replace('"', "'"), |
|
|
placeholder_clean.replace("'", '"'), |
|
|
re.sub(r'\s+', ' ', placeholder_clean), |
|
|
placeholder_clean.replace(' ', ' '), |
|
|
] |
|
|
blocks = [] |
|
|
for variation in placeholder_variations: |
|
|
blocks.append(f"""{SEARCH_START} |
|
|
{variation} |
|
|
{DIVIDER} |
|
|
{video_html} |
|
|
{REPLACE_END}""") |
|
|
return '\n\n'.join(blocks) |
|
|
|
|
|
|
|
|
if '<body' in html_content: |
|
|
body_start = html_content.find('<body') |
|
|
body_end = html_content.find('>', body_start) + 1 |
|
|
opening_body_tag = html_content[body_start:body_end] |
|
|
|
|
|
|
|
|
body_content_start = body_end |
|
|
|
|
|
|
|
|
patterns_to_try = [ |
|
|
r'<main[^>]*>', |
|
|
r'<section[^>]*class="[^"]*hero[^"]*"[^>]*>', |
|
|
r'<div[^>]*class="[^"]*container[^"]*"[^>]*>', |
|
|
r'<header[^>]*>', |
|
|
] |
|
|
|
|
|
insertion_point = None |
|
|
for pattern in patterns_to_try: |
|
|
import re |
|
|
match = re.search(pattern, html_content[body_content_start:], re.IGNORECASE) |
|
|
if match: |
|
|
match_end = body_content_start + match.end() |
|
|
|
|
|
tag_content = html_content[body_content_start + match.start():match_end] |
|
|
insertion_point = html_content[:match_end] + '\n ' |
|
|
break |
|
|
|
|
|
if not insertion_point: |
|
|
|
|
|
insertion_point = html_content[:body_end] + '\n ' |
|
|
video_with_container = f'<div class="video-container" style="margin: 20px 0; text-align: center;">\n {video_html}\n </div>' |
|
|
return f"""{SEARCH_START} |
|
|
{insertion_point} |
|
|
{DIVIDER} |
|
|
{insertion_point} |
|
|
{video_with_container} |
|
|
{REPLACE_END}""" |
|
|
else: |
|
|
return f"""{SEARCH_START} |
|
|
{insertion_point} |
|
|
{DIVIDER} |
|
|
{insertion_point} |
|
|
{video_html} |
|
|
{REPLACE_END}""" |
|
|
|
|
|
|
|
|
return f"{SEARCH_START}\n\n{DIVIDER}\n{video_html}\n{REPLACE_END}" |
|
|
|
|
|
def create_music_replacement_blocks_text_to_music(html_content: str, prompt: str, session_id: Optional[str] = None) -> str: |
|
|
"""Create search/replace blocks that insert ONE generated <audio> near the top of <body>.""" |
|
|
if not prompt or not prompt.strip(): |
|
|
return "" |
|
|
|
|
|
audio_html = generate_music_from_text(prompt, session_id=session_id, token=None) |
|
|
if audio_html.startswith("Error"): |
|
|
return "" |
|
|
|
|
|
|
|
|
import re |
|
|
section_match = re.search(r"<section\b[\s\S]*?</section>", html_content, flags=re.IGNORECASE) |
|
|
if section_match: |
|
|
section_html = section_match.group(0) |
|
|
section_clean = re.sub(r"\s+", " ", section_html.strip()) |
|
|
variations = [ |
|
|
section_html, |
|
|
section_clean, |
|
|
section_clean.replace('"', "'"), |
|
|
section_clean.replace("'", '"'), |
|
|
re.sub(r"\s+", " ", section_clean), |
|
|
] |
|
|
blocks = [] |
|
|
for v in variations: |
|
|
blocks.append(f"""{SEARCH_START} |
|
|
{v} |
|
|
{DIVIDER} |
|
|
{v}\n {audio_html} |
|
|
{REPLACE_END}""") |
|
|
return "\n\n".join(blocks) |
|
|
if '<body' in html_content: |
|
|
body_end = html_content.find('>', html_content.find('<body')) + 1 |
|
|
insertion_point = html_content[:body_end] + '\n ' |
|
|
return f"""{SEARCH_START} |
|
|
{insertion_point} |
|
|
{DIVIDER} |
|
|
{insertion_point} |
|
|
{audio_html} |
|
|
{REPLACE_END}""" |
|
|
|
|
|
|
|
|
return f"{SEARCH_START}\n\n{DIVIDER}\n{audio_html}\n{REPLACE_END}" |