Spaces:
Running
Running
| import cv2 | |
| import os | |
| import shutil | |
| import time | |
| from SinglePhoto import FaceSwapper | |
| def extract_frames(video_path, frames_dir): | |
| if not os.path.exists(frames_dir): | |
| os.makedirs(frames_dir) | |
| last_idx = -1 | |
| else: | |
| # Find the last extracted frame index | |
| existing = [f for f in os.listdir(frames_dir) if f.startswith("frame_") and f.endswith(".jpg")] | |
| if existing: | |
| last_idx = max([int(f.split("_")[1].split(".")[0]) for f in existing]) | |
| else: | |
| last_idx = -1 | |
| cap = cv2.VideoCapture(video_path) | |
| frame_paths = [] | |
| idx = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.jpg") | |
| if idx > last_idx: | |
| cv2.imwrite(frame_path, frame) | |
| frame_paths.append(frame_path) | |
| idx += 1 | |
| cap.release() | |
| return frame_paths | |
| def frames_to_video(frames_dir, output_video_path, fps): | |
| frames = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) | |
| if not frames: | |
| print("No frames found in directory.") | |
| return | |
| first_frame = cv2.imread(frames[0]) | |
| height, width, layers = first_frame.shape | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) | |
| for frame_path in frames: | |
| frame = cv2.imread(frame_path) | |
| out.write(frame) | |
| out.release() | |
| def main(): | |
| # Use files from VideoSwapping folder | |
| video_path = os.path.join("VideoSwapping", "data_dst.mp4") | |
| source_image_path = os.path.join("VideoSwapping", "data_src.jpg") | |
| frames_dir = os.path.join("VideoSwapping", "video_frames") | |
| swapped_dir = os.path.join("VideoSwapping", "swapped_frames") | |
| output_video_path = os.path.join("VideoSwapping", "output_swapped_video.mp4") | |
| source_face_idx = 1 | |
| # Ask user for target_face_idx | |
| while True: | |
| try: | |
| user_input = input("Enter target_face_idx (default is 1): ").strip() | |
| if user_input == "": | |
| dest_face_idx = 1 | |
| break | |
| dest_face_idx = int(user_input) | |
| break | |
| except ValueError: | |
| print("Invalid input. Please enter an integer value.") | |
| print("Choose an option:") | |
| print("1. Extract frames only") | |
| print("2. Face swap only (requires extracted frames)") | |
| print("3. Both extract frames and face swap") | |
| choice = input("Enter 1, 2, or 3: ").strip() | |
| frame_paths = [] | |
| if choice == "1" or choice == "3": | |
| print("Extracting frames from video (resuming if needed)...") | |
| frame_paths = extract_frames(video_path, frames_dir) | |
| print(f"Extracted {len(frame_paths)} frames to {frames_dir}.") | |
| if choice == "1": | |
| return | |
| if choice == "2": | |
| # If only face swap, ensure frames are present | |
| if not os.path.exists(frames_dir): | |
| print("Frames directory does not exist. Please extract frames first.") | |
| return | |
| frame_paths = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) | |
| if choice == "2" or choice == "3": | |
| # Prepare output directory | |
| if not os.path.exists(swapped_dir): | |
| os.makedirs(swapped_dir) | |
| # Initialize face swapper | |
| swapper = FaceSwapper() | |
| # Swap faces on each frame | |
| print("Swapping faces on frames...") | |
| start_time = time.time() | |
| for idx, frame_path in enumerate(frame_paths): | |
| frame_name = os.path.basename(frame_path) | |
| # Replace 'frame_' with 'swapped_' in the filename | |
| if frame_name.startswith("frame_"): | |
| swapped_name = "swapped_" + frame_name[len("frame_"):] | |
| else: | |
| swapped_name = "swapped_" + frame_name | |
| out_path = os.path.join(swapped_dir, swapped_name) | |
| if os.path.exists(out_path): | |
| # Skip already swapped frames | |
| print(f"Frame {idx+1}/{len(frame_paths)} already swapped, skipping...", end='\r') | |
| continue | |
| try: | |
| try: | |
| swapped = swapper.swap_faces( | |
| source_path=source_image_path, | |
| source_face_idx=source_face_idx, | |
| target_path=frame_path, | |
| target_face_idx=dest_face_idx | |
| ) | |
| except ValueError as ve: | |
| if "Target image contains" in str(ve): | |
| print(f"\nFrame {idx}: Target face idx {dest_face_idx} not found, trying with idx 1.",end='\r') | |
| swapped = swapper.swap_faces( | |
| source_path=source_image_path, | |
| source_face_idx=source_face_idx, | |
| target_path=frame_path, | |
| target_face_idx=1 | |
| ) | |
| else: | |
| raise ve | |
| cv2.imwrite(out_path, swapped) | |
| except Exception as e: | |
| print(f"\nFrame {idx}: {e}") | |
| # Optionally, copy the original frame if swap fails | |
| cv2.imwrite(out_path, cv2.imread(frame_path)) | |
| # Estimate time left | |
| elapsed = time.time() - start_time | |
| avg_time = elapsed / (idx + 1) | |
| remaining = avg_time * (len(frame_paths) - (idx + 1)) | |
| mins, secs = divmod(int(remaining), 60) | |
| print(f"Swapping frame {idx+1}/{len(frame_paths)} | Est. time left: {mins:02d}:{secs:02d}", end='\r') | |
| print() # Move to the next line after the loop | |
| # Get FPS from original video | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| cap.release() | |
| # Combine swapped frames into video | |
| print("Combining swapped frames into video...") | |
| frames_to_video(swapped_dir, output_video_path, fps) | |
| print(f"Done! Output video saved as {output_video_path}") | |
| # Ask user if they want to keep the extracted frames and swapped images | |
| answer = input("Do you want to keep the extracted frames and swapped images? (y/n): ").strip().lower() | |
| if answer == 'n': | |
| try: | |
| shutil.rmtree(frames_dir) | |
| shutil.rmtree(swapped_dir) | |
| print("Temporary folders deleted.") | |
| except Exception as e: | |
| print(f"Error deleting folders: {e}") | |
| else: | |
| print("Temporary folders kept.") | |
| if __name__ == "__main__": | |
| main() |