File size: 7,182 Bytes
f89e77a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
import math
import subprocess
import os
import concurrent.futures
import shutil
import tempfile
import argparse
import cv2
from dotenv import load_dotenv
import gradio as gr
import ffmpeg
import time
FRAMES_FOLDER = "temp_frames"
OUTPUT_FRAMES_FOLDER = "temp_output_frames"
load_dotenv()
rife_ncnn_vulkan_path = os.getenv("RIFE_NCNN_VULKAN_PATH")
parser = argparse.ArgumentParser(description="Launch the smooth-frames-web-ui on Gradio Interface")
parser.add_argument("--share", action="store_true", help="Enable sharing the app")
args = parser.parse_args()
codec_mapping = {
"OpenH264": "openh264",
"VP9": "vp09"
}
def get_fps(video_path):
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
return fps
def smart_roundup(number, base=10):
"""If the first digit of number is non-zero, carry forward to base."""
return number if number % base == 0 else math.ceil(number / base) * base
def interpolate_frames(prev_frame, next_frame, interp_factor):
interpolated_frames = []
for i in range(1, interp_factor):
alpha = i / interp_factor
beta = 1.0 - alpha
interpolated_frame = cv2.addWeighted(prev_frame, beta, next_frame, alpha, 0)
interpolated_frames.append(interpolated_frame)
return interpolated_frames
def generate_intermediate_frame(img_path1, img_path2, output_path):
cmd = [rife_ncnn_vulkan_path, "-0", img_path1, "-1", img_path2, "-o", output_path]
subprocess.run(cmd)
return output_path
def generate_intermediate_frames_with_dir(input_dir, output_path):
cmd = [rife_ncnn_vulkan_path, "-v", "-i", input_dir, "-o", output_path]
subprocess.run(cmd)
return output_path
def extract_frames_from_video(video_path, output_folder, target_fps=60/2):
orig_fps = smart_roundup(get_fps(video_path))
interp_factor = math.ceil(target_fps / orig_fps) # Rounding up to ensure >= target_fps
cap = cv2.VideoCapture(video_path)
frames = []
count = 0
ret, prev_frame = cap.read()
if not ret:
cap.release()
return frames
prev_frame_path = os.path.join(output_folder, f"frame_{count:07}.png")
cv2.imwrite(prev_frame_path, prev_frame)
frames.append(prev_frame_path)
count += 1
futures = []
with concurrent.futures.ThreadPoolExecutor() as executor:
while True:
ret, next_frame = cap.read()
if not ret:
break
if orig_fps < target_fps:
for i in range(1, interp_factor):
count += 2
next_frame_path = os.path.join(output_folder, f"frame_{count+1:07}.png")
cv2.imwrite(next_frame_path, next_frame)
frames.append(next_frame_path)
output_frame_path = os.path.join(output_folder, f"frame_{count:07}.png")
future = executor.submit(
generate_intermediate_frame,
prev_frame_path,
next_frame_path,
output_frame_path,
)
futures.append(future)
frames.append(output_frame_path)
prev_frame_path = next_frame_path # Set the path of the current frame for the next iteration
next_frame_path = os.path.join(output_folder, f"frame_{count:07}.png")
else:
count += 1
next_frame_path = os.path.join(output_folder, f"frame_{count:07}.png")
cv2.imwrite(next_frame_path, next_frame)
frames.append(next_frame_path)
concurrent.futures.wait(futures)
cap.release()
return frames
def generate_video_from_images(image_dir, codec):
if codec == "openh264":
vcodec = 'libopenh264'
output_video_name = "output.mp4"
pix_fmt = 'yuv420p'
elif codec == "vp09":
vcodec = 'libvpx-vp9'
output_video_name = "output.webm"
pix_fmt = 'yuva420p'
else:
raise ValueError("Invalid codec specified")
# Ensure the directory is valid
if not os.path.exists(image_dir):
raise ValueError("The image directory does not exist")
image_pattern = os.path.join(image_dir, '%08d.png')
# Obtain the dimensions of the first image
probe = ffmpeg.probe(image_pattern % 1)
video_info = next(stream for stream in probe['streams'] if stream['codec_type'] == 'video')
width = int(video_info['width'])
height = int(video_info['height'])
try:
(
ffmpeg
.input(image_pattern, framerate=60)
.output(output_video_name, vcodec=vcodec, pix_fmt=pix_fmt, s=f'{width}x{height}', video_bitrate='5000k')
.global_args("-y")
.run()
)
except ffmpeg._run.Error as e:
print(f'ffmpeg stderr:\n{e.stderr.decode()}')
raise e
return output_video_name
def is_valid_frame_filename(filename):
if not filename.endswith('.png'):
return False
return True
def interpolate_and_create_video(video_path, codec):
extract_frames_from_video(video_path, FRAMES_FOLDER)
generate_intermediate_frames_with_dir(FRAMES_FOLDER, OUTPUT_FRAMES_FOLDER)
return generate_video_from_images(OUTPUT_FRAMES_FOLDER, codec)
def delete_all_files_in_dir(dir_path):
if not os.path.exists(dir_path):
print(f"The path {dir_path} does not exist.")
return
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
if os.path.isfile(file_path):
try:
os.remove(file_path)
except OSError as e:
print(f"Error: {file_path} : {e.strerror}")
def process_video(input_video, codec_choice, output_video):
start_time = time.time()
if not os.path.exists(rife_ncnn_vulkan_path):
raise FileNotFoundError(f"RIFE ncnn Vulkan at path {rife_ncnn_vulkan_path} does not exist. Check your configuration.")
if not os.path.exists(FRAMES_FOLDER):
os.makedirs(FRAMES_FOLDER)
if not os.path.exists(OUTPUT_FRAMES_FOLDER):
os.makedirs(OUTPUT_FRAMES_FOLDER)
codec = codec_mapping.get(codec_choice, "vp9")
temp_dir = tempfile.mkdtemp()
_, file_extension = os.path.splitext(input_video.name)
temp_file_path = os.path.join(temp_dir, "input_video" + file_extension)
shutil.copy2(input_video.name, temp_file_path)
output_video = interpolate_and_create_video(temp_file_path, codec)
# clean
delete_all_files_in_dir('temp_frames')
delete_all_files_in_dir('temp_output_frames')
shutil.rmtree(temp_dir)
end_time = time.time()
print(f"Execution time: {end_time - start_time} second")
return output_video
# gradio
input_video = gr.File(label="Upload a video")
output_video = gr.Video(label="Processed video")
codec_choice = gr.Dropdown(choices=["OpenH264", "VP9"], value="OpenH264", label="Select a Codec")
gr.Interface(fn=process_video, inputs=[input_video, codec_choice], outputs=output_video).launch(share=args.share) |