Spaces:
Sleeping
Sleeping
| import os | |
| import cv2 | |
| import gradio as gr | |
| import numpy as np | |
| from datetime import datetime | |
| from scrfd import SCRFD | |
| from arcface_onnx import ArcFaceONNX | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.cluster import DBSCAN | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| from dataclasses import dataclass | |
| import logging | |
| from typing import List, Tuple, Optional, Dict | |
| import json | |
| from pathlib import Path | |
| import shutil | |
| import requests | |
| import tempfile | |
| from urllib.parse import urlparse | |
| import logging | |
| from core.comparator import VideoComparator, ComparisonResult | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| IS_HUGGINGFACE = os.getenv("SPACE_ID") is not None | |
| if IS_HUGGINGFACE: | |
| YOUTUBE_SUPPORT = False | |
| logger.info("🌐 Hugging Face Spaces modunda çalışıyor - YouTube desteği kapalı") | |
| else: | |
| try: | |
| import yt_dlp | |
| YOUTUBE_SUPPORT = True | |
| logger.info("✅ YouTube desteği aktif") | |
| except ImportError: | |
| YOUTUBE_SUPPORT = False | |
| logger.warning("⚠️ yt-dlp yüklü değil") | |
| try: | |
| import yt_dlp | |
| YOUTUBE_SUPPORT = True | |
| except ImportError: | |
| YOUTUBE_SUPPORT = False | |
| logger.warning("Youtube desteği yüklü değil.") | |
| class FaceDetectionConfig: | |
| frame_skip: int = 30 | |
| face_size_threshold: int = 1000 | |
| clustering_eps: float = 0.5 | |
| min_samples: int = 2 | |
| resize_factor: float = 0.5 | |
| chunk_size: int = 500 | |
| max_workers: int = 2 | |
| use_gpu: bool = False | |
| class FaceDetector: | |
| def __init__(self, config: FaceDetectionConfig): | |
| self.config = config | |
| self.models = None | |
| self.progress_callback = None | |
| self.temp_files = [] | |
| def set_progress_callback(self, callback): | |
| self.progress_callback = callback | |
| def is_youtube_url(self, url: str) -> bool: | |
| youtube_domains = ['youtube.com', 'youtu.be', 'youtube-nocookie.com'] | |
| parsed = urlparse(url) | |
| return any(domain in parsed.netloc for domain in youtube_domains) | |
| def download_youtube_video(self, url: str) -> str: | |
| if not YOUTUBE_SUPPORT: | |
| raise ValueError("YouTube desteği için paket kurulmalı") | |
| try: | |
| if self.progress_callback: | |
| self.progress_callback(0, "YouTube videosu indiriliyor...") | |
| temp_dir = tempfile.gettempdir() | |
| temp_filename = f"yt_{int(time.time())}_{np.random.randint(1000, 9999)}" | |
| temp_path_without_ext = os.path.join(temp_dir, temp_filename) | |
| ydl_opts = { | |
| 'format': 'best[ext=mp4][height<=720]/best[height<=720]/best', | |
| 'outtmpl': temp_path_without_ext + '.%(ext)s', | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'socket_timeout': 60, | |
| 'retries': 3, | |
| 'fragment_retries': 3, | |
| 'keepvideo': True, | |
| 'merge_output_format': 'mp4', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegVideoConvertor', | |
| 'preferedformat': 'mp4', | |
| }], | |
| 'progress_hooks': [self._youtube_progress_hook], | |
| } | |
| logger.info(f"YouTube videosu indiriliyor: {url}") | |
| logger.info(f"Hedef dosya: {temp_path_without_ext}.mp4") | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| video_title = info.get('title', 'video') | |
| video_ext = info.get('ext', 'mp4') | |
| logger.info(f"YouTube video başlığı: {video_title}") | |
| final_path = temp_path_without_ext + '.mp4' | |
| if not os.path.exists(final_path): | |
| for ext in ['.mp4', '.webm', '.mkv']: | |
| alt_path = temp_path_without_ext + ext | |
| if os.path.exists(alt_path): | |
| final_path = alt_path | |
| logger.info(f"Video bulundu: {final_path}") | |
| break | |
| if not os.path.exists(final_path): | |
| possible_files = [f for f in os.listdir(temp_dir) if f.startswith(temp_filename)] | |
| if possible_files: | |
| final_path = os.path.join(temp_dir, possible_files[0]) | |
| logger.info(f"Alternatif dosya bulundu: {final_path}") | |
| else: | |
| raise ValueError(f"YouTube videosu indirilemedi! Beklenen: {final_path}") | |
| file_size = os.path.getsize(final_path) | |
| if file_size == 0: | |
| raise ValueError("İndirilen YouTube videosu boş!") | |
| self.temp_files.append(final_path) | |
| logger.info(f"YouTube videosu başarıyla indirildi: {final_path} ({file_size / 1024 / 1024:.1f}MB)") | |
| if self.progress_callback: | |
| self.progress_callback(20, f"YouTube videosu indirildi ({file_size / 1024 / 1024:.1f}MB)") | |
| return final_path | |
| except Exception as e: | |
| logger.error(f"YouTube indirme hatası: {e}", exc_info=True) | |
| raise ValueError(f"YouTube videosu indirilemedi: {str(e)}") | |
| def _youtube_progress_hook(self, d): | |
| if d['status'] == 'downloading': | |
| if 'total_bytes' in d: | |
| progress = (d['downloaded_bytes'] / d['total_bytes']) * 20 | |
| if self.progress_callback: | |
| self.progress_callback( | |
| progress, | |
| f"YouTube indiriliyor: {d['downloaded_bytes'] / 1024 / 1024:.1f}MB / {d['total_bytes'] / 1024 / 1024:.1f}MB" | |
| ) | |
| elif d['status'] == 'finished': | |
| if self.progress_callback: | |
| self.progress_callback(18, "YouTube videosu işleniyor...") | |
| def download_video_from_url(self, url: str) -> str: | |
| if self.is_youtube_url(url): | |
| return self.download_youtube_video(url) | |
| temp_path = None | |
| try: | |
| if self.progress_callback: | |
| self.progress_callback(0, "Video indiriliyor...") | |
| parsed = urlparse(url) | |
| if not parsed.scheme in ['http', 'https']: | |
| raise ValueError("Geçersiz URL! HTTP veya HTTPS protokolü kullanın.") | |
| # Dosya uzantısını belirle | |
| ext = os.path.splitext(parsed.path)[1] | |
| if not ext or ext not in ['.mp4', '.avi', '.mov', '.mkv', '.webm']: | |
| ext = '.mp4' | |
| # Geçici dosya oluştur | |
| temp_fd, temp_path = tempfile.mkstemp(suffix=ext, prefix='video_') | |
| os.close(temp_fd) # File descriptor'ı kapat | |
| self.temp_files.append(temp_path) | |
| logger.info(f"Geçici dosya oluşturuldu: {temp_path}") | |
| # URL'den indir | |
| response = requests.get(url, stream=True, timeout=60, | |
| headers={'User-Agent': 'Mozilla/5.0'}) | |
| response.raise_for_status() | |
| total_size = int(response.headers.get('content-length', 0)) | |
| downloaded = 0 | |
| with open(temp_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=65536): # 64KB chunks | |
| if chunk: | |
| f.write(chunk) | |
| downloaded += len(chunk) | |
| if total_size > 0 and self.progress_callback: | |
| progress = (downloaded / total_size) * 20 | |
| if downloaded % (1024 * 1024) < 65536: # Her 1MB'de güncelle | |
| self.progress_callback( | |
| progress, | |
| f"İndiriliyor: {downloaded / 1024 / 1024:.1f}MB / {total_size / 1024 / 1024:.1f}MB" | |
| ) | |
| if not os.path.exists(temp_path): | |
| raise ValueError("Video dosyası oluşturulamadı!") | |
| file_size = os.path.getsize(temp_path) | |
| if file_size == 0: | |
| raise ValueError("İndirilen video dosyası boş!") | |
| logger.info(f"Video başarıyla indirildi: {temp_path} ({file_size / 1024 / 1024:.1f}MB)") | |
| if self.progress_callback: | |
| self.progress_callback(20, f"Video indirildi ({file_size / 1024 / 1024:.1f}MB), işleme başlanıyor...") | |
| return temp_path | |
| except requests.exceptions.Timeout: | |
| if temp_path and os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| raise ValueError("Video indirme zaman aşımına uğradı. Lütfen tekrar deneyin.") | |
| except requests.exceptions.RequestException as e: | |
| if temp_path and os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| raise ValueError(f"Video indirme hatası: {str(e)}") | |
| except Exception as e: | |
| if temp_path and os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| raise ValueError(f"Beklenmeyen hata: {str(e)}") | |
| def cleanup_temp_files(self): | |
| for temp_file in self.temp_files: | |
| try: | |
| if os.path.exists(temp_file): | |
| os.unlink(temp_file) | |
| logger.info(f"Geçici dosya silindi: {temp_file}") | |
| except Exception as e: | |
| logger.warning(f"Geçici dosya silinemedi {temp_file}: {e}") | |
| self.temp_files = [] | |
| def _load_models(self) -> Tuple[SCRFD, ArcFaceONNX]: | |
| try: | |
| logger.info("Modeller yükleniyor (CPU mode)...") | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| models_dir = os.path.join(current_dir, 'deploy', 'models') | |
| import onnxruntime as ort | |
| sess_options = ort.SessionOptions() | |
| ort.set_default_logger_severity(3) | |
| sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| sess_options.intra_op_num_threads = 2 | |
| # Sadece CPU provider | |
| providers = ['CPUExecutionProvider'] | |
| det_model = os.path.join(models_dir, 'det_10g.onnx') | |
| arc_model = os.path.join(models_dir, 'w600k_r50.onnx') | |
| if not os.path.exists(det_model) or not os.path.exists(arc_model): | |
| raise FileNotFoundError(f"Model dosyaları bulunamadı: {models_dir}") | |
| detector = SCRFD(det_model) | |
| detector.session = ort.InferenceSession(det_model, sess_options, providers=providers) | |
| recognizer = ArcFaceONNX(arc_model) | |
| recognizer.session = ort.InferenceSession(arc_model, sess_options, providers=providers) | |
| logger.info(f"✅ CPU mode aktif: {recognizer.session.get_providers()}") | |
| return detector, recognizer | |
| except Exception as e: | |
| logger.error(f"Model yükleme hatası: {e}") | |
| raise | |
| def create_output_directory(self, video_path: str, is_temp: bool = False) -> str: | |
| logger.info(f"burası {self},{video_path},{is_temp}") | |
| if is_temp: | |
| # URL/YouTube için temp dizini kullan | |
| temp_dir = tempfile.gettempdir() | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_dir = os.path.join(temp_dir, f"face_detection_{timestamp}") | |
| else: | |
| # Yerel dosya için aynı dizini kullan | |
| base_dir = os.path.dirname(video_path) | |
| video_name = os.path.splitext(os.path.basename(video_path))[0] | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_dir = os.path.join(base_dir, f"{video_name}_{timestamp}") | |
| os.makedirs(output_dir, exist_ok=True) | |
| logger.info(f"Output dizini oluşturuldu: {output_dir}") | |
| return output_dir | |
| def extract_embeddings(self, face_img: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: | |
| try: | |
| if face_img is None or face_img.size == 0: | |
| logger.warning("Boş veya None face_img") | |
| return None, None | |
| if face_img.shape[0] < 10 or face_img.shape[1] < 10: | |
| logger.warning(f"Yüz çok küçük: {face_img.shape}") | |
| return None, None | |
| detector, recognizer = self.models | |
| bboxes, kpss = detector.autodetect(face_img, max_num=1) | |
| if len(bboxes) == 0: | |
| logger.warning("Yüz tespit edilemedi") | |
| return None, None | |
| kps = kpss[0] | |
| if kps is None or len(kps) < 5: | |
| logger.warning("Geçersiz keypoints") | |
| return None, None | |
| embedding = recognizer.get(face_img, kps) | |
| if embedding is None: | |
| logger.warning("Embedding None döndü") | |
| return None, None | |
| if np.isnan(embedding).any() or np.isinf(embedding).any(): | |
| logger.warning("Embedding NaN veya Inf içeriyor") | |
| return None, None | |
| if np.allclose(embedding, 0): | |
| logger.warning("Embedding sıfır vektör") | |
| return None, None | |
| return embedding, kps | |
| except Exception as e: | |
| logger.error(f"Embedding çıkarma hatası: {e}") | |
| return None, None | |
| def calculate_face_quality(self, face_img: np.ndarray, face_size: float, kps: np.ndarray) -> float: | |
| try: | |
| quality_score = 0 | |
| if face_size <= 0: | |
| logger.warning("Geçersiz face_size: <= 0") | |
| return 0.0 | |
| # Size score | |
| size_score = min(face_size / 5000, 2.0) | |
| quality_score += size_score | |
| # Keypoint kontrolü | |
| if kps is None or len(kps) < 5: | |
| logger.warning("Geçersiz keypoints") | |
| return quality_score | |
| left_eye, right_eye = kps[0], kps[1] | |
| eye_distance = np.linalg.norm(left_eye - right_eye) | |
| face_width = np.sqrt(face_size) | |
| if face_width <= 0: | |
| logger.warning("face_width <= 0") | |
| return quality_score | |
| eye_ratio = eye_distance / face_width | |
| angle_score = min(eye_ratio * 3, 2.0) | |
| quality_score += angle_score | |
| gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY) | |
| blur_var = cv2.Laplacian(gray, cv2.CV_64F).var() | |
| blur_score = min(blur_var / 500, 2.0) | |
| quality_score += blur_score | |
| left_mouth, right_mouth = kps[3], kps[4] | |
| mouth_distance = np.linalg.norm(left_mouth - right_mouth) | |
| mouth_ratio = mouth_distance / face_width | |
| symmetry_score = min(mouth_ratio * 3, 2.0) | |
| quality_score += symmetry_score | |
| return quality_score | |
| except Exception as e: | |
| logger.error(f"Quality hesaplama hatası: {e}") | |
| return 1.0 | |
| def process_frame(self, frame: np.ndarray) -> List[Dict]: | |
| frame = cv2.resize(frame, (0, 0), fx=self.config.resize_factor, fy=self.config.resize_factor) | |
| detector, _ = self.models | |
| faces_data = [] | |
| try: | |
| bboxes, _ = detector.autodetect(frame) | |
| for x1, y1, x2, y2, _ in bboxes: | |
| face_size = (x2 - x1) * (y2 - y1) | |
| if face_size < self.config.face_size_threshold: | |
| continue | |
| face_img = frame[int(y1):int(y2), int(x1):int(x2)] | |
| embedding, kps = self.extract_embeddings(face_img) | |
| if embedding is not None and kps is not None: | |
| quality_score = self.calculate_face_quality(face_img, face_size, kps) | |
| faces_data.append({ | |
| 'embedding': embedding, | |
| 'face_img': face_img, | |
| 'quality_score': quality_score, | |
| 'bbox': [float(x1), float(y1), float(x2), float(y2)] | |
| }) | |
| except Exception as e: | |
| logger.error(f"Frame işleme hatası: {e}") | |
| return faces_data | |
| def process_video_chunk(self, frames: List[np.ndarray]) -> List[Dict]: | |
| all_faces = [] | |
| for frame in frames: | |
| faces = self.process_frame(frame) | |
| all_faces.extend(faces) | |
| return all_faces | |
| def detect_faces(self, video_path: str, is_url: bool = False): | |
| start_time = time.time() | |
| original_path = video_path | |
| downloaded_path = None | |
| try: | |
| if is_url: | |
| downloaded_path = self.download_video_from_url(video_path) | |
| video_path = downloaded_path | |
| logger.info(f"URL'den indirilen video kullanılıyor: {video_path}") | |
| # Video dosyasının varlığını kontrol et | |
| if not os.path.exists(video_path): | |
| raise ValueError(f"Video dosyası bulunamadı: {video_path}") | |
| file_size = os.path.getsize(video_path) | |
| if file_size == 0: | |
| raise ValueError(f"Video dosyası boş: {video_path}") | |
| logger.info(f"Video dosyası kontrol edildi: {video_path} ({file_size / 1024 / 1024:.1f}MB)") | |
| if self.models is None: | |
| self.models = self._load_models() | |
| output_dir = self.create_output_directory(video_path if not is_url else tempfile.gettempdir(), is_temp=is_url) | |
| metadata = { | |
| 'video_path': original_path, | |
| 'is_url': is_url, | |
| 'processing_start': datetime.now().isoformat(), | |
| 'config': vars(self.config), | |
| 'faces': [] | |
| } | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Video açılamadı: {video_path}. Dosya bozuk veya desteklenmeyen format olabilir.") | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| logger.info(f"Video: {total_frames} frame, {fps} FPS, {duration:.1f} saniye") | |
| progress_offset = 20 if is_url else 0 | |
| max_progress = 80 if is_url else 100 | |
| if self.progress_callback: | |
| self.progress_callback(progress_offset, f"Video açıldı: {total_frames} frame") | |
| current_frames = [] | |
| all_faces_data = [] | |
| frame_count = 0 | |
| with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_count += 1 | |
| if frame_count % self.config.frame_skip == 0: | |
| current_frames.append(frame) | |
| if len(current_frames) >= self.config.chunk_size: | |
| future = executor.submit(self.process_video_chunk, current_frames) | |
| all_faces_data.extend(future.result()) | |
| current_frames = [] | |
| if frame_count % 500 == 0: | |
| progress = (frame_count / total_frames) * 100 | |
| if self.progress_callback: | |
| adjusted_progress = progress_offset + (progress / 2) * ((max_progress - progress_offset) / 100) | |
| self.progress_callback( | |
| adjusted_progress, | |
| f"Frame işleniyor: {frame_count}/{total_frames} ({progress:.1f}%)" | |
| ) | |
| if current_frames: | |
| future = executor.submit(self.process_video_chunk, current_frames) | |
| all_faces_data.extend(future.result()) | |
| cap.release() | |
| if not all_faces_data: | |
| raise ValueError("Hiç yüz bulunamadı!") | |
| clustering_progress = progress_offset + (max_progress - progress_offset) * 0.6 | |
| if self.progress_callback: | |
| self.progress_callback(clustering_progress, f"{len(all_faces_data)} yüz tespit edildi, clustering yapılıyor...") | |
| embeddings_array = np.array([face['embedding'] for face in all_faces_data]) | |
| clustering = DBSCAN( | |
| eps=self.config.clustering_eps, | |
| min_samples=self.config.min_samples, | |
| metric='cosine' | |
| ).fit(embeddings_array) | |
| labels = clustering.labels_ | |
| n_clusters = len(set(labels)) - (1 if -1 in labels else 0) | |
| saving_progress = progress_offset + (max_progress - progress_offset) * 0.8 | |
| if self.progress_callback: | |
| self.progress_callback(saving_progress, f"{n_clusters} benzersiz kişi tespit edildi, yüzler kaydediliyor...") | |
| saved_faces = [] | |
| for cluster_id in range(n_clusters): | |
| cluster_indices = np.where(labels == cluster_id)[0] | |
| cluster_faces = [all_faces_data[i] for i in cluster_indices] | |
| best_face = max(cluster_faces, key=lambda x: x['quality_score']) | |
| face_img_resized = cv2.resize(best_face['face_img'], (112, 112)) | |
| face_file = f"person_{cluster_id}.jpg" | |
| face_path = os.path.join(output_dir, face_file) | |
| cv2.imwrite(face_path, face_img_resized, [cv2.IMWRITE_JPEG_QUALITY, 95]) | |
| saved_faces.append(face_path) | |
| metadata['faces'].append({ | |
| 'cluster_id': cluster_id, | |
| 'face_file': face_file, | |
| 'quality_score': float(best_face['quality_score']), | |
| 'bbox': best_face['bbox'], | |
| 'cluster_size': len(cluster_indices) | |
| }) | |
| elapsed_time = time.time() - start_time | |
| metadata['processing_end'] = datetime.now().isoformat() | |
| metadata['elapsed_time'] = elapsed_time | |
| metadata['total_frames'] = total_frames | |
| metadata['fps'] = fps | |
| metadata['duration'] = duration | |
| metadata['unique_persons'] = n_clusters | |
| metadata_path = os.path.join(output_dir, 'metadata.json') | |
| with open(metadata_path, 'w', encoding='utf-8') as f: | |
| json.dump(metadata, f, indent=2, ensure_ascii=False) | |
| if self.progress_callback: | |
| self.progress_callback(100, f"✅ Tamamlandı! {n_clusters} kişi bulundu ({elapsed_time:.1f}s)") | |
| return output_dir, saved_faces, metadata | |
| except Exception as e: | |
| logger.error(f"İşlem hatası: {e}") | |
| raise | |
| finally: | |
| if is_url: | |
| self.cleanup_temp_files() | |
| detector_instance = None | |
| comparator_instance = None | |
| def initialize_detector(frame_skip, face_threshold, clustering_eps, use_gpu): | |
| global detector_instance | |
| config = FaceDetectionConfig( | |
| frame_skip=frame_skip, | |
| face_size_threshold=face_threshold, | |
| clustering_eps=clustering_eps, | |
| use_gpu=use_gpu | |
| ) | |
| detector_instance = FaceDetector(config) | |
| return "✅ Ayarlar kaydedildi!" | |
| def initialize_comparator(): | |
| """Video karşılaştırıcı instance'ını başlat""" | |
| global comparator_instance, detector_instance | |
| if detector_instance is None: | |
| detector_instance = FaceDetector(FaceDetectionConfig()) | |
| if comparator_instance is None: | |
| comparator_instance = VideoComparator( | |
| face_detector=detector_instance, | |
| similarity_threshold=0.6 # %60 benzerlik eşiği | |
| ) | |
| return comparator_instance | |
| def process_video_gradio(video_file, video_url, progress=gr.Progress()): | |
| global detector_instance | |
| if detector_instance is None: | |
| detector_instance = FaceDetector(FaceDetectionConfig()) | |
| def update_progress(value, message): | |
| progress(value / 100, desc=message) | |
| detector_instance.set_progress_callback(update_progress) | |
| try: | |
| progress(0, desc="İşlem başlatılıyor...") | |
| if IS_HUGGINGFACE and video_url and video_url.strip(): | |
| return [], "❌ Hugging Face Spaces'te URL/YouTube desteği bulunmamaktadır. Lütfen video dosyası yükleyin.", "❌ URL desteği yok" | |
| if video_url and video_url.strip(): | |
| video_source = video_url.strip() | |
| is_url = True | |
| source_name = urlparse(video_url).path.split('/')[-1] or "video" | |
| logger.info(f"URL kullanılıyor: {video_url}") | |
| # YouTube mu kontrol et | |
| if detector_instance.is_youtube_url(video_url): | |
| if not YOUTUBE_SUPPORT: | |
| return [], "❌ YouTube desteği için paket kurulmalı", "❌ paket kurulu değil" | |
| logger.info("YouTube URL tespit edildi") | |
| elif video_file: | |
| video_source = video_file | |
| is_url = False | |
| source_name = os.path.basename(video_file) | |
| logger.info(f"Yerel dosya kullanılıyor: {video_file}") | |
| else: | |
| return [], "❌ Lütfen bir video yükleyin veya URL girin!", "❌ Video bulunamadı" | |
| # URL test (YouTube değilse) | |
| if is_url and not detector_instance.is_youtube_url(video_source): | |
| try: | |
| head_response = requests.head(video_source, timeout=10, allow_redirects=True) | |
| logger.info(f"URL test - Status: {head_response.status_code}, Content-Type: {head_response.headers.get('content-type', 'unknown')}") | |
| if head_response.status_code != 200: | |
| return [], f"❌ URL erişilemez (HTTP {head_response.status_code})", "❌ URL hatası" | |
| except Exception as e: | |
| logger.warning(f"URL test başarısız: {e}, yine de deneniyor...") | |
| # Video süresini kontrol et (detect_faces çağrılmadan önce) | |
| if not is_url: | |
| cap = cv2.VideoCapture(video_source) | |
| if cap.isOpened(): | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| cap.release() | |
| if duration > 300: # 5 dakika limiti | |
| return [], f"⚠️ Video çok uzun ({duration:.0f} saniye)! CPU modunda maksimum 5 dakika (300 saniye) desteklenir.", "❌ Süre limiti aşıldı" | |
| output_dir, saved_faces, metadata = detector_instance.detect_faces(video_source, is_url=is_url) | |
| # URL'den indirilen videolar için de süre kontrolü | |
| if is_url and metadata['duration'] > 300: | |
| return [], f"⚠️ Video çok uzun ({metadata['duration']:.0f} saniye)! CPU modunda maksimum 5 dakika desteklenir.", "❌ Süre limiti aşıldı" | |
| report = f""" | |
| # 📊 İşlem Raporu | |
| ## Genel Bilgiler | |
| - **Video**: {source_name} | |
| - **Kaynak**: {'🌐 URL' if is_url else '📁 Yerel Dosya'} | |
| - **Süre**: {metadata['duration']:.1f} saniye | |
| - **FPS**: {metadata['fps']} | |
| - **Toplam Frame**: {metadata['total_frames']} | |
| - **İşlem Süresi**: {metadata['elapsed_time']:.1f} saniye | |
| ## Tespit Sonuçları | |
| - **Benzersiz Kişi**: {metadata['unique_persons']} | |
| - **Toplam Yüz Tespiti**: {sum(f['cluster_size'] for f in metadata['faces'])} | |
| ## Kişi Detayları | |
| """ | |
| for face in metadata['faces']: | |
| report += f"\n### Kişi {face['cluster_id']}\n" | |
| report += f"- Kalite Skoru: {face['quality_score']:.2f}\n" | |
| report += f"- Görülme Sayısı: {face['cluster_size']}\n" | |
| return saved_faces, report, f"✅ Başarılı! Çıktı: {output_dir}" | |
| except Exception as e: | |
| error_msg = f"❌ Hata: {str(e)}" | |
| logger.error(error_msg) | |
| return [], error_msg, error_msg | |
| def compare_two_faces(face1, face2): | |
| global detector_instance | |
| if detector_instance is None: | |
| detector_instance = FaceDetector(FaceDetectionConfig()) | |
| detector_instance.models = detector_instance._load_models() | |
| try: | |
| img1 = cv2.imread(face1) if isinstance(face1, str) else cv2.cvtColor(face1, cv2.COLOR_RGB2BGR) | |
| img2 = cv2.imread(face2) if isinstance(face2, str) else cv2.cvtColor(face2, cv2.COLOR_RGB2BGR) | |
| emb1, _ = detector_instance.extract_embeddings(img1) | |
| emb2, _ = detector_instance.extract_embeddings(img2) | |
| if emb1 is None or emb2 is None: | |
| return "❌ Yüz tespit edilemedi!" | |
| similarity = cosine_similarity([emb1], [emb2])[0][0] | |
| percentage = similarity * 100 | |
| if percentage > 70: | |
| result = f"✅ Aynı Kişi ({percentage:.1f}% benzerlik)" | |
| elif percentage > 50: | |
| result = f"⚠️ Muhtemelen Aynı Kişi ({percentage:.1f}% benzerlik)" | |
| else: | |
| result = f"❌ Farklı Kişiler ({percentage:.1f}% benzerlik)" | |
| return result | |
| except Exception as e: | |
| return f"❌ Hata: {str(e)}" | |
| def compare_videos_gradio(video1, video2, url1, url2, similarity_threshold, progress=gr.Progress()): | |
| """ | |
| Gradio callback: İki videoyu karşılaştır | |
| """ | |
| global comparator_instance | |
| # Comparator'ı başlat | |
| comparator = initialize_comparator() | |
| comparator.similarity_threshold = similarity_threshold | |
| def update_progress(value, message): | |
| progress(value / 100, desc=message) | |
| comparator.set_progress_callback(update_progress) | |
| try: | |
| progress(0, desc="Videolar hazırlanıyor...") | |
| # Video 1 kaynağını belirle | |
| if url1 and url1.strip() and not IS_HUGGINGFACE: | |
| video1_source = url1.strip() | |
| is_v1_url = True | |
| elif video1: | |
| video1_source = video1 | |
| is_v1_url = False | |
| else: | |
| return [], "❌ Lütfen Video 1 yükleyin!", "❌ Video 1 eksik" | |
| # Video 2 kaynağını belirle | |
| if url2 and url2.strip() and not IS_HUGGINGFACE: | |
| video2_source = url2.strip() | |
| is_v2_url = True | |
| elif video2: | |
| video2_source = video2 | |
| is_v2_url = False | |
| else: | |
| return [], "❌ Lütfen Video 2 yükleyin!", "❌ Video 2 eksik" | |
| # Hugging Face kontrolü | |
| if IS_HUGGINGFACE and (is_v1_url or is_v2_url): | |
| return [], "❌ Hugging Face Spaces'te URL desteği yok!", "❌ URL desteği yok" | |
| # Karşılaştırmayı yap | |
| result, output_dir, saved_images = comparator.compare_videos( | |
| video1_source, | |
| video2_source, | |
| is_video1_url=is_v1_url, | |
| is_video2_url=is_v2_url | |
| ) | |
| # Rapor oluştur | |
| report = comparator.generate_report(result) | |
| # Özet mesaj | |
| summary = f""" | |
| ✅ **Karşılaştırma Tamamlandı!** | |
| 🎯 **Sonuçlar:** | |
| - Ortak Kişi: **{result.common_count}** | |
| - Sadece Video 1: **{len(result.only_video1)}** | |
| - Sadece Video 2: **{len(result.only_video2)}** | |
| 📁 Çıktı: {output_dir} | |
| """ | |
| return saved_images, report, summary | |
| except Exception as e: | |
| error_msg = f"❌ Hata: {str(e)}" | |
| logger.error(error_msg, exc_info=True) | |
| return [], error_msg, error_msg | |
| with gr.Blocks(title="Yüz Tanıma Sistemi", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🎭 Video Yüz Tanıma Sistemi | |
| Video dosyalarından otomatik yüz tespiti ve tanıma yapın | |
| ⚠️ **CPU Modunda Çalışıyor**: İşlem süresi uzun olabilir (5 dk video = ~10-15 dk) | |
| """) | |
| with gr.Tabs(): | |
| with gr.Tab("📹 Video İşle"): | |
| gr.Markdown("### Video kaynağını seçin:") | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input = gr.Video(label="📁 Yerel Video Yükle",height=300) | |
| if not IS_HUGGINGFACE: | |
| gr.Markdown("**VEYA**") | |
| url_input = gr.Textbox( | |
| label="🌐 Video URL'si", | |
| placeholder="https://example.com/video.mp4 veya YouTube linki", | |
| lines=1, | |
| interactive=True | |
| ) | |
| gr.Markdown("*URL girilirse öncelikle o kullanılır*") | |
| else: | |
| url_input = gr.Textbox( | |
| label="🌐 Video URL'si (Devre Dışı)", | |
| placeholder="Hugging Face Spaces'te URL desteği yok", | |
| lines=1, | |
| interactive=False, | |
| visible=True | |
| ) | |
| process_btn = gr.Button("🚀 İşlemi Başlat", variant="primary", size="lg") | |
| status_text = gr.Textbox(label="Durum", interactive=False) | |
| with gr.Column(scale=1): | |
| gallery_output = gr.Gallery(label="Tespit Edilen Yüzler", columns=4, height=500,object_fit="contain") | |
| report_output = gr.Markdown(label="Rapor") | |
| gr.Markdown(""" | |
| #### 💡 URL Örnekleri: | |
| - **YouTube**: `https://www.youtube.com/watch?v=xxxxx` veya `https://youtu.be/xxxxx` veya Shorts | |
| - **Doğrudan video**: `https://example.com/video.mp4` | |
| - Google Drive paylaşım linki çalışmaz (direkt indirme linki gerekir) | |
| - **Desteklenen formatlar**: MP4, AVI, MOV, MKV, WebM | |
| """) | |
| process_btn.click( | |
| fn=process_video_gradio, | |
| inputs=[video_input, url_input], | |
| outputs=[gallery_output, report_output, status_text] | |
| ) | |
| with gr.Tab("🔍 Yüz Karşılaştır"): | |
| gr.Markdown("İki yüz görselini yükleyin ve benzerliklerini kontrol edin") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| face1_input = gr.Image( | |
| label="Yüz 1", | |
| type="filepath", | |
| height=300, | |
| width=300 | |
| ) | |
| with gr.Column(scale=1): | |
| face2_input = gr.Image( | |
| label="Yüz 2", | |
| type="filepath", | |
| height=300, | |
| width=300 | |
| ) | |
| compare_btn = gr.Button("⚖️ Karşılaştır", variant="primary") | |
| compare_result = gr.Textbox(label="Sonuç", interactive=False, lines=3) | |
| compare_btn.click( | |
| fn=compare_two_faces, | |
| inputs=[face1_input, face2_input], | |
| outputs=compare_result | |
| ) | |
| with gr.Tab("🎬 Video Karşılaştır"): | |
| gr.Markdown(""" | |
| ## İki Videoyu Karşılaştır | |
| İki farklı videodan yüz tespiti yaparak ortak kişileri bulur | |
| """) | |
| # Hugging Face uyarısı | |
| if IS_HUGGINGFACE: | |
| gr.Markdown(""" | |
| ⚠️ **Hugging Face Spaces Modunda**: URL/YouTube desteği kapalı. | |
| Sadece dosya yükleme kullanılabilir. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📹 Video 1") | |
| video1_input = gr.Video( | |
| label="Video 1 Yükle", | |
| height=250, | |
| show_label=True | |
| ) | |
| if not IS_HUGGINGFACE: | |
| gr.Markdown("**VEYA**") | |
| url1_input = gr.Textbox( | |
| label="Video 1 URL", | |
| placeholder="https://example.com/video1.mp4", | |
| lines=1, | |
| interactive=True | |
| ) | |
| else: | |
| url1_input = gr.Textbox( | |
| label="Video 1 URL (Devre Dışı)", | |
| placeholder="HF Spaces'te URL desteği yok", | |
| lines=1, | |
| interactive=False | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📹 Video 2") | |
| video2_input = gr.Video( | |
| label="Video 2 Yükle", | |
| height=250, | |
| show_label=True | |
| ) | |
| if not IS_HUGGINGFACE: | |
| gr.Markdown("**VEYA**") | |
| url2_input = gr.Textbox( | |
| label="Video 2 URL", | |
| placeholder="https://example.com/video2.mp4", | |
| lines=1, | |
| interactive=True | |
| ) | |
| else: | |
| url2_input = gr.Textbox( | |
| label="Video 2 URL (Devre Dışı)", | |
| placeholder="HF Spaces'te URL desteği yok", | |
| lines=1, | |
| interactive=False | |
| ) | |
| # Ayarlar | |
| with gr.Row(): | |
| similarity_slider = gr.Slider( | |
| minimum=0.4, | |
| maximum=0.9, | |
| value=0.6, | |
| step=0.05, | |
| label="🎯 Benzerlik Eşiği (Düşük = Daha fazla eşleşme)", | |
| info="İki yüzün aynı kişi olarak kabul edilmesi için minimum benzerlik oranı" | |
| ) | |
| compare_videos_btn = gr.Button("🔍 Videoları Karşılaştır", variant="primary", size="lg") | |
| status_compare = gr.Textbox(label="Durum", interactive=False, lines=3) | |
| gr.Markdown("### 📊 Sonuçlar") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gallery_compare = gr.Gallery( | |
| label="Ortak Yüzler (Sol: Video 1, Sağ: Video 2)", | |
| columns=2, | |
| height=500, | |
| object_fit="contain" | |
| ) | |
| with gr.Column(scale=1): | |
| report_compare = gr.Markdown(label="Detaylı Rapor") | |
| gr.Markdown(""" | |
| ### 💡 Nasıl Çalışır? | |
| 1. **İki video yükleyin** (veya URL girin) | |
| 2. **Benzerlik eşiğini ayarlayın** (varsayılan %60) | |
| 3. **Karşılaştır** butonuna tıklayın | |
| 4. Sistem her iki videodan yüz tespiti yapar ve ortak kişileri bulur | |
| ### 📈 Sonuç Kategorileri | |
| - **Ortak Yüzler**: Her iki videoda da görünen kişiler (yan yana gösterilir) | |
| - **Sadece Video 1**: Yalnızca ilk videoda görünen kişiler | |
| - **Sadece Video 2**: Yalnızca ikinci videoda görünen kişiler | |
| """) | |
| # Event handler | |
| compare_videos_btn.click( | |
| fn=compare_videos_gradio, | |
| inputs=[ | |
| video1_input, | |
| video2_input, | |
| url1_input, | |
| url2_input, | |
| similarity_slider | |
| ], | |
| outputs=[ | |
| gallery_compare, | |
| report_compare, | |
| status_compare | |
| ] | |
| ) | |
| with gr.Tab("⚙️ Ayarlar"): | |
| gr.Markdown("### Gelişmiş Ayarlar") | |
| frame_skip_slider = gr.Slider(20, 60, value=30, step=5, | |
| label="Frame Atlama (yüksek = daha hızlı)") | |
| face_threshold_slider = gr.Slider(600, 2000, value=1000, step=100, | |
| label="Minimum Yüz Boyutu (piksel)") | |
| clustering_slider = gr.Slider(0.3, 0.7, value=0.5, step=0.05, | |
| label="Clustering Hassasiyeti") | |
| save_settings_btn = gr.Button("💾 Ayarları Kaydet") | |
| settings_status = gr.Textbox(label="Durum", interactive=False) | |
| save_settings_btn.click( | |
| fn=initialize_detector, | |
| inputs=[frame_skip_slider, face_threshold_slider, clustering_slider], | |
| outputs=settings_status | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### 💡 İpuçları | |
| - **Frame Atlama**: Daha hızlı işlem için artırın, daha fazla tespit için azaltın | |
| - **Clustering**: Daha az kişi tespit ediyorsa artırın, fazla tespit ediyorsa azaltın | |
| """) | |
| if __name__ == "__main__": | |
| print("\n" + "="*60) | |
| print("🎬 Video Yüz Tanıma Sistemi") | |
| print("="*60) | |
| print("="*60 + "\n") | |
| demo.launch(share=False, server_name="0.0.0.0", server_port=7860) |