# modal_video_processing.py # Deploy with: modal deploy modal_video_processing.py import modal import os # Create Modal app app = modal.App("aiquoteclipgenerator") # Define image with all dependencies image = modal.Image.debian_slim(python_version="3.11").pip_install( "moviepy==1.0.3", "pillow", "numpy", "imageio==2.31.1", "imageio-ffmpeg", "requests", "fastapi", ) @app.function( image=image, cpu=2, memory=2048, timeout=180, concurrency_limit=10, allow_concurrent_inputs=10, container_idle_timeout=120, ) def process_quote_video( video_url: str, quote_text: str, audio_b64: str | None = None, text_style: str = "classic_center", ) -> bytes: """ Process a quote video on Modal. - Downloads a portrait/background video from `video_url`. - Overlays `quote_text` using a chosen `text_style`. - If `audio_b64` is provided, decodes it and: * sets it as the audio track * makes video duration roughly match audio (with min/max bounds). Duration rules: - With audio: target = audio_duration + 0.5s MIN = 7s, MAX = 20s - Without audio: target = min(original_video_duration, 15s) Returns: Raw bytes of the final MP4 video. """ import tempfile import requests from moviepy.editor import ( VideoFileClip, ImageClip, CompositeVideoClip, AudioFileClip, ) from moviepy.video.fx.all import loop as vfx_loop from PIL import Image, ImageDraw, ImageFont import numpy as np import time import base64 start_time = time.time() # --------------------------- # 1. Download video # --------------------------- resp = requests.get(video_url, stream=True, timeout=30) resp.raise_for_status() temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") with open(temp_video.name, "wb") as f: for chunk in resp.iter_content(chunk_size=1024 * 1024): f.write(chunk) # --------------------------- # 2. Load video # --------------------------- video = VideoFileClip(temp_video.name) orig_duration = video.duration # --------------------------- # 3. Duration logic + optional audio # --------------------------- audio_clip = None temp_audio_path = None # Default target when no audio target_duration = orig_duration if audio_b64: try: temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") temp_audio_path = temp_audio.name temp_audio.close() audio_bytes = base64.b64decode(audio_b64) with open(temp_audio_path, "wb") as f: f.write(audio_bytes) audio_clip = AudioFileClip(temp_audio_path) audio_duration = audio_clip.duration # Proportional rules with audio MIN_DUR = 7.0 MAX_DUR = 20.0 target_duration = audio_duration + 0.5 # small buffer if target_duration < MIN_DUR: target_duration = MIN_DUR if target_duration > MAX_DUR: target_duration = MAX_DUR # Adjust video to target_duration if target_duration > video.duration: video = vfx_loop(video, duration=target_duration) elif target_duration < video.duration: video = video.subclip(0, target_duration) except Exception as e: print(f"⚠️ Audio handling error: {e}") audio_clip = None # Fall back to no-audio behavior below if audio_clip is None: # No audio path: clamp to reasonable length MAX_NO_AUDIO = 15.0 if orig_duration > MAX_NO_AUDIO: target_duration = MAX_NO_AUDIO video = video.subclip(0, target_duration) else: target_duration = orig_duration # At this point, video.duration ≈ target_duration w, h = video.size # --------------------------- # 4. Create styled text overlay # --------------------------- def make_text_frame(t): img = Image.new("RGBA", (w, h), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) font_size = int(h * 0.025) try: font = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size ) except Exception: font = ImageFont.load_default() max_width = int(w * 0.6) # Wrap quote text words = quote_text.split() lines = [] current_line = [] for word in words: test_line = " ".join(current_line + [word]) bbox = draw.textbbox((0, 0), test_line, font=font) text_width = bbox[2] - bbox[0] if text_width <= max_width: current_line.append(word) else: if current_line: lines.append(" ".join(current_line)) current_line = [word] else: lines.append(word) if current_line: lines.append(" ".join(current_line)) line_spacing = int(font_size * 0.4) text_block_height = len(lines) * (font_size + line_spacing) # Positioning based on text_style style = (text_style or "classic_center").lower().strip() if style == "lower_third_serif": # Lower third of the frame y_start = int(h * 0.60) - text_block_height // 2 elif style == "typewriter_top": # Closer to the top y_start = int(h * 0.20) else: # classic_center y_start = (h - text_block_height) // 2 y = y_start for line in lines: bbox = draw.textbbox((0, 0), line, font=font) text_width = bbox[2] - bbox[0] x = (w - text_width) // 2 outline_width = max(2, int(font_size * 0.08)) for adj_x in range(-outline_width, outline_width + 1): for adj_y in range(-outline_width, outline_width + 1): draw.text((x + adj_x, y + adj_y), line, font=font, fill="black") draw.text((x, y), line, font=font, fill="white") y += font_size + line_spacing return np.array(img) text_clip = ImageClip(make_text_frame(0), duration=video.duration) # --------------------------- # 5. Composite video + text # --------------------------- final_video = CompositeVideoClip([video, text_clip]) # Attach audio if available (no extra duration forcing) if audio_clip is not None: try: final_video = final_video.set_audio(audio_clip) except Exception as e: print(f"⚠️ Could not attach audio: {e}") # --------------------------- # 6. Export final video # --------------------------- output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") output_path = output_file.name output_file.close() final_video.write_videofile( output_path, codec="libx264", audio_codec="aac", fps=10, preset="ultrafast", threads=2, verbose=False, logger=None, bitrate="400k", ffmpeg_params=["-crf", "30", "-g", "30"], ) # Read final bytes with open(output_path, "rb") as f: video_bytes = f.read() # --------------------------- # 7. Cleanup # --------------------------- video.close() final_video.close() try: os.unlink(temp_video.name) except Exception: pass if audio_clip is not None: try: audio_clip.close() except Exception: pass if temp_audio_path and os.path.exists(temp_audio_path): try: os.unlink(temp_audio_path) except Exception: pass try: os.unlink(output_path) except Exception: pass total_time = time.time() - start_time print( f"🎉 Total: {total_time:.1f}s, Size: {len(video_bytes) / 1024 / 1024:.2f}MB, " f"text_style={text_style}, target_duration≈{target_duration:.1f}s" ) return video_bytes @app.function(image=image) @modal.web_endpoint(method="POST") def process_video_endpoint(data: dict): """ Single-video HTTP endpoint. Expected JSON: { "video_url": "...", "quote_text": "...", "audio_b64": "....", # optional "text_style": "classic_center" | "lower_third_serif" | "typewriter_top" # optional } """ video_url = data.get("video_url") quote_text = data.get("quote_text") audio_b64 = data.get("audio_b64") text_style = data.get("text_style", "classic_center") if not video_url or not quote_text: return {"error": "Missing video_url or quote_text"}, 400 try: video_bytes = process_quote_video.remote( video_url=video_url, quote_text=quote_text, audio_b64=audio_b64, text_style=text_style, ) import base64 video_b64 = base64.b64encode(video_bytes).decode() return { "success": True, "video": video_b64, "size_mb": len(video_bytes) / 1024 / 1024, } except Exception as e: return {"error": str(e)}, 500 @app.function(image=image) @modal.web_endpoint(method="POST") def process_batch_endpoint(data: dict): """ Batch endpoint - process multiple videos in PARALLEL. Expected JSON: { "videos": [ { "video_url": "...", "quote_text": "...", "audio_b64": "...", # optional "text_style": "..." # optional }, ... ] } """ videos_data = data.get("videos", []) if not videos_data: return {"error": "Missing videos array"}, 400 try: # Prepare arguments video_urls = [v.get("video_url") for v in videos_data] quote_texts = [v.get("quote_text") for v in videos_data] audio_list = [v.get("audio_b64") for v in videos_data] styles = [v.get("text_style", "classic_center") for v in videos_data] # Basic validation for i, (vu, qt) in enumerate(zip(video_urls, quote_texts)): if not vu or not qt: return {"error": f"Missing video_url or quote_text at index {i}"}, 400 # Process all videos in parallel using map results = list( process_quote_video.map( video_urls, quote_texts, audio_list, styles, ) ) import base64 encoded_results = [] for video_bytes in results: video_b64 = base64.b64encode(video_bytes).decode() encoded_results.append( { "success": True, "video": video_b64, "size_mb": len(video_bytes) / 1024 / 1024, } ) return { "success": True, "videos": encoded_results, "count": len(encoded_results), } except Exception as e: return {"error": str(e)}, 500