# modal_video_processing.py # Deploy with: modal deploy modal_video_processing.py import modal import os # Create Modal app app = modal.App("aiquoteclipgenerator") # Define image with all dependencies image = modal.Image.debian_slim(python_version="3.11").pip_install( "moviepy==1.0.3", "pillow", "numpy", "imageio==2.31.1", "imageio-ffmpeg", "requests", "fastapi", ) @app.function( image=image, cpu=2, memory=2048, timeout=180, concurrency_limit=10, allow_concurrent_inputs=10, container_idle_timeout=120, ) def process_quote_video( video_url: str, quote_text: str, audio_b64: str = None, text_style: str = "classic_center", ) -> bytes: """ Process quote video on Modal - FAST version. Supports multiple text styles / font layouts. """ import tempfile import requests from moviepy.editor import VideoFileClip, ImageClip, CompositeVideoClip from PIL import Image, ImageDraw, ImageFont import numpy as np import time start_time = time.time() # Download video response = requests.get(video_url, stream=True, timeout=30) response.raise_for_status() temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") with open(temp_video.name, "wb") as f: for chunk in response.iter_content(chunk_size=1024 * 1024): f.write(chunk) # Load video video = VideoFileClip(temp_video.name) # Trim to first 10 seconds if video.duration > 10: video = video.subclip(0, 10) w, h = video.size # Choose layout + font behavior based on text_style # Supported: # - "classic_center" → centered, sans serif (default) # - "lower_third_serif" → bottom, serif # - "typewriter_top" → top, monospace-ish def make_text_frame(t): img = Image.new("RGBA", (w, h), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) base_font_size = int(h * 0.03) font_paths = [] y_mode = "center" font_size = base_font_size if text_style == "lower_third_serif": font_paths = [ "/usr/share/fonts/truetype/dejavu/DejaVuSerif-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSerif.ttf", ] y_mode = "lower_third" font_size = int(h * 0.032) elif text_style == "typewriter_top": font_paths = [ "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSansMono-Bold.ttf", ] y_mode = "top" font_size = int(h * 0.028) else: # classic_center font_paths = [ "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", ] y_mode = "center" font_size = base_font_size # Try fonts, fall back to default font = None for path in font_paths: try: font = ImageFont.truetype(path, font_size) break except Exception: continue if font is None: font = ImageFont.load_default() # Wrap text max_width = int(w * 0.7 if text_style != "classic_center" else w * 0.6) words = quote_text.split() lines = [] current_line = [] for word in words: test_line = " ".join(current_line + [word]) bbox = draw.textbbox((0, 0), test_line, font=font) text_width = bbox[2] - bbox[0] if text_width <= max_width: current_line.append(word) else: if current_line: lines.append(" ".join(current_line)) current_line = [word] else: lines.append(word) if current_line: lines.append(" ".join(current_line)) # Line spacing line_spacing = int(font_size * 0.4) text_block_height = len(lines) * (font_size + line_spacing) # Vertical placement if y_mode == "top": y = int(h * 0.10) elif y_mode == "lower_third": y = int(h * 0.65) else: # center y = (h - text_block_height) // 2 # Draw lines for line in lines: bbox = draw.textbbox((0, 0), line, font=font) text_width = bbox[2] - bbox[0] x = (w - text_width) // 2 # Outline / stroke outline_width = max(2, int(font_size * 0.08)) for adj_x in range(-outline_width, outline_width + 1): for adj_y in range(-outline_width, outline_width + 1): draw.text((x + adj_x, y + adj_y), line, font=font, fill="black") # Main text draw.text((x, y), line, font=font, fill="white") y += font_size + line_spacing return np.array(img) text_clip = ImageClip(make_text_frame(0), duration=video.duration) # Composite final_video = CompositeVideoClip([video, text_clip]) # Export - FAST settings output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") final_video.write_videofile( output_file.name, codec="libx264", audio_codec="aac", fps=10, preset="ultrafast", threads=2, verbose=False, logger=None, bitrate="400k", ffmpeg_params=["-crf", "30", "-g", "30"], ) # Read bytes with open(output_file.name, "rb") as f: video_bytes = f.read() # Cleanup video.close() final_video.close() os.unlink(temp_video.name) os.unlink(output_file.name) total_time = time.time() - start_time print( f"🎉 Total: {total_time:.1f}s, Size: {len(video_bytes)/1024/1024:.2f}MB, Style: {text_style}" ) return video_bytes @app.function(image=image) @modal.web_endpoint(method="POST") def process_video_endpoint(data: dict): """Single video web endpoint""" video_url = data.get("video_url") quote_text = data.get("quote_text") audio_b64 = data.get("audio_b64") # ignored for now text_style = data.get("text_style", "classic_center") if not video_url or not quote_text: return {"error": "Missing video_url or quote_text"}, 400 try: video_bytes = process_quote_video.remote( video_url, quote_text, audio_b64, text_style ) import base64 video_b64 = base64.b64encode(video_bytes).decode() return { "success": True, "video": video_b64, "size_mb": len(video_bytes) / 1024 / 1024, } except Exception as e: return {"error": str(e)}, 500 @app.function(image=image) @modal.web_endpoint(method="POST") def process_batch_endpoint(data: dict): """ Batch endpoint - process multiple videos in PARALLEL. """ videos_data = data.get("videos", []) if not videos_data: return {"error": "Missing videos array"}, 400 try: # Extract per-video parameters video_urls = [v["video_url"] for v in videos_data] quote_texts = [v["quote_text"] for v in videos_data] audio_b64s = [v.get("audio_b64") for v in videos_data] text_styles = [v.get("text_style", "classic_center") for v in videos_data] results = list( process_quote_video.map( video_urls, quote_texts, audio_b64s, text_styles, ) ) import base64 encoded_results = [] for video_bytes in results: video_b64 = base64.b64encode(video_bytes).decode() encoded_results.append( { "success": True, "video": video_b64, "size_mb": len(video_bytes) / 1024 / 1024, } ) return { "success": True, "videos": encoded_results, "count": len(encoded_results), } except Exception as e: return {"error": str(e)}, 500