PaperShow / Paper2Video /src /pipeline.py
ZaynZhu
Clean version without large assets
7c08dc3
'''
1. (LLM) slide generation
2. (VLM) subtitle and cursor prompt generation
3. TTS->audio; GUI&WhisperX Grounding->cursor;
4. Talking Gen: local-[hallo2, fantasy, ...], api-[HeyGen]
5. Merage
'''
import cv2
import pdb
import json
import time
import shutil
import asyncio
import os, sys
import argparse
import subprocess
from os import path
from pdf2image import convert_from_path
print("Initializing...")
from speech_gen import tts_per_slide
from subtitle_render import add_subtitles
from talking_gen import talking_gen_per_slide
from cursor_gen import cursor_gen_per_sentence
from slide_code_gen import latex_code_gen
# from slide_code_gen_select_improvement import latex_code_gen_upgrade
from cursor_render import render_video_with_cursor_from_json
from subtitle_cursor_prompt_gen import subtitle_cursor_gen
from wei_utils import get_agent_config
# os.environ["GEMINI_API_KEY"] = ""
# os.environ["OPENAI_API_KEY"] = ""
def copy_folder(src_dir, dst_dir):
if not os.path.exists(src_dir): raise FileNotFoundError(f"no such dir: {src_dir}")
os.makedirs(os.path.dirname(dst_dir), exist_ok=True)
shutil.copytree(src_dir, dst_dir)
def str2list(s): return [int(x) for x in s.split(',')]
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Paper2Video Generation Pipeline')
parser.add_argument('--result_dir', type=str, default='./result/zeyu')
parser.add_argument('--model_name_t', type=str, default='gpt-4.1')
parser.add_argument('--model_name_v', type=str, default='gpt-4.1')
parser.add_argument('--model_name_talking', type=str, default='hallo2')
parser.add_argument('--paper_latex_root', type=str, default='./assets/demo/latex_proj')
parser.add_argument('--ref_img', type=str, default='./assets/demo/zeyu.png')
parser.add_argument('--ref_audio', type=str, default='./assets/demo/zeyu.wav')
parser.add_argument('--ref_text', type=str, default=None)
parser.add_argument('--gpu_list', type=str2list, default="")
parser.add_argument('--if_tree_search', type=bool, default=True)
parser.add_argument('--beamer_templete_prompt', type=str, default=None)
parser.add_argument('--stage', type=str, default="[\"0\"]")
parser.add_argument('--talking_head_env', type=str, default="")
# slide+subtitle: 1;
# tts+cusor: 2;
# talking-head: 3:
# all: 0
args = parser.parse_args()
stage = json.loads(args.stage)
print("start", "stage:", stage, args.gpu_list)
cursor_img_path = "./cursor_image/red.png"
os.makedirs(args.result_dir, exist_ok=True) # result dir
agent_config_t = get_agent_config(args.model_name_t) # LLM
agent_config_v = get_agent_config(args.model_name_v) # VLM
copy_latex_proj_path = path.join(args.result_dir, path.basename(args.paper_latex_root))
if path.exists(copy_latex_proj_path) is False:
copy_folder(args.paper_latex_root, copy_latex_proj_path)
args.paper_latex_root = copy_latex_proj_path
if path.exists(path.join(args.result_dir, "sat.json")) is True:
with open(path.join(args.result_dir, "sat.json"), 'r') as f:
time_second = json.load(f)
else: time_second = {}
if path.exists(path.join(args.result_dir, "token.json")) is True:
with open(path.join(args.result_dir, "token.json"), 'r') as f:
token_usage = json.load(f)
else: token_usage = {}
## Step 1: Slide Generation
slide_latex_path = path.join(args.paper_latex_root, "slides.tex")
slide_image_dir = path.join(args.result_dir, 'slide_imgs')
os.makedirs(slide_image_dir, exist_ok=True)
start_time = time.time() # start time
if "1" in stage or "0" in stage:
prompt_path = "./prompts/slide_beamer_prompt.txt"
if args.if_tree_search is True:
usage_slide, beamer_path = latex_code_gen(prompt_path=prompt_path, tex_dir=args.paper_latex_root, beamer_save_path=slide_latex_path,
model_config_ll=agent_config_t, model_config_vl=agent_config_v, beamer_temp_name=args.beamer_templete_prompt)
else:
paper_latex_path = path.join(args.paper_latex_root, "main.tex")
usage_slide = latex_code_gen(prompt_path=prompt_path, tex_dir=args.paper_latex_root, tex_path=paper_latex_path, beamer_save_path=slide_latex_path, model_config=agent_config_t)
slide_imgs = convert_from_path(beamer_path, dpi=400)
for i, img in enumerate(slide_imgs): img.save(path.join(slide_image_dir, f"{i+1}.png")) # save slides as images
if args.model_name_t not in token_usage.keys():
token_usage[args.model_name_t] = [usage_slide]
else: token_usage[args.model_name_t].append(usage_slide)
step1_time = time.time()
time_second["slide_gen"] = [step1_time-start_time]
print("Slide Generation", step1_time-start_time)
## Step 2: Subtitle and Cursor Prompt Generation
start_time = time.time() # start time
subtitle_cursor_save_path = path.join(args.result_dir, 'subtitle_w_cursor.txt')
cursor_save_path = path.join(args.result_dir, 'cursor.json')
speech_save_dir = path.join(args.result_dir, 'audio')
if "2" in stage or "0" in stage:
prompt_path = "./prompts/slide_subtitle_cursor_prompt.txt"
subtitle, usage_subtitle = subtitle_cursor_gen(slide_image_dir, prompt_path, agent_config_v)
with open(subtitle_cursor_save_path, 'w') as f: f.write(subtitle)
if args.model_name_v not in token_usage.keys():
token_usage[args.model_name_v] = [usage_subtitle]
else: token_usage[args.model_name_v].append(usage_subtitle)
step2_time = time.time()
time_second["subtitle_cursor_prompt_gen"] = [step2_time-start_time]
print("Subtitle and Cursor Prompt Generation", step2_time-start_time)
## Step 3-1: Speech Generation
tts_per_slide(model_type='f5', script_path=subtitle_cursor_save_path,
speech_save_dir=speech_save_dir, ref_audio=args.ref_audio, ref_text=args.ref_text)
step3_1_time = time.time()
time_second["tts"] = [step3_1_time-step2_time]
print("Speech Generation", step3_1_time-step2_time)
## Step 3-2: Cursor Generation
os.environ["PYTHONHASHSEED"] = "random"
cursor_token = cursor_gen_per_sentence(script_path=subtitle_cursor_save_path, slide_img_dir=slide_image_dir,
slide_audio_dir=speech_save_dir, cursor_save_path=cursor_save_path, gpu_list=args.gpu_list)
token_usage["cursor"] = cursor_token
step3_2_time = time.time()
time_second["cursor_gen"] = [step3_2_time-step3_1_time]
print("Cursor Generation", step3_2_time-step3_1_time)
## Step 4: Talking Video Generation
start_time = time.time() # start time
if "3" in stage or "0" in stage:
talking_save_dir = path.join(args.result_dir, 'talking_{}'.format(args.model_name_talking))
talking_inference_input = []
audio_path_list = [path.join(speech_save_dir, name) for name in os.listdir(speech_save_dir)]
for audio_path in audio_path_list: talking_inference_input.append([args.ref_img, audio_path])
talking_gen_per_slide(args.model_name_talking, talking_inference_input, talking_save_dir, args.gpu_list, env_path=args.talking_head_env)
step4_time = time.time()
time_second["talking_gen"] = [step4_time-start_time]
print("Cursor Generation", step4_time-start_time)
## Step5: Merage
# merage talking and slides
tmp_merage_dir = path.join(args.result_dir, "merage")
tmp_merage_1 = path.join(args.result_dir, "1_merage.mp4")
image_size = cv2.imread(path.join(slide_image_dir, '1.png')).shape
if args.model_name_talking == 'hallo2':
size = max(image_size[0]//6, image_size[1]//6)
width, height = size, size
num_slide = len(os.listdir(slide_image_dir))
print(args.ref_img.split("/")[-1].split(".")[0])
merage_cmd = ["./1_merage.bash", slide_image_dir, talking_save_dir, tmp_merage_dir,
str(width), str(height), str(num_slide), tmp_merage_1, args.ref_img.split("/")[-1].replace(".png", "")]
out = subprocess.run(merage_cmd, text=True)
# render cursor
cursor_size = size//6
tmp_merage_2 = path.join(args.result_dir, "2_merage.mp4")
render_video_with_cursor_from_json(video_path=tmp_merage_1, out_video_path=tmp_merage_2,
json_path=cursor_save_path, cursor_img_path=cursor_img_path,
transition_duration=0.1, cursor_size=cursor_size)
# render subtitle
front_size = size//10
tmp_merage_3 = path.join(args.result_dir, "3_merage.mp4")
add_subtitles(tmp_merage_2, tmp_merage_3, size//10)
step5_time = time.time()
time_second["merage"] = [step5_time-step4_time]
print("Merage", step5_time-step4_time)
# sat. save
time_second = {"slide_gen": [step1_time-start_time, usage_slide],
"subtitle_cursor_prompt_gen": [step2_time-step1_time, usage_subtitle],
"tts": step3_1_time-step2_time, "cursor_gen": step3_2_time-step3_1_time,
"talking_gen": step4_time-step3_2_time, "merage": step5_time-step4_time}
with open(path.join(args.result_dir, "sat.json"), 'w') as f: json.dump(time_second, f, indent=4)
with open(path.join(args.result_dir, "token.json"), 'w') as f: json.dump(token_usage, f, indent=4)