import os import cv2 import gradio as gr import numpy as np from datetime import datetime from scrfd import SCRFD from arcface_onnx import ArcFaceONNX from sklearn.metrics.pairwise import cosine_similarity from sklearn.cluster import DBSCAN import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass import logging from typing import List, Tuple, Optional, Dict import json from pathlib import Path import shutil import requests import tempfile from urllib.parse import urlparse import logging from core.comparator import VideoComparator, ComparisonResult logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) IS_HUGGINGFACE = os.getenv("SPACE_ID") is not None if IS_HUGGINGFACE: YOUTUBE_SUPPORT = False logger.info("🌐 Hugging Face Spaces modunda çalışıyor - YouTube desteği kapalı") else: try: import yt_dlp YOUTUBE_SUPPORT = True logger.info("✅ YouTube desteği aktif") except ImportError: YOUTUBE_SUPPORT = False logger.warning("⚠️ yt-dlp yüklü değil") try: import yt_dlp YOUTUBE_SUPPORT = True except ImportError: YOUTUBE_SUPPORT = False logger.warning("Youtube desteği yüklü değil.") @dataclass class FaceDetectionConfig: frame_skip: int = 30 face_size_threshold: int = 1000 clustering_eps: float = 0.5 min_samples: int = 2 resize_factor: float = 0.5 chunk_size: int = 500 max_workers: int = 2 use_gpu: bool = False class FaceDetector: def __init__(self, config: FaceDetectionConfig): self.config = config self.models = None self.progress_callback = None self.temp_files = [] def set_progress_callback(self, callback): self.progress_callback = callback def is_youtube_url(self, url: str) -> bool: youtube_domains = ['youtube.com', 'youtu.be', 'youtube-nocookie.com'] parsed = urlparse(url) return any(domain in parsed.netloc for domain in youtube_domains) def download_youtube_video(self, url: str) -> str: if not YOUTUBE_SUPPORT: raise ValueError("YouTube desteği için paket kurulmalı") try: if self.progress_callback: self.progress_callback(0, "YouTube videosu indiriliyor...") temp_dir = tempfile.gettempdir() temp_filename = f"yt_{int(time.time())}_{np.random.randint(1000, 9999)}" temp_path_without_ext = os.path.join(temp_dir, temp_filename) ydl_opts = { 'format': 'best[ext=mp4][height<=720]/best[height<=720]/best', 'outtmpl': temp_path_without_ext + '.%(ext)s', 'quiet': True, 'no_warnings': True, 'socket_timeout': 60, 'retries': 3, 'fragment_retries': 3, 'keepvideo': True, 'merge_output_format': 'mp4', 'postprocessors': [{ 'key': 'FFmpegVideoConvertor', 'preferedformat': 'mp4', }], 'progress_hooks': [self._youtube_progress_hook], } logger.info(f"YouTube videosu indiriliyor: {url}") logger.info(f"Hedef dosya: {temp_path_without_ext}.mp4") with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) video_title = info.get('title', 'video') video_ext = info.get('ext', 'mp4') logger.info(f"YouTube video başlığı: {video_title}") final_path = temp_path_without_ext + '.mp4' if not os.path.exists(final_path): for ext in ['.mp4', '.webm', '.mkv']: alt_path = temp_path_without_ext + ext if os.path.exists(alt_path): final_path = alt_path logger.info(f"Video bulundu: {final_path}") break if not os.path.exists(final_path): possible_files = [f for f in os.listdir(temp_dir) if f.startswith(temp_filename)] if possible_files: final_path = os.path.join(temp_dir, possible_files[0]) logger.info(f"Alternatif dosya bulundu: {final_path}") else: raise ValueError(f"YouTube videosu indirilemedi! Beklenen: {final_path}") file_size = os.path.getsize(final_path) if file_size == 0: raise ValueError("İndirilen YouTube videosu boş!") self.temp_files.append(final_path) logger.info(f"YouTube videosu başarıyla indirildi: {final_path} ({file_size / 1024 / 1024:.1f}MB)") if self.progress_callback: self.progress_callback(20, f"YouTube videosu indirildi ({file_size / 1024 / 1024:.1f}MB)") return final_path except Exception as e: logger.error(f"YouTube indirme hatası: {e}", exc_info=True) raise ValueError(f"YouTube videosu indirilemedi: {str(e)}") def _youtube_progress_hook(self, d): if d['status'] == 'downloading': if 'total_bytes' in d: progress = (d['downloaded_bytes'] / d['total_bytes']) * 20 if self.progress_callback: self.progress_callback( progress, f"YouTube indiriliyor: {d['downloaded_bytes'] / 1024 / 1024:.1f}MB / {d['total_bytes'] / 1024 / 1024:.1f}MB" ) elif d['status'] == 'finished': if self.progress_callback: self.progress_callback(18, "YouTube videosu işleniyor...") def download_video_from_url(self, url: str) -> str: if self.is_youtube_url(url): return self.download_youtube_video(url) temp_path = None try: if self.progress_callback: self.progress_callback(0, "Video indiriliyor...") parsed = urlparse(url) if not parsed.scheme in ['http', 'https']: raise ValueError("Geçersiz URL! HTTP veya HTTPS protokolü kullanın.") # Dosya uzantısını belirle ext = os.path.splitext(parsed.path)[1] if not ext or ext not in ['.mp4', '.avi', '.mov', '.mkv', '.webm']: ext = '.mp4' # Geçici dosya oluştur temp_fd, temp_path = tempfile.mkstemp(suffix=ext, prefix='video_') os.close(temp_fd) # File descriptor'ı kapat self.temp_files.append(temp_path) logger.info(f"Geçici dosya oluşturuldu: {temp_path}") # URL'den indir response = requests.get(url, stream=True, timeout=60, headers={'User-Agent': 'Mozilla/5.0'}) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) downloaded = 0 with open(temp_path, 'wb') as f: for chunk in response.iter_content(chunk_size=65536): # 64KB chunks if chunk: f.write(chunk) downloaded += len(chunk) if total_size > 0 and self.progress_callback: progress = (downloaded / total_size) * 20 if downloaded % (1024 * 1024) < 65536: # Her 1MB'de güncelle self.progress_callback( progress, f"İndiriliyor: {downloaded / 1024 / 1024:.1f}MB / {total_size / 1024 / 1024:.1f}MB" ) if not os.path.exists(temp_path): raise ValueError("Video dosyası oluşturulamadı!") file_size = os.path.getsize(temp_path) if file_size == 0: raise ValueError("İndirilen video dosyası boş!") logger.info(f"Video başarıyla indirildi: {temp_path} ({file_size / 1024 / 1024:.1f}MB)") if self.progress_callback: self.progress_callback(20, f"Video indirildi ({file_size / 1024 / 1024:.1f}MB), işleme başlanıyor...") return temp_path except requests.exceptions.Timeout: if temp_path and os.path.exists(temp_path): os.unlink(temp_path) raise ValueError("Video indirme zaman aşımına uğradı. Lütfen tekrar deneyin.") except requests.exceptions.RequestException as e: if temp_path and os.path.exists(temp_path): os.unlink(temp_path) raise ValueError(f"Video indirme hatası: {str(e)}") except Exception as e: if temp_path and os.path.exists(temp_path): os.unlink(temp_path) raise ValueError(f"Beklenmeyen hata: {str(e)}") def cleanup_temp_files(self): for temp_file in self.temp_files: try: if os.path.exists(temp_file): os.unlink(temp_file) logger.info(f"Geçici dosya silindi: {temp_file}") except Exception as e: logger.warning(f"Geçici dosya silinemedi {temp_file}: {e}") self.temp_files = [] def _load_models(self) -> Tuple[SCRFD, ArcFaceONNX]: try: logger.info("Modeller yükleniyor (CPU mode)...") current_dir = os.path.dirname(os.path.abspath(__file__)) models_dir = os.path.join(current_dir, 'deploy', 'models') import onnxruntime as ort sess_options = ort.SessionOptions() ort.set_default_logger_severity(3) sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL sess_options.intra_op_num_threads = 2 # Sadece CPU provider providers = ['CPUExecutionProvider'] det_model = os.path.join(models_dir, 'det_10g.onnx') arc_model = os.path.join(models_dir, 'w600k_r50.onnx') if not os.path.exists(det_model) or not os.path.exists(arc_model): raise FileNotFoundError(f"Model dosyaları bulunamadı: {models_dir}") detector = SCRFD(det_model) detector.session = ort.InferenceSession(det_model, sess_options, providers=providers) recognizer = ArcFaceONNX(arc_model) recognizer.session = ort.InferenceSession(arc_model, sess_options, providers=providers) logger.info(f"✅ CPU mode aktif: {recognizer.session.get_providers()}") return detector, recognizer except Exception as e: logger.error(f"Model yükleme hatası: {e}") raise def create_output_directory(self, video_path: str, is_temp: bool = False) -> str: logger.info(f"burası {self},{video_path},{is_temp}") if is_temp: # URL/YouTube için temp dizini kullan temp_dir = tempfile.gettempdir() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = os.path.join(temp_dir, f"face_detection_{timestamp}") else: # Yerel dosya için aynı dizini kullan base_dir = os.path.dirname(video_path) video_name = os.path.splitext(os.path.basename(video_path))[0] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = os.path.join(base_dir, f"{video_name}_{timestamp}") os.makedirs(output_dir, exist_ok=True) logger.info(f"Output dizini oluşturuldu: {output_dir}") return output_dir def extract_embeddings(self, face_img: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: try: if face_img is None or face_img.size == 0: logger.warning("Boş veya None face_img") return None, None if face_img.shape[0] < 10 or face_img.shape[1] < 10: logger.warning(f"Yüz çok küçük: {face_img.shape}") return None, None detector, recognizer = self.models bboxes, kpss = detector.autodetect(face_img, max_num=1) if len(bboxes) == 0: logger.warning("Yüz tespit edilemedi") return None, None kps = kpss[0] if kps is None or len(kps) < 5: logger.warning("Geçersiz keypoints") return None, None embedding = recognizer.get(face_img, kps) if embedding is None: logger.warning("Embedding None döndü") return None, None if np.isnan(embedding).any() or np.isinf(embedding).any(): logger.warning("Embedding NaN veya Inf içeriyor") return None, None if np.allclose(embedding, 0): logger.warning("Embedding sıfır vektör") return None, None return embedding, kps except Exception as e: logger.error(f"Embedding çıkarma hatası: {e}") return None, None def calculate_face_quality(self, face_img: np.ndarray, face_size: float, kps: np.ndarray) -> float: try: quality_score = 0 if face_size <= 0: logger.warning("Geçersiz face_size: <= 0") return 0.0 # Size score size_score = min(face_size / 5000, 2.0) quality_score += size_score # Keypoint kontrolü if kps is None or len(kps) < 5: logger.warning("Geçersiz keypoints") return quality_score left_eye, right_eye = kps[0], kps[1] eye_distance = np.linalg.norm(left_eye - right_eye) face_width = np.sqrt(face_size) if face_width <= 0: logger.warning("face_width <= 0") return quality_score eye_ratio = eye_distance / face_width angle_score = min(eye_ratio * 3, 2.0) quality_score += angle_score gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY) blur_var = cv2.Laplacian(gray, cv2.CV_64F).var() blur_score = min(blur_var / 500, 2.0) quality_score += blur_score left_mouth, right_mouth = kps[3], kps[4] mouth_distance = np.linalg.norm(left_mouth - right_mouth) mouth_ratio = mouth_distance / face_width symmetry_score = min(mouth_ratio * 3, 2.0) quality_score += symmetry_score return quality_score except Exception as e: logger.error(f"Quality hesaplama hatası: {e}") return 1.0 def process_frame(self, frame: np.ndarray) -> List[Dict]: frame = cv2.resize(frame, (0, 0), fx=self.config.resize_factor, fy=self.config.resize_factor) detector, _ = self.models faces_data = [] try: bboxes, _ = detector.autodetect(frame) for x1, y1, x2, y2, _ in bboxes: face_size = (x2 - x1) * (y2 - y1) if face_size < self.config.face_size_threshold: continue face_img = frame[int(y1):int(y2), int(x1):int(x2)] embedding, kps = self.extract_embeddings(face_img) if embedding is not None and kps is not None: quality_score = self.calculate_face_quality(face_img, face_size, kps) faces_data.append({ 'embedding': embedding, 'face_img': face_img, 'quality_score': quality_score, 'bbox': [float(x1), float(y1), float(x2), float(y2)] }) except Exception as e: logger.error(f"Frame işleme hatası: {e}") return faces_data def process_video_chunk(self, frames: List[np.ndarray]) -> List[Dict]: all_faces = [] for frame in frames: faces = self.process_frame(frame) all_faces.extend(faces) return all_faces def detect_faces(self, video_path: str, is_url: bool = False): start_time = time.time() original_path = video_path downloaded_path = None try: if is_url: downloaded_path = self.download_video_from_url(video_path) video_path = downloaded_path logger.info(f"URL'den indirilen video kullanılıyor: {video_path}") # Video dosyasının varlığını kontrol et if not os.path.exists(video_path): raise ValueError(f"Video dosyası bulunamadı: {video_path}") file_size = os.path.getsize(video_path) if file_size == 0: raise ValueError(f"Video dosyası boş: {video_path}") logger.info(f"Video dosyası kontrol edildi: {video_path} ({file_size / 1024 / 1024:.1f}MB)") if self.models is None: self.models = self._load_models() output_dir = self.create_output_directory(video_path if not is_url else tempfile.gettempdir(), is_temp=is_url) metadata = { 'video_path': original_path, 'is_url': is_url, 'processing_start': datetime.now().isoformat(), 'config': vars(self.config), 'faces': [] } cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Video açılamadı: {video_path}. Dosya bozuk veya desteklenmeyen format olabilir.") total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = int(cap.get(cv2.CAP_PROP_FPS)) duration = total_frames / fps if fps > 0 else 0 logger.info(f"Video: {total_frames} frame, {fps} FPS, {duration:.1f} saniye") progress_offset = 20 if is_url else 0 max_progress = 80 if is_url else 100 if self.progress_callback: self.progress_callback(progress_offset, f"Video açıldı: {total_frames} frame") current_frames = [] all_faces_data = [] frame_count = 0 with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: while True: ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % self.config.frame_skip == 0: current_frames.append(frame) if len(current_frames) >= self.config.chunk_size: future = executor.submit(self.process_video_chunk, current_frames) all_faces_data.extend(future.result()) current_frames = [] if frame_count % 500 == 0: progress = (frame_count / total_frames) * 100 if self.progress_callback: adjusted_progress = progress_offset + (progress / 2) * ((max_progress - progress_offset) / 100) self.progress_callback( adjusted_progress, f"Frame işleniyor: {frame_count}/{total_frames} ({progress:.1f}%)" ) if current_frames: future = executor.submit(self.process_video_chunk, current_frames) all_faces_data.extend(future.result()) cap.release() if not all_faces_data: raise ValueError("Hiç yüz bulunamadı!") clustering_progress = progress_offset + (max_progress - progress_offset) * 0.6 if self.progress_callback: self.progress_callback(clustering_progress, f"{len(all_faces_data)} yüz tespit edildi, clustering yapılıyor...") embeddings_array = np.array([face['embedding'] for face in all_faces_data]) clustering = DBSCAN( eps=self.config.clustering_eps, min_samples=self.config.min_samples, metric='cosine' ).fit(embeddings_array) labels = clustering.labels_ n_clusters = len(set(labels)) - (1 if -1 in labels else 0) saving_progress = progress_offset + (max_progress - progress_offset) * 0.8 if self.progress_callback: self.progress_callback(saving_progress, f"{n_clusters} benzersiz kişi tespit edildi, yüzler kaydediliyor...") saved_faces = [] for cluster_id in range(n_clusters): cluster_indices = np.where(labels == cluster_id)[0] cluster_faces = [all_faces_data[i] for i in cluster_indices] best_face = max(cluster_faces, key=lambda x: x['quality_score']) face_img_resized = cv2.resize(best_face['face_img'], (112, 112)) face_file = f"person_{cluster_id}.jpg" face_path = os.path.join(output_dir, face_file) cv2.imwrite(face_path, face_img_resized, [cv2.IMWRITE_JPEG_QUALITY, 95]) saved_faces.append(face_path) metadata['faces'].append({ 'cluster_id': cluster_id, 'face_file': face_file, 'quality_score': float(best_face['quality_score']), 'bbox': best_face['bbox'], 'cluster_size': len(cluster_indices) }) elapsed_time = time.time() - start_time metadata['processing_end'] = datetime.now().isoformat() metadata['elapsed_time'] = elapsed_time metadata['total_frames'] = total_frames metadata['fps'] = fps metadata['duration'] = duration metadata['unique_persons'] = n_clusters metadata_path = os.path.join(output_dir, 'metadata.json') with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, indent=2, ensure_ascii=False) if self.progress_callback: self.progress_callback(100, f"✅ Tamamlandı! {n_clusters} kişi bulundu ({elapsed_time:.1f}s)") return output_dir, saved_faces, metadata except Exception as e: logger.error(f"İşlem hatası: {e}") raise finally: if is_url: self.cleanup_temp_files() detector_instance = None comparator_instance = None def initialize_detector(frame_skip, face_threshold, clustering_eps, use_gpu): global detector_instance config = FaceDetectionConfig( frame_skip=frame_skip, face_size_threshold=face_threshold, clustering_eps=clustering_eps, use_gpu=use_gpu ) detector_instance = FaceDetector(config) return "✅ Ayarlar kaydedildi!" def initialize_comparator(): """Video karşılaştırıcı instance'ını başlat""" global comparator_instance, detector_instance if detector_instance is None: detector_instance = FaceDetector(FaceDetectionConfig()) if comparator_instance is None: comparator_instance = VideoComparator( face_detector=detector_instance, similarity_threshold=0.6 # %60 benzerlik eşiği ) return comparator_instance def process_video_gradio(video_file, video_url, progress=gr.Progress()): global detector_instance if detector_instance is None: detector_instance = FaceDetector(FaceDetectionConfig()) def update_progress(value, message): progress(value / 100, desc=message) detector_instance.set_progress_callback(update_progress) try: progress(0, desc="İşlem başlatılıyor...") if IS_HUGGINGFACE and video_url and video_url.strip(): return [], "❌ Hugging Face Spaces'te URL/YouTube desteği bulunmamaktadır. Lütfen video dosyası yükleyin.", "❌ URL desteği yok" if video_url and video_url.strip(): video_source = video_url.strip() is_url = True source_name = urlparse(video_url).path.split('/')[-1] or "video" logger.info(f"URL kullanılıyor: {video_url}") # YouTube mu kontrol et if detector_instance.is_youtube_url(video_url): if not YOUTUBE_SUPPORT: return [], "❌ YouTube desteği için paket kurulmalı", "❌ paket kurulu değil" logger.info("YouTube URL tespit edildi") elif video_file: video_source = video_file is_url = False source_name = os.path.basename(video_file) logger.info(f"Yerel dosya kullanılıyor: {video_file}") else: return [], "❌ Lütfen bir video yükleyin veya URL girin!", "❌ Video bulunamadı" # URL test (YouTube değilse) if is_url and not detector_instance.is_youtube_url(video_source): try: head_response = requests.head(video_source, timeout=10, allow_redirects=True) logger.info(f"URL test - Status: {head_response.status_code}, Content-Type: {head_response.headers.get('content-type', 'unknown')}") if head_response.status_code != 200: return [], f"❌ URL erişilemez (HTTP {head_response.status_code})", "❌ URL hatası" except Exception as e: logger.warning(f"URL test başarısız: {e}, yine de deneniyor...") # Video süresini kontrol et (detect_faces çağrılmadan önce) if not is_url: cap = cv2.VideoCapture(video_source) if cap.isOpened(): total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = int(cap.get(cv2.CAP_PROP_FPS)) duration = total_frames / fps if fps > 0 else 0 cap.release() if duration > 300: # 5 dakika limiti return [], f"⚠️ Video çok uzun ({duration:.0f} saniye)! CPU modunda maksimum 5 dakika (300 saniye) desteklenir.", "❌ Süre limiti aşıldı" output_dir, saved_faces, metadata = detector_instance.detect_faces(video_source, is_url=is_url) # URL'den indirilen videolar için de süre kontrolü if is_url and metadata['duration'] > 300: return [], f"⚠️ Video çok uzun ({metadata['duration']:.0f} saniye)! CPU modunda maksimum 5 dakika desteklenir.", "❌ Süre limiti aşıldı" report = f""" # 📊 İşlem Raporu ## Genel Bilgiler - **Video**: {source_name} - **Kaynak**: {'🌐 URL' if is_url else '📁 Yerel Dosya'} - **Süre**: {metadata['duration']:.1f} saniye - **FPS**: {metadata['fps']} - **Toplam Frame**: {metadata['total_frames']} - **İşlem Süresi**: {metadata['elapsed_time']:.1f} saniye ## Tespit Sonuçları - **Benzersiz Kişi**: {metadata['unique_persons']} - **Toplam Yüz Tespiti**: {sum(f['cluster_size'] for f in metadata['faces'])} ## Kişi Detayları """ for face in metadata['faces']: report += f"\n### Kişi {face['cluster_id']}\n" report += f"- Kalite Skoru: {face['quality_score']:.2f}\n" report += f"- Görülme Sayısı: {face['cluster_size']}\n" return saved_faces, report, f"✅ Başarılı! Çıktı: {output_dir}" except Exception as e: error_msg = f"❌ Hata: {str(e)}" logger.error(error_msg) return [], error_msg, error_msg def compare_two_faces(face1, face2): global detector_instance if detector_instance is None: detector_instance = FaceDetector(FaceDetectionConfig()) detector_instance.models = detector_instance._load_models() try: img1 = cv2.imread(face1) if isinstance(face1, str) else cv2.cvtColor(face1, cv2.COLOR_RGB2BGR) img2 = cv2.imread(face2) if isinstance(face2, str) else cv2.cvtColor(face2, cv2.COLOR_RGB2BGR) emb1, _ = detector_instance.extract_embeddings(img1) emb2, _ = detector_instance.extract_embeddings(img2) if emb1 is None or emb2 is None: return "❌ Yüz tespit edilemedi!" similarity = cosine_similarity([emb1], [emb2])[0][0] percentage = similarity * 100 if percentage > 70: result = f"✅ Aynı Kişi ({percentage:.1f}% benzerlik)" elif percentage > 50: result = f"⚠️ Muhtemelen Aynı Kişi ({percentage:.1f}% benzerlik)" else: result = f"❌ Farklı Kişiler ({percentage:.1f}% benzerlik)" return result except Exception as e: return f"❌ Hata: {str(e)}" def compare_videos_gradio(video1, video2, url1, url2, similarity_threshold, progress=gr.Progress()): """ Gradio callback: İki videoyu karşılaştır """ global comparator_instance # Comparator'ı başlat comparator = initialize_comparator() comparator.similarity_threshold = similarity_threshold def update_progress(value, message): progress(value / 100, desc=message) comparator.set_progress_callback(update_progress) try: progress(0, desc="Videolar hazırlanıyor...") # Video 1 kaynağını belirle if url1 and url1.strip() and not IS_HUGGINGFACE: video1_source = url1.strip() is_v1_url = True elif video1: video1_source = video1 is_v1_url = False else: return [], "❌ Lütfen Video 1 yükleyin!", "❌ Video 1 eksik" # Video 2 kaynağını belirle if url2 and url2.strip() and not IS_HUGGINGFACE: video2_source = url2.strip() is_v2_url = True elif video2: video2_source = video2 is_v2_url = False else: return [], "❌ Lütfen Video 2 yükleyin!", "❌ Video 2 eksik" # Hugging Face kontrolü if IS_HUGGINGFACE and (is_v1_url or is_v2_url): return [], "❌ Hugging Face Spaces'te URL desteği yok!", "❌ URL desteği yok" # Karşılaştırmayı yap result, output_dir, saved_images = comparator.compare_videos( video1_source, video2_source, is_video1_url=is_v1_url, is_video2_url=is_v2_url ) # Rapor oluştur report = comparator.generate_report(result) # Özet mesaj summary = f""" ✅ **Karşılaştırma Tamamlandı!** 🎯 **Sonuçlar:** - Ortak Kişi: **{result.common_count}** - Sadece Video 1: **{len(result.only_video1)}** - Sadece Video 2: **{len(result.only_video2)}** 📁 Çıktı: {output_dir} """ return saved_images, report, summary except Exception as e: error_msg = f"❌ Hata: {str(e)}" logger.error(error_msg, exc_info=True) return [], error_msg, error_msg with gr.Blocks(title="Yüz Tanıma Sistemi", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🎭 Video Yüz Tanıma Sistemi Video dosyalarından otomatik yüz tespiti ve tanıma yapın ⚠️ **CPU Modunda Çalışıyor**: İşlem süresi uzun olabilir (5 dk video = ~10-15 dk) """) with gr.Tabs(): with gr.Tab("📹 Video İşle"): gr.Markdown("### Video kaynağını seçin:") with gr.Row(): with gr.Column(): video_input = gr.Video(label="📁 Yerel Video Yükle",height=300) if not IS_HUGGINGFACE: gr.Markdown("**VEYA**") url_input = gr.Textbox( label="🌐 Video URL'si", placeholder="https://example.com/video.mp4 veya YouTube linki", lines=1, interactive=True ) gr.Markdown("*URL girilirse öncelikle o kullanılır*") else: url_input = gr.Textbox( label="🌐 Video URL'si (Devre Dışı)", placeholder="Hugging Face Spaces'te URL desteği yok", lines=1, interactive=False, visible=True ) process_btn = gr.Button("🚀 İşlemi Başlat", variant="primary", size="lg") status_text = gr.Textbox(label="Durum", interactive=False) with gr.Column(scale=1): gallery_output = gr.Gallery(label="Tespit Edilen Yüzler", columns=4, height=500,object_fit="contain") report_output = gr.Markdown(label="Rapor") gr.Markdown(""" #### 💡 URL Örnekleri: - **YouTube**: `https://www.youtube.com/watch?v=xxxxx` veya `https://youtu.be/xxxxx` veya Shorts - **Doğrudan video**: `https://example.com/video.mp4` - Google Drive paylaşım linki çalışmaz (direkt indirme linki gerekir) - **Desteklenen formatlar**: MP4, AVI, MOV, MKV, WebM """) process_btn.click( fn=process_video_gradio, inputs=[video_input, url_input], outputs=[gallery_output, report_output, status_text] ) with gr.Tab("🔍 Yüz Karşılaştır"): gr.Markdown("İki yüz görselini yükleyin ve benzerliklerini kontrol edin") with gr.Row(): with gr.Column(scale=1): face1_input = gr.Image( label="Yüz 1", type="filepath", height=300, width=300 ) with gr.Column(scale=1): face2_input = gr.Image( label="Yüz 2", type="filepath", height=300, width=300 ) compare_btn = gr.Button("⚖️ Karşılaştır", variant="primary") compare_result = gr.Textbox(label="Sonuç", interactive=False, lines=3) compare_btn.click( fn=compare_two_faces, inputs=[face1_input, face2_input], outputs=compare_result ) with gr.Tab("🎬 Video Karşılaştır"): gr.Markdown(""" ## İki Videoyu Karşılaştır İki farklı videodan yüz tespiti yaparak ortak kişileri bulur """) # Hugging Face uyarısı if IS_HUGGINGFACE: gr.Markdown(""" ⚠️ **Hugging Face Spaces Modunda**: URL/YouTube desteği kapalı. Sadece dosya yükleme kullanılabilir. """) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📹 Video 1") video1_input = gr.Video( label="Video 1 Yükle", height=250, show_label=True ) if not IS_HUGGINGFACE: gr.Markdown("**VEYA**") url1_input = gr.Textbox( label="Video 1 URL", placeholder="https://example.com/video1.mp4", lines=1, interactive=True ) else: url1_input = gr.Textbox( label="Video 1 URL (Devre Dışı)", placeholder="HF Spaces'te URL desteği yok", lines=1, interactive=False ) with gr.Column(scale=1): gr.Markdown("### 📹 Video 2") video2_input = gr.Video( label="Video 2 Yükle", height=250, show_label=True ) if not IS_HUGGINGFACE: gr.Markdown("**VEYA**") url2_input = gr.Textbox( label="Video 2 URL", placeholder="https://example.com/video2.mp4", lines=1, interactive=True ) else: url2_input = gr.Textbox( label="Video 2 URL (Devre Dışı)", placeholder="HF Spaces'te URL desteği yok", lines=1, interactive=False ) # Ayarlar with gr.Row(): similarity_slider = gr.Slider( minimum=0.4, maximum=0.9, value=0.6, step=0.05, label="🎯 Benzerlik Eşiği (Düşük = Daha fazla eşleşme)", info="İki yüzün aynı kişi olarak kabul edilmesi için minimum benzerlik oranı" ) compare_videos_btn = gr.Button("🔍 Videoları Karşılaştır", variant="primary", size="lg") status_compare = gr.Textbox(label="Durum", interactive=False, lines=3) gr.Markdown("### 📊 Sonuçlar") with gr.Row(): with gr.Column(scale=2): gallery_compare = gr.Gallery( label="Ortak Yüzler (Sol: Video 1, Sağ: Video 2)", columns=2, height=500, object_fit="contain" ) with gr.Column(scale=1): report_compare = gr.Markdown(label="Detaylı Rapor") gr.Markdown(""" ### 💡 Nasıl Çalışır? 1. **İki video yükleyin** (veya URL girin) 2. **Benzerlik eşiğini ayarlayın** (varsayılan %60) 3. **Karşılaştır** butonuna tıklayın 4. Sistem her iki videodan yüz tespiti yapar ve ortak kişileri bulur ### 📈 Sonuç Kategorileri - **Ortak Yüzler**: Her iki videoda da görünen kişiler (yan yana gösterilir) - **Sadece Video 1**: Yalnızca ilk videoda görünen kişiler - **Sadece Video 2**: Yalnızca ikinci videoda görünen kişiler """) # Event handler compare_videos_btn.click( fn=compare_videos_gradio, inputs=[ video1_input, video2_input, url1_input, url2_input, similarity_slider ], outputs=[ gallery_compare, report_compare, status_compare ] ) with gr.Tab("⚙️ Ayarlar"): gr.Markdown("### Gelişmiş Ayarlar") frame_skip_slider = gr.Slider(20, 60, value=30, step=5, label="Frame Atlama (yüksek = daha hızlı)") face_threshold_slider = gr.Slider(600, 2000, value=1000, step=100, label="Minimum Yüz Boyutu (piksel)") clustering_slider = gr.Slider(0.3, 0.7, value=0.5, step=0.05, label="Clustering Hassasiyeti") save_settings_btn = gr.Button("💾 Ayarları Kaydet") settings_status = gr.Textbox(label="Durum", interactive=False) save_settings_btn.click( fn=initialize_detector, inputs=[frame_skip_slider, face_threshold_slider, clustering_slider], outputs=settings_status ) gr.Markdown(""" --- ### 💡 İpuçları - **Frame Atlama**: Daha hızlı işlem için artırın, daha fazla tespit için azaltın - **Clustering**: Daha az kişi tespit ediyorsa artırın, fazla tespit ediyorsa azaltın """) if __name__ == "__main__": print("\n" + "="*60) print("🎬 Video Yüz Tanıma Sistemi") print("="*60) print("="*60 + "\n") demo.launch(share=False, server_name="0.0.0.0", server_port=7860)