FaceDetection / app.py
hakandinger
fix: handle ZeroDivisionError in embedding extraction
e04e6b0
import os
import cv2
import gradio as gr
import numpy as np
from datetime import datetime
from scrfd import SCRFD
from arcface_onnx import ArcFaceONNX
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import DBSCAN
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import logging
from typing import List, Tuple, Optional, Dict
import json
from pathlib import Path
import shutil
import requests
import tempfile
from urllib.parse import urlparse
import logging
from core.comparator import VideoComparator, ComparisonResult
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
IS_HUGGINGFACE = os.getenv("SPACE_ID") is not None
if IS_HUGGINGFACE:
YOUTUBE_SUPPORT = False
logger.info("🌐 Hugging Face Spaces modunda çalışıyor - YouTube desteği kapalı")
else:
try:
import yt_dlp
YOUTUBE_SUPPORT = True
logger.info("✅ YouTube desteği aktif")
except ImportError:
YOUTUBE_SUPPORT = False
logger.warning("⚠️ yt-dlp yüklü değil")
try:
import yt_dlp
YOUTUBE_SUPPORT = True
except ImportError:
YOUTUBE_SUPPORT = False
logger.warning("Youtube desteği yüklü değil.")
@dataclass
class FaceDetectionConfig:
frame_skip: int = 30
face_size_threshold: int = 1000
clustering_eps: float = 0.5
min_samples: int = 2
resize_factor: float = 0.5
chunk_size: int = 500
max_workers: int = 2
use_gpu: bool = False
class FaceDetector:
def __init__(self, config: FaceDetectionConfig):
self.config = config
self.models = None
self.progress_callback = None
self.temp_files = []
def set_progress_callback(self, callback):
self.progress_callback = callback
def is_youtube_url(self, url: str) -> bool:
youtube_domains = ['youtube.com', 'youtu.be', 'youtube-nocookie.com']
parsed = urlparse(url)
return any(domain in parsed.netloc for domain in youtube_domains)
def download_youtube_video(self, url: str) -> str:
if not YOUTUBE_SUPPORT:
raise ValueError("YouTube desteği için paket kurulmalı")
try:
if self.progress_callback:
self.progress_callback(0, "YouTube videosu indiriliyor...")
temp_dir = tempfile.gettempdir()
temp_filename = f"yt_{int(time.time())}_{np.random.randint(1000, 9999)}"
temp_path_without_ext = os.path.join(temp_dir, temp_filename)
ydl_opts = {
'format': 'best[ext=mp4][height<=720]/best[height<=720]/best',
'outtmpl': temp_path_without_ext + '.%(ext)s',
'quiet': True,
'no_warnings': True,
'socket_timeout': 60,
'retries': 3,
'fragment_retries': 3,
'keepvideo': True,
'merge_output_format': 'mp4',
'postprocessors': [{
'key': 'FFmpegVideoConvertor',
'preferedformat': 'mp4',
}],
'progress_hooks': [self._youtube_progress_hook],
}
logger.info(f"YouTube videosu indiriliyor: {url}")
logger.info(f"Hedef dosya: {temp_path_without_ext}.mp4")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
video_title = info.get('title', 'video')
video_ext = info.get('ext', 'mp4')
logger.info(f"YouTube video başlığı: {video_title}")
final_path = temp_path_without_ext + '.mp4'
if not os.path.exists(final_path):
for ext in ['.mp4', '.webm', '.mkv']:
alt_path = temp_path_without_ext + ext
if os.path.exists(alt_path):
final_path = alt_path
logger.info(f"Video bulundu: {final_path}")
break
if not os.path.exists(final_path):
possible_files = [f for f in os.listdir(temp_dir) if f.startswith(temp_filename)]
if possible_files:
final_path = os.path.join(temp_dir, possible_files[0])
logger.info(f"Alternatif dosya bulundu: {final_path}")
else:
raise ValueError(f"YouTube videosu indirilemedi! Beklenen: {final_path}")
file_size = os.path.getsize(final_path)
if file_size == 0:
raise ValueError("İndirilen YouTube videosu boş!")
self.temp_files.append(final_path)
logger.info(f"YouTube videosu başarıyla indirildi: {final_path} ({file_size / 1024 / 1024:.1f}MB)")
if self.progress_callback:
self.progress_callback(20, f"YouTube videosu indirildi ({file_size / 1024 / 1024:.1f}MB)")
return final_path
except Exception as e:
logger.error(f"YouTube indirme hatası: {e}", exc_info=True)
raise ValueError(f"YouTube videosu indirilemedi: {str(e)}")
def _youtube_progress_hook(self, d):
if d['status'] == 'downloading':
if 'total_bytes' in d:
progress = (d['downloaded_bytes'] / d['total_bytes']) * 20
if self.progress_callback:
self.progress_callback(
progress,
f"YouTube indiriliyor: {d['downloaded_bytes'] / 1024 / 1024:.1f}MB / {d['total_bytes'] / 1024 / 1024:.1f}MB"
)
elif d['status'] == 'finished':
if self.progress_callback:
self.progress_callback(18, "YouTube videosu işleniyor...")
def download_video_from_url(self, url: str) -> str:
if self.is_youtube_url(url):
return self.download_youtube_video(url)
temp_path = None
try:
if self.progress_callback:
self.progress_callback(0, "Video indiriliyor...")
parsed = urlparse(url)
if not parsed.scheme in ['http', 'https']:
raise ValueError("Geçersiz URL! HTTP veya HTTPS protokolü kullanın.")
# Dosya uzantısını belirle
ext = os.path.splitext(parsed.path)[1]
if not ext or ext not in ['.mp4', '.avi', '.mov', '.mkv', '.webm']:
ext = '.mp4'
# Geçici dosya oluştur
temp_fd, temp_path = tempfile.mkstemp(suffix=ext, prefix='video_')
os.close(temp_fd) # File descriptor'ı kapat
self.temp_files.append(temp_path)
logger.info(f"Geçici dosya oluşturuldu: {temp_path}")
# URL'den indir
response = requests.get(url, stream=True, timeout=60,
headers={'User-Agent': 'Mozilla/5.0'})
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=65536): # 64KB chunks
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0 and self.progress_callback:
progress = (downloaded / total_size) * 20
if downloaded % (1024 * 1024) < 65536: # Her 1MB'de güncelle
self.progress_callback(
progress,
f"İndiriliyor: {downloaded / 1024 / 1024:.1f}MB / {total_size / 1024 / 1024:.1f}MB"
)
if not os.path.exists(temp_path):
raise ValueError("Video dosyası oluşturulamadı!")
file_size = os.path.getsize(temp_path)
if file_size == 0:
raise ValueError("İndirilen video dosyası boş!")
logger.info(f"Video başarıyla indirildi: {temp_path} ({file_size / 1024 / 1024:.1f}MB)")
if self.progress_callback:
self.progress_callback(20, f"Video indirildi ({file_size / 1024 / 1024:.1f}MB), işleme başlanıyor...")
return temp_path
except requests.exceptions.Timeout:
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)
raise ValueError("Video indirme zaman aşımına uğradı. Lütfen tekrar deneyin.")
except requests.exceptions.RequestException as e:
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)
raise ValueError(f"Video indirme hatası: {str(e)}")
except Exception as e:
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)
raise ValueError(f"Beklenmeyen hata: {str(e)}")
def cleanup_temp_files(self):
for temp_file in self.temp_files:
try:
if os.path.exists(temp_file):
os.unlink(temp_file)
logger.info(f"Geçici dosya silindi: {temp_file}")
except Exception as e:
logger.warning(f"Geçici dosya silinemedi {temp_file}: {e}")
self.temp_files = []
def _load_models(self) -> Tuple[SCRFD, ArcFaceONNX]:
try:
logger.info("Modeller yükleniyor (CPU mode)...")
current_dir = os.path.dirname(os.path.abspath(__file__))
models_dir = os.path.join(current_dir, 'deploy', 'models')
import onnxruntime as ort
sess_options = ort.SessionOptions()
ort.set_default_logger_severity(3)
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
sess_options.intra_op_num_threads = 2
# Sadece CPU provider
providers = ['CPUExecutionProvider']
det_model = os.path.join(models_dir, 'det_10g.onnx')
arc_model = os.path.join(models_dir, 'w600k_r50.onnx')
if not os.path.exists(det_model) or not os.path.exists(arc_model):
raise FileNotFoundError(f"Model dosyaları bulunamadı: {models_dir}")
detector = SCRFD(det_model)
detector.session = ort.InferenceSession(det_model, sess_options, providers=providers)
recognizer = ArcFaceONNX(arc_model)
recognizer.session = ort.InferenceSession(arc_model, sess_options, providers=providers)
logger.info(f"✅ CPU mode aktif: {recognizer.session.get_providers()}")
return detector, recognizer
except Exception as e:
logger.error(f"Model yükleme hatası: {e}")
raise
def create_output_directory(self, video_path: str, is_temp: bool = False) -> str:
logger.info(f"burası {self},{video_path},{is_temp}")
if is_temp:
# URL/YouTube için temp dizini kullan
temp_dir = tempfile.gettempdir()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = os.path.join(temp_dir, f"face_detection_{timestamp}")
else:
# Yerel dosya için aynı dizini kullan
base_dir = os.path.dirname(video_path)
video_name = os.path.splitext(os.path.basename(video_path))[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = os.path.join(base_dir, f"{video_name}_{timestamp}")
os.makedirs(output_dir, exist_ok=True)
logger.info(f"Output dizini oluşturuldu: {output_dir}")
return output_dir
def extract_embeddings(self, face_img: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
try:
if face_img is None or face_img.size == 0:
logger.warning("Boş veya None face_img")
return None, None
if face_img.shape[0] < 10 or face_img.shape[1] < 10:
logger.warning(f"Yüz çok küçük: {face_img.shape}")
return None, None
detector, recognizer = self.models
bboxes, kpss = detector.autodetect(face_img, max_num=1)
if len(bboxes) == 0:
logger.warning("Yüz tespit edilemedi")
return None, None
kps = kpss[0]
if kps is None or len(kps) < 5:
logger.warning("Geçersiz keypoints")
return None, None
embedding = recognizer.get(face_img, kps)
if embedding is None:
logger.warning("Embedding None döndü")
return None, None
if np.isnan(embedding).any() or np.isinf(embedding).any():
logger.warning("Embedding NaN veya Inf içeriyor")
return None, None
if np.allclose(embedding, 0):
logger.warning("Embedding sıfır vektör")
return None, None
return embedding, kps
except Exception as e:
logger.error(f"Embedding çıkarma hatası: {e}")
return None, None
def calculate_face_quality(self, face_img: np.ndarray, face_size: float, kps: np.ndarray) -> float:
try:
quality_score = 0
if face_size <= 0:
logger.warning("Geçersiz face_size: <= 0")
return 0.0
# Size score
size_score = min(face_size / 5000, 2.0)
quality_score += size_score
# Keypoint kontrolü
if kps is None or len(kps) < 5:
logger.warning("Geçersiz keypoints")
return quality_score
left_eye, right_eye = kps[0], kps[1]
eye_distance = np.linalg.norm(left_eye - right_eye)
face_width = np.sqrt(face_size)
if face_width <= 0:
logger.warning("face_width <= 0")
return quality_score
eye_ratio = eye_distance / face_width
angle_score = min(eye_ratio * 3, 2.0)
quality_score += angle_score
gray = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)
blur_var = cv2.Laplacian(gray, cv2.CV_64F).var()
blur_score = min(blur_var / 500, 2.0)
quality_score += blur_score
left_mouth, right_mouth = kps[3], kps[4]
mouth_distance = np.linalg.norm(left_mouth - right_mouth)
mouth_ratio = mouth_distance / face_width
symmetry_score = min(mouth_ratio * 3, 2.0)
quality_score += symmetry_score
return quality_score
except Exception as e:
logger.error(f"Quality hesaplama hatası: {e}")
return 1.0
def process_frame(self, frame: np.ndarray) -> List[Dict]:
frame = cv2.resize(frame, (0, 0), fx=self.config.resize_factor, fy=self.config.resize_factor)
detector, _ = self.models
faces_data = []
try:
bboxes, _ = detector.autodetect(frame)
for x1, y1, x2, y2, _ in bboxes:
face_size = (x2 - x1) * (y2 - y1)
if face_size < self.config.face_size_threshold:
continue
face_img = frame[int(y1):int(y2), int(x1):int(x2)]
embedding, kps = self.extract_embeddings(face_img)
if embedding is not None and kps is not None:
quality_score = self.calculate_face_quality(face_img, face_size, kps)
faces_data.append({
'embedding': embedding,
'face_img': face_img,
'quality_score': quality_score,
'bbox': [float(x1), float(y1), float(x2), float(y2)]
})
except Exception as e:
logger.error(f"Frame işleme hatası: {e}")
return faces_data
def process_video_chunk(self, frames: List[np.ndarray]) -> List[Dict]:
all_faces = []
for frame in frames:
faces = self.process_frame(frame)
all_faces.extend(faces)
return all_faces
def detect_faces(self, video_path: str, is_url: bool = False):
start_time = time.time()
original_path = video_path
downloaded_path = None
try:
if is_url:
downloaded_path = self.download_video_from_url(video_path)
video_path = downloaded_path
logger.info(f"URL'den indirilen video kullanılıyor: {video_path}")
# Video dosyasının varlığını kontrol et
if not os.path.exists(video_path):
raise ValueError(f"Video dosyası bulunamadı: {video_path}")
file_size = os.path.getsize(video_path)
if file_size == 0:
raise ValueError(f"Video dosyası boş: {video_path}")
logger.info(f"Video dosyası kontrol edildi: {video_path} ({file_size / 1024 / 1024:.1f}MB)")
if self.models is None:
self.models = self._load_models()
output_dir = self.create_output_directory(video_path if not is_url else tempfile.gettempdir(), is_temp=is_url)
metadata = {
'video_path': original_path,
'is_url': is_url,
'processing_start': datetime.now().isoformat(),
'config': vars(self.config),
'faces': []
}
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Video açılamadı: {video_path}. Dosya bozuk veya desteklenmeyen format olabilir.")
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
duration = total_frames / fps if fps > 0 else 0
logger.info(f"Video: {total_frames} frame, {fps} FPS, {duration:.1f} saniye")
progress_offset = 20 if is_url else 0
max_progress = 80 if is_url else 100
if self.progress_callback:
self.progress_callback(progress_offset, f"Video açıldı: {total_frames} frame")
current_frames = []
all_faces_data = []
frame_count = 0
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count % self.config.frame_skip == 0:
current_frames.append(frame)
if len(current_frames) >= self.config.chunk_size:
future = executor.submit(self.process_video_chunk, current_frames)
all_faces_data.extend(future.result())
current_frames = []
if frame_count % 500 == 0:
progress = (frame_count / total_frames) * 100
if self.progress_callback:
adjusted_progress = progress_offset + (progress / 2) * ((max_progress - progress_offset) / 100)
self.progress_callback(
adjusted_progress,
f"Frame işleniyor: {frame_count}/{total_frames} ({progress:.1f}%)"
)
if current_frames:
future = executor.submit(self.process_video_chunk, current_frames)
all_faces_data.extend(future.result())
cap.release()
if not all_faces_data:
raise ValueError("Hiç yüz bulunamadı!")
clustering_progress = progress_offset + (max_progress - progress_offset) * 0.6
if self.progress_callback:
self.progress_callback(clustering_progress, f"{len(all_faces_data)} yüz tespit edildi, clustering yapılıyor...")
embeddings_array = np.array([face['embedding'] for face in all_faces_data])
clustering = DBSCAN(
eps=self.config.clustering_eps,
min_samples=self.config.min_samples,
metric='cosine'
).fit(embeddings_array)
labels = clustering.labels_
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
saving_progress = progress_offset + (max_progress - progress_offset) * 0.8
if self.progress_callback:
self.progress_callback(saving_progress, f"{n_clusters} benzersiz kişi tespit edildi, yüzler kaydediliyor...")
saved_faces = []
for cluster_id in range(n_clusters):
cluster_indices = np.where(labels == cluster_id)[0]
cluster_faces = [all_faces_data[i] for i in cluster_indices]
best_face = max(cluster_faces, key=lambda x: x['quality_score'])
face_img_resized = cv2.resize(best_face['face_img'], (112, 112))
face_file = f"person_{cluster_id}.jpg"
face_path = os.path.join(output_dir, face_file)
cv2.imwrite(face_path, face_img_resized, [cv2.IMWRITE_JPEG_QUALITY, 95])
saved_faces.append(face_path)
metadata['faces'].append({
'cluster_id': cluster_id,
'face_file': face_file,
'quality_score': float(best_face['quality_score']),
'bbox': best_face['bbox'],
'cluster_size': len(cluster_indices)
})
elapsed_time = time.time() - start_time
metadata['processing_end'] = datetime.now().isoformat()
metadata['elapsed_time'] = elapsed_time
metadata['total_frames'] = total_frames
metadata['fps'] = fps
metadata['duration'] = duration
metadata['unique_persons'] = n_clusters
metadata_path = os.path.join(output_dir, 'metadata.json')
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
if self.progress_callback:
self.progress_callback(100, f"✅ Tamamlandı! {n_clusters} kişi bulundu ({elapsed_time:.1f}s)")
return output_dir, saved_faces, metadata
except Exception as e:
logger.error(f"İşlem hatası: {e}")
raise
finally:
if is_url:
self.cleanup_temp_files()
detector_instance = None
comparator_instance = None
def initialize_detector(frame_skip, face_threshold, clustering_eps, use_gpu):
global detector_instance
config = FaceDetectionConfig(
frame_skip=frame_skip,
face_size_threshold=face_threshold,
clustering_eps=clustering_eps,
use_gpu=use_gpu
)
detector_instance = FaceDetector(config)
return "✅ Ayarlar kaydedildi!"
def initialize_comparator():
"""Video karşılaştırıcı instance'ını başlat"""
global comparator_instance, detector_instance
if detector_instance is None:
detector_instance = FaceDetector(FaceDetectionConfig())
if comparator_instance is None:
comparator_instance = VideoComparator(
face_detector=detector_instance,
similarity_threshold=0.6 # %60 benzerlik eşiği
)
return comparator_instance
def process_video_gradio(video_file, video_url, progress=gr.Progress()):
global detector_instance
if detector_instance is None:
detector_instance = FaceDetector(FaceDetectionConfig())
def update_progress(value, message):
progress(value / 100, desc=message)
detector_instance.set_progress_callback(update_progress)
try:
progress(0, desc="İşlem başlatılıyor...")
if IS_HUGGINGFACE and video_url and video_url.strip():
return [], "❌ Hugging Face Spaces'te URL/YouTube desteği bulunmamaktadır. Lütfen video dosyası yükleyin.", "❌ URL desteği yok"
if video_url and video_url.strip():
video_source = video_url.strip()
is_url = True
source_name = urlparse(video_url).path.split('/')[-1] or "video"
logger.info(f"URL kullanılıyor: {video_url}")
# YouTube mu kontrol et
if detector_instance.is_youtube_url(video_url):
if not YOUTUBE_SUPPORT:
return [], "❌ YouTube desteği için paket kurulmalı", "❌ paket kurulu değil"
logger.info("YouTube URL tespit edildi")
elif video_file:
video_source = video_file
is_url = False
source_name = os.path.basename(video_file)
logger.info(f"Yerel dosya kullanılıyor: {video_file}")
else:
return [], "❌ Lütfen bir video yükleyin veya URL girin!", "❌ Video bulunamadı"
# URL test (YouTube değilse)
if is_url and not detector_instance.is_youtube_url(video_source):
try:
head_response = requests.head(video_source, timeout=10, allow_redirects=True)
logger.info(f"URL test - Status: {head_response.status_code}, Content-Type: {head_response.headers.get('content-type', 'unknown')}")
if head_response.status_code != 200:
return [], f"❌ URL erişilemez (HTTP {head_response.status_code})", "❌ URL hatası"
except Exception as e:
logger.warning(f"URL test başarısız: {e}, yine de deneniyor...")
# Video süresini kontrol et (detect_faces çağrılmadan önce)
if not is_url:
cap = cv2.VideoCapture(video_source)
if cap.isOpened():
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
duration = total_frames / fps if fps > 0 else 0
cap.release()
if duration > 300: # 5 dakika limiti
return [], f"⚠️ Video çok uzun ({duration:.0f} saniye)! CPU modunda maksimum 5 dakika (300 saniye) desteklenir.", "❌ Süre limiti aşıldı"
output_dir, saved_faces, metadata = detector_instance.detect_faces(video_source, is_url=is_url)
# URL'den indirilen videolar için de süre kontrolü
if is_url and metadata['duration'] > 300:
return [], f"⚠️ Video çok uzun ({metadata['duration']:.0f} saniye)! CPU modunda maksimum 5 dakika desteklenir.", "❌ Süre limiti aşıldı"
report = f"""
# 📊 İşlem Raporu
## Genel Bilgiler
- **Video**: {source_name}
- **Kaynak**: {'🌐 URL' if is_url else '📁 Yerel Dosya'}
- **Süre**: {metadata['duration']:.1f} saniye
- **FPS**: {metadata['fps']}
- **Toplam Frame**: {metadata['total_frames']}
- **İşlem Süresi**: {metadata['elapsed_time']:.1f} saniye
## Tespit Sonuçları
- **Benzersiz Kişi**: {metadata['unique_persons']}
- **Toplam Yüz Tespiti**: {sum(f['cluster_size'] for f in metadata['faces'])}
## Kişi Detayları
"""
for face in metadata['faces']:
report += f"\n### Kişi {face['cluster_id']}\n"
report += f"- Kalite Skoru: {face['quality_score']:.2f}\n"
report += f"- Görülme Sayısı: {face['cluster_size']}\n"
return saved_faces, report, f"✅ Başarılı! Çıktı: {output_dir}"
except Exception as e:
error_msg = f"❌ Hata: {str(e)}"
logger.error(error_msg)
return [], error_msg, error_msg
def compare_two_faces(face1, face2):
global detector_instance
if detector_instance is None:
detector_instance = FaceDetector(FaceDetectionConfig())
detector_instance.models = detector_instance._load_models()
try:
img1 = cv2.imread(face1) if isinstance(face1, str) else cv2.cvtColor(face1, cv2.COLOR_RGB2BGR)
img2 = cv2.imread(face2) if isinstance(face2, str) else cv2.cvtColor(face2, cv2.COLOR_RGB2BGR)
emb1, _ = detector_instance.extract_embeddings(img1)
emb2, _ = detector_instance.extract_embeddings(img2)
if emb1 is None or emb2 is None:
return "❌ Yüz tespit edilemedi!"
similarity = cosine_similarity([emb1], [emb2])[0][0]
percentage = similarity * 100
if percentage > 70:
result = f"✅ Aynı Kişi ({percentage:.1f}% benzerlik)"
elif percentage > 50:
result = f"⚠️ Muhtemelen Aynı Kişi ({percentage:.1f}% benzerlik)"
else:
result = f"❌ Farklı Kişiler ({percentage:.1f}% benzerlik)"
return result
except Exception as e:
return f"❌ Hata: {str(e)}"
def compare_videos_gradio(video1, video2, url1, url2, similarity_threshold, progress=gr.Progress()):
"""
Gradio callback: İki videoyu karşılaştır
"""
global comparator_instance
# Comparator'ı başlat
comparator = initialize_comparator()
comparator.similarity_threshold = similarity_threshold
def update_progress(value, message):
progress(value / 100, desc=message)
comparator.set_progress_callback(update_progress)
try:
progress(0, desc="Videolar hazırlanıyor...")
# Video 1 kaynağını belirle
if url1 and url1.strip() and not IS_HUGGINGFACE:
video1_source = url1.strip()
is_v1_url = True
elif video1:
video1_source = video1
is_v1_url = False
else:
return [], "❌ Lütfen Video 1 yükleyin!", "❌ Video 1 eksik"
# Video 2 kaynağını belirle
if url2 and url2.strip() and not IS_HUGGINGFACE:
video2_source = url2.strip()
is_v2_url = True
elif video2:
video2_source = video2
is_v2_url = False
else:
return [], "❌ Lütfen Video 2 yükleyin!", "❌ Video 2 eksik"
# Hugging Face kontrolü
if IS_HUGGINGFACE and (is_v1_url or is_v2_url):
return [], "❌ Hugging Face Spaces'te URL desteği yok!", "❌ URL desteği yok"
# Karşılaştırmayı yap
result, output_dir, saved_images = comparator.compare_videos(
video1_source,
video2_source,
is_video1_url=is_v1_url,
is_video2_url=is_v2_url
)
# Rapor oluştur
report = comparator.generate_report(result)
# Özet mesaj
summary = f"""
✅ **Karşılaştırma Tamamlandı!**
🎯 **Sonuçlar:**
- Ortak Kişi: **{result.common_count}**
- Sadece Video 1: **{len(result.only_video1)}**
- Sadece Video 2: **{len(result.only_video2)}**
📁 Çıktı: {output_dir}
"""
return saved_images, report, summary
except Exception as e:
error_msg = f"❌ Hata: {str(e)}"
logger.error(error_msg, exc_info=True)
return [], error_msg, error_msg
with gr.Blocks(title="Yüz Tanıma Sistemi", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🎭 Video Yüz Tanıma Sistemi
Video dosyalarından otomatik yüz tespiti ve tanıma yapın
⚠️ **CPU Modunda Çalışıyor**: İşlem süresi uzun olabilir (5 dk video = ~10-15 dk)
""")
with gr.Tabs():
with gr.Tab("📹 Video İşle"):
gr.Markdown("### Video kaynağını seçin:")
with gr.Row():
with gr.Column():
video_input = gr.Video(label="📁 Yerel Video Yükle",height=300)
if not IS_HUGGINGFACE:
gr.Markdown("**VEYA**")
url_input = gr.Textbox(
label="🌐 Video URL'si",
placeholder="https://example.com/video.mp4 veya YouTube linki",
lines=1,
interactive=True
)
gr.Markdown("*URL girilirse öncelikle o kullanılır*")
else:
url_input = gr.Textbox(
label="🌐 Video URL'si (Devre Dışı)",
placeholder="Hugging Face Spaces'te URL desteği yok",
lines=1,
interactive=False,
visible=True
)
process_btn = gr.Button("🚀 İşlemi Başlat", variant="primary", size="lg")
status_text = gr.Textbox(label="Durum", interactive=False)
with gr.Column(scale=1):
gallery_output = gr.Gallery(label="Tespit Edilen Yüzler", columns=4, height=500,object_fit="contain")
report_output = gr.Markdown(label="Rapor")
gr.Markdown("""
#### 💡 URL Örnekleri:
- **YouTube**: `https://www.youtube.com/watch?v=xxxxx` veya `https://youtu.be/xxxxx` veya Shorts
- **Doğrudan video**: `https://example.com/video.mp4`
- Google Drive paylaşım linki çalışmaz (direkt indirme linki gerekir)
- **Desteklenen formatlar**: MP4, AVI, MOV, MKV, WebM
""")
process_btn.click(
fn=process_video_gradio,
inputs=[video_input, url_input],
outputs=[gallery_output, report_output, status_text]
)
with gr.Tab("🔍 Yüz Karşılaştır"):
gr.Markdown("İki yüz görselini yükleyin ve benzerliklerini kontrol edin")
with gr.Row():
with gr.Column(scale=1):
face1_input = gr.Image(
label="Yüz 1",
type="filepath",
height=300,
width=300
)
with gr.Column(scale=1):
face2_input = gr.Image(
label="Yüz 2",
type="filepath",
height=300,
width=300
)
compare_btn = gr.Button("⚖️ Karşılaştır", variant="primary")
compare_result = gr.Textbox(label="Sonuç", interactive=False, lines=3)
compare_btn.click(
fn=compare_two_faces,
inputs=[face1_input, face2_input],
outputs=compare_result
)
with gr.Tab("🎬 Video Karşılaştır"):
gr.Markdown("""
## İki Videoyu Karşılaştır
İki farklı videodan yüz tespiti yaparak ortak kişileri bulur
""")
# Hugging Face uyarısı
if IS_HUGGINGFACE:
gr.Markdown("""
⚠️ **Hugging Face Spaces Modunda**: URL/YouTube desteği kapalı.
Sadece dosya yükleme kullanılabilir.
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📹 Video 1")
video1_input = gr.Video(
label="Video 1 Yükle",
height=250,
show_label=True
)
if not IS_HUGGINGFACE:
gr.Markdown("**VEYA**")
url1_input = gr.Textbox(
label="Video 1 URL",
placeholder="https://example.com/video1.mp4",
lines=1,
interactive=True
)
else:
url1_input = gr.Textbox(
label="Video 1 URL (Devre Dışı)",
placeholder="HF Spaces'te URL desteği yok",
lines=1,
interactive=False
)
with gr.Column(scale=1):
gr.Markdown("### 📹 Video 2")
video2_input = gr.Video(
label="Video 2 Yükle",
height=250,
show_label=True
)
if not IS_HUGGINGFACE:
gr.Markdown("**VEYA**")
url2_input = gr.Textbox(
label="Video 2 URL",
placeholder="https://example.com/video2.mp4",
lines=1,
interactive=True
)
else:
url2_input = gr.Textbox(
label="Video 2 URL (Devre Dışı)",
placeholder="HF Spaces'te URL desteği yok",
lines=1,
interactive=False
)
# Ayarlar
with gr.Row():
similarity_slider = gr.Slider(
minimum=0.4,
maximum=0.9,
value=0.6,
step=0.05,
label="🎯 Benzerlik Eşiği (Düşük = Daha fazla eşleşme)",
info="İki yüzün aynı kişi olarak kabul edilmesi için minimum benzerlik oranı"
)
compare_videos_btn = gr.Button("🔍 Videoları Karşılaştır", variant="primary", size="lg")
status_compare = gr.Textbox(label="Durum", interactive=False, lines=3)
gr.Markdown("### 📊 Sonuçlar")
with gr.Row():
with gr.Column(scale=2):
gallery_compare = gr.Gallery(
label="Ortak Yüzler (Sol: Video 1, Sağ: Video 2)",
columns=2,
height=500,
object_fit="contain"
)
with gr.Column(scale=1):
report_compare = gr.Markdown(label="Detaylı Rapor")
gr.Markdown("""
### 💡 Nasıl Çalışır?
1. **İki video yükleyin** (veya URL girin)
2. **Benzerlik eşiğini ayarlayın** (varsayılan %60)
3. **Karşılaştır** butonuna tıklayın
4. Sistem her iki videodan yüz tespiti yapar ve ortak kişileri bulur
### 📈 Sonuç Kategorileri
- **Ortak Yüzler**: Her iki videoda da görünen kişiler (yan yana gösterilir)
- **Sadece Video 1**: Yalnızca ilk videoda görünen kişiler
- **Sadece Video 2**: Yalnızca ikinci videoda görünen kişiler
""")
# Event handler
compare_videos_btn.click(
fn=compare_videos_gradio,
inputs=[
video1_input,
video2_input,
url1_input,
url2_input,
similarity_slider
],
outputs=[
gallery_compare,
report_compare,
status_compare
]
)
with gr.Tab("⚙️ Ayarlar"):
gr.Markdown("### Gelişmiş Ayarlar")
frame_skip_slider = gr.Slider(20, 60, value=30, step=5,
label="Frame Atlama (yüksek = daha hızlı)")
face_threshold_slider = gr.Slider(600, 2000, value=1000, step=100,
label="Minimum Yüz Boyutu (piksel)")
clustering_slider = gr.Slider(0.3, 0.7, value=0.5, step=0.05,
label="Clustering Hassasiyeti")
save_settings_btn = gr.Button("💾 Ayarları Kaydet")
settings_status = gr.Textbox(label="Durum", interactive=False)
save_settings_btn.click(
fn=initialize_detector,
inputs=[frame_skip_slider, face_threshold_slider, clustering_slider],
outputs=settings_status
)
gr.Markdown("""
---
### 💡 İpuçları
- **Frame Atlama**: Daha hızlı işlem için artırın, daha fazla tespit için azaltın
- **Clustering**: Daha az kişi tespit ediyorsa artırın, fazla tespit ediyorsa azaltın
""")
if __name__ == "__main__":
print("\n" + "="*60)
print("🎬 Video Yüz Tanıma Sistemi")
print("="*60)
print("="*60 + "\n")
demo.launch(share=False, server_name="0.0.0.0", server_port=7860)