AIQuoteClipGenerator / modal_video_processing.py
ladybug11's picture
update
2f051ee
raw
history blame
8.22 kB
# modal_video_processing.py
# Deploy with: modal deploy modal_video_processing.py
import modal
import os
# Create Modal app
app = modal.App("aiquoteclipgenerator")
# Define image with all dependencies
image = modal.Image.debian_slim(python_version="3.11").pip_install(
"moviepy==1.0.3",
"pillow",
"numpy",
"imageio==2.31.1",
"imageio-ffmpeg",
"requests",
"fastapi",
)
@app.function(
image=image,
cpu=2,
memory=2048,
timeout=180,
concurrency_limit=10,
allow_concurrent_inputs=10,
container_idle_timeout=120,
)
def process_quote_video(
video_url: str,
quote_text: str,
audio_b64: str = None,
text_style: str = "classic_center",
) -> bytes:
"""
Process quote video on Modal - FAST version.
Supports multiple text styles / font layouts.
"""
import tempfile
import requests
from moviepy.editor import VideoFileClip, ImageClip, CompositeVideoClip
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import time
start_time = time.time()
# Download video
response = requests.get(video_url, stream=True, timeout=30)
response.raise_for_status()
temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
with open(temp_video.name, "wb") as f:
for chunk in response.iter_content(chunk_size=1024 * 1024):
f.write(chunk)
# Load video
video = VideoFileClip(temp_video.name)
# Trim to first 10 seconds
if video.duration > 10:
video = video.subclip(0, 10)
w, h = video.size
# Choose layout + font behavior based on text_style
# Supported:
# - "classic_center" β†’ centered, sans serif (default)
# - "lower_third_serif" β†’ bottom, serif
# - "typewriter_top" β†’ top, monospace-ish
def make_text_frame(t):
img = Image.new("RGBA", (w, h), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
base_font_size = int(h * 0.03)
font_paths = []
y_mode = "center"
font_size = base_font_size
if text_style == "lower_third_serif":
font_paths = [
"/usr/share/fonts/truetype/dejavu/DejaVuSerif-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSerif.ttf",
]
y_mode = "lower_third"
font_size = int(h * 0.032)
elif text_style == "typewriter_top":
font_paths = [
"/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSansMono-Bold.ttf",
]
y_mode = "top"
font_size = int(h * 0.028)
else: # classic_center
font_paths = [
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
]
y_mode = "center"
font_size = base_font_size
# Try fonts, fall back to default
font = None
for path in font_paths:
try:
font = ImageFont.truetype(path, font_size)
break
except Exception:
continue
if font is None:
font = ImageFont.load_default()
# Wrap text
max_width = int(w * 0.7 if text_style != "classic_center" else w * 0.6)
words = quote_text.split()
lines = []
current_line = []
for word in words:
test_line = " ".join(current_line + [word])
bbox = draw.textbbox((0, 0), test_line, font=font)
text_width = bbox[2] - bbox[0]
if text_width <= max_width:
current_line.append(word)
else:
if current_line:
lines.append(" ".join(current_line))
current_line = [word]
else:
lines.append(word)
if current_line:
lines.append(" ".join(current_line))
# Line spacing
line_spacing = int(font_size * 0.4)
text_block_height = len(lines) * (font_size + line_spacing)
# Vertical placement
if y_mode == "top":
y = int(h * 0.10)
elif y_mode == "lower_third":
y = int(h * 0.65)
else: # center
y = (h - text_block_height) // 2
# Draw lines
for line in lines:
bbox = draw.textbbox((0, 0), line, font=font)
text_width = bbox[2] - bbox[0]
x = (w - text_width) // 2
# Outline / stroke
outline_width = max(2, int(font_size * 0.08))
for adj_x in range(-outline_width, outline_width + 1):
for adj_y in range(-outline_width, outline_width + 1):
draw.text((x + adj_x, y + adj_y), line, font=font, fill="black")
# Main text
draw.text((x, y), line, font=font, fill="white")
y += font_size + line_spacing
return np.array(img)
text_clip = ImageClip(make_text_frame(0), duration=video.duration)
# Composite
final_video = CompositeVideoClip([video, text_clip])
# Export - FAST settings
output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
final_video.write_videofile(
output_file.name,
codec="libx264",
audio_codec="aac",
fps=10,
preset="ultrafast",
threads=2,
verbose=False,
logger=None,
bitrate="400k",
ffmpeg_params=["-crf", "30", "-g", "30"],
)
# Read bytes
with open(output_file.name, "rb") as f:
video_bytes = f.read()
# Cleanup
video.close()
final_video.close()
os.unlink(temp_video.name)
os.unlink(output_file.name)
total_time = time.time() - start_time
print(
f"πŸŽ‰ Total: {total_time:.1f}s, Size: {len(video_bytes)/1024/1024:.2f}MB, Style: {text_style}"
)
return video_bytes
@app.function(image=image)
@modal.web_endpoint(method="POST")
def process_video_endpoint(data: dict):
"""Single video web endpoint"""
video_url = data.get("video_url")
quote_text = data.get("quote_text")
audio_b64 = data.get("audio_b64") # ignored for now
text_style = data.get("text_style", "classic_center")
if not video_url or not quote_text:
return {"error": "Missing video_url or quote_text"}, 400
try:
video_bytes = process_quote_video.remote(
video_url, quote_text, audio_b64, text_style
)
import base64
video_b64 = base64.b64encode(video_bytes).decode()
return {
"success": True,
"video": video_b64,
"size_mb": len(video_bytes) / 1024 / 1024,
}
except Exception as e:
return {"error": str(e)}, 500
@app.function(image=image)
@modal.web_endpoint(method="POST")
def process_batch_endpoint(data: dict):
"""
Batch endpoint - process multiple videos in PARALLEL.
"""
videos_data = data.get("videos", [])
if not videos_data:
return {"error": "Missing videos array"}, 400
try:
# Extract per-video parameters
video_urls = [v["video_url"] for v in videos_data]
quote_texts = [v["quote_text"] for v in videos_data]
audio_b64s = [v.get("audio_b64") for v in videos_data]
text_styles = [v.get("text_style", "classic_center") for v in videos_data]
results = list(
process_quote_video.map(
video_urls,
quote_texts,
audio_b64s,
text_styles,
)
)
import base64
encoded_results = []
for video_bytes in results:
video_b64 = base64.b64encode(video_bytes).decode()
encoded_results.append(
{
"success": True,
"video": video_b64,
"size_mb": len(video_bytes) / 1024 / 1024,
}
)
return {
"success": True,
"videos": encoded_results,
"count": len(encoded_results),
}
except Exception as e:
return {"error": str(e)}, 500