import gradio as gr import os import cv2 import numpy as np import shutil import subprocess import time from SinglePhoto import FaceSwapper wellcomingMessage = """

Face Swapping Suite

All-in-one face swapping: single photo, video, multi-source, and multi-destination!

""" swapper = FaceSwapper() def swap_single_photo(src_img, src_idx, dst_img, dst_idx, progress=gr.Progress(track_tqdm=True)): log = "" start_time = time.time() try: progress(0, desc="Preparing files") src_path = "SinglePhoto/data_src.jpg" dst_path = "SinglePhoto/data_dst.jpg" output_path = "SinglePhoto/output_swapped.jpg" os.makedirs(os.path.dirname(src_path), exist_ok=True) os.makedirs(os.path.dirname(dst_path), exist_ok=True) os.makedirs(os.path.dirname(output_path), exist_ok=True) src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) cv2.imwrite(dst_path, dst_img_bgr) log += f"Saved source to {src_path}, destination to {dst_path}\n" progress(0.5, desc="Swapping faces") result = swapper.swap_faces(src_path, int(src_idx), dst_path, int(dst_idx)) cv2.imwrite(output_path, result) log += f"Swapped and saved result to {output_path}\n" progress(0.8, desc="Cleaning up") try: if os.path.exists(src_path): os.remove(src_path) if os.path.exists(dst_path): os.remove(dst_path) log += "Cleaned up temp files.\n" except Exception as cleanup_error: log += f"Cleanup error: {cleanup_error}\n" progress(1, desc="Done") elapsed = time.time() - start_time log += f"Elapsed time: {elapsed:.2f} seconds\n" return output_path, log except Exception as e: log += f"Error: {e}\n" progress(1, desc="Error") elapsed = time.time() - start_time log += f"Elapsed time: {elapsed:.2f} seconds\n" return None, log def swap_video(src_img, src_idx, video, dst_idx, progress=gr.Progress()): log = "" start_time = time.time() src_path = "VideoSwapping/data_src.jpg" dst_video_path = "VideoSwapping/data_dst.mp4" frames_dir = "VideoSwapping/video_frames" swapped_dir = "VideoSwapping/swapped_frames" output_video_path = "VideoSwapping/output_tmp_output_video.mp4" final_output_path = "VideoSwapping/output_with_audio.mp4" os.makedirs(os.path.dirname(src_path), exist_ok=True) os.makedirs(os.path.dirname(dst_video_path), exist_ok=True) os.makedirs(frames_dir, exist_ok=True) os.makedirs(swapped_dir, exist_ok=True) src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) log += f"Saved source image to {src_path}\n" progress(0.05, desc="Saved source image") if isinstance(video, str) and os.path.exists(video): shutil.copy(video, dst_video_path) log += f"Copied video to {dst_video_path}\n" else: dst_video_path = video from VideoSwapping import extract_frames, frames_to_video # Extract frames only if not already present frame_paths = extract_frames(dst_video_path, frames_dir) log += f"Extracted {len(frame_paths)} frames to {frames_dir}\n" progress(0.15, desc="Extracted frames") # Prepare swapped frames list and resume if possible swapped_files = set(os.listdir(swapped_dir)) total_frames = len(frame_paths) start_loop_time = time.time() for idx, frame_path in enumerate(frame_paths): swapped_name = f"swapped_{idx:05d}.jpg" out_path = os.path.join(swapped_dir, swapped_name) if swapped_name in swapped_files and os.path.exists(out_path): log += f"Frame {idx}: already swapped, skipping.\n" elapsed = time.time() - start_loop_time avg_time = elapsed / (idx + 1) if idx + 1 > 0 else 0 remaining = avg_time * (total_frames - (idx + 1)) mins, secs = divmod(int(remaining), 60) progress(0.15 + 0.6 * (idx + 1) / total_frames, desc=f"Swapping {idx+1}/{total_frames} | {mins:02d}:{secs:02d} left") continue try: try: swapped = swapper.swap_faces(src_path, int(src_idx), frame_path, int(dst_idx)) except ValueError as ve: if int(dst_idx) != 1 and "Target image contains" in str(ve): swapped = swapper.swap_faces(src_path, int(src_idx), frame_path, 1) log += f"Frame {idx}: dst_idx {dst_idx} not found, used 1 instead.\n" else: raise ve cv2.imwrite(out_path, swapped) log += f"Swapped frame {idx} and saved to {out_path}\n" except Exception as e: cv2.imwrite(out_path, cv2.imread(frame_path)) log += f"Failed to swap frame {idx}: {e}\n" elapsed = time.time() - start_loop_time avg_time = elapsed / (idx + 1) if idx + 1 > 0 else 0 remaining = avg_time * (total_frames - (idx + 1)) mins, secs = divmod(int(remaining), 60) progress(0.15 + 0.6 * (idx + 1) / total_frames, desc=f"Swapping {idx+1}/{total_frames} | {mins:02d}:{secs:02d} left") cap = cv2.VideoCapture(dst_video_path) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() frames_to_video(swapped_dir, output_video_path, fps) log += f"Combined swapped frames into video {output_video_path}\n" progress(0.8, desc="Muxing audio") # Add audio from original video ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path) if ok: log += f"Added audio to {final_output_path}\n" else: log += f"Audio muxing failed: {audio_log}\n" final_output_path = output_video_path # fallback to video without audio try: if os.path.exists(src_path): os.remove(src_path) if os.path.exists(dst_video_path): os.remove(dst_video_path) if os.path.exists(frames_dir): shutil.rmtree(frames_dir) if os.path.exists(swapped_dir): shutil.rmtree(swapped_dir) log += "Cleaned up temp files and folders.\n" except Exception as cleanup_error: log += f"Cleanup error: {cleanup_error}\n" progress(1, desc="Done") elapsed = time.time() - start_time log += f"Elapsed time: {elapsed:.2f} seconds\n" return final_output_path, log def add_audio_to_video(original_video_path, video_no_audio_path, output_path): """ Uses ffmpeg to mux audio from original_video_path into video_no_audio_path. """ cmd = [ "ffmpeg", "-y", "-i", video_no_audio_path, "-i", original_video_path, "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0?", "-shortest", output_path ] try: subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True, "" except subprocess.CalledProcessError as e: return False, e.stderr.decode() def swap_multi_src_single_dst(src_imgs, dst_img, dst_idx, progress=gr.Progress(track_tqdm=True)): log = "" results = [] src_dir = "MultiSrcSingleDst/src" dst_dir = "MultiSrcSingleDst/dst" output_dir = "MultiSrcSingleDst/output" os.makedirs(src_dir, exist_ok=True) os.makedirs(dst_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) if isinstance(dst_img, tuple): dst_img = dst_img[0] dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR) dst_path = os.path.join(dst_dir, "data_dst.jpg") cv2.imwrite(dst_path, dst_img_bgr) log += f"Saved destination image to {dst_path}\n" progress(0.05, desc="Saved destination image") for i, src_img in enumerate(src_imgs): if isinstance(src_img, tuple): src_img = src_img[0] src_path = os.path.join(src_dir, f"data_src_{i}.jpg") output_path = os.path.join(output_dir, f"output_swapped_{i}.jpg") src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) log += f"Saved source image {i} to {src_path}\n" try: result = swapper.swap_faces(src_path, 1, dst_path, int(dst_idx)) cv2.imwrite(output_path, result) results.append(output_path) log += f"Swapped and saved result to {output_path}\n" except Exception as e: results.append(f"Error: {e}") log += f"Error swapping source {i}: {e}\n" progress((i + 1) / len(src_imgs), desc=f"Swapping source {i+1}/{len(src_imgs)}") progress(1, desc="Done") return results, log def swap_multi_src_multi_dst(src_imgs, dst_imgs, dst_indices, progress=gr.Progress(track_tqdm=True)): log = "" results = [] src_dir = "MultiSrcMultiDst/src" dst_dir = "MultiSrcMultiDst/dst" output_dir = "MultiSrcMultiDst/output" os.makedirs(src_dir, exist_ok=True) os.makedirs(dst_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) if isinstance(dst_indices, str): dst_indices_list = [int(idx.strip()) for idx in dst_indices.split(",") if idx.strip().isdigit()] else: dst_indices_list = [int(idx) for idx in dst_indices] total = max(1, len(src_imgs) * len(dst_imgs)) count = 0 for i, src_img in enumerate(src_imgs): if isinstance(src_img, tuple): src_img = src_img[0] if src_img is None: results.append(f"Error: Source image at index {i} is None") log += f"Source image at index {i} is None\n" continue src_path = os.path.join(src_dir, f"data_src_{i}.jpg") if isinstance(src_img, np.ndarray): src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) log += f"Saved source image {i} to {src_path}\n" elif isinstance(src_img, str) and os.path.exists(src_img): shutil.copy(src_img, src_path) log += f"Copied source image {i} from {src_img} to {src_path}\n" else: results.append(f"Error: Invalid source image at index {i}") log += f"Invalid source image at index {i}\n" continue for j, dst_img in enumerate(dst_imgs): if isinstance(dst_img, tuple): dst_img = dst_img[0] if dst_img is None: results.append(f"Error: Destination image at index {j} is None") log += f"Destination image at index {j} is None\n" continue dst_path = os.path.join(dst_dir, f"data_dst_{j}.jpg") output_path = os.path.join(output_dir, f"output_swapped_{i}_{j}.jpg") if isinstance(dst_img, np.ndarray): dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR) cv2.imwrite(dst_path, dst_img_bgr) log += f"Saved destination image {j} to {dst_path}\n" elif isinstance(dst_img, str) and os.path.exists(dst_img): shutil.copy(dst_img, dst_path) log += f"Copied destination image {j} from {dst_img} to {dst_path}\n" else: results.append(f"Error: Invalid destination image at index {j}") log += f"Invalid destination image at index {j}\n" continue try: dst_idx = dst_indices_list[j] if j < len(dst_indices_list) else 1 result = swapper.swap_faces(src_path, 1, dst_path, int(dst_idx)) cv2.imwrite(output_path, result) results.append(output_path) log += f"Swapped src {i} with dst {j} and saved to {output_path}\n" except Exception as e: results.append(f"Error: {e}") log += f"Error swapping src {i} with dst {j}: {e}\n" count += 1 progress(count / total, desc=f"Swapping ({count}/{total})") progress(1, desc="Done") return results, log def swap_single_src_multi_dst(src_img, dst_imgs, dst_indices, progress=gr.Progress(track_tqdm=True)): log = "" results = [] src_dir = "SingleSrcMultiDst/src" dst_dir = "SingleSrcMultiDst/dst" output_dir = "SingleSrcMultiDst/output" os.makedirs(src_dir, exist_ok=True) os.makedirs(dst_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) if isinstance(src_img, tuple): src_img = src_img[0] src_path = os.path.join(src_dir, "data_src.jpg") src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) log += f"Saved source image to {src_path}\n" progress(0.05, desc="Saved source image") if isinstance(dst_indices, str): dst_indices_list = [int(idx.strip()) for idx in dst_indices.split(",") if idx.strip().isdigit()] else: dst_indices_list = [int(idx) for idx in dst_indices] for j, dst_img in enumerate(dst_imgs): if isinstance(dst_img, tuple): dst_img = dst_img[0] dst_path = os.path.join(dst_dir, f"data_dst_{j}.jpg") output_path = os.path.join(output_dir, f"output_swapped_{j}.jpg") dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR) cv2.imwrite(dst_path, dst_img_bgr) log += f"Saved destination image {j} to {dst_path}\n" try: dst_idx = dst_indices_list[j] if j < len(dst_indices_list) else 1 result = swapper.swap_faces(src_path, 1, dst_path, int(dst_idx)) cv2.imwrite(output_path, result) results.append(output_path) log += f"Swapped and saved result to {output_path}\n" except Exception as e: results.append(f"Error: {e}") log += f"Error swapping with destination {j}: {e}\n" progress((j + 1) / len(dst_imgs), desc=f"Swapping destination {j+1}/{len(dst_imgs)}") progress(1, desc="Done") return results, log def swap_video_all_faces(src_img, video, num_faces_to_swap, progress=gr.Progress()): """ Swaps the single source image to all faces in each frame of the destination video. For each frame, swaps one by one, always using the latest swapped image for the next face. """ log = "" start_time = time.time() src_path = "VideoSwappingAllFaces/data_src.jpg" dst_video_path = "VideoSwappingAllFaces/data_dst.mp4" frames_dir = "VideoSwappingAllFaces/video_frames" swapped_dir = "VideoSwappingAllFaces/swapped_frames" output_video_path = "VideoSwappingAllFaces/output_tmp_output_video.mp4" final_output_path = "VideoSwappingAllFaces/output_with_audio.mp4" os.makedirs(os.path.dirname(src_path), exist_ok=True) os.makedirs(os.path.dirname(dst_video_path), exist_ok=True) os.makedirs(frames_dir, exist_ok=True) os.makedirs(swapped_dir, exist_ok=True) src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) log += f"Saved source image to {src_path}\n" progress(0.05, desc="Saved source image") if isinstance(video, str) and os.path.exists(video): shutil.copy(video, dst_video_path) log += f"Copied video to {dst_video_path}\n" else: dst_video_path = video from VideoSwapping import extract_frames, frames_to_video frame_paths = extract_frames(dst_video_path, frames_dir) log += f"Extracted {len(frame_paths)} frames to {frames_dir}\n" progress(0.15, desc="Extracted frames") swapped_files = set(os.listdir(swapped_dir)) temp_dir = os.path.join(swapped_dir, "temp_swap") os.makedirs(temp_dir, exist_ok=True) total_frames = len(frame_paths) start_loop_time = time.time() for idx, frame_path in enumerate(frame_paths): swapped_name = f"swapped_{idx:05d}.jpg" out_path = os.path.join(swapped_dir, swapped_name) temp_frame_path = os.path.join(temp_dir, "temp.jpg") if swapped_name in swapped_files and os.path.exists(out_path): log += f"Frame {idx}: already swapped, skipping.\n" elapsed = time.time() - start_loop_time avg_time = elapsed / (idx + 1) if idx + 1 > 0 else 0 remaining = avg_time * (total_frames - (idx + 1)) mins, secs = divmod(int(remaining), 60) progress(0.15 + 0.6 * (idx + 1) / total_frames, desc=f"Swapping {idx+1}/{total_frames} | {mins:02d}:{secs:02d} left") continue try: shutil.copy(frame_path, temp_frame_path) for face_idx in range(1, int(num_faces_to_swap) + 1): try: swapped_img = swapper.swap_faces(src_path, 1, temp_frame_path, face_idx) cv2.imwrite(temp_frame_path, swapped_img) except Exception as e: log += f"Failed to swap face {face_idx} in frame {idx}: {e}\n" shutil.copy(temp_frame_path, out_path) log += f"Swapped all faces in frame {idx} and saved to {out_path}\n" if os.path.exists(temp_frame_path): os.remove(temp_frame_path) except Exception as e: cv2.imwrite(out_path, cv2.imread(frame_path)) log += f"Failed to swap frame {idx}: {e}\n" elapsed = time.time() - start_loop_time avg_time = elapsed / (idx + 1) if idx + 1 > 0 else 0 remaining = avg_time * (total_frames - (idx + 1)) mins, secs = divmod(int(remaining), 60) progress(0.15 + 0.6 * (idx + 1) / total_frames, desc=f"Swapping {idx+1}/{total_frames} | {mins:02d}:{secs:02d} left") if os.path.exists(temp_dir): shutil.rmtree(temp_dir) cap = cv2.VideoCapture(dst_video_path) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() frames_to_video(swapped_dir, output_video_path, fps) log += f"Combined swapped frames into video {output_video_path}\n" progress(0.8, desc="Muxing audio") ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path) if ok: log += f"Added audio to {final_output_path}\n" else: log += f"Audio muxing failed: {audio_log}\n" final_output_path = output_video_path try: if os.path.exists(src_path): os.remove(src_path) if os.path.exists(dst_video_path): os.remove(dst_video_path) if os.path.exists(frames_dir): shutil.rmtree(frames_dir) if os.path.exists(swapped_dir): shutil.rmtree(swapped_dir) log += "Cleaned up temp files and folders.\n" except Exception as cleanup_error: log += f"Cleanup error: {cleanup_error}\n" progress(1, desc="Done") elapsed = time.time() - start_time log += f"Elapsed time: {elapsed:.2f} seconds\n" return final_output_path, log def swap_faces_custom(src_imgs, dst_img, mapping_str, progress=gr.Progress(track_tqdm=True)): """ src_imgs: list of source images (numpy arrays) dst_img: destination image (numpy array) mapping_str: comma-separated string, e.g. "2,1,3" """ log = "" start_time = time.time() dst_path = "CustomSwap/data_dst.jpg" output_path = "CustomSwap/output_swapped.jpg" src_dir = "CustomSwap/src" temp_dir = "CustomSwap/temp" os.makedirs(src_dir, exist_ok=True) os.makedirs(temp_dir, exist_ok=True) os.makedirs(os.path.dirname(dst_path), exist_ok=True) os.makedirs(os.path.dirname(output_path), exist_ok=True) # Save destination image dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR) cv2.imwrite(dst_path, dst_img_bgr) log += f"Saved destination image to {dst_path}\n" # Save all source images src_paths = [] for i, src_img in enumerate(src_imgs): src_path = os.path.join(src_dir, f"data_src_{i+1}.jpg") if isinstance(src_img, tuple): src_img = src_img[0] if src_img is None: log += f"Source image {i+1} is None, skipping.\n" continue if isinstance(src_img, np.ndarray): src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) src_paths.append(src_path) log += f"Saved source image {i+1} to {src_path}\n" elif isinstance(src_img, str) and os.path.exists(src_img): shutil.copy(src_img, src_path) src_paths.append(src_path) log += f"Copied source image {i+1} from {src_img} to {src_path}\n" else: log += f"Source image {i+1} is not a valid image, skipping.\n" # Parse mapping try: mapping = [int(x.strip()) for x in mapping_str.split(",") if x.strip().isdigit()] except Exception as e: log += f"Error parsing mapping: {e}\n" elapsed = time.time() - start_time log += f"Elapsed time: {elapsed:.2f} seconds\n" return None, log # Use a temp file for iterative swapping temp_dst_path = os.path.join(temp_dir, "temp_dst.jpg") shutil.copy(dst_path, temp_dst_path) for face_idx, src_idx in enumerate(mapping, start=1): if src_idx < 1 or src_idx > len(src_paths): log += f"Invalid source index {src_idx} for face {face_idx}, skipping.\n" continue try: swapped_img = swapper.swap_faces(src_paths[src_idx-1], 1, temp_dst_path, face_idx) cv2.imwrite(temp_dst_path, swapped_img) log += f"Swapped face {face_idx} in destination with source {src_idx}\n" except Exception as e: log += f"Failed to swap face {face_idx} with source {src_idx}: {e}\n" shutil.copy(temp_dst_path, output_path) log += f"Saved swapped image to {output_path}\n" if os.path.exists(temp_dst_path): os.remove(temp_dst_path) elapsed = time.time() - start_time log += f"Elapsed time: {elapsed:.2f} seconds\n" return output_path, log def swap_video_custom_mapping(src_imgs, video, mapping_str, progress=gr.Progress()): """ Swaps faces in each frame of the video according to the user mapping. src_imgs: list of source images (numpy arrays) video: path or numpy array of the video mapping_str: comma-separated string, e.g. "2,1,3" """ log = "" start_time = time.time() src_dir = "CustomVideoSwap/src" temp_dir = "CustomVideoSwap/temp" frames_dir = "CustomVideoSwap/frames" swapped_dir = "CustomVideoSwap/swapped_frames" output_video_path = "CustomVideoSwap/output_tmp_output_video.mp4" final_output_path = "CustomVideoSwap/output_with_audio.mp4" dst_video_path = "CustomVideoSwap/data_dst.mp4" os.makedirs(src_dir, exist_ok=True) os.makedirs(temp_dir, exist_ok=True) os.makedirs(frames_dir, exist_ok=True) os.makedirs(swapped_dir, exist_ok=True) # Save all source images src_paths = [] for i, src_img in enumerate(src_imgs): src_path = os.path.join(src_dir, f"data_src_{i+1}.jpg") if isinstance(src_img, tuple): src_img = src_img[0] if src_img is None: log += f"Source image {i+1} is None, skipping.\n" continue if isinstance(src_img, np.ndarray): src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) src_paths.append(src_path) log += f"Saved source image {i+1} to {src_path}\n" elif isinstance(src_img, str) and os.path.exists(src_img): shutil.copy(src_img, src_path) src_paths.append(src_path) log += f"Copied source image {i+1} from {src_img} to {src_path}\n" else: log += f"Source image {i+1} is not a valid image, skipping.\n" # Parse mapping try: mapping = [int(x.strip()) for x in mapping_str.split(",") if x.strip().isdigit()] except Exception as e: log += f"Error parsing mapping: {e}\n" elapsed = time.time() - start_time log += f"Elapsed time: {elapsed:.2f} seconds\n" return None, log # Prepare video if isinstance(video, str) and os.path.exists(video): shutil.copy(video, dst_video_path) log += f"Copied video to {dst_video_path}\n" else: dst_video_path = video from VideoSwapping import extract_frames, frames_to_video frame_paths = extract_frames(dst_video_path, frames_dir) log += f"Extracted {len(frame_paths)} frames to {frames_dir}\n" progress(0.1, desc="Extracted frames") swapped_files = set(os.listdir(swapped_dir)) temp_frame_path = os.path.join(temp_dir, "temp.jpg") total_frames = len(frame_paths) start_loop_time = time.time() for idx, frame_path in enumerate(frame_paths): swapped_name = f"swapped_{idx:05d}.jpg" out_path = os.path.join(swapped_dir, swapped_name) if swapped_name in swapped_files and os.path.exists(out_path): log += f"Frame {idx}: already swapped, skipping.\n" elapsed = time.time() - start_loop_time avg_time = elapsed / (idx + 1) if idx + 1 > 0 else 0 remaining = avg_time * (total_frames - (idx + 1)) mins, secs = divmod(int(remaining), 60) progress(0.1 + 0.7 * (idx + 1) / total_frames, desc=f"Swapping {idx+1}/{total_frames} | {mins:02d}:{secs:02d} left") continue try: shutil.copy(frame_path, temp_frame_path) for face_idx, src_idx in enumerate(mapping, start=1): if src_idx < 1 or src_idx > len(src_paths): log += f"Invalid source index {src_idx} for face {face_idx} in frame {idx}, skipping.\n" continue try: swapped_img = swapper.swap_faces(src_paths[src_idx-1], 1, temp_frame_path, face_idx) cv2.imwrite(temp_frame_path, swapped_img) log += f"Frame {idx}: Swapped face {face_idx} with source {src_idx}\n" except Exception as e: log += f"Frame {idx}: Failed to swap face {face_idx} with source {src_idx}: {e}\n" shutil.copy(temp_frame_path, out_path) log += f"Swapped all faces in frame {idx} and saved to {out_path}\n" if os.path.exists(temp_frame_path): os.remove(temp_frame_path) except Exception as e: cv2.imwrite(out_path, cv2.imread(frame_path)) log += f"Failed to swap frame {idx}: {e}\n" elapsed = time.time() - start_loop_time avg_time = elapsed / (idx + 1) if idx + 1 > 0 else 0 remaining = avg_time * (total_frames - (idx + 1)) mins, secs = divmod(int(remaining), 60) progress(0.1 + 0.7 * (idx + 1) / total_frames, desc=f"Swapping {idx+1}/{total_frames} | {mins:02d}:{secs:02d} left") if os.path.exists(temp_dir): shutil.rmtree(temp_dir) cap = cv2.VideoCapture(dst_video_path) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() frames_to_video(swapped_dir, output_video_path, fps) log += f"Combined swapped frames into video {output_video_path}\n" progress(0.9, desc="Muxing audio") ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path) if ok: log += f"Added audio to {final_output_path}\n" else: log += f"Audio muxing failed: {audio_log}\n" final_output_path = output_video_path try: if os.path.exists(dst_video_path): os.remove(dst_video_path) if os.path.exists(frames_dir): shutil.rmtree(frames_dir) if os.path.exists(swapped_dir): shutil.rmtree(swapped_dir) log += "Cleaned up temp files and folders.\n" except Exception as cleanup_error: log += f"Cleanup error: {cleanup_error}\n" progress(1, desc="Done") elapsed = time.time() - start_time log += f"Elapsed time: {elapsed:.2f} seconds\n" return final_output_path, log # Add this to your Gradio UI: with gr.Blocks() as demo: gr.Markdown(wellcomingMessage) with gr.Tab("Single Photo Swapping"): gr.Interface( fn=swap_single_photo, inputs=[ gr.Image(label="Source Image"), gr.Number(value=1, label="Source Face Index"), gr.Image(label="Destination Image"), gr.Number(value=1, label="Destination Face Index"), ], outputs=[ gr.Image(label="Swapped Image"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("SingleSrc MultiDst"): gr.Interface( fn=swap_single_src_multi_dst, inputs=[ gr.Image(label="Source Image"), gr.Gallery(label="Destination Images", type="numpy", columns=3), gr.Textbox(label="Destination Face Indices (comma-separated, e.g. 1,1,2)"), ], outputs=[ gr.Gallery(label="Swapped Images"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("MultiSrc SingleDst"): gr.Interface( fn=swap_multi_src_single_dst, inputs=[ gr.Gallery(label="Source Images", type="numpy", columns=3), gr.Image(label="Destination Image"), gr.Number(value=1, label="Destination Face Index"), ], outputs=[ gr.Gallery(label="Swapped Images"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("MultiSrc MultiDst"): gr.Interface( fn=swap_multi_src_multi_dst, inputs=[ gr.Gallery(label="Source Images", type="numpy", columns=3), gr.Gallery(label="Destination Images", type="numpy", columns=3), gr.Textbox(label="Destination Face Indices (comma-separated, e.g. 1,1,2)"), ], outputs=[ gr.Gallery(label="Swapped Images"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("Custom Face Mapping"): gr.Interface( fn=swap_faces_custom, inputs=[ gr.Gallery(label="Source Images", type="numpy", columns=3), gr.Image(label="Destination Image"), gr.Textbox(label="Mapping (comma-separated, e.g. 2,1,3)"), ], outputs=[ gr.Image(label="Swapped Image"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("Video Swapping"): gr.Interface( fn=swap_video, inputs=[ gr.Image(label="Source Image"), gr.Number(value=1, label="Source Face Index"), gr.Video(label="Target Video"), gr.Number(value=1, label="Destination Face Index"), ], outputs=[ gr.Video(label="Swapped Video"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("Video All Faces"): gr.Interface( fn=swap_video_all_faces, inputs=[ gr.Image(label="Source Image"), gr.Video(label="Target Video"), gr.Number(value=1, label="Number of Faces to Swap"), ], outputs=[ gr.Video(label="Swapped Video"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("Custom Video Face Mapping"): gr.Interface( fn=swap_video_custom_mapping, inputs=[ gr.Gallery(label="Source Images", type="numpy", columns=3), gr.Video(label="Target Video"), gr.Textbox(label="Mapping (comma-separated, e.g. 2,1,3)"), ], outputs=[ gr.Video(label="Swapped Video"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) if __name__ == "__main__": demo.launch(share=True)