File size: 9,687 Bytes
7c08dc3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
'''
1. (LLM) slide generation
2. (VLM) subtitle and cursor prompt generation
3. TTS->audio; GUI&WhisperX Grounding->cursor;
4. Talking Gen: local-[hallo2, fantasy, ...], api-[HeyGen]
5. Merage
'''
import cv2
import pdb
import json
import time
import shutil
import asyncio
import os, sys
import argparse
import subprocess
from os import path
from pdf2image import convert_from_path
print("Initializing...")
from speech_gen import tts_per_slide
from subtitle_render import add_subtitles
from talking_gen import talking_gen_per_slide
from cursor_gen import cursor_gen_per_sentence
from slide_code_gen import latex_code_gen
# from slide_code_gen_select_improvement import latex_code_gen_upgrade
from cursor_render import render_video_with_cursor_from_json
from subtitle_cursor_prompt_gen import subtitle_cursor_gen
from wei_utils import get_agent_config
# os.environ["GEMINI_API_KEY"] = ""
# os.environ["OPENAI_API_KEY"] = ""
def copy_folder(src_dir, dst_dir):
if not os.path.exists(src_dir): raise FileNotFoundError(f"no such dir: {src_dir}")
os.makedirs(os.path.dirname(dst_dir), exist_ok=True)
shutil.copytree(src_dir, dst_dir)
def str2list(s): return [int(x) for x in s.split(',')]
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Paper2Video Generation Pipeline')
parser.add_argument('--result_dir', type=str, default='./result/zeyu')
parser.add_argument('--model_name_t', type=str, default='gpt-4.1')
parser.add_argument('--model_name_v', type=str, default='gpt-4.1')
parser.add_argument('--model_name_talking', type=str, default='hallo2')
parser.add_argument('--paper_latex_root', type=str, default='./assets/demo/latex_proj')
parser.add_argument('--ref_img', type=str, default='./assets/demo/zeyu.png')
parser.add_argument('--ref_audio', type=str, default='./assets/demo/zeyu.wav')
parser.add_argument('--ref_text', type=str, default=None)
parser.add_argument('--gpu_list', type=str2list, default="")
parser.add_argument('--if_tree_search', type=bool, default=True)
parser.add_argument('--beamer_templete_prompt', type=str, default=None)
parser.add_argument('--stage', type=str, default="[\"0\"]")
parser.add_argument('--talking_head_env', type=str, default="")
# slide+subtitle: 1;
# tts+cusor: 2;
# talking-head: 3:
# all: 0
args = parser.parse_args()
stage = json.loads(args.stage)
print("start", "stage:", stage, args.gpu_list)
cursor_img_path = "./cursor_image/red.png"
os.makedirs(args.result_dir, exist_ok=True) # result dir
agent_config_t = get_agent_config(args.model_name_t) # LLM
agent_config_v = get_agent_config(args.model_name_v) # VLM
copy_latex_proj_path = path.join(args.result_dir, path.basename(args.paper_latex_root))
if path.exists(copy_latex_proj_path) is False:
copy_folder(args.paper_latex_root, copy_latex_proj_path)
args.paper_latex_root = copy_latex_proj_path
if path.exists(path.join(args.result_dir, "sat.json")) is True:
with open(path.join(args.result_dir, "sat.json"), 'r') as f:
time_second = json.load(f)
else: time_second = {}
if path.exists(path.join(args.result_dir, "token.json")) is True:
with open(path.join(args.result_dir, "token.json"), 'r') as f:
token_usage = json.load(f)
else: token_usage = {}
## Step 1: Slide Generation
slide_latex_path = path.join(args.paper_latex_root, "slides.tex")
slide_image_dir = path.join(args.result_dir, 'slide_imgs')
os.makedirs(slide_image_dir, exist_ok=True)
start_time = time.time() # start time
if "1" in stage or "0" in stage:
prompt_path = "./prompts/slide_beamer_prompt.txt"
if args.if_tree_search is True:
usage_slide, beamer_path = latex_code_gen(prompt_path=prompt_path, tex_dir=args.paper_latex_root, beamer_save_path=slide_latex_path,
model_config_ll=agent_config_t, model_config_vl=agent_config_v, beamer_temp_name=args.beamer_templete_prompt)
else:
paper_latex_path = path.join(args.paper_latex_root, "main.tex")
usage_slide = latex_code_gen(prompt_path=prompt_path, tex_dir=args.paper_latex_root, tex_path=paper_latex_path, beamer_save_path=slide_latex_path, model_config=agent_config_t)
slide_imgs = convert_from_path(beamer_path, dpi=400)
for i, img in enumerate(slide_imgs): img.save(path.join(slide_image_dir, f"{i+1}.png")) # save slides as images
if args.model_name_t not in token_usage.keys():
token_usage[args.model_name_t] = [usage_slide]
else: token_usage[args.model_name_t].append(usage_slide)
step1_time = time.time()
time_second["slide_gen"] = [step1_time-start_time]
print("Slide Generation", step1_time-start_time)
## Step 2: Subtitle and Cursor Prompt Generation
start_time = time.time() # start time
subtitle_cursor_save_path = path.join(args.result_dir, 'subtitle_w_cursor.txt')
cursor_save_path = path.join(args.result_dir, 'cursor.json')
speech_save_dir = path.join(args.result_dir, 'audio')
if "2" in stage or "0" in stage:
prompt_path = "./prompts/slide_subtitle_cursor_prompt.txt"
subtitle, usage_subtitle = subtitle_cursor_gen(slide_image_dir, prompt_path, agent_config_v)
with open(subtitle_cursor_save_path, 'w') as f: f.write(subtitle)
if args.model_name_v not in token_usage.keys():
token_usage[args.model_name_v] = [usage_subtitle]
else: token_usage[args.model_name_v].append(usage_subtitle)
step2_time = time.time()
time_second["subtitle_cursor_prompt_gen"] = [step2_time-start_time]
print("Subtitle and Cursor Prompt Generation", step2_time-start_time)
## Step 3-1: Speech Generation
tts_per_slide(model_type='f5', script_path=subtitle_cursor_save_path,
speech_save_dir=speech_save_dir, ref_audio=args.ref_audio, ref_text=args.ref_text)
step3_1_time = time.time()
time_second["tts"] = [step3_1_time-step2_time]
print("Speech Generation", step3_1_time-step2_time)
## Step 3-2: Cursor Generation
os.environ["PYTHONHASHSEED"] = "random"
cursor_token = cursor_gen_per_sentence(script_path=subtitle_cursor_save_path, slide_img_dir=slide_image_dir,
slide_audio_dir=speech_save_dir, cursor_save_path=cursor_save_path, gpu_list=args.gpu_list)
token_usage["cursor"] = cursor_token
step3_2_time = time.time()
time_second["cursor_gen"] = [step3_2_time-step3_1_time]
print("Cursor Generation", step3_2_time-step3_1_time)
## Step 4: Talking Video Generation
start_time = time.time() # start time
if "3" in stage or "0" in stage:
talking_save_dir = path.join(args.result_dir, 'talking_{}'.format(args.model_name_talking))
talking_inference_input = []
audio_path_list = [path.join(speech_save_dir, name) for name in os.listdir(speech_save_dir)]
for audio_path in audio_path_list: talking_inference_input.append([args.ref_img, audio_path])
talking_gen_per_slide(args.model_name_talking, talking_inference_input, talking_save_dir, args.gpu_list, env_path=args.talking_head_env)
step4_time = time.time()
time_second["talking_gen"] = [step4_time-start_time]
print("Cursor Generation", step4_time-start_time)
## Step5: Merage
# merage talking and slides
tmp_merage_dir = path.join(args.result_dir, "merage")
tmp_merage_1 = path.join(args.result_dir, "1_merage.mp4")
image_size = cv2.imread(path.join(slide_image_dir, '1.png')).shape
if args.model_name_talking == 'hallo2':
size = max(image_size[0]//6, image_size[1]//6)
width, height = size, size
num_slide = len(os.listdir(slide_image_dir))
print(args.ref_img.split("/")[-1].split(".")[0])
merage_cmd = ["./1_merage.bash", slide_image_dir, talking_save_dir, tmp_merage_dir,
str(width), str(height), str(num_slide), tmp_merage_1, args.ref_img.split("/")[-1].replace(".png", "")]
out = subprocess.run(merage_cmd, text=True)
# render cursor
cursor_size = size//6
tmp_merage_2 = path.join(args.result_dir, "2_merage.mp4")
render_video_with_cursor_from_json(video_path=tmp_merage_1, out_video_path=tmp_merage_2,
json_path=cursor_save_path, cursor_img_path=cursor_img_path,
transition_duration=0.1, cursor_size=cursor_size)
# render subtitle
front_size = size//10
tmp_merage_3 = path.join(args.result_dir, "3_merage.mp4")
add_subtitles(tmp_merage_2, tmp_merage_3, size//10)
step5_time = time.time()
time_second["merage"] = [step5_time-step4_time]
print("Merage", step5_time-step4_time)
# sat. save
time_second = {"slide_gen": [step1_time-start_time, usage_slide],
"subtitle_cursor_prompt_gen": [step2_time-step1_time, usage_subtitle],
"tts": step3_1_time-step2_time, "cursor_gen": step3_2_time-step3_1_time,
"talking_gen": step4_time-step3_2_time, "merage": step5_time-step4_time}
with open(path.join(args.result_dir, "sat.json"), 'w') as f: json.dump(time_second, f, indent=4)
with open(path.join(args.result_dir, "token.json"), 'w') as f: json.dump(token_usage, f, indent=4)
|