# modal_video_processing.py # Deploy with: modal deploy modal_video_processing.py import modal import os # Create Modal app app = modal.App("aiquoteclipgenerator") # Define image with all dependencies image = modal.Image.debian_slim(python_version="3.11").pip_install( "moviepy==1.0.3", "pillow", "numpy", "imageio==2.31.1", "imageio-ffmpeg", "requests", "fastapi" ) @app.function( image=image, cpu=4, # 4 CPUs for faster encoding memory=4096, # 4GB RAM timeout=300, # 5 minute timeout ) def process_quote_video(video_url: str, quote_text: str, audio_url: str = None) -> bytes: """ Process quote video on Modal's fast infrastructure. Downloads video, adds text overlay, optionally adds audio, returns video bytes. Args: video_url: URL of background video quote_text: Quote to overlay audio_url: Optional URL of audio file Returns: bytes: Processed video file as bytes """ import tempfile import requests from moviepy.editor import VideoFileClip, ImageClip, CompositeVideoClip, AudioFileClip from PIL import Image, ImageDraw, ImageFont import numpy as np print(f"🎬 Starting video processing on Modal...") print(f" Video: {video_url[:50]}...") print(f" Quote length: {len(quote_text)} chars") # Download video print("📥 Downloading video...") response = requests.get(video_url, stream=True, timeout=30) response.raise_for_status() temp_video = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') with open(temp_video.name, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print("✅ Video downloaded") # Load video print("🎥 Loading video...") video = VideoFileClip(temp_video.name) w, h = video.size print(f" Dimensions: {w}x{h}") # Create text overlay using PIL print("✍️ Creating text overlay...") def make_text_frame(t): img = Image.new('RGBA', (w, h), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) font_size = int(h * 0.025) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) except: try: font = ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", font_size) except: font = ImageFont.load_default() max_width = int(w * 0.6) # Wrap text words = quote_text.split() lines = [] current_line = [] for word in words: test_line = ' '.join(current_line + [word]) bbox = draw.textbbox((0, 0), test_line, font=font) text_width = bbox[2] - bbox[0] if text_width <= max_width: current_line.append(word) else: if current_line: lines.append(' '.join(current_line)) current_line = [word] else: lines.append(word) if current_line: lines.append(' '.join(current_line)) line_spacing = int(font_size * 0.4) text_block_height = len(lines) * (font_size + line_spacing) y = (h - text_block_height) // 2 for line in lines: bbox = draw.textbbox((0, 0), line, font=font) text_width = bbox[2] - bbox[0] x = (w - text_width) // 2 outline_width = max(2, int(font_size * 0.08)) for adj_x in range(-outline_width, outline_width + 1): for adj_y in range(-outline_width, outline_width + 1): draw.text((x + adj_x, y + adj_y), line, font=font, fill='black') draw.text((x, y), line, font=font, fill='white') y += font_size + line_spacing return np.array(img) text_clip = ImageClip(make_text_frame(0), duration=video.duration) print("✅ Text overlay created") # Composite print("🎨 Compositing video...") final_video = CompositeVideoClip([video, text_clip]) # Add audio if provided if audio_url: print("🎤 Adding voice narration...") try: audio_response = requests.get(audio_url, timeout=30) audio_response.raise_for_status() temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') with open(temp_audio.name, 'wb') as f: f.write(audio_response.content) audio_clip = AudioFileClip(temp_audio.name) audio_duration = min(audio_clip.duration, final_video.duration) audio_clip = audio_clip.subclip(0, audio_duration) final_video = final_video.set_audio(audio_clip) print("✅ Audio added") os.unlink(temp_audio.name) except Exception as e: print(f"⚠️ Audio failed: {e}") # Export print("📦 Exporting video...") output_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') final_video.write_videofile( output_file.name, codec='libx264', audio_codec='aac', fps=24, preset='ultrafast', threads=4, verbose=False, logger=None ) print("✅ Video exported") # Read video bytes with open(output_file.name, 'rb') as f: video_bytes = f.read() # Cleanup video.close() final_video.close() os.unlink(temp_video.name) os.unlink(output_file.name) print(f"🎉 Processing complete! Video size: {len(video_bytes) / 1024 / 1024:.2f}MB") return video_bytes # Expose as web endpoint for easy calling from Gradio @app.function(image=image) @modal.web_endpoint(method="POST") def process_video_endpoint(data: dict): """ Web endpoint to process videos. Accepts JSON with video_url, quote_text, and optional audio_url. """ video_url = data.get("video_url") quote_text = data.get("quote_text") audio_url = data.get("audio_url") if not video_url or not quote_text: return {"error": "Missing video_url or quote_text"}, 400 try: video_bytes = process_quote_video.remote(video_url, quote_text, audio_url) # Return video bytes as base64 import base64 video_b64 = base64.b64encode(video_bytes).decode() return { "success": True, "video": video_b64, "size_mb": len(video_bytes) / 1024 / 1024 } except Exception as e: return {"error": str(e)}, 500 if __name__ == "__main__": # Test locally with app.run(): result = process_quote_video.remote( video_url="https://videos.pexels.com/video-files/3843433/3843433-uhd_2732_1440_25fps.mp4", quote_text="Test quote for local testing", audio_url=None ) print(f"Got video: {len(result)} bytes")