|
|
|
|
|
|
|
|
|
|
|
import modal |
|
|
import os |
|
|
|
|
|
|
|
|
app = modal.App("aiquoteclipgenerator") |
|
|
|
|
|
|
|
|
image = modal.Image.debian_slim(python_version="3.11").pip_install( |
|
|
"moviepy==1.0.3", |
|
|
"pillow", |
|
|
"numpy", |
|
|
"imageio==2.31.1", |
|
|
"imageio-ffmpeg", |
|
|
"requests", |
|
|
"fastapi" |
|
|
) |
|
|
|
|
|
@app.function( |
|
|
image=image, |
|
|
cpu=4, |
|
|
memory=4096, |
|
|
timeout=300, |
|
|
) |
|
|
def process_quote_video(video_url: str, quote_text: str, audio_url: str = None) -> bytes: |
|
|
""" |
|
|
Process quote video on Modal's fast infrastructure. |
|
|
Downloads video, adds text overlay, optionally adds audio, returns video bytes. |
|
|
|
|
|
Args: |
|
|
video_url: URL of background video |
|
|
quote_text: Quote to overlay |
|
|
audio_url: Optional URL of audio file |
|
|
|
|
|
Returns: |
|
|
bytes: Processed video file as bytes |
|
|
""" |
|
|
import tempfile |
|
|
import requests |
|
|
from moviepy.editor import VideoFileClip, ImageClip, CompositeVideoClip, AudioFileClip |
|
|
from PIL import Image, ImageDraw, ImageFont |
|
|
import numpy as np |
|
|
|
|
|
print(f"π¬ Starting video processing on Modal...") |
|
|
print(f" Video: {video_url[:50]}...") |
|
|
print(f" Quote length: {len(quote_text)} chars") |
|
|
|
|
|
|
|
|
print("π₯ Downloading video...") |
|
|
response = requests.get(video_url, stream=True, timeout=30) |
|
|
response.raise_for_status() |
|
|
|
|
|
temp_video = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') |
|
|
with open(temp_video.name, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
|
|
|
print("β
Video downloaded") |
|
|
|
|
|
|
|
|
print("π₯ Loading video...") |
|
|
video = VideoFileClip(temp_video.name) |
|
|
w, h = video.size |
|
|
print(f" Dimensions: {w}x{h}") |
|
|
|
|
|
|
|
|
print("βοΈ Creating text overlay...") |
|
|
def make_text_frame(t): |
|
|
img = Image.new('RGBA', (w, h), (0, 0, 0, 0)) |
|
|
draw = ImageDraw.Draw(img) |
|
|
|
|
|
font_size = int(h * 0.025) |
|
|
|
|
|
try: |
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) |
|
|
except: |
|
|
try: |
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", font_size) |
|
|
except: |
|
|
font = ImageFont.load_default() |
|
|
|
|
|
max_width = int(w * 0.6) |
|
|
|
|
|
|
|
|
words = quote_text.split() |
|
|
lines = [] |
|
|
current_line = [] |
|
|
|
|
|
for word in words: |
|
|
test_line = ' '.join(current_line + [word]) |
|
|
bbox = draw.textbbox((0, 0), test_line, font=font) |
|
|
text_width = bbox[2] - bbox[0] |
|
|
|
|
|
if text_width <= max_width: |
|
|
current_line.append(word) |
|
|
else: |
|
|
if current_line: |
|
|
lines.append(' '.join(current_line)) |
|
|
current_line = [word] |
|
|
else: |
|
|
lines.append(word) |
|
|
|
|
|
if current_line: |
|
|
lines.append(' '.join(current_line)) |
|
|
|
|
|
line_spacing = int(font_size * 0.4) |
|
|
text_block_height = len(lines) * (font_size + line_spacing) |
|
|
y = (h - text_block_height) // 2 |
|
|
|
|
|
for line in lines: |
|
|
bbox = draw.textbbox((0, 0), line, font=font) |
|
|
text_width = bbox[2] - bbox[0] |
|
|
x = (w - text_width) // 2 |
|
|
|
|
|
outline_width = max(2, int(font_size * 0.08)) |
|
|
for adj_x in range(-outline_width, outline_width + 1): |
|
|
for adj_y in range(-outline_width, outline_width + 1): |
|
|
draw.text((x + adj_x, y + adj_y), line, font=font, fill='black') |
|
|
|
|
|
draw.text((x, y), line, font=font, fill='white') |
|
|
y += font_size + line_spacing |
|
|
|
|
|
return np.array(img) |
|
|
|
|
|
text_clip = ImageClip(make_text_frame(0), duration=video.duration) |
|
|
print("β
Text overlay created") |
|
|
|
|
|
|
|
|
print("π¨ Compositing video...") |
|
|
final_video = CompositeVideoClip([video, text_clip]) |
|
|
|
|
|
|
|
|
if audio_url: |
|
|
print("π€ Adding voice narration...") |
|
|
try: |
|
|
audio_response = requests.get(audio_url, timeout=30) |
|
|
audio_response.raise_for_status() |
|
|
|
|
|
temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') |
|
|
with open(temp_audio.name, 'wb') as f: |
|
|
f.write(audio_response.content) |
|
|
|
|
|
audio_clip = AudioFileClip(temp_audio.name) |
|
|
audio_duration = min(audio_clip.duration, final_video.duration) |
|
|
audio_clip = audio_clip.subclip(0, audio_duration) |
|
|
final_video = final_video.set_audio(audio_clip) |
|
|
print("β
Audio added") |
|
|
|
|
|
os.unlink(temp_audio.name) |
|
|
except Exception as e: |
|
|
print(f"β οΈ Audio failed: {e}") |
|
|
|
|
|
|
|
|
print("π¦ Exporting video...") |
|
|
output_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') |
|
|
|
|
|
final_video.write_videofile( |
|
|
output_file.name, |
|
|
codec='libx264', |
|
|
audio_codec='aac', |
|
|
fps=24, |
|
|
preset='ultrafast', |
|
|
threads=4, |
|
|
verbose=False, |
|
|
logger=None |
|
|
) |
|
|
|
|
|
print("β
Video exported") |
|
|
|
|
|
|
|
|
with open(output_file.name, 'rb') as f: |
|
|
video_bytes = f.read() |
|
|
|
|
|
|
|
|
video.close() |
|
|
final_video.close() |
|
|
os.unlink(temp_video.name) |
|
|
os.unlink(output_file.name) |
|
|
|
|
|
print(f"π Processing complete! Video size: {len(video_bytes) / 1024 / 1024:.2f}MB") |
|
|
|
|
|
return video_bytes |
|
|
|
|
|
|
|
|
|
|
|
@app.function(image=image) |
|
|
@modal.web_endpoint(method="POST") |
|
|
def process_video_endpoint(data: dict): |
|
|
""" |
|
|
Web endpoint to process videos. |
|
|
Accepts JSON with video_url, quote_text, and optional audio_url. |
|
|
""" |
|
|
video_url = data.get("video_url") |
|
|
quote_text = data.get("quote_text") |
|
|
audio_url = data.get("audio_url") |
|
|
|
|
|
if not video_url or not quote_text: |
|
|
return {"error": "Missing video_url or quote_text"}, 400 |
|
|
|
|
|
try: |
|
|
video_bytes = process_quote_video.remote(video_url, quote_text, audio_url) |
|
|
|
|
|
|
|
|
import base64 |
|
|
video_b64 = base64.b64encode(video_bytes).decode() |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"video": video_b64, |
|
|
"size_mb": len(video_bytes) / 1024 / 1024 |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"error": str(e)}, 500 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
with app.run(): |
|
|
result = process_quote_video.remote( |
|
|
video_url="https://videos.pexels.com/video-files/3843433/3843433-uhd_2732_1440_25fps.mp4", |
|
|
quote_text="Test quote for local testing", |
|
|
audio_url=None |
|
|
) |
|
|
print(f"Got video: {len(result)} bytes") |