import gradio as gr import os import cv2 import numpy as np import shutil import subprocess import time from SinglePhoto import FaceSwapper wellcomingMessage = """
All-in-one face swapping: single photo, video, multi-source, and multi-destination!
""" swapper = FaceSwapper() def swap_single_photo(src_img, src_idx, dst_img, dst_idx, progress=gr.Progress(track_tqdm=True)): log = "" try: progress(0, desc="Preparing files") src_path = "SinglePhoto/data_src.jpg" dst_path = "SinglePhoto/data_dst.jpg" output_path = "SinglePhoto/output_swapped.jpg" os.makedirs(os.path.dirname(src_path), exist_ok=True) os.makedirs(os.path.dirname(dst_path), exist_ok=True) os.makedirs(os.path.dirname(output_path), exist_ok=True) src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) cv2.imwrite(dst_path, dst_img_bgr) log += f"Saved source to {src_path}, destination to {dst_path}\n" progress(0.5, desc="Swapping faces") result = swapper.swap_faces(src_path, int(src_idx), dst_path, int(dst_idx)) cv2.imwrite(output_path, result) log += f"Swapped and saved result to {output_path}\n" progress(0.8, desc="Cleaning up") try: if os.path.exists(src_path): os.remove(src_path) if os.path.exists(dst_path): os.remove(dst_path) log += "Cleaned up temp files.\n" except Exception as cleanup_error: log += f"Cleanup error: {cleanup_error}\n" progress(1, desc="Done") return output_path, log except Exception as e: log += f"Error: {e}\n" progress(1, desc="Error") return None, log def swap_video(src_img, src_idx, video, dst_idx, progress=gr.Progress()): log = "" src_path = "VideoSwapping/data_src.jpg" dst_video_path = "VideoSwapping/data_dst.mp4" frames_dir = "VideoSwapping/video_frames" swapped_dir = "VideoSwapping/swapped_frames" output_video_path = "VideoSwapping/output_tmp_output_video.mp4" final_output_path = "VideoSwapping/output_with_audio.mp4" os.makedirs(os.path.dirname(src_path), exist_ok=True) os.makedirs(os.path.dirname(dst_video_path), exist_ok=True) os.makedirs(frames_dir, exist_ok=True) os.makedirs(swapped_dir, exist_ok=True) src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) log += f"Saved source image to {src_path}\n" progress(0.05, desc="Saved source image") if isinstance(video, str) and os.path.exists(video): shutil.copy(video, dst_video_path) log += f"Copied video to {dst_video_path}\n" else: dst_video_path = video from VideoSwapping import extract_frames, frames_to_video # Extract frames only if not already present frame_paths = extract_frames(dst_video_path, frames_dir) log += f"Extracted {len(frame_paths)} frames to {frames_dir}\n" progress(0.15, desc="Extracted frames") # Prepare swapped frames list and resume if possible swapped_files = set(os.listdir(swapped_dir)) start_time = time.time() total_frames = len(frame_paths) for idx, frame_path in enumerate(frame_paths): swapped_name = f"swapped_{idx:05d}.jpg" out_path = os.path.join(swapped_dir, swapped_name) if swapped_name in swapped_files and os.path.exists(out_path): log += f"Frame {idx}: already swapped, skipping.\n" elapsed = time.time() - start_time avg_time = elapsed / (idx + 1) if idx + 1 > 0 else 0 remaining = avg_time * (total_frames - (idx + 1)) mins, secs = divmod(int(remaining), 60) progress(0.15 + 0.6 * (idx + 1) / total_frames, desc=f"Swapping {idx+1}/{total_frames} | {mins:02d}:{secs:02d} left") continue try: try: swapped = swapper.swap_faces(src_path, int(src_idx), frame_path, int(dst_idx)) except ValueError as ve: if int(dst_idx) != 1 and "Target image contains" in str(ve): swapped = swapper.swap_faces(src_path, int(src_idx), frame_path, 1) log += f"Frame {idx}: dst_idx {dst_idx} not found, used 1 instead.\n" else: raise ve cv2.imwrite(out_path, swapped) log += f"Swapped frame {idx} and saved to {out_path}\n" except Exception as e: cv2.imwrite(out_path, cv2.imread(frame_path)) log += f"Failed to swap frame {idx}: {e}\n" elapsed = time.time() - start_time avg_time = elapsed / (idx + 1) if idx + 1 > 0 else 0 remaining = avg_time * (total_frames - (idx + 1)) mins, secs = divmod(int(remaining), 60) progress(0.15 + 0.6 * (idx + 1) / total_frames, desc=f"Swapping {idx+1}/{total_frames} | {mins:02d}:{secs:02d} left") cap = cv2.VideoCapture(dst_video_path) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() frames_to_video(swapped_dir, output_video_path, fps) log += f"Combined swapped frames into video {output_video_path}\n" progress(0.8, desc="Muxing audio") # Add audio from original video ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path) if ok: log += f"Added audio to {final_output_path}\n" else: log += f"Audio muxing failed: {audio_log}\n" final_output_path = output_video_path # fallback to video without audio try: if os.path.exists(src_path): os.remove(src_path) if os.path.exists(dst_video_path): os.remove(dst_video_path) if os.path.exists(frames_dir): shutil.rmtree(frames_dir) if os.path.exists(swapped_dir): shutil.rmtree(swapped_dir) log += "Cleaned up temp files and folders.\n" except Exception as cleanup_error: log += f"Cleanup error: {cleanup_error}\n" progress(1, desc="Done") return final_output_path, log def add_audio_to_video(original_video_path, video_no_audio_path, output_path): """ Uses ffmpeg to mux audio from original_video_path into video_no_audio_path. """ cmd = [ "ffmpeg", "-y", "-i", video_no_audio_path, "-i", original_video_path, "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0?", "-shortest", output_path ] try: subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True, "" except subprocess.CalledProcessError as e: return False, e.stderr.decode() def swap_multi_src_single_dst(src_imgs, dst_img, dst_idx, progress=gr.Progress(track_tqdm=True)): log = "" results = [] src_dir = "MultiSrcSingleDst/src" dst_dir = "MultiSrcSingleDst/dst" output_dir = "MultiSrcSingleDst/output" os.makedirs(src_dir, exist_ok=True) os.makedirs(dst_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) if isinstance(dst_img, tuple): dst_img = dst_img[0] dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR) dst_path = os.path.join(dst_dir, "data_dst.jpg") cv2.imwrite(dst_path, dst_img_bgr) log += f"Saved destination image to {dst_path}\n" progress(0.05, desc="Saved destination image") for i, src_img in enumerate(src_imgs): if isinstance(src_img, tuple): src_img = src_img[0] src_path = os.path.join(src_dir, f"data_src_{i}.jpg") output_path = os.path.join(output_dir, f"output_swapped_{i}.jpg") src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) log += f"Saved source image {i} to {src_path}\n" try: result = swapper.swap_faces(src_path, 1, dst_path, int(dst_idx)) cv2.imwrite(output_path, result) results.append(output_path) log += f"Swapped and saved result to {output_path}\n" except Exception as e: results.append(f"Error: {e}") log += f"Error swapping source {i}: {e}\n" progress((i + 1) / len(src_imgs), desc=f"Swapping source {i+1}/{len(src_imgs)}") progress(1, desc="Done") return results, log def swap_multi_src_multi_dst(src_imgs, dst_imgs, dst_indices, progress=gr.Progress(track_tqdm=True)): log = "" results = [] src_dir = "MultiSrcMultiDst/src" dst_dir = "MultiSrcMultiDst/dst" output_dir = "MultiSrcMultiDst/output" os.makedirs(src_dir, exist_ok=True) os.makedirs(dst_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) if isinstance(dst_indices, str): dst_indices_list = [int(idx.strip()) for idx in dst_indices.split(",") if idx.strip().isdigit()] else: dst_indices_list = [int(idx) for idx in dst_indices] total = max(1, len(src_imgs) * len(dst_imgs)) count = 0 for i, src_img in enumerate(src_imgs): if isinstance(src_img, tuple): src_img = src_img[0] if src_img is None: results.append(f"Error: Source image at index {i} is None") log += f"Source image at index {i} is None\n" continue src_path = os.path.join(src_dir, f"data_src_{i}.jpg") if isinstance(src_img, np.ndarray): src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) log += f"Saved source image {i} to {src_path}\n" elif isinstance(src_img, str) and os.path.exists(src_img): shutil.copy(src_img, src_path) log += f"Copied source image {i} from {src_img} to {src_path}\n" else: results.append(f"Error: Invalid source image at index {i}") log += f"Invalid source image at index {i}\n" continue for j, dst_img in enumerate(dst_imgs): if isinstance(dst_img, tuple): dst_img = dst_img[0] if dst_img is None: results.append(f"Error: Destination image at index {j} is None") log += f"Destination image at index {j} is None\n" continue dst_path = os.path.join(dst_dir, f"data_dst_{j}.jpg") output_path = os.path.join(output_dir, f"output_swapped_{i}_{j}.jpg") if isinstance(dst_img, np.ndarray): dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR) cv2.imwrite(dst_path, dst_img_bgr) log += f"Saved destination image {j} to {dst_path}\n" elif isinstance(dst_img, str) and os.path.exists(dst_img): shutil.copy(dst_img, dst_path) log += f"Copied destination image {j} from {dst_img} to {dst_path}\n" else: results.append(f"Error: Invalid destination image at index {j}") log += f"Invalid destination image at index {j}\n" continue try: dst_idx = dst_indices_list[j] if j < len(dst_indices_list) else 1 result = swapper.swap_faces(src_path, 1, dst_path, int(dst_idx)) cv2.imwrite(output_path, result) results.append(output_path) log += f"Swapped src {i} with dst {j} and saved to {output_path}\n" except Exception as e: results.append(f"Error: {e}") log += f"Error swapping src {i} with dst {j}: {e}\n" count += 1 progress(count / total, desc=f"Swapping ({count}/{total})") progress(1, desc="Done") return results, log def swap_single_src_multi_dst(src_img, dst_imgs, dst_indices, progress=gr.Progress(track_tqdm=True)): log = "" results = [] src_dir = "SingleSrcMultiDst/src" dst_dir = "SingleSrcMultiDst/dst" output_dir = "SingleSrcMultiDst/output" os.makedirs(src_dir, exist_ok=True) os.makedirs(dst_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) if isinstance(src_img, tuple): src_img = src_img[0] src_path = os.path.join(src_dir, "data_src.jpg") src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) log += f"Saved source image to {src_path}\n" progress(0.05, desc="Saved source image") if isinstance(dst_indices, str): dst_indices_list = [int(idx.strip()) for idx in dst_indices.split(",") if idx.strip().isdigit()] else: dst_indices_list = [int(idx) for idx in dst_indices] for j, dst_img in enumerate(dst_imgs): if isinstance(dst_img, tuple): dst_img = dst_img[0] dst_path = os.path.join(dst_dir, f"data_dst_{j}.jpg") output_path = os.path.join(output_dir, f"output_swapped_{j}.jpg") dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR) cv2.imwrite(dst_path, dst_img_bgr) log += f"Saved destination image {j} to {dst_path}\n" try: dst_idx = dst_indices_list[j] if j < len(dst_indices_list) else 1 result = swapper.swap_faces(src_path, 1, dst_path, int(dst_idx)) cv2.imwrite(output_path, result) results.append(output_path) log += f"Swapped and saved result to {output_path}\n" except Exception as e: results.append(f"Error: {e}") log += f"Error swapping with destination {j}: {e}\n" progress((j + 1) / len(dst_imgs), desc=f"Swapping destination {j+1}/{len(dst_imgs)}") progress(1, desc="Done") return results, log def swap_video_all_faces(src_img, video, num_faces_to_swap, progress=gr.Progress()): """ Swaps the single source image to all faces in each frame of the destination video. For each frame, swaps one by one, always using the latest swapped image for the next face. """ log = "" src_path = "VideoSwappingAllFaces/data_src.jpg" dst_video_path = "VideoSwappingAllFaces/data_dst.mp4" frames_dir = "VideoSwappingAllFaces/video_frames" swapped_dir = "VideoSwappingAllFaces/swapped_frames" output_video_path = "VideoSwappingAllFaces/output_tmp_output_video.mp4" final_output_path = "VideoSwappingAllFaces/output_with_audio.mp4" os.makedirs(os.path.dirname(src_path), exist_ok=True) os.makedirs(os.path.dirname(dst_video_path), exist_ok=True) os.makedirs(frames_dir, exist_ok=True) os.makedirs(swapped_dir, exist_ok=True) src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) cv2.imwrite(src_path, src_img_bgr) log += f"Saved source image to {src_path}\n" progress(0.05, desc="Saved source image") if isinstance(video, str) and os.path.exists(video): shutil.copy(video, dst_video_path) log += f"Copied video to {dst_video_path}\n" else: dst_video_path = video from VideoSwapping import extract_frames, frames_to_video frame_paths = extract_frames(dst_video_path, frames_dir) log += f"Extracted {len(frame_paths)} frames to {frames_dir}\n" progress(0.15, desc="Extracted frames") swapped_files = set(os.listdir(swapped_dir)) temp_dir = os.path.join(swapped_dir, "temp_swap") os.makedirs(temp_dir, exist_ok=True) start_time = time.time() total_frames = len(frame_paths) for idx, frame_path in enumerate(frame_paths): swapped_name = f"swapped_{idx:05d}.jpg" out_path = os.path.join(swapped_dir, swapped_name) temp_frame_path = os.path.join(temp_dir, "temp.jpg") if swapped_name in swapped_files and os.path.exists(out_path): log += f"Frame {idx}: already swapped, skipping.\n" elapsed = time.time() - start_time avg_time = elapsed / (idx + 1) if idx + 1 > 0 else 0 remaining = avg_time * (total_frames - (idx + 1)) mins, secs = divmod(int(remaining), 60) progress(0.15 + 0.6 * (idx + 1) / total_frames, desc=f"Swapping {idx+1}/{total_frames} | {mins:02d}:{secs:02d} left") continue try: shutil.copy(frame_path, temp_frame_path) for face_idx in range(1, int(num_faces_to_swap) + 1): try: swapped_img = swapper.swap_faces(src_path, 1, temp_frame_path, face_idx) cv2.imwrite(temp_frame_path, swapped_img) except Exception as e: log += f"Failed to swap face {face_idx} in frame {idx}: {e}\n" shutil.copy(temp_frame_path, out_path) log += f"Swapped all faces in frame {idx} and saved to {out_path}\n" if os.path.exists(temp_frame_path): os.remove(temp_frame_path) except Exception as e: cv2.imwrite(out_path, cv2.imread(frame_path)) log += f"Failed to swap frame {idx}: {e}\n" elapsed = time.time() - start_time avg_time = elapsed / (idx + 1) if idx + 1 > 0 else 0 remaining = avg_time * (total_frames - (idx + 1)) mins, secs = divmod(int(remaining), 60) progress(0.15 + 0.6 * (idx + 1) / total_frames, desc=f"Swapping {idx+1}/{total_frames} | {mins:02d}:{secs:02d} left") if os.path.exists(temp_dir): shutil.rmtree(temp_dir) cap = cv2.VideoCapture(dst_video_path) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() frames_to_video(swapped_dir, output_video_path, fps) log += f"Combined swapped frames into video {output_video_path}\n" progress(0.8, desc="Muxing audio") ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path) if ok: log += f"Added audio to {final_output_path}\n" else: log += f"Audio muxing failed: {audio_log}\n" final_output_path = output_video_path try: if os.path.exists(src_path): os.remove(src_path) if os.path.exists(dst_video_path): os.remove(dst_video_path) if os.path.exists(frames_dir): shutil.rmtree(frames_dir) if os.path.exists(swapped_dir): shutil.rmtree(swapped_dir) log += "Cleaned up temp files and folders.\n" except Exception as cleanup_error: log += f"Cleanup error: {cleanup_error}\n" progress(1, desc="Done") return final_output_path, log with gr.Blocks() as demo: gr.Markdown(wellcomingMessage) with gr.Tab("Single Photo Swapping"): gr.Interface( fn=swap_single_photo, inputs=[ gr.Image(label="Source Image"), gr.Number(value=1, label="Source Face Index"), gr.Image(label="Destination Image"), gr.Number(value=1, label="Destination Face Index"), ], outputs=[ gr.Image(label="Swapped Image"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("Video Swapping"): gr.Interface( fn=swap_video, inputs=[ gr.Image(label="Source Image"), gr.Number(value=1, label="Source Face Index"), gr.Video(label="Target Video"), gr.Number(value=1, label="Destination Face Index"), ], outputs=[ gr.Video(label="Swapped Video"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("Video All Faces"): gr.Interface( fn=swap_video_all_faces, inputs=[ gr.Image(label="Source Image"), gr.Video(label="Target Video"), gr.Number(value=1, label="Number of Faces to Swap"), ], outputs=[ gr.Video(label="Swapped Video"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("SingleSrc MultiDst"): gr.Interface( fn=swap_single_src_multi_dst, inputs=[ gr.Image(label="Source Image"), gr.Gallery(label="Destination Images", type="numpy", columns=3), gr.Textbox(label="Destination Face Indices (comma-separated, e.g. 1,1,2)"), ], outputs=[ gr.Gallery(label="Swapped Images"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("MultiSrc SingleDst"): gr.Interface( fn=swap_multi_src_single_dst, inputs=[ gr.Gallery(label="Source Images", type="numpy", columns=3), gr.Image(label="Destination Image"), gr.Number(value=1, label="Destination Face Index"), ], outputs=[ gr.Gallery(label="Swapped Images"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) with gr.Tab("MultiSrc MultiDst"): gr.Interface( fn=swap_multi_src_multi_dst, inputs=[ gr.Gallery(label="Source Images", type="numpy", columns=3), gr.Gallery(label="Destination Images", type="numpy", columns=3), gr.Textbox(label="Destination Face Indices (comma-separated, e.g. 1,1,2)"), ], outputs=[ gr.Gallery(label="Swapped Images"), gr.Textbox(label="Log Output", lines=8, interactive=False) ], ) if __name__ == "__main__": demo.launch(share=False)