|
|
|
|
|
|
|
|
|
|
|
import time |
|
|
import psutil |
|
|
import datetime |
|
|
|
|
|
class SimpleDebugger: |
|
|
def __init__(self): |
|
|
self.start_time = time.time() |
|
|
print("=" * 60) |
|
|
print("🔍 ZeroGPU SPACES DEBUGGING SYSTEM GESTARTET") |
|
|
print(f"🕐 Start Zeit: {datetime.datetime.now().strftime('%H:%M:%S')}") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
try: |
|
|
memory = psutil.virtual_memory() |
|
|
print(f"💾 RAM Total: {memory.total / 1024**3:.1f}GB") |
|
|
print(f"💾 RAM Free: {memory.available / 1024**3:.1f}GB") |
|
|
except: |
|
|
print("💾 RAM Info nicht verfügbar") |
|
|
print("=" * 60) |
|
|
|
|
|
def log(self, message, details=None): |
|
|
"""Checkpoint mit Timing und Memory Info""" |
|
|
elapsed = time.time() - self.start_time |
|
|
timestamp = datetime.datetime.now().strftime('%H:%M:%S') |
|
|
|
|
|
try: |
|
|
memory = psutil.virtual_memory() |
|
|
memory_pct = memory.percent |
|
|
memory_free = memory.available / 1024**3 |
|
|
except: |
|
|
memory_pct = 0 |
|
|
memory_free = 0 |
|
|
|
|
|
print(f"\n🕐 [{timestamp}] {message}") |
|
|
print(f" ⏱️ Nach {elapsed:.1f}s | 💾 RAM: {memory_pct:.1f}% ({memory_free:.1f}GB frei)") |
|
|
|
|
|
if details: |
|
|
print(f" 📋 {details}") |
|
|
|
|
|
|
|
|
if elapsed > 60: |
|
|
print(f" ⚠️ WARNUNG: Schon {elapsed:.1f}s vergangen!") |
|
|
elif elapsed > 300: |
|
|
print(f" 🚨 SEHR LANGSAM: {elapsed:.1f}s - Das ist ungewöhnlich lang!") |
|
|
|
|
|
|
|
|
debug = SimpleDebugger() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
debug.log("Starte ZeroGPU Import...") |
|
|
|
|
|
import spaces |
|
|
debug.log("✅ ZeroGPU spaces Modul importiert") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
debug.log("Starte Python Imports...") |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import gc |
|
|
debug.log("Basic Python imports fertig") |
|
|
|
|
|
import cv2 |
|
|
import torch |
|
|
import numpy as np |
|
|
debug.log("OpenCV, PyTorch, NumPy imports fertig") |
|
|
|
|
|
import gradio as gr |
|
|
debug.log("Gradio importiert") |
|
|
|
|
|
import subprocess |
|
|
import requests |
|
|
from urllib.parse import urlparse |
|
|
debug.log("Network-Module importiert") |
|
|
|
|
|
debug.log("Starte HuggingFace Hub Import...") |
|
|
from huggingface_hub import hf_hub_download |
|
|
debug.log("HuggingFace Hub importiert") |
|
|
|
|
|
debug.log("Starte Video Depth Anything Import (kann hängen wenn Module fehlen)...") |
|
|
try: |
|
|
from video_depth_anything.video_depth import VideoDepthAnything |
|
|
from utils.dc_utils import read_video_frames, save_video |
|
|
debug.log("✅ Video Depth Anything Module erfolgreich importiert") |
|
|
except Exception as e: |
|
|
debug.log("❌ Video Depth Anything Import FEHLER", str(e)) |
|
|
|
|
|
debug.log("Starte Transformers Import (erstes kritisches Modul)...") |
|
|
from transformers import BlipProcessor, BlipForConditionalGeneration |
|
|
debug.log("✅ Transformers erfolgreich importiert") |
|
|
|
|
|
from PIL import Image |
|
|
debug.log("Alle Imports abgeschlossen") |
|
|
|
|
|
|
|
|
debug.log("Environment Variablen werden gesetzt...") |
|
|
os.environ["HF_HOME"] = "/tmp/huggingface" |
|
|
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface/transformers" |
|
|
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" |
|
|
debug.log("Environment setup fertig") |
|
|
|
|
|
|
|
|
debug.log("Gradio Utils werden gepatcht...") |
|
|
def patch_gradio_utils(): |
|
|
"""Fix Gradio schema type checking bug""" |
|
|
try: |
|
|
from gradio_client import utils |
|
|
original_get_type = utils.get_type |
|
|
|
|
|
def patched_get_type(schema): |
|
|
if isinstance(schema, bool): |
|
|
return "boolean" |
|
|
if not isinstance(schema, dict): |
|
|
return "any" |
|
|
return original_get_type(schema) |
|
|
|
|
|
utils.get_type = patched_get_type |
|
|
debug.log("✅ Gradio utils erfolgreich gepatcht") |
|
|
except Exception as e: |
|
|
debug.log("❌ Gradio utils patching FEHLER", str(e)) |
|
|
|
|
|
patch_gradio_utils() |
|
|
|
|
|
|
|
|
debug.log("🔥 KRITISCH: BLIP Model Loading startet - das ist oft der langsamste Teil!") |
|
|
|
|
|
debug.log("BLIP Processor Download/Load startet...") |
|
|
print("Loading BLIP model...") |
|
|
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") |
|
|
debug.log("✅ BLIP Processor geladen") |
|
|
|
|
|
debug.log("BLIP Model Download/Load startet - das dauert oft sehr lange...") |
|
|
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to("cpu") |
|
|
debug.log("✅ BLIP Model geladen und auf CPU verschoben") |
|
|
|
|
|
def get_first_frame_for_blip(video_path, target_size=480): |
|
|
"""Effizient: Lädt nur das erste Frame für BLIP (nicht alle Frames!)""" |
|
|
try: |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
|
|
|
|
|
|
if not cap.isOpened(): |
|
|
print(f"DEBUG: Could not open video: {video_path}") |
|
|
cap.release() |
|
|
return None |
|
|
|
|
|
|
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
if frame_count <= 0: |
|
|
print(f"DEBUG: Invalid frame count: {frame_count}") |
|
|
cap.release() |
|
|
return None |
|
|
|
|
|
print(f"DEBUG: Video has {frame_count} frames, reading first frame (index 0)") |
|
|
|
|
|
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) |
|
|
ret, frame = cap.read() |
|
|
cap.release() |
|
|
|
|
|
if not ret or frame is None: |
|
|
print("DEBUG: Could not read first frame") |
|
|
return None |
|
|
|
|
|
|
|
|
h, w = frame.shape[:2] |
|
|
if max(h, w) > target_size: |
|
|
scale = target_size / max(h, w) |
|
|
new_h, new_w = int(h * scale), int(w * scale) |
|
|
frame = cv2.resize(frame, (new_w, new_h)) |
|
|
print(f"DEBUG: Resized frame from {w}x{h} to {new_w}x{new_h}") |
|
|
else: |
|
|
print(f"DEBUG: Frame size {w}x{h} already within target {target_size}") |
|
|
|
|
|
|
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
return frame_rgb |
|
|
|
|
|
except Exception as e: |
|
|
print(f"DEBUG: get_first_frame_for_blip error: {e}") |
|
|
return None |
|
|
|
|
|
def generate_blip_name(frame: np.ndarray) -> str: |
|
|
"""Generate filename from frame using BLIP image captioning + Duplikat-Entfernung""" |
|
|
try: |
|
|
|
|
|
if frame is None or frame.size == 0: |
|
|
return "video" |
|
|
|
|
|
image = Image.fromarray(frame) |
|
|
inputs = blip_processor(images=image, return_tensors="pt").to("cpu") |
|
|
out = blip_model.generate(**inputs) |
|
|
caption = blip_processor.decode(out[0], skip_special_tokens=True).lower() |
|
|
|
|
|
print(f"DEBUG: BLIP caption: '{caption}'") |
|
|
|
|
|
|
|
|
stopwords = {"a", "an", "the", "in", "on", "at", "with", "by", "of", "for", "under", "through", "and", "is"} |
|
|
words = [w for w in caption.split() if w not in stopwords and w.isalpha()] |
|
|
|
|
|
|
|
|
words = list(dict.fromkeys(words)) |
|
|
|
|
|
print(f"DEBUG: Words after stopword removal and deduplication: {words}") |
|
|
|
|
|
trimmed = "_".join(words[:3]) |
|
|
result = trimmed[:30] if trimmed else "video" |
|
|
|
|
|
print(f"DEBUG: Final BLIP name: '{result}'") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"BLIP error: {e}") |
|
|
return "video" |
|
|
|
|
|
|
|
|
|
|
|
def create_overlay_thumbnail(rgb_frame, depth_frame): |
|
|
""" |
|
|
Erstellt Overlay-Thumbnail mit vollständigem RGB und Depth-Miniatur unten rechts |
|
|
|
|
|
Args: |
|
|
rgb_frame: Original RGB Frame (volle Auflösung) |
|
|
depth_frame: Depth Frame (bereits auf RGB-Größe angepasst und verarbeitet) |
|
|
|
|
|
Returns: |
|
|
np.array: Thumbnail mit RGB-Vollbild und Depth-Overlay unten rechts |
|
|
""" |
|
|
print(f"DEBUG: Creating overlay thumbnail - RGB: {rgb_frame.shape}, Depth: {depth_frame.shape}") |
|
|
|
|
|
|
|
|
target_size = 1024 |
|
|
h, w = rgb_frame.shape[:2] |
|
|
|
|
|
if max(h, w) > target_size: |
|
|
scale = target_size / max(h, w) |
|
|
new_h, new_w = int(h * scale), int(w * scale) |
|
|
rgb_thumb = cv2.resize(rgb_frame, (new_w, new_h)) |
|
|
else: |
|
|
rgb_thumb = rgb_frame.copy() |
|
|
|
|
|
print(f"DEBUG: RGB thumbnail size: {rgb_thumb.shape}") |
|
|
|
|
|
|
|
|
thumb_h, thumb_w = rgb_thumb.shape[:2] |
|
|
depth_mini_w = int(thumb_w * 0.30) |
|
|
depth_mini_h = int(depth_mini_w * (thumb_h / thumb_w)) |
|
|
|
|
|
|
|
|
depth_mini = cv2.resize(depth_frame, (depth_mini_w, depth_mini_h)) |
|
|
|
|
|
print(f"DEBUG: Depth miniature size: {depth_mini.shape} (30% of RGB width)") |
|
|
|
|
|
|
|
|
result = rgb_thumb.copy() |
|
|
|
|
|
|
|
|
x_start = thumb_w - depth_mini_w |
|
|
y_start = thumb_h - depth_mini_h |
|
|
|
|
|
|
|
|
x_start = max(0, x_start) |
|
|
y_start = max(0, y_start) |
|
|
x_end = min(thumb_w, x_start + depth_mini_w) |
|
|
y_end = min(thumb_h, y_start + depth_mini_h) |
|
|
|
|
|
|
|
|
actual_w = x_end - x_start |
|
|
actual_h = y_end - y_start |
|
|
|
|
|
if actual_w != depth_mini_w or actual_h != depth_mini_h: |
|
|
depth_mini = cv2.resize(depth_mini, (actual_w, actual_h)) |
|
|
|
|
|
|
|
|
mask = create_rounded_corner_mask(actual_w, actual_h) |
|
|
|
|
|
|
|
|
apply_rounded_overlay(result, depth_mini, x_start, y_start, mask) |
|
|
|
|
|
print(f"DEBUG: Overlay thumbnail completed: {result.shape}") |
|
|
print(f"DEBUG: Depth overlay at position ({x_start}, {y_start}) with size {actual_w}x{actual_h}") |
|
|
return result |
|
|
|
|
|
def create_rounded_corner_mask(width, height): |
|
|
"""Erstellt Anti-Aliased Maske mit abgerundeter oberer linker Ecke""" |
|
|
|
|
|
radius = int(min(width, height) * 0.40) |
|
|
radius = max(radius, 5) |
|
|
|
|
|
|
|
|
mask = np.ones((height, width), dtype=np.float32) |
|
|
|
|
|
|
|
|
for y in range(radius): |
|
|
for x in range(radius): |
|
|
|
|
|
dist = np.sqrt((x - radius) ** 2 + (y - radius) ** 2) |
|
|
|
|
|
if dist > radius: |
|
|
|
|
|
alpha = max(0, 1 - (dist - radius)) |
|
|
mask[y, x] = alpha |
|
|
|
|
|
print(f"DEBUG: Created rounded mask with radius {radius}px for {width}x{height} overlay") |
|
|
return mask |
|
|
|
|
|
def apply_rounded_overlay(result, depth_mini, x_start, y_start, mask): |
|
|
"""Wendet Depth-Overlay mit abgerundeter Maske an""" |
|
|
actual_h, actual_w = depth_mini.shape[:2] |
|
|
|
|
|
|
|
|
rgb_section = result[y_start:y_start + actual_h, x_start:x_start + actual_w].copy() |
|
|
|
|
|
|
|
|
for c in range(3): |
|
|
|
|
|
blended = rgb_section[:, :, c].astype(np.float32) * (1 - mask) + \ |
|
|
depth_mini[:, :, c].astype(np.float32) * mask |
|
|
result[y_start:y_start + actual_h, x_start:x_start + actual_w, c] = blended.astype(np.uint8) |
|
|
|
|
|
print(f"DEBUG: Applied anti-aliased rounded overlay at ({x_start}, {y_start})") |
|
|
|
|
|
def add_depth_logo_to_overlay(thumbnail, overlay_x, overlay_y, overlay_w, overlay_h): |
|
|
"""Adds small 'D' logo specifically to the depth overlay area""" |
|
|
try: |
|
|
|
|
|
logo_size = max(20, int(overlay_w * 0.15)) |
|
|
|
|
|
|
|
|
margin = 5 |
|
|
x_pos = overlay_x + overlay_w - logo_size - margin |
|
|
y_pos = overlay_y + overlay_h - margin |
|
|
|
|
|
|
|
|
x_pos = max(overlay_x + margin, min(x_pos, overlay_x + overlay_w - logo_size)) |
|
|
y_pos = max(overlay_y + logo_size, min(y_pos, overlay_y + overlay_h - margin)) |
|
|
|
|
|
|
|
|
font = cv2.FONT_HERSHEY_SIMPLEX |
|
|
font_scale = max(1.0, logo_size / 20) |
|
|
font_thickness = max(2, int(logo_size / 10)) |
|
|
|
|
|
|
|
|
(text_w, text_h), baseline = cv2.getTextSize("D", font, font_scale, font_thickness) |
|
|
|
|
|
|
|
|
circle_radius = logo_size // 2 |
|
|
circle_center = (x_pos + circle_radius, y_pos - circle_radius) |
|
|
|
|
|
|
|
|
overlay = thumbnail.copy() |
|
|
|
|
|
|
|
|
cv2.circle(overlay, circle_center, circle_radius, (0, 0, 0), -1, cv2.LINE_AA) |
|
|
|
|
|
|
|
|
text_x = circle_center[0] - text_w // 2 |
|
|
text_y = circle_center[1] + text_h // 2 |
|
|
|
|
|
cv2.putText(overlay, "D", |
|
|
(text_x, text_y), |
|
|
font, font_scale, (255, 255, 255), font_thickness, cv2.LINE_AA) |
|
|
|
|
|
|
|
|
alpha = 0.8 |
|
|
result = cv2.addWeighted(thumbnail, 1-alpha, overlay, alpha, 0) |
|
|
|
|
|
print(f"DEBUG: Added small 'D' logo to overlay at ({circle_center[0]}, {circle_center[1]}), size: {logo_size}px") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"DEBUG: Overlay logo addition failed: {e}") |
|
|
return thumbnail |
|
|
|
|
|
def embed_thumbnail_in_video(video_path, thumbnail_array, base_name): |
|
|
"""Bettet Thumbnail als Cover-Art in MP4-Video ein (JPEG für iOS-Kompatibilität)""" |
|
|
try: |
|
|
|
|
|
if len(thumbnail_array.shape) == 3 and thumbnail_array.shape[2] == 3: |
|
|
|
|
|
thumbnail_bgr = cv2.cvtColor(thumbnail_array, cv2.COLOR_RGB2BGR) |
|
|
else: |
|
|
thumbnail_bgr = thumbnail_array |
|
|
|
|
|
|
|
|
temp_thumb_path = f"temp_{base_name}_thumb.jpg" |
|
|
|
|
|
|
|
|
success = cv2.imwrite(temp_thumb_path, thumbnail_bgr, [ |
|
|
cv2.IMWRITE_JPEG_QUALITY, 90, |
|
|
cv2.IMWRITE_JPEG_OPTIMIZE, 1 |
|
|
]) |
|
|
|
|
|
if not success: |
|
|
raise RuntimeError("Failed to save thumbnail as JPEG") |
|
|
|
|
|
|
|
|
if not os.path.exists(temp_thumb_path): |
|
|
raise RuntimeError("Thumbnail JPEG file not created") |
|
|
|
|
|
print(f"DEBUG: Saved thumbnail as JPEG: {temp_thumb_path}") |
|
|
|
|
|
|
|
|
temp_output = video_path.replace('.mp4', '_with_thumb.mp4') |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", "-y", |
|
|
"-i", video_path, |
|
|
"-i", temp_thumb_path, |
|
|
"-map", "0", |
|
|
"-map", "1", |
|
|
"-c", "copy", |
|
|
"-c:v:1", "mjpeg", |
|
|
"-disposition:v:1", "attached_pic", |
|
|
"-metadata:s:v:1", "title=Cover", |
|
|
"-metadata:s:v:1", "comment=JPEG Video Thumbnail", |
|
|
temp_output |
|
|
] |
|
|
|
|
|
print(f"DEBUG: Embedding JPEG thumbnail in video: {video_path}") |
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
|
|
|
if result.returncode == 0: |
|
|
|
|
|
os.replace(temp_output, video_path) |
|
|
print(f"✅ JPEG thumbnail successfully embedded in {video_path}") |
|
|
else: |
|
|
print(f"❌ FFmpeg failed: {result.stderr}") |
|
|
|
|
|
|
|
|
if os.path.exists(temp_thumb_path): |
|
|
os.remove(temp_thumb_path) |
|
|
if os.path.exists(temp_output): |
|
|
os.remove(temp_output) |
|
|
|
|
|
return result.returncode == 0 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Thumbnail embedding failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
debug.log("🔥 KRITISCH: Video Depth Anything Model Loading startet!") |
|
|
debug.log("Device wird ermittelt...") |
|
|
|
|
|
print("Loading Video Depth Anything model...") |
|
|
|
|
|
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
debug.log(f"Device ausgewählt: {DEVICE}") |
|
|
|
|
|
encoder = 'vitl' |
|
|
model_name = 'Large' |
|
|
model_configs = { |
|
|
'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]}, |
|
|
} |
|
|
|
|
|
debug.log("VideoDepthAnything Instanz wird erstellt...") |
|
|
video_depth_anything = VideoDepthAnything(**model_configs[encoder]) |
|
|
debug.log("✅ VideoDepthAnything Instanz erstellt") |
|
|
|
|
|
debug.log("🔥 KRITISCH: Model Checkpoint Download startet - das kann sehr lange dauern!") |
|
|
ckpt_path = hf_hub_download(repo_id=f"depth-anything/Video-Depth-Anything-{model_name}", |
|
|
filename=f"video_depth_anything_{encoder}.pth", |
|
|
cache_dir="/tmp/huggingface") |
|
|
debug.log("✅ Model Checkpoint heruntergeladen", f"Pfad: {ckpt_path}") |
|
|
|
|
|
debug.log("Model Weights werden geladen...") |
|
|
video_depth_anything.load_state_dict(torch.load(ckpt_path, map_location='cpu')) |
|
|
debug.log("✅ Model Weights geladen") |
|
|
|
|
|
debug.log("Model wird auf Device verschoben und in Eval-Modus gesetzt...") |
|
|
video_depth_anything = video_depth_anything.to(DEVICE).eval() |
|
|
debug.log("✅ Video Depth Anything Model komplett bereit!") |
|
|
|
|
|
|
|
|
def validate_url(url): |
|
|
"""Validate if URL is properly formatted""" |
|
|
try: |
|
|
parsed = urlparse(url) |
|
|
return bool(parsed.scheme and parsed.netloc) |
|
|
except: |
|
|
return False |
|
|
|
|
|
def download_video_with_ytdlp(url): |
|
|
"""Universal video download using yt-dlp Python module""" |
|
|
try: |
|
|
import yt_dlp |
|
|
import time |
|
|
import tempfile |
|
|
|
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
temp_filename = f"ytdlp_{int(time.time())}" |
|
|
temp_path = os.path.join(temp_dir, f"{temp_filename}.%(ext)s") |
|
|
|
|
|
|
|
|
ydl_opts = { |
|
|
'format': 'best[ext=mp4]/best', |
|
|
'outtmpl': temp_path, |
|
|
'noplaylist': True, |
|
|
'no_warnings': False, |
|
|
} |
|
|
|
|
|
print(f"DEBUG: Downloading with yt-dlp module: {url}") |
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
|
|
|
info = ydl.extract_info(url, download=False) |
|
|
|
|
|
|
|
|
ydl.download([url]) |
|
|
|
|
|
|
|
|
import glob |
|
|
temp_base = temp_path.replace(".%(ext)s", "") |
|
|
downloaded_files = glob.glob(f"{temp_base}.*") |
|
|
|
|
|
if not downloaded_files: |
|
|
raise RuntimeError("yt-dlp completed but no file found") |
|
|
|
|
|
actual_path = downloaded_files[0] |
|
|
print(f"DEBUG: yt-dlp downloaded: {actual_path}") |
|
|
return actual_path |
|
|
|
|
|
except ImportError: |
|
|
raise RuntimeError("yt-dlp Python module not installed. Install with: pip install yt-dlp") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to download with yt-dlp: {e}") |
|
|
|
|
|
def detect_video_source(url): |
|
|
"""Detect video source and determine download method""" |
|
|
|
|
|
if "cdn.midjourney.com" in url or "midjourney" in url.lower(): |
|
|
return "midjourney" |
|
|
elif "image.civitai.com" in url: |
|
|
return "civitai" |
|
|
elif "v21-kling.klingai.com" in url or "kling.ai" in url: |
|
|
return "kling" |
|
|
|
|
|
|
|
|
elif any(ext in url.lower() for ext in ['.mp4', '.webm', '.mov', '.avi', '.mkv']): |
|
|
return "direct_video" |
|
|
|
|
|
|
|
|
elif any(platform in url.lower() for platform in [ |
|
|
'youtube.com', 'youtu.be', 'vimeo.com', 'dailymotion.com', |
|
|
'tiktok.com', 'instagram.com', 'twitter.com', 'x.com', |
|
|
'facebook.com', 'reddit.com', 'twitch.tv' |
|
|
]): |
|
|
return "ytdlp_platform" |
|
|
|
|
|
|
|
|
else: |
|
|
return "ytdlp_fallback" |
|
|
|
|
|
def optimize_civitai_url(url): |
|
|
"""Convert gallery Civitai URLs to original quality to avoid dimension issues""" |
|
|
if "image.civitai.com" in url and "width=450" in url: |
|
|
|
|
|
optimized_url = url.replace("transcode=true,width=450", "transcode=true,original=true,quality=90") |
|
|
print(f"🔧 Optimized Civitai URL: gallery → original quality") |
|
|
print(f" From: {url}") |
|
|
print(f" To: {optimized_url}") |
|
|
return optimized_url |
|
|
return url |
|
|
|
|
|
def download_civitai_video(civitai_url): |
|
|
"""Direct download for Civitai videos (no proxy needed)""" |
|
|
try: |
|
|
|
|
|
civitai_url = optimize_civitai_url(civitai_url) |
|
|
|
|
|
|
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
|
'Referer': 'https://civitai.com/', |
|
|
'Accept': 'video/webm,video/mp4,video/*;q=0.9,*/*;q=0.8', |
|
|
} |
|
|
|
|
|
|
|
|
print(f"DEBUG: Downloading optimized Civitai video: {civitai_url}") |
|
|
|
|
|
response = requests.get(civitai_url, headers=headers, stream=True, timeout=30) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
try: |
|
|
parsed_url = urlparse(civitai_url) |
|
|
|
|
|
path_parts = parsed_url.path.split('/') |
|
|
if len(path_parts) > 1: |
|
|
|
|
|
filename_part = path_parts[-1] |
|
|
if '.' in filename_part: |
|
|
temp_path = f"temp_civitai_{filename_part}" |
|
|
else: |
|
|
import time |
|
|
temp_path = f"temp_civitai_{int(time.time())}.webm" |
|
|
else: |
|
|
import time |
|
|
temp_path = f"temp_civitai_{int(time.time())}.webm" |
|
|
except: |
|
|
import time |
|
|
temp_path = f"temp_civitai_{int(time.time())}.webm" |
|
|
|
|
|
|
|
|
with open(temp_path, "wb") as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
if chunk: |
|
|
f.write(chunk) |
|
|
|
|
|
print(f"DEBUG: Civitai video downloaded to: {temp_path}") |
|
|
return temp_path |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to download Civitai video: {e}") |
|
|
|
|
|
def download_video_from_url(original_url): |
|
|
"""Universal video downloader with yt-dlp integration""" |
|
|
try: |
|
|
if not validate_url(original_url): |
|
|
raise ValueError("Invalid URL format") |
|
|
|
|
|
|
|
|
source = detect_video_source(original_url) |
|
|
print(f"DEBUG: Detected video source: {source}") |
|
|
|
|
|
if source == "direct_video": |
|
|
return download_generic_video(original_url) |
|
|
elif source == "civitai": |
|
|
return download_civitai_video(original_url) |
|
|
elif source == "midjourney": |
|
|
return download_midjourney_video(original_url) |
|
|
elif source == "kling": |
|
|
return download_generic_video(original_url) |
|
|
elif source == "ytdlp_platform": |
|
|
return download_video_with_ytdlp(original_url) |
|
|
elif source == "ytdlp_fallback": |
|
|
|
|
|
try: |
|
|
return download_video_with_ytdlp(original_url) |
|
|
except Exception as ytdlp_error: |
|
|
print(f"DEBUG: yt-dlp failed, trying direct download: {ytdlp_error}") |
|
|
return download_generic_video(original_url) |
|
|
else: |
|
|
return download_generic_video(original_url) |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to download video: {e}") |
|
|
|
|
|
def download_midjourney_video(mj_url): |
|
|
"""Download MidJourney videos via proxy""" |
|
|
try: |
|
|
proxy_base = "https://9cee417c-5874-4e53-939a-52ad3f6f2f30-00-16i6nbwyeqga.picard.replit.dev/" |
|
|
proxy_url = f"{proxy_base}?url={mj_url}" |
|
|
|
|
|
|
|
|
try: |
|
|
parsed_url = urlparse(mj_url) |
|
|
url_filename = os.path.basename(parsed_url.path) |
|
|
if url_filename and '.' in url_filename: |
|
|
temp_path = f"temp_mj_{url_filename}" |
|
|
else: |
|
|
import time |
|
|
temp_path = f"temp_mj_{int(time.time())}.mp4" |
|
|
except: |
|
|
import time |
|
|
temp_path = f"temp_mj_{int(time.time())}.mp4" |
|
|
|
|
|
print(f"DEBUG: Downloading MJ video via proxy: {proxy_url}") |
|
|
|
|
|
with requests.get(proxy_url, stream=True, timeout=30) as response: |
|
|
response.raise_for_status() |
|
|
with open(temp_path, "wb") as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
if chunk: |
|
|
f.write(chunk) |
|
|
return temp_path |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to download MJ video: {e}") |
|
|
|
|
|
def download_generic_video(url): |
|
|
"""Fallback for unknown video sources""" |
|
|
try: |
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
|
} |
|
|
|
|
|
response = requests.get(url, headers=headers, stream=True, timeout=30) |
|
|
response.raise_for_status() |
|
|
|
|
|
import time |
|
|
temp_path = f"temp_generic_{int(time.time())}.mp4" |
|
|
|
|
|
with open(temp_path, "wb") as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
if chunk: |
|
|
f.write(chunk) |
|
|
return temp_path |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to download generic video: {e}") |
|
|
|
|
|
|
|
|
current_video_file = None |
|
|
current_video_url = None |
|
|
blip_generated_name = "" |
|
|
original_filename = "" |
|
|
|
|
|
|
|
|
@spaces.GPU(duration=300) |
|
|
def infer_video_depth_from_source(upload_video, video_url, filename, use_blip, create_thumbnail, *args): |
|
|
"""Process video to generate depth maps and RGBD output with ZeroGPU acceleration""" |
|
|
try: |
|
|
max_len, target_fps, max_res, stitch, grayscale, convert_from_color, blur = args |
|
|
|
|
|
|
|
|
input_path = upload_video or video_url |
|
|
if not input_path: |
|
|
return None, None, "Error: No video source provided", None |
|
|
|
|
|
|
|
|
base_name = filename.strip().replace(" ", "_")[:30] if filename.strip() else "output" |
|
|
print(f"DEBUG: Final filename locked in: '{base_name}'") |
|
|
|
|
|
|
|
|
output_dir = "./outputs" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
vis_video_path = os.path.join(output_dir, base_name + "_vis.mp4") |
|
|
rgbd_video_path = os.path.join(output_dir, base_name + "_RGBD.mp4") |
|
|
|
|
|
print(f"DEBUG: Output files - Vis: '{vis_video_path}', RGBD: '{rgbd_video_path}'") |
|
|
|
|
|
|
|
|
print("Reading video frames...") |
|
|
frames, target_fps = read_video_frames(input_path, max_len, target_fps, max_res) |
|
|
if len(frames) == 0: |
|
|
return None, None, "Error: No frames could be extracted from video", None |
|
|
|
|
|
|
|
|
print("Generating depth maps with ZeroGPU acceleration...") |
|
|
depths, fps = video_depth_anything.infer_video_depth(frames, target_fps, input_size=518, device=DEVICE) |
|
|
print("✅ Depth maps generated successfully") |
|
|
|
|
|
|
|
|
save_video(depths, vis_video_path, fps=fps, is_depths=True) |
|
|
|
|
|
rgbd_path = None |
|
|
thumbnail = None |
|
|
|
|
|
if stitch: |
|
|
print("Creating RGBD stitched video...") |
|
|
|
|
|
full_frames, _ = read_video_frames(input_path, max_len, target_fps, max_res=-1) |
|
|
d_min, d_max = depths.min(), depths.max() |
|
|
stitched_frames = [] |
|
|
|
|
|
for i in range(min(len(full_frames), len(depths))): |
|
|
rgb = full_frames[i] |
|
|
depth = ((depths[i] - d_min) / (d_max - d_min) * 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
if grayscale: |
|
|
if convert_from_color: |
|
|
import matplotlib |
|
|
cmap = matplotlib.colormaps.get_cmap("inferno") |
|
|
depth_color = (cmap(depth / 255.0)[..., :3] * 255).astype(np.uint8) |
|
|
gray = cv2.cvtColor(depth_color, cv2.COLOR_RGB2GRAY) |
|
|
depth_vis = np.stack([gray]*3, axis=-1) |
|
|
else: |
|
|
depth_vis = np.stack([depth]*3, axis=-1) |
|
|
else: |
|
|
import matplotlib |
|
|
cmap = matplotlib.colormaps.get_cmap("inferno") |
|
|
depth_vis = (cmap(depth / 255.0)[..., :3] * 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
if blur > 0: |
|
|
kernel = int(blur * 20) * 2 + 1 |
|
|
depth_vis = cv2.GaussianBlur(depth_vis, (kernel, kernel), 0) |
|
|
|
|
|
|
|
|
depth_resized = cv2.resize(depth_vis, (rgb.shape[1], rgb.shape[0])) |
|
|
stitched = cv2.hconcat([rgb, depth_resized]) |
|
|
stitched_frames.append(stitched) |
|
|
|
|
|
|
|
|
if i == 0 and create_thumbnail: |
|
|
print("Creating thumbnail from first perfectly matched RGB+Depth pair...") |
|
|
try: |
|
|
print(f"DEBUG: Using RGB: {rgb.shape}, Depth: {depth_resized.shape}") |
|
|
print(f"DEBUG: Depth range: {depth_resized.min()} - {depth_resized.max()}") |
|
|
|
|
|
|
|
|
thumbnail = create_overlay_thumbnail(rgb, depth_resized) |
|
|
|
|
|
print("✅ Thumbnail created from first RGBD pair (not embedded yet)") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Thumbnail creation failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
thumbnail = None |
|
|
|
|
|
|
|
|
save_video(np.array(stitched_frames), rgbd_video_path, fps=fps) |
|
|
print("✅ RGBD video created successfully") |
|
|
|
|
|
|
|
|
try: |
|
|
temp_audio_path = rgbd_video_path.replace('.mp4', '_audio.mp4') |
|
|
cmd = [ |
|
|
"ffmpeg", "-y", "-i", rgbd_video_path, "-i", input_path, |
|
|
"-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0?", |
|
|
"-shortest", temp_audio_path |
|
|
] |
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
if result.returncode == 0: |
|
|
os.replace(temp_audio_path, rgbd_video_path) |
|
|
print("✅ Audio added successfully") |
|
|
except Exception as e: |
|
|
print(f"Audio processing failed: {e}") |
|
|
|
|
|
rgbd_path = rgbd_video_path |
|
|
|
|
|
|
|
|
if create_thumbnail and thumbnail is not None: |
|
|
print("Embedding thumbnail in RGBD video only (after all processing)...") |
|
|
embed_thumbnail_in_video(rgbd_video_path, thumbnail, base_name) |
|
|
print("✅ Thumbnail embedded in RGBD video only") |
|
|
elif create_thumbnail: |
|
|
print("❌ No thumbnail to embed") |
|
|
|
|
|
|
|
|
gc.collect() |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
success_msg = f"✅ Videos saved as '{base_name}_vis.mp4'" |
|
|
if stitch and rgbd_path: |
|
|
success_msg += f" and '{base_name}_RGBD.mp4'" |
|
|
if create_thumbnail and thumbnail is not None: |
|
|
success_msg += " with embedded thumbnail" |
|
|
|
|
|
print(f"DEBUG: Processing completed - Vis: '{vis_video_path}', RGBD: '{rgbd_path}'") |
|
|
return vis_video_path, rgbd_path, success_msg, thumbnail |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Processing failed: {str(e)}" |
|
|
print(error_msg) |
|
|
return None, None, error_msg, None |
|
|
|
|
|
|
|
|
def on_video_upload_change(video_file, use_blip): |
|
|
"""Handle video upload and store video info for toggling""" |
|
|
global current_video_file, blip_generated_name, original_filename, current_video_url |
|
|
|
|
|
print(f"DEBUG: Upload handler called with video_file: {video_file}") |
|
|
|
|
|
if not video_file: |
|
|
print("DEBUG: No video file - clearing state") |
|
|
current_video_file = None |
|
|
blip_generated_name = "" |
|
|
original_filename = "" |
|
|
return "", gr.update(), "Upload a video file" |
|
|
|
|
|
try: |
|
|
|
|
|
current_video_file = video_file |
|
|
current_video_url = None |
|
|
|
|
|
print(f"DEBUG: Processing upload - video_file type: {type(video_file)}") |
|
|
|
|
|
|
|
|
original_filename = "uploaded_video" |
|
|
|
|
|
|
|
|
if hasattr(video_file, 'name') and video_file.name: |
|
|
print(f"DEBUG: video_file.name = '{video_file.name}'") |
|
|
original_name = os.path.splitext(os.path.basename(video_file.name))[0] |
|
|
cleaned = "".join(c for c in original_name if c.isalnum() or c in "_-")[:30] |
|
|
if cleaned: |
|
|
original_filename = cleaned |
|
|
print(f"DEBUG: Method 1 success: '{original_filename}'") |
|
|
|
|
|
|
|
|
elif hasattr(video_file, 'orig_name') and video_file.orig_name: |
|
|
print(f"DEBUG: video_file.orig_name = '{video_file.orig_name}'") |
|
|
original_name = os.path.splitext(os.path.basename(video_file.orig_name))[0] |
|
|
cleaned = "".join(c for c in original_name if c.isalnum() or c in "_-")[:30] |
|
|
if cleaned: |
|
|
original_filename = cleaned |
|
|
print(f"DEBUG: Method 2 success: '{original_filename}'") |
|
|
|
|
|
|
|
|
elif isinstance(video_file, str): |
|
|
print(f"DEBUG: video_file is string: '{video_file}'") |
|
|
original_name = os.path.splitext(os.path.basename(video_file))[0] |
|
|
cleaned = "".join(c for c in original_name if c.isalnum() or c in "_-")[:30] |
|
|
if cleaned: |
|
|
original_filename = cleaned |
|
|
print(f"DEBUG: Method 3 success: '{original_filename}'") |
|
|
|
|
|
print(f"DEBUG: Final original filename set to: '{original_filename}'") |
|
|
|
|
|
|
|
|
blip_generated_name = "" |
|
|
if use_blip: |
|
|
print("DEBUG: Starting optimized BLIP processing...") |
|
|
frame = get_first_frame_for_blip(video_file, target_size=480) |
|
|
blip_generated_name = generate_blip_name(frame) |
|
|
print(f"DEBUG: BLIP name generated: '{blip_generated_name}'") |
|
|
|
|
|
|
|
|
final_name = blip_generated_name if (use_blip and blip_generated_name) else original_filename |
|
|
print(f"DEBUG: Final name returned: '{final_name}' (BLIP: {use_blip})") |
|
|
return final_name, "", "Video uploaded successfully!" |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Upload processing failed: {str(e)}" |
|
|
print(f"DEBUG ERROR: {error_msg}") |
|
|
return "uploaded_video", gr.update(), error_msg |
|
|
|
|
|
def on_video_url_change(url, use_blip): |
|
|
"""Handle URL input change with support for MJ and Civitai""" |
|
|
global current_video_file, current_video_url, blip_generated_name, original_filename |
|
|
|
|
|
if not url or url.strip() == "": |
|
|
|
|
|
if current_video_file is None: |
|
|
current_video_url = None |
|
|
blip_generated_name = "" |
|
|
original_filename = "" |
|
|
return None, "", "Enter a video URL (YouTube, TikTok, Instagram, MidJourney, Civitai, etc.)" |
|
|
else: |
|
|
|
|
|
return gr.update(), gr.update(), gr.update() |
|
|
|
|
|
try: |
|
|
source = detect_video_source(url) |
|
|
print(f"Downloading {source} video from URL: {url}") |
|
|
|
|
|
video_path = download_video_from_url(url) |
|
|
|
|
|
|
|
|
current_video_file = None |
|
|
current_video_url = video_path |
|
|
|
|
|
|
|
|
try: |
|
|
if source == "civitai": |
|
|
|
|
|
parsed_url = urlparse(url) |
|
|
path_parts = parsed_url.path.split('/') |
|
|
|
|
|
for part in reversed(path_parts): |
|
|
if part and '.' not in part and len(part) > 3: |
|
|
cleaned = "".join(c for c in part if c.isalnum() or c in "_-")[:20] |
|
|
if cleaned: |
|
|
original_filename = f"civitai_{cleaned}" |
|
|
break |
|
|
else: |
|
|
original_filename = "civitai_video" |
|
|
|
|
|
elif source == "midjourney": |
|
|
original_filename = "midjourney_video" |
|
|
elif source == "kling": |
|
|
original_filename = "kling_video" |
|
|
elif source == "direct_video": |
|
|
|
|
|
parsed_url = urlparse(url) |
|
|
url_filename = os.path.splitext(os.path.basename(parsed_url.path))[0] |
|
|
cleaned = "".join(c for c in url_filename if c.isalnum() or c in "_-")[:20] |
|
|
original_filename = cleaned if cleaned else "direct_video" |
|
|
elif source in ["ytdlp_platform", "ytdlp_fallback"]: |
|
|
|
|
|
parsed_url = urlparse(url) |
|
|
domain = parsed_url.netloc.lower() |
|
|
|
|
|
domain = domain.replace('www.', '').replace('m.', '') |
|
|
domain_name = domain.split('.')[0] |
|
|
original_filename = f"{domain_name}_video" |
|
|
else: |
|
|
original_filename = "downloaded_video" |
|
|
|
|
|
except: |
|
|
original_filename = f"{source}_video" if source != "unknown" else "downloaded_video" |
|
|
|
|
|
print(f"DEBUG: {source.title()} original filename set to: '{original_filename}'") |
|
|
|
|
|
blip_generated_name = "" |
|
|
|
|
|
|
|
|
if use_blip and video_path: |
|
|
try: |
|
|
print("DEBUG: Starting optimized BLIP processing for URL video...") |
|
|
frame = get_first_frame_for_blip(video_path, target_size=480) |
|
|
blip_generated_name = generate_blip_name(frame) |
|
|
print(f"DEBUG: {source.title()} BLIP name generated: '{blip_generated_name}'") |
|
|
except Exception as e: |
|
|
print(f"BLIP naming failed: {e}") |
|
|
blip_generated_name = "" |
|
|
|
|
|
|
|
|
final_name = blip_generated_name if (use_blip and blip_generated_name) else original_filename |
|
|
success_msg = f"✅ {source.title()} video downloaded successfully!" |
|
|
print(f"DEBUG: {source.title()} final name returned: '{final_name}' (BLIP: {use_blip})") |
|
|
return video_path, final_name, success_msg |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Download failed: {str(e)}" |
|
|
print(error_msg) |
|
|
return None, "", error_msg |
|
|
|
|
|
def on_blip_toggle(use_blip): |
|
|
"""Handle BLIP checkbox toggle - switch between BLIP and original name""" |
|
|
global current_video_file, current_video_url, blip_generated_name, original_filename |
|
|
|
|
|
|
|
|
if current_video_file is None and current_video_url is None: |
|
|
return "", "No video loaded" |
|
|
|
|
|
print(f"DEBUG: Toggle called - BLIP: {use_blip}, Original: '{original_filename}', BLIP name: '{blip_generated_name}'") |
|
|
|
|
|
try: |
|
|
|
|
|
if use_blip and not blip_generated_name: |
|
|
if current_video_file: |
|
|
frame = get_first_frame_for_blip(current_video_file, target_size=480) |
|
|
blip_generated_name = generate_blip_name(frame) |
|
|
print(f"DEBUG: Generated new BLIP name from file: '{blip_generated_name}'") |
|
|
elif current_video_url: |
|
|
|
|
|
frame = get_first_frame_for_blip(current_video_url, target_size=480) |
|
|
blip_generated_name = generate_blip_name(frame) |
|
|
print(f"DEBUG: Generated new BLIP name from URL: '{blip_generated_name}'") |
|
|
|
|
|
|
|
|
if use_blip and blip_generated_name: |
|
|
final_name = blip_generated_name |
|
|
status = "Using BLIP generated name" |
|
|
else: |
|
|
final_name = original_filename if original_filename else "video" |
|
|
status = "Using original filename" |
|
|
|
|
|
print(f"DEBUG: Toggle returning: '{final_name}' - {status}") |
|
|
return final_name, status |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Name generation failed: {str(e)}" |
|
|
print(error_msg) |
|
|
fallback = original_filename if original_filename else "video" |
|
|
return fallback, error_msg |
|
|
|
|
|
|
|
|
with gr.Blocks(analytics_enabled=False, title="Video Depth Anything - ZeroGPU") as demo: |
|
|
gr.Markdown(""" |
|
|
# 🎥 Video Depth Anything + RGBD Output (ZeroGPU Accelerated) |
|
|
|
|
|
Generate depth maps from videos and watch RGBD videos on holographic displays like Looking Glass Go. |
|
|
Upload a video or paste a video URL from **YouTube, TikTok, Instagram, MidJourney, Civitai**, or any platform. |
|
|
|
|
|
**⚡ GPU acceleration powered by ZeroGPU** |
|
|
|
|
|
[🔗 Project Page](https://videodepthanything.github.io/) | [📖 Paper](https://arxiv.org/abs/2401.01884) |
|
|
""") |
|
|
|
|
|
|
|
|
status_display = gr.HTML("") |
|
|
|
|
|
with gr.Row(equal_height=True): |
|
|
with gr.Column(scale=1): |
|
|
upload_video = gr.Video( |
|
|
label="Upload Video", |
|
|
height=500, |
|
|
show_label=True |
|
|
) |
|
|
with gr.Column(scale=1): |
|
|
depth_out = gr.Video( |
|
|
label="Depth Visualization", |
|
|
interactive=False, |
|
|
autoplay=True, |
|
|
height=500, |
|
|
show_label=True |
|
|
) |
|
|
with gr.Column(scale=2): |
|
|
rgbd_out = gr.Video( |
|
|
label="RGBD Side-by-Side", |
|
|
interactive=False, |
|
|
autoplay=True, |
|
|
height=500, |
|
|
show_label=True |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
video_url = gr.Textbox( |
|
|
label="Video URL (YouTube, TikTok, Instagram, Civitai, MidJourney, etc.)", |
|
|
placeholder="Paste video URL from YouTube, TikTok, Instagram, MidJourney, Civitai, or any platform...", |
|
|
scale=3 |
|
|
) |
|
|
use_blip = gr.Checkbox( |
|
|
label="Auto-name with BLIP", |
|
|
value=True, |
|
|
scale=1, |
|
|
info="Generate filename from video content" |
|
|
) |
|
|
filename = gr.Textbox( |
|
|
label="Output Filename (_RGBD.mp4 will be added)", |
|
|
placeholder="Enter filename or let BLIP generate it", |
|
|
scale=3 |
|
|
) |
|
|
create_thumbnail = gr.Checkbox( |
|
|
label="Embed Video Thumbnail", |
|
|
value=True, |
|
|
scale=1, |
|
|
info="Generate and embed thumbnail in MP4" |
|
|
) |
|
|
thumbnail_preview = gr.Image( |
|
|
label="Thumbnail Preview", |
|
|
height=140, |
|
|
width=180, |
|
|
interactive=False, |
|
|
show_label=True, |
|
|
scale=1 |
|
|
) |
|
|
|
|
|
|
|
|
video_url.change( |
|
|
fn=on_video_url_change, |
|
|
inputs=[video_url, use_blip], |
|
|
outputs=[upload_video, filename, status_display], |
|
|
queue=False |
|
|
) |
|
|
|
|
|
upload_video.upload( |
|
|
fn=on_video_upload_change, |
|
|
inputs=[upload_video, use_blip], |
|
|
outputs=[filename, video_url, status_display], |
|
|
queue=False |
|
|
) |
|
|
|
|
|
|
|
|
use_blip.change( |
|
|
fn=on_blip_toggle, |
|
|
inputs=[use_blip], |
|
|
outputs=[filename, status_display] |
|
|
) |
|
|
|
|
|
with gr.Accordion("⚙️ Advanced Settings", open=False): |
|
|
with gr.Row(): |
|
|
max_len = gr.Slider( |
|
|
label="Max Frames", |
|
|
minimum=-1, |
|
|
maximum=1000, |
|
|
value=-1, |
|
|
step=1, |
|
|
info="Maximum frames to process (-1 for all)" |
|
|
) |
|
|
target_fps = gr.Slider( |
|
|
label="Target FPS", |
|
|
minimum=-1, |
|
|
maximum=30, |
|
|
value=-1, |
|
|
step=1, |
|
|
info="Output FPS (-1 for original)" |
|
|
) |
|
|
max_res = gr.Slider( |
|
|
label="Max Resolution", |
|
|
minimum=480, |
|
|
maximum=1920, |
|
|
value=1280, |
|
|
step=1, |
|
|
info="Maximum resolution for processing" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
stitch = gr.Checkbox( |
|
|
label="Create RGBD Output", |
|
|
value=True, |
|
|
info="Generate side-by-side RGB + Depth video" |
|
|
) |
|
|
grayscale = gr.Checkbox( |
|
|
label="Grayscale Depth", |
|
|
value=True, |
|
|
info="Convert depth to grayscale" |
|
|
) |
|
|
convert_from_color = gr.Checkbox( |
|
|
label="From Colormap", |
|
|
value=True, |
|
|
info="Convert from color before grayscale" |
|
|
) |
|
|
blur = gr.Slider( |
|
|
label="Depth Blur", |
|
|
minimum=0, |
|
|
maximum=1, |
|
|
value=0.3, |
|
|
step=0.01, |
|
|
info="Blur amount for depth visualization" |
|
|
) |
|
|
|
|
|
run_btn = gr.Button("🚀 Generate Depth Video with ZeroGPU", variant="primary", size="lg") |
|
|
|
|
|
|
|
|
run_btn.click( |
|
|
fn=infer_video_depth_from_source, |
|
|
inputs=[ |
|
|
upload_video, video_url, filename, use_blip, create_thumbnail, |
|
|
max_len, target_fps, max_res, stitch, |
|
|
grayscale, convert_from_color, blur |
|
|
], |
|
|
outputs=[depth_out, rgbd_out, status_display, thumbnail_preview] |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
### 🚀 ZeroGPU Features: |
|
|
- **GPU Acceleration**: Automatic GPU allocation for depth processing |
|
|
- **Memory Management**: Optimized VRAM usage with automatic cleanup |
|
|
- **Queue System**: Fair resource sharing with other users |
|
|
|
|
|
### Tips: |
|
|
- **Upload formats**: MP4, AVI, MOV, etc. |
|
|
- **BLIP naming**: Automatically generates descriptive filenames |
|
|
- **RGBD output**: Side-by-side comparison of original and depth |
|
|
- **Thumbnail Preview**: Shows final RGB→Depth gradient after processing |
|
|
- **Embedded Thumbnails**: Videos will show previews in Windows Explorer |
|
|
- **Processing time**: GPU acceleration makes processing much faster |
|
|
- **Filename**: Set your preferred name before clicking Generate! |
|
|
""") |
|
|
|
|
|
demo.queue(max_size=10) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("Starting Video Depth Anything interface with ZeroGPU acceleration...") |
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
show_error=True |
|
|
) |