AIQuoteClipGenerator / modal_video_processing.py
ladybug11's picture
update
4af6b12
# modal_video_processing.py
# Deploy with: modal deploy modal_video_processing.py
import modal
import os
# Create Modal app
app = modal.App("aiquoteclipgenerator")
# Define image with all dependencies
image = modal.Image.debian_slim(python_version="3.11").pip_install(
"moviepy==1.0.3",
"pillow",
"numpy",
"imageio==2.31.1",
"imageio-ffmpeg",
"requests",
"fastapi",
)
@app.function(
image=image,
cpu=2,
memory=2048,
timeout=180,
concurrency_limit=10,
allow_concurrent_inputs=10,
container_idle_timeout=120,
)
def process_quote_video(
video_url: str,
quote_text: str,
audio_b64: str | None = None,
text_style: str = "classic_center",
) -> bytes:
"""
Process a quote video on Modal.
- Downloads a portrait/background video from `video_url`.
- Overlays `quote_text` using a chosen `text_style`.
- If `audio_b64` is provided, decodes it and:
* sets it as the audio track
* makes video duration roughly match audio (with min/max bounds).
Duration rules:
- With audio:
target = audio_duration + 0.5s
MIN = 7s, MAX = 20s
- Without audio:
target = min(original_video_duration, 15s)
Returns:
Raw bytes of the final MP4 video.
"""
import tempfile
import requests
from moviepy.editor import (
VideoFileClip,
ImageClip,
CompositeVideoClip,
AudioFileClip,
)
from moviepy.video.fx.all import loop as vfx_loop
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import time
import base64
start_time = time.time()
# ---------------------------
# 1. Download video
# ---------------------------
resp = requests.get(video_url, stream=True, timeout=30)
resp.raise_for_status()
temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
with open(temp_video.name, "wb") as f:
for chunk in resp.iter_content(chunk_size=1024 * 1024):
f.write(chunk)
# ---------------------------
# 2. Load video
# ---------------------------
video = VideoFileClip(temp_video.name)
orig_duration = video.duration
# ---------------------------
# 3. Duration logic + optional audio
# ---------------------------
audio_clip = None
temp_audio_path = None
# Default target when no audio
target_duration = orig_duration
if audio_b64:
try:
temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
temp_audio_path = temp_audio.name
temp_audio.close()
audio_bytes = base64.b64decode(audio_b64)
with open(temp_audio_path, "wb") as f:
f.write(audio_bytes)
audio_clip = AudioFileClip(temp_audio_path)
audio_duration = audio_clip.duration
# Proportional rules with audio
MIN_DUR = 7.0
MAX_DUR = 20.0
target_duration = audio_duration + 0.5 # small buffer
if target_duration < MIN_DUR:
target_duration = MIN_DUR
if target_duration > MAX_DUR:
target_duration = MAX_DUR
# Adjust video to target_duration
if target_duration > video.duration:
video = vfx_loop(video, duration=target_duration)
elif target_duration < video.duration:
video = video.subclip(0, target_duration)
except Exception as e:
print(f"⚠️ Audio handling error: {e}")
audio_clip = None
# Fall back to no-audio behavior below
if audio_clip is None:
# No audio path: clamp to reasonable length
MAX_NO_AUDIO = 15.0
if orig_duration > MAX_NO_AUDIO:
target_duration = MAX_NO_AUDIO
video = video.subclip(0, target_duration)
else:
target_duration = orig_duration
# At this point, video.duration ≈ target_duration
w, h = video.size
# ---------------------------
# 4. Create styled text overlay
# ---------------------------
def make_text_frame(t):
img = Image.new("RGBA", (w, h), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
font_size = int(h * 0.025)
try:
font = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size
)
except Exception:
font = ImageFont.load_default()
max_width = int(w * 0.6)
# Wrap quote text
words = quote_text.split()
lines = []
current_line = []
for word in words:
test_line = " ".join(current_line + [word])
bbox = draw.textbbox((0, 0), test_line, font=font)
text_width = bbox[2] - bbox[0]
if text_width <= max_width:
current_line.append(word)
else:
if current_line:
lines.append(" ".join(current_line))
current_line = [word]
else:
lines.append(word)
if current_line:
lines.append(" ".join(current_line))
line_spacing = int(font_size * 0.4)
text_block_height = len(lines) * (font_size + line_spacing)
# Positioning based on text_style
style = (text_style or "classic_center").lower().strip()
if style == "lower_third_serif":
# Lower third of the frame
y_start = int(h * 0.60) - text_block_height // 2
elif style == "typewriter_top":
# Closer to the top
y_start = int(h * 0.20)
else:
# classic_center
y_start = (h - text_block_height) // 2
y = y_start
for line in lines:
bbox = draw.textbbox((0, 0), line, font=font)
text_width = bbox[2] - bbox[0]
x = (w - text_width) // 2
outline_width = max(2, int(font_size * 0.08))
for adj_x in range(-outline_width, outline_width + 1):
for adj_y in range(-outline_width, outline_width + 1):
draw.text((x + adj_x, y + adj_y), line, font=font, fill="black")
draw.text((x, y), line, font=font, fill="white")
y += font_size + line_spacing
return np.array(img)
text_clip = ImageClip(make_text_frame(0), duration=video.duration)
# ---------------------------
# 5. Composite video + text
# ---------------------------
final_video = CompositeVideoClip([video, text_clip])
# Attach audio if available (no extra duration forcing)
if audio_clip is not None:
try:
final_video = final_video.set_audio(audio_clip)
except Exception as e:
print(f"⚠️ Could not attach audio: {e}")
# ---------------------------
# 6. Export final video
# ---------------------------
output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
output_path = output_file.name
output_file.close()
final_video.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
fps=10,
preset="ultrafast",
threads=2,
verbose=False,
logger=None,
bitrate="400k",
ffmpeg_params=["-crf", "30", "-g", "30"],
)
# Read final bytes
with open(output_path, "rb") as f:
video_bytes = f.read()
# ---------------------------
# 7. Cleanup
# ---------------------------
video.close()
final_video.close()
try:
os.unlink(temp_video.name)
except Exception:
pass
if audio_clip is not None:
try:
audio_clip.close()
except Exception:
pass
if temp_audio_path and os.path.exists(temp_audio_path):
try:
os.unlink(temp_audio_path)
except Exception:
pass
try:
os.unlink(output_path)
except Exception:
pass
total_time = time.time() - start_time
print(
f"🎉 Total: {total_time:.1f}s, Size: {len(video_bytes) / 1024 / 1024:.2f}MB, "
f"text_style={text_style}, target_duration≈{target_duration:.1f}s"
)
return video_bytes
@app.function(image=image)
@modal.web_endpoint(method="POST")
def process_video_endpoint(data: dict):
"""
Single-video HTTP endpoint.
Expected JSON:
{
"video_url": "...",
"quote_text": "...",
"audio_b64": "....", # optional
"text_style": "classic_center" | "lower_third_serif" | "typewriter_top" # optional
}
"""
video_url = data.get("video_url")
quote_text = data.get("quote_text")
audio_b64 = data.get("audio_b64")
text_style = data.get("text_style", "classic_center")
if not video_url or not quote_text:
return {"error": "Missing video_url or quote_text"}, 400
try:
video_bytes = process_quote_video.remote(
video_url=video_url,
quote_text=quote_text,
audio_b64=audio_b64,
text_style=text_style,
)
import base64
video_b64 = base64.b64encode(video_bytes).decode()
return {
"success": True,
"video": video_b64,
"size_mb": len(video_bytes) / 1024 / 1024,
}
except Exception as e:
return {"error": str(e)}, 500
@app.function(image=image)
@modal.web_endpoint(method="POST")
def process_batch_endpoint(data: dict):
"""
Batch endpoint - process multiple videos in PARALLEL.
Expected JSON:
{
"videos": [
{
"video_url": "...",
"quote_text": "...",
"audio_b64": "...", # optional
"text_style": "..." # optional
},
...
]
}
"""
videos_data = data.get("videos", [])
if not videos_data:
return {"error": "Missing videos array"}, 400
try:
# Prepare arguments
video_urls = [v.get("video_url") for v in videos_data]
quote_texts = [v.get("quote_text") for v in videos_data]
audio_list = [v.get("audio_b64") for v in videos_data]
styles = [v.get("text_style", "classic_center") for v in videos_data]
# Basic validation
for i, (vu, qt) in enumerate(zip(video_urls, quote_texts)):
if not vu or not qt:
return {"error": f"Missing video_url or quote_text at index {i}"}, 400
# Process all videos in parallel using map
results = list(
process_quote_video.map(
video_urls,
quote_texts,
audio_list,
styles,
)
)
import base64
encoded_results = []
for video_bytes in results:
video_b64 = base64.b64encode(video_bytes).decode()
encoded_results.append(
{
"success": True,
"video": video_b64,
"size_mb": len(video_bytes) / 1024 / 1024,
}
)
return {
"success": True,
"videos": encoded_results,
"count": len(encoded_results),
}
except Exception as e:
return {"error": str(e)}, 500