Spaces:
Build error
Build error
| import gradio as gr | |
| import cv2 | |
| import dlib | |
| import shutil | |
| import numpy as np | |
| import random | |
| from datetime import datetime | |
| import torch | |
| import torch.nn.functional as F | |
| from facenet_pytorch import MTCNN, InceptionResnetV1 | |
| from PIL import Image | |
| from pytorch_grad_cam import GradCAM | |
| from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget | |
| from pytorch_grad_cam.utils.image import show_cam_on_image | |
| import os | |
| import warnings | |
| import tempfile | |
| import glob | |
| from concurrent.futures import ThreadPoolExecutor | |
| import multiprocessing | |
| from concurrent.futures import ProcessPoolExecutor, as_completed | |
| import re | |
| from PIL import Image | |
| from PIL.ExifTags import TAGS | |
| import tempfile | |
| import librosa | |
| import plotly.express as px | |
| import torchaudio | |
| from tortoise.models.classifier import AudioMiniEncoderWithClassifierHead | |
| warnings.filterwarnings("ignore") | |
| def inputseparation(video, image, audio): | |
| if video is not None: | |
| return save_video(video) | |
| elif image is not None: | |
| return predictimage(image) | |
| else: | |
| return audiopredict(audio) | |
| def load_audio(uploaded_file, sampling_rate=22000): | |
| # Handle MP3 files with torchaudio | |
| with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
| with open(uploaded_file, 'rb') as audio_file: # Open in binary mode | |
| tmp.write(audio_file.read()) | |
| tmp_path = tmp.name | |
| audio, sr = torchaudio.load(tmp_path) | |
| audio = audio.mean(dim=0) | |
| if sr != sampling_rate: | |
| audio = torchaudio.transforms.Resample(sr, sampling_rate)(audio) | |
| audio = audio.clamp_(-1, 1) | |
| return audio.unsqueeze(0) | |
| def classify_audio_clip(clip): | |
| classifier = AudioMiniEncoderWithClassifierHead(2, spec_dim=1, embedding_dim=512, depth=5, downsample_factor=4, resnet_blocks=2, attn_blocks=4, num_attn_heads=4, base_channels=32, dropout=0, kernel_size=5, distribute_zero_label=False) | |
| state_dict = torch.load('classifier.pth', map_location=torch.device('cpu')) | |
| classifier.load_state_dict(state_dict) | |
| classifier.eval() | |
| clip = clip.cpu().unsqueeze(0) | |
| with torch.no_grad(): | |
| results = classifier(clip) | |
| probabilities = F.softmax(results, dim=-1) | |
| ai_generated_probability = probabilities[0][1].item() | |
| return ai_generated_probability | |
| def audiopredict(audio): | |
| if audio is not None: | |
| audio_clip = load_audio(audio) | |
| ai_generated_probability = classify_audio_clip(audio_clip) | |
| image_path = os.path.join("./wave.jpg") | |
| image = Image.open(image_path) | |
| if ai_generated_probability < 0.5: | |
| return "Real", "The audio is likely to be Real", "No EXIF data found in the audio", image | |
| else: | |
| return "Deepfake", "The audio is likely to be AI Generated", "No EXIF data found in the audio", image | |
| # Video Input Code | |
| def save_video(video_path): | |
| # Create a temporary directory to save the video | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| # Extract filename from path | |
| filename = os.path.basename(video_path) | |
| # Save video to the temporary folder | |
| temp_video_path = os.path.join(temp_dir, filename) | |
| with open(temp_video_path, "wb") as f: | |
| f.write(open(video_path, "rb").read()) | |
| # Process frames, select faces, and perform deepfake identification | |
| textoutput, exif, face_with_mask = process_video(temp_dir, filename) | |
| print(textoutput) | |
| string = textoutput | |
| # Extract percentages and convert them to floats | |
| percentages = re.findall(r"(\d+\.\d+)%", string) | |
| real_percentage = float(percentages[0]) | |
| fake_percentage = float(percentages[1]) | |
| # Determine which percentage is higher | |
| if real_percentage > fake_percentage: | |
| print("Real") | |
| val = "Real" | |
| else: | |
| print("Fake") | |
| val = "Deepfake" | |
| return val, textoutput, exif, face_with_mask | |
| def process_video(video_folder, video_filename): | |
| # Additional Processing (Frames, Faces, Deepfake Identification) | |
| frames_base_dir = "./frames" | |
| faces_base_dir = "./faces" | |
| selected_faces_base_dir = "./selected_faces" | |
| # Find the latest video | |
| video_path = os.path.join(video_folder, video_filename) | |
| # Create session folders | |
| session_name = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| frames_session_dir = create_session_folder(frames_base_dir, session_name) | |
| faces_session_dir = create_session_folder(faces_base_dir, session_name) | |
| selected_faces_session_dir = create_session_folder(selected_faces_base_dir, session_name) | |
| # Extract frames and faces | |
| video_to_frames_and_extract_faces(video_path, frames_session_dir, faces_session_dir) | |
| # Select random faces | |
| select_random_faces(faces_session_dir, selected_faces_session_dir) | |
| # Perform deepfake identification | |
| textoutput, exif, face_with_mask = identify_deepfake(selected_faces_session_dir) | |
| return textoutput, exif, face_with_mask | |
| def create_session_folder(parent_dir, session_name=None): | |
| if not session_name: | |
| session_name = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| session_path = os.path.join(parent_dir, session_name) | |
| os.makedirs(session_path, exist_ok=True) | |
| return session_path | |
| def extract_faces(frame_path, faces_dir): | |
| frame = cv2.imread(frame_path) | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| detector = dlib.get_frontal_face_detector() | |
| faces = detector(gray, 1) | |
| faces_extracted = 0 | |
| for (i, face) in enumerate(faces): | |
| (x, y, w, h) = (face.left(), face.top(), face.width(), face.height()) | |
| face_image = frame[y:y+h, x:x+w] | |
| face_file_path = os.path.join(faces_dir, f"face_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.jpg") | |
| cv2.imwrite(face_file_path, face_image) | |
| faces_extracted += 1 | |
| return faces_extracted | |
| def video_to_frames_and_extract_faces(video_path, frames_dir, faces_dir): | |
| video_capture = cv2.VideoCapture(video_path) | |
| success, frame = video_capture.read() | |
| frame_count = 0 | |
| processed_frame_count = 0 | |
| futures = [] | |
| num_workers = min(multiprocessing.cpu_count(), 8) | |
| with ProcessPoolExecutor(max_workers=num_workers) as executor: | |
| while success: | |
| if frame_count % 2 == 0: | |
| frame_file = os.path.join(frames_dir, f"frame_{processed_frame_count}.jpg") | |
| cv2.imwrite(frame_file, frame) | |
| processed_frame_count += 1 | |
| if processed_frame_count % 4 == 0: | |
| future = executor.submit(extract_faces, frame_file, faces_dir) | |
| futures.append(future) | |
| success, frame = video_capture.read() | |
| frame_count += 1 | |
| total_faces = sum(f.result() for f in as_completed(futures)) | |
| print(f"Saved frames: {processed_frame_count}, Processed for face extraction: {len(futures)}, Extracted faces: {total_faces}") | |
| video_capture.release() | |
| return total_faces | |
| def select_random_faces(faces_dir, selected_faces_dir): | |
| face_files = [os.path.join(faces_dir, f) for f in os.listdir(faces_dir) if f.endswith('.jpg')] | |
| selected_faces = random.sample(face_files, min(20, len(face_files))) | |
| for face_file in selected_faces: | |
| basename = os.path.basename(face_file) | |
| destination_file = os.path.join(selected_faces_dir, basename) | |
| shutil.copy(face_file, destination_file) | |
| print(f"Selected random faces: {len(selected_faces)}") | |
| # Find Deepfake or Not | |
| def identify_deepfake(selected_faces_dir): | |
| # Setup device | |
| DEVICE = 'cpu' if not torch.cuda.is_available() else 'cuda' | |
| # Initialize MTCNN and InceptionResnetV1 with pre-trained models | |
| mtcnn = MTCNN(select_largest=False, post_process=False, device=DEVICE).to(DEVICE).eval() | |
| model = InceptionResnetV1(pretrained="vggface2", classify=True, num_classes=1, device=DEVICE) | |
| # Load the model checkpoint | |
| checkpoint_path = "./resnetinceptionv1_epoch_32.pth" # Update this path | |
| checkpoint = torch.load(checkpoint_path, map_location=DEVICE) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| model.to(DEVICE) | |
| model.eval() | |
| # Define prediction function | |
| def predict(input_image: Image.Image): | |
| try: | |
| face = mtcnn(input_image) | |
| if face is None: | |
| raise Exception('No face detected') | |
| face = F.interpolate(face.unsqueeze(0), size=(256, 256), mode='bilinear', align_corners=False) | |
| face = face.to(DEVICE).to(torch.float32) / 255.0 | |
| target_layers = [model.block8.branch1[-1]] | |
| cam = GradCAM(model=model, target_layers=target_layers) | |
| targets = [ClassifierOutputTarget(0)] | |
| grayscale_cam = cam(input_tensor=face, targets=targets, eigen_smooth=True) | |
| grayscale_cam = grayscale_cam[0, :] | |
| face_image_np = face.squeeze().permute(1, 2, 0).cpu().detach().numpy() | |
| visualization = show_cam_on_image(face_image_np, grayscale_cam, use_rgb=True) | |
| face_with_mask = cv2.addWeighted((face_image_np * 255).astype('uint8'), 1, (visualization * 255).astype('uint8'), 0.5, 0) | |
| with torch.no_grad(): | |
| output = torch.sigmoid(model(face)).item() | |
| prediction = "real" if output < 0.5 else "fake" | |
| confidences = {'real': 1 - output, 'fake': output} | |
| return confidences, prediction, face_with_mask | |
| except Exception as e: | |
| print(f"Prediction failed: {e}") | |
| return {'real': 0, 'fake': 100}, "fake", None | |
| # Process images in the selected folder | |
| image_files = sorted([f for f in os.listdir(selected_faces_dir) if f.endswith(('.jpg', '.jpeg', '.png', '.bmp'))]) | |
| results = {} # Initialize an empty dictionary to store results | |
| for image_file in image_files: | |
| image_path = os.path.join(selected_faces_dir, image_file) | |
| input_image = Image.open(image_path) | |
| confidences, prediction, face_with_mask = predict(input_image) | |
| # print(confidences, prediction, face_with_mask) | |
| if face_with_mask is None: | |
| continue | |
| # Store the results in the dictionary | |
| results[image_file] = { | |
| 'Confidence': confidences, | |
| 'Prediction': 'real' if confidences['real'] > confidences['fake'] else 'fake' | |
| } | |
| print(f"Image: {image_file}, Confidence: {confidences}, Prediction: {'real' if confidences['real'] > confidences['fake'] else 'fake'}") | |
| image_path = os.path.join(selected_faces_dir, image_files[0]) | |
| image = Image.open(image_path) | |
| exif_data = image.getexif() # Returns an Exif instance or None | |
| if exif_data: | |
| exif = "" | |
| for tag_id in exif_data: | |
| # Get the tag name | |
| tag = TAGS.get(tag_id, tag_id) | |
| value = exif_data[tag_id] | |
| # Print the tag and value in a human-readable format | |
| exif += f"{tag}: {value}\n" | |
| else: | |
| exif = "No EXIF data or Metadata found in the video" | |
| # Accumulate 'real' and 'fake' scores | |
| real_total = 0.0 | |
| fake_total = 0.0 | |
| count = 0 | |
| for key, value in results.items(): | |
| if 'Confidence' in value: | |
| real_total += value['Confidence']['real'] | |
| fake_total += value['Confidence']['fake'] | |
| count += 1 | |
| # Calculate and display consolidated score if any images were successfully processed | |
| if count > 0: | |
| real_avg = (real_total / count) * 100 | |
| fake_avg = (fake_total / count) * 100 | |
| textoutput = (f"Consolidated Score for the uploaded video - Real: {real_avg:.2f}%, Fake: {fake_avg:.2f}%") | |
| return textoutput, exif, face_with_mask | |
| else: | |
| print("No images were successfully processed to calculate a consolidated score.") | |
| # Gradio Interface | |
| def predictimage(input_image: Image.Image): | |
| DEVICE = 'cuda:0' if torch.cuda.is_available() else 'cpu' | |
| mtcnn = MTCNN( | |
| select_largest=False, | |
| post_process=False, | |
| device=DEVICE | |
| ).to(DEVICE).eval() | |
| model = InceptionResnetV1( | |
| pretrained="vggface2", | |
| classify=True, | |
| num_classes=1, | |
| device=DEVICE | |
| ) | |
| checkpoint = torch.load("./resnetinceptionv1_epoch_32.pth", map_location=torch.device('cpu')) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| model.to(DEVICE) | |
| model.eval() | |
| face = mtcnn(input_image) | |
| image = input_image | |
| exif_data = image.getexif() # Returns an Exif instance or None | |
| if exif_data: | |
| exif = "" | |
| for tag_id in exif_data: | |
| # Get the tag name | |
| tag = TAGS.get(tag_id, tag_id) | |
| value = exif_data[tag_id] | |
| # Print the tag and value in a human-readable format | |
| exif += f"{tag}: {value}\n" | |
| else: | |
| exif = "No EXIF data found in the image" | |
| if face is None: | |
| return "Neutral", "No face detected", exif, input_image | |
| face = face.unsqueeze(0) # add the batch dimension | |
| face = F.interpolate(face, size=(256, 256), mode='bilinear', align_corners=False) | |
| # convert the face into a numpy array to be able to plot it | |
| prev_face = face.squeeze(0).permute(1, 2, 0).cpu().detach().int().numpy() | |
| prev_face = prev_face.astype('uint8') | |
| face = face.to(DEVICE) | |
| face = face.to(torch.float32) | |
| face = face / 255.0 | |
| face_image_to_plot = face.squeeze(0).permute(1, 2, 0).cpu().detach().int().numpy() | |
| target_layers=[model.block8.branch1[-1]] | |
| use_cuda = True if torch.cuda.is_available() else False | |
| cam = GradCAM(model=model, target_layers=target_layers) | |
| targets = [ClassifierOutputTarget(0)] | |
| grayscale_cam = cam(input_tensor=face, targets=targets, eigen_smooth=True) | |
| grayscale_cam = grayscale_cam[0, :] | |
| visualization = show_cam_on_image(face_image_to_plot, grayscale_cam, use_rgb=True) | |
| face_with_mask = cv2.addWeighted(prev_face, 1, visualization, 0.5, 0) | |
| with torch.no_grad(): | |
| output = torch.sigmoid(model(face).squeeze(0)) | |
| prediction = "Real" if output.item() < 0.5 else "Deepfake" | |
| real_prediction = 1 - output.item() | |
| fake_prediction = output.item() | |
| real_avg = real_prediction * 100 | |
| fake_avg = fake_prediction * 100 | |
| textoutput = (f"Consolidated Score for the uploaded image - Real: {real_avg:.2f}%, Fake: {fake_avg:.2f}%") | |
| return prediction, textoutput, exif, face_with_mask | |
| def main(): | |
| # Video Input Interface | |
| video_input_interface = gr.Interface( | |
| fn=inputseparation, | |
| inputs=[ | |
| gr.Video(label="Upload Video"), | |
| gr.Image(label="Input Image", type="pil"), | |
| gr.Audio(label="Upload Audio", type="filepath") | |
| ], | |
| outputs=[ | |
| gr.Label(label="Output Result"), | |
| gr.Text(label="Explanation"), | |
| gr.Text(label="EXIF Data / Metadata"), | |
| gr.Image(label="Face with Mask") | |
| ], | |
| title="Veritrue.ai", | |
| description="You can upload either a video, image or an audio and it will give you whether it is a deepfake or a real one." | |
| ) | |
| # Execute Video Input Interface | |
| video_input_interface.launch() | |
| if __name__ == "__main__": | |
| main() | |