Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI | |
| app = FastAPI() | |
| from fastapi.middleware.cors import CORSMiddleware | |
| origins = [ | |
| "*" | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def greet_json(): | |
| return {"Hello": "World!"} | |
| #-------------------------------------------------------------------------------------------------------------------- | |
| import os | |
| import gdown | |
| file_id = "1zhisRgRi2qBFX73VFhzh-Ho93MORQqVa" | |
| output_dir = "./downloads" | |
| output_file = "file.h5" | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| output_path = os.path.join(output_dir, output_file) | |
| url = f"https://drive.google.com/uc?id={file_id}" | |
| try: | |
| gdown.download(url, output_path, quiet=False) | |
| print(f"File downloaded successfully to: {output_path}") | |
| except Exception as e: | |
| print(f"Error downloading file: {e}") | |
| output_file = "file.h5" | |
| file_path = os.path.join(output_dir, output_file) | |
| #-------------------------------------------------------------------------------------------------------------------- | |
| file_id = "1wIaycDFGTF3e0PpAHKk-GLnxk4cMehOU" | |
| output_dir = "./downloads" | |
| output_file = "file2.h5" | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| output_path = os.path.join(output_dir, output_file) | |
| url = f"https://drive.google.com/uc?id={file_id}" | |
| try: | |
| gdown.download(url, output_path, quiet=False) | |
| print(f"File downloaded successfully to: {output_path}") | |
| except Exception as e: | |
| print(f"Error downloading file: {e}") | |
| output_file = "file2.h5" | |
| file_path = os.path.join(output_dir, output_file) | |
| if os.path.exists(file_path): | |
| print(f"The file '{output_file}' exists at '{file_path}'.") | |
| else: | |
| print(f"The file '{output_file}' does not exist at '{file_path}'.") | |
| #-------------------------------------------------------------------------------------------------------------------- | |
| import os | |
| import numpy as np | |
| import tensorflow as tf | |
| import tensorflow | |
| import librosa | |
| import matplotlib.pyplot as plt | |
| # import gradio as gr | |
| import os | |
| os.environ["TORCH_HOME"] = "/tmp/torch_cache" | |
| os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" | |
| os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib_config" | |
| os.environ["FONTCONFIG_PATH"] = "/tmp/fontconfig" | |
| os.environ["HF_HOME"] = "/tmp/huggingface_cache" | |
| from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input | |
| from tensorflow.keras.models import Model | |
| from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout | |
| from tensorflow.keras.optimizers import Adam | |
| from transformers import pipeline | |
| class UnifiedDeepfakeDetector: | |
| def __init__(self): | |
| self.input_shape = (224, 224, 3) | |
| self.vgg_model = self.build_vgg16_model() | |
| self.dense_model = tf.keras.models.load_model('downloads/file2.h5') | |
| self.cnn_model = tf.keras.models.load_model('downloads/file.h5') | |
| self.melody_machine = pipeline(model="MelodyMachine/Deepfake-audio-detection-V2") | |
| def build_vgg16_model(self): | |
| base_model = VGG16(weights='imagenet', include_top=False, input_shape=self.input_shape) | |
| for layer in base_model.layers: | |
| layer.trainable = False | |
| x = base_model.output | |
| x = GlobalAveragePooling2D()(x) | |
| x = Dense(512, activation='relu')(x) | |
| x = Dropout(0.5)(x) | |
| x = Dense(256, activation='relu')(x) | |
| x = Dropout(0.3)(x) | |
| output = Dense(1, activation='sigmoid')(x) | |
| model = Model(inputs=base_model.input, outputs=output) | |
| model.compile(optimizer=Adam(learning_rate=0.0001), | |
| loss='binary_crossentropy', | |
| metrics=['accuracy']) | |
| return model | |
| def audio_to_spectrogram(self, file_path, plot=False): | |
| try: | |
| audio, sr = librosa.load(file_path, duration=5.0, sr=22050) | |
| spectrogram = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=224, fmax=8000) | |
| spectrogram_db = librosa.power_to_db(spectrogram, ref=np.max) | |
| if plot: | |
| plt.figure(figsize=(12, 6)) | |
| librosa.display.specshow(spectrogram_db, y_axis='mel', x_axis='time', cmap='viridis') | |
| plt.colorbar(format='%+2.0f dB') | |
| plt.title('Mel Spectrogram Analysis') | |
| plot_path = 'spectrogram_plot.png' | |
| plt.savefig(plot_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| return plot_path | |
| spectrogram_norm = (spectrogram_db - spectrogram_db.min()) / (spectrogram_db.max() - spectrogram_db.min()) | |
| spectrogram_rgb = np.stack([spectrogram_norm]*3, axis=-1) | |
| spectrogram_resized = tf.image.resize(spectrogram_rgb, (224, 224)) | |
| return preprocess_input(spectrogram_resized * 255) | |
| except Exception as e: | |
| print(f"Spectrogram error: {e}") | |
| return None | |
| def analyze_audio_rf(self, audio_path, model_choice="all"): | |
| results = {} | |
| plots = {} | |
| r = [] | |
| audio_features = {} | |
| try: | |
| # Load audio and extract basic features | |
| audio, sr = librosa.load(audio_path, res_type="kaiser_fast") | |
| audio_features = { | |
| "sample_rate": sr, | |
| "duration": librosa.get_duration(y=audio, sr=sr), | |
| "rms_energy": float(np.mean(librosa.feature.rms(y=audio))), | |
| "zero_crossing_rate": float(np.mean(librosa.feature.zero_crossing_rate(y=audio))) | |
| } | |
| # VGG16 Analysis | |
| if model_choice in ["VGG16", "all"]: | |
| spec = self.audio_to_spectrogram(audio_path) | |
| if spec is not None: | |
| pred = self.vgg_model.predict(np.expand_dims(spec, axis=0))[0][0] | |
| results["VGG16"] = { | |
| "prediction": "FAKE" if pred > 0.5 else "REAL", | |
| "confidence": float(pred if pred > 0.5 else 1 - pred), | |
| "raw_score": float(pred) | |
| } | |
| plots["spectrogram"] = self.audio_to_spectrogram(audio_path, plot=True) | |
| r.append("FAKE" if pred > 0.5 else "REAL") | |
| # Dense Model Analysis | |
| if model_choice in ["Dense", "all"]: | |
| mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=40) | |
| mfcc_scaled = np.mean(mfcc.T, axis=0).reshape(1, -1) | |
| pred = self.dense_model.predict(mfcc_scaled) | |
| results["Dense"] = { | |
| "prediction": "FAKE" if np.argmax(pred[0]) == 0 else "REAL", | |
| "confidence": float(np.max(pred[0])), | |
| "raw_scores": pred[0].tolist() | |
| } | |
| r.append("FAKE" if np.argmax(pred[0]) == 0 else "REAL") | |
| # CNN Model Analysis | |
| if model_choice in ["CNN", "all"]: | |
| mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=40) | |
| mfcc_scaled = np.mean(mfcc.T, axis=0).reshape(None, 40, 1, 1) | |
| pred = self.cnn_model.predict(mfcc_scaled) | |
| results["CNN"] = { | |
| "prediction": "FAKE" if np.argmax(pred[0]) == 0 else "REAL", | |
| "confidence": float(np.max(pred[0])), | |
| "raw_scores": pred[0].tolist() | |
| } | |
| r.append("FAKE" if np.argmax(pred[0]) == 0 else "REAL") | |
| # Melody Machine Analysis | |
| if model_choice in ["MelodyMachine", "all"]: | |
| result = self.melody_machine(audio_path) | |
| best_pred = max(result, key=lambda x: x['score']) | |
| results["MelodyMachine"] = { | |
| "prediction": best_pred['label'].upper(), | |
| "confidence": float(best_pred['score']), | |
| "all_predictions": result | |
| } | |
| r.append(best_pred['label'].upper()) | |
| return r | |
| except Exception as e: | |
| print(f"Analysis error: {e}") | |
| return None, None, None | |
| #-------------------------------------------------------------------------------------------------------------------- | |
| import torchaudio | |
| import torch | |
| import numpy as np | |
| from scipy.stats import skew, kurtosis, median_abs_deviation | |
| import os | |
| import torch.nn.functional as F | |
| import os | |
| os.environ["TORCH_HOME"] = "/tmp/torch_cache" | |
| from torchaudio.pipelines import WAV2VEC2_BASE | |
| bundle = WAV2VEC2_BASE | |
| model = bundle.get_model() | |
| print("Model downloaded successfully!") | |
| def extract_features(file_path): | |
| if os.path.exists(file_path): | |
| print(f"File successfully written: {file_path}") | |
| else: | |
| print("File writing failed.") | |
| waveform, sample_rate = torchaudio.load(file_path) | |
| if sample_rate != bundle.sample_rate: | |
| waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=bundle.sample_rate)(waveform) | |
| with torch.inference_mode(): | |
| features, _ = model.extract_features(waveform) | |
| pooled_features = [] | |
| for f in features: | |
| if f.dim() == 3: | |
| f = f.permute(0, 2, 1) | |
| pooled_f = F.adaptive_avg_pool1d(f[0].unsqueeze(0), 1).squeeze(0) | |
| pooled_features.append(pooled_f) | |
| final_features = torch.cat(pooled_features, dim=0).numpy() | |
| final_features = (final_features - np.mean(final_features)) / (np.std(final_features) + 1e-10) | |
| return final_features | |
| def additional_features(features): | |
| mad = median_abs_deviation(features) | |
| features_clipped = np.clip(features, 1e-10, None) | |
| entropy = -np.sum(features_clipped * np.log(features_clipped)) | |
| return mad, entropy | |
| def classify_audio(features): | |
| _, entropy = additional_features(features) | |
| print(entropy) | |
| if entropy > 150: | |
| return True, entropy | |
| else: | |
| return False, entropy | |
| #-------------------------------------------------------------------------------------------------------------------- | |
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.responses import JSONResponse | |
| import torch | |
| from scipy.stats import skew, kurtosis, median_abs_deviation | |
| import shutil | |
| import subprocess | |
| import os | |
| import librosa | |
| os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" | |
| os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" | |
| os.environ["FONTCONFIG_PATH"] = "/tmp/fontconfig" | |
| os.environ["TF_ENABLE_ONEDNN_OPTS"]="0" | |
| os.environ["HF_HOME"] = "/tmp/huggingface_cache" | |
| os.makedirs("/tmp/matplotlib", exist_ok=True) | |
| os.makedirs("/tmp/fontconfig", exist_ok=True) | |
| os.makedirs("/tmp/huggingface_cache", exist_ok=True) | |
| SAVE_DIR = './audio' | |
| os.makedirs(SAVE_DIR, exist_ok=True) | |
| os.system('apt-get update && apt-get install -y ffmpeg') | |
| def reencode_audio(input_path, output_path): | |
| command = [ | |
| '/usr/bin/ffmpeg', '-i', input_path, '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', output_path | |
| ] | |
| subprocess.run(command, check=True) | |
| #-------------------------------------------------------------------------------------------------------------------- | |
| from collections import Counter | |
| from datetime import datetime | |
| import base64 | |
| async def upload_file(file: UploadFile = File(...)): | |
| print(f"Received file: {file.filename}") | |
| original_filename = file.filename.rsplit('.', 1)[0] | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| wav_filename = os.path.join(SAVE_DIR, f"{timestamp}.wav") | |
| reencoded_filename = os.path.join(SAVE_DIR, f"{timestamp}_reencoded.wav") | |
| # os.makedirs(SAVE_DIR, exist_ok=True) | |
| with open(wav_filename, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| reencode_audio(wav_filename, reencoded_filename) | |
| os.remove(wav_filename) | |
| print(f"File successfully re-encoded as: {reencoded_filename}") | |
| try: | |
| audio, sr = librosa.load(reencoded_filename, sr=None) | |
| print("Loaded successfully with librosa") | |
| except Exception as e: | |
| print(f"Error loading re-encoded file: {e}") | |
| new_features = extract_features(reencoded_filename) | |
| prediction, entropy = classify_audio(new_features) | |
| with open(reencoded_filename, "rb") as audio_file: | |
| audio_data = audio_file.read() | |
| # audio_base64 = base64.b64encode(audio_data).decode('utf-8') | |
| os.remove(reencoded_filename) | |
| return JSONResponse(content={ | |
| "prediction": bool(prediction), | |
| "entropy": float(entropy), | |
| }) | |
| async def upload_file(file: UploadFile = File(...)): | |
| print(f"Received file: {file.filename}") | |
| original_filename = file.filename.rsplit('.', 1)[0] | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| wav_filename = os.path.join(SAVE_DIR, f"{timestamp}.wav") | |
| reencoded_filename = os.path.join(SAVE_DIR, f"{timestamp}_reencoded.wav") | |
| # os.makedirs(SAVE_DIR, exist_ok=True) | |
| with open(wav_filename, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| reencode_audio(wav_filename, reencoded_filename) | |
| os.remove(wav_filename) | |
| print(f"File successfully re-encoded as: {reencoded_filename}") | |
| try: | |
| audio, sr = librosa.load(reencoded_filename, sr=None) | |
| print("Loaded successfully with librosa") | |
| except Exception as e: | |
| print(f"Error loading re-encoded file: {e}") | |
| new_features = extract_features(reencoded_filename) | |
| detector = UnifiedDeepfakeDetector() | |
| print(reencoded_filename) | |
| result = detector.analyze_audio_rf(reencoded_filename, model_choice="all") | |
| prediction, entropy = classify_audio(new_features) | |
| with open(reencoded_filename, "rb") as audio_file: | |
| audio_data = audio_file.read() | |
| result = list(result) | |
| result.append("FAKE" if float(entropy) < 150 else "REAL") | |
| print(result) | |
| r_normalized = [x.upper() for x in result if x is not None] | |
| counter = Counter(r_normalized) | |
| most_common_element, _ = counter.most_common(1)[0] | |
| print(f"The most frequent element is: {most_common_element}") | |
| audio_base64 = base64.b64encode(audio_data).decode('utf-8') | |
| print(f"Audio Data Length: {len(audio_data)}") | |
| os.remove(reencoded_filename) | |
| return JSONResponse(content={ | |
| "filename": file.filename, | |
| "prediction": most_common_element.upper(), | |
| "entropy": float(entropy), | |
| "audio": audio_base64, | |
| "content_type": "audio/wav" | |
| }) | |