Spaces:
Running
on
T4
Running
on
T4
| # --------------------- List Images Endpoint --------------------- | |
| import os | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| import shutil | |
| import uuid | |
| import cv2 | |
| import numpy as np | |
| import threading | |
| import subprocess | |
| import logging | |
| import tempfile | |
| import sys | |
| from datetime import datetime,timedelta | |
| import tempfile | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| from huggingface_hub import hf_hub_download | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Depends, Security, Form | |
| from fastapi.responses import RedirectResponse | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| import httpx | |
| import uvicorn | |
| import gradio as gr | |
| from gradio import mount_gradio_app | |
| from PIL import Image | |
| import io | |
| # from scipy import ndimage | |
| # DigitalOcean Spaces | |
| import boto3 | |
| from botocore.client import Config | |
| from typing import Optional | |
| # --------------------- Logging --------------------- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --------------------- Secrets & Paths --------------------- | |
| REPO_ID = "HariLogicgo/face_swap_models" | |
| MODELS_DIR = "./models" | |
| os.makedirs(MODELS_DIR, exist_ok=True) | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| API_SECRET_TOKEN = os.getenv("API_SECRET_TOKEN") | |
| DO_SPACES_REGION = os.getenv("DO_SPACES_REGION", "blr1") | |
| DO_SPACES_ENDPOINT = f"https://{DO_SPACES_REGION}.digitaloceanspaces.com" | |
| DO_SPACES_KEY = os.getenv("DO_SPACES_KEY") | |
| DO_SPACES_SECRET = os.getenv("DO_SPACES_SECRET") | |
| DO_SPACES_BUCKET = os.getenv("DO_SPACES_BUCKET") | |
| # NEW admin DB | |
| ADMIN_MONGO_URL = os.getenv("ADMIN_MONGO_URL") | |
| admin_client = AsyncIOMotorClient(ADMIN_MONGO_URL) | |
| admin_db = admin_client.adminPanel | |
| subcategories_col = admin_db.subcategories | |
| media_clicks_col = admin_db.media_clicks | |
| # OLD logs DB | |
| MONGODB_URL = os.getenv("MONGODB_URL") | |
| client = None | |
| database = None | |
| # --------------------- Download Models --------------------- | |
| def download_models(): | |
| logger.info("Downloading models...") | |
| inswapper_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="models/inswapper_128.onnx", | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| token=HF_TOKEN | |
| ) | |
| buffalo_files = ["1k3d68.onnx", "2d106det.onnx", "genderage.onnx", "det_10g.onnx", "w600k_r50.onnx"] | |
| for f in buffalo_files: | |
| hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=f"models/buffalo_l/" + f, | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| token=HF_TOKEN | |
| ) | |
| logger.info("Models downloaded.") | |
| return inswapper_path | |
| inswapper_path = download_models() | |
| # --------------------- Face Analysis + Swapper --------------------- | |
| providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| face_analysis_app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers) | |
| face_analysis_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers) | |
| # --------------------- CodeFormer --------------------- | |
| CODEFORMER_PATH = "CodeFormer/inference_codeformer.py" | |
| def ensure_codeformer(): | |
| if not os.path.exists("CodeFormer"): | |
| subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True) | |
| subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=True) | |
| subprocess.run("python CodeFormer/basicsr/setup.py develop", shell=True, check=True) | |
| subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=True) | |
| subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=True) | |
| ensure_codeformer() | |
| # class NaturalFaceSwapper: | |
| # """Enhanced face swapping with natural blending techniques""" | |
| # def __init__(self, swapper, face_app): | |
| # self.swapper = swapper | |
| # self.face_app = face_app | |
| # def match_color_histogram(self, source, target, mask=None): | |
| # """Match color histogram of source to target for better blending""" | |
| # if mask is None: | |
| # mask = np.ones(source.shape[:2], dtype=np.uint8) * 255 | |
| # result = source.copy() | |
| # for i in range(3): # Process each channel | |
| # source_channel = source[:, :, i] | |
| # target_channel = target[:, :, i] | |
| # # Only use masked regions | |
| # source_masked = source_channel[mask > 0] | |
| # target_masked = target_channel[mask > 0] | |
| # if len(source_masked) > 0 and len(target_masked) > 0: | |
| # # Match histograms | |
| # matched = self._match_histogram_channel( | |
| # source_channel, source_masked, target_masked | |
| # ) | |
| # result[:, :, i] = matched | |
| # return result | |
| # def subtle_skin_smooth(img, strength=0.3, preserve_details=True): | |
| # """ | |
| # Subtle bilateral filter for natural skin smoothing | |
| # Args: | |
| # img: Input image (BGR format) | |
| # strength: Smoothing strength (0.1-0.5 recommended, default 0.3) | |
| # preserve_details: If True, uses edge-preserving filter | |
| # Returns: | |
| # Smoothed image | |
| # """ | |
| # if preserve_details: | |
| # # Bilateral filter preserves edges while smoothing | |
| # smoothed = cv2.bilateralFilter(img, d=9, sigmaColor=75, sigmaSpace=75) | |
| # else: | |
| # # Gaussian blur (faster but less detail preservation) | |
| # smoothed = cv2.GaussianBlur(img, (9, 9), 0) | |
| # # Blend with original | |
| # result = cv2.addWeighted(img, 1-strength, smoothed, strength, 0) | |
| # return result | |
| # def advanced_skin_smooth(img, strength=0.3): | |
| # """ | |
| # Advanced skin smoothing with frequency separation | |
| # Smooths skin while preserving pores and texture | |
| # Args: | |
| # img: Input image (BGR format) | |
| # strength: Smoothing strength (0.2-0.5 recommended) | |
| # Returns: | |
| # Smoothed image with preserved texture | |
| # """ | |
| # # Convert to float for better precision | |
| # img_float = img.astype(np.float32) / 255.0 | |
| # # Low frequency (color and tone) | |
| # low_freq = cv2.GaussianBlur(img_float, (0, 0), sigmaX=3, sigmaY=3) | |
| # # High frequency (details and texture) | |
| # high_freq = img_float - low_freq | |
| # # Smooth only the low frequency | |
| # low_freq_smoothed = cv2.bilateralFilter( | |
| # (low_freq * 255).astype(np.uint8), | |
| # d=9, | |
| # sigmaColor=75, | |
| # sigmaSpace=75 | |
| # ).astype(np.float32) / 255.0 | |
| # # Blend smoothed low frequency with original | |
| # low_freq_final = cv2.addWeighted(low_freq, 1-strength, low_freq_smoothed, strength, 0) | |
| # # Recombine with high frequency to preserve texture | |
| # result = low_freq_final + high_freq | |
| # result = np.clip(result * 255, 0, 255).astype(np.uint8) | |
| # return result | |
| # def skin_tone_aware_smooth(img, face_analysis_app, strength=0.3): | |
| # """ | |
| # Smooth only skin regions (more advanced) | |
| # Detects face and creates skin mask | |
| # Args: | |
| # img: Input image (BGR format) | |
| # face_analysis_app: InsightFace app for face detection | |
| # strength: Smoothing strength | |
| # Returns: | |
| # Image with skin-only smoothing | |
| # """ | |
| # # Detect faces to create skin mask | |
| # faces = face_analysis_app.get(img) | |
| # if not faces: | |
| # # No face detected, smooth entire image | |
| # return subtle_skin_smooth(img, strength) | |
| # # Create skin mask based on face regions | |
| # mask = np.zeros(img.shape[:2], dtype=np.uint8) | |
| # for face in faces: | |
| # x1, y1, x2, y2 = [int(v) for v in face.bbox] | |
| # # Expand bbox to include more skin area | |
| # padding_x = int((x2 - x1) * 0.2) | |
| # padding_y = int((y2 - y1) * 0.3) | |
| # x1 = max(0, x1 - padding_x) | |
| # y1 = max(0, y1 - padding_y) | |
| # x2 = min(img.shape[1], x2 + padding_x) | |
| # y2 = min(img.shape[0], y2 + padding_y) | |
| # # Create elliptical mask for natural look | |
| # center = ((x1 + x2) // 2, (y1 + y2) // 2) | |
| # axes = ((x2 - x1) // 2, (y2 - y1) // 2) | |
| # cv2.ellipse(mask, center, axes, 0, 0, 360, 255, -1) | |
| # # Blur mask for smooth transition | |
| # mask = cv2.GaussianBlur(mask, (31, 31), 0) | |
| # mask_float = mask.astype(float) / 255.0 | |
| # mask_3ch = np.stack([mask_float] * 3, axis=2) | |
| # # Apply smoothing | |
| # smoothed = cv2.bilateralFilter(img, 9, 75, 75) | |
| # # Blend only where mask is present | |
| # result = (smoothed * mask_3ch * strength + | |
| # img * (1 - mask_3ch * strength)).astype(np.uint8) | |
| # return result | |
| # def _match_histogram_channel(self, channel, source_vals, target_vals): | |
| # """Match histogram for single channel""" | |
| # # Compute CDFs | |
| # source_hist, _ = np.histogram(source_vals, 256, [0, 256]) | |
| # target_hist, _ = np.histogram(target_vals, 256, [0, 256]) | |
| # source_cdf = source_hist.cumsum() | |
| # target_cdf = target_hist.cumsum() | |
| # # Normalize | |
| # source_cdf = source_cdf / source_cdf[-1] | |
| # target_cdf = target_cdf / target_cdf[-1] | |
| # # Create mapping | |
| # mapping = np.zeros(256, dtype=np.uint8) | |
| # for i in range(256): | |
| # # Find closest value in target CDF | |
| # idx = np.argmin(np.abs(target_cdf - source_cdf[i])) | |
| # mapping[i] = idx | |
| # return mapping[channel] | |
| # def seamless_clone_blend(self, source, target, mask, center): | |
| # """Use Poisson blending for seamless integration""" | |
| # try: | |
| # # OpenCV's seamlessClone for natural blending | |
| # result = cv2.seamlessClone( | |
| # source, target, mask, center, | |
| # cv2.NORMAL_CLONE # Try MIXED_CLONE for different effect | |
| # ) | |
| # return result | |
| # except: | |
| # # Fallback to alpha blending if seamlessClone fails | |
| # return self.alpha_blend_with_feather(source, target, mask) | |
| # def alpha_blend_with_feather(self, source, target, mask, feather_amount=15): | |
| # """Alpha blend with feathered edges for smooth transition""" | |
| # # Create feathered mask | |
| # mask_float = mask.astype(float) / 255.0 | |
| # # Apply Gaussian blur for feathering | |
| # feathered_mask = cv2.GaussianBlur(mask_float, (feather_amount*2+1, feather_amount*2+1), 0) | |
| # feathered_mask = np.clip(feathered_mask, 0, 1) | |
| # # Expand mask to 3 channels | |
| # feathered_mask_3ch = np.stack([feathered_mask] * 3, axis=2) | |
| # # Blend | |
| # blended = (source * feathered_mask_3ch + | |
| # target * (1 - feathered_mask_3ch)).astype(np.uint8) | |
| # return blended | |
| # def laplacian_pyramid_blend(self, source, target, mask, levels=6): | |
| # """Multi-resolution blending using Laplacian pyramids""" | |
| # # Generate Gaussian pyramid for mask | |
| # mask_float = mask.astype(float) / 255.0 | |
| # gaussian_mask = [mask_float] | |
| # for i in range(levels): | |
| # mask_float = cv2.pyrDown(mask_float) | |
| # gaussian_mask.append(mask_float) | |
| # # Generate Laplacian pyramids | |
| # def build_laplacian_pyramid(img, levels): | |
| # gaussian = [img.astype(float)] | |
| # for i in range(levels): | |
| # img = cv2.pyrDown(img) | |
| # gaussian.append(img) | |
| # laplacian = [] | |
| # for i in range(levels): | |
| # size = (gaussian[i].shape[1], gaussian[i].shape[0]) | |
| # upsampled = cv2.pyrUp(gaussian[i + 1], dstsize=size) | |
| # laplacian.append(gaussian[i] - upsampled) | |
| # laplacian.append(gaussian[levels]) | |
| # return laplacian | |
| # lp_source = build_laplacian_pyramid(source, levels) | |
| # lp_target = build_laplacian_pyramid(target, levels) | |
| # # Blend each level | |
| # blended_pyramid = [] | |
| # for ls, lt, gm in zip(lp_source, lp_target, gaussian_mask): | |
| # # Resize mask if needed | |
| # if gm.shape[:2] != ls.shape[:2]: | |
| # gm = cv2.resize(gm, (ls.shape[1], ls.shape[0])) | |
| # gm_3ch = np.stack([gm] * 3, axis=2) | |
| # blended = ls * gm_3ch + lt * (1 - gm_3ch) | |
| # blended_pyramid.append(blended) | |
| # # Reconstruct | |
| # result = blended_pyramid[-1] | |
| # for i in range(levels - 1, -1, -1): | |
| # size = (blended_pyramid[i].shape[1], blended_pyramid[i].shape[0]) | |
| # result = cv2.pyrUp(result, dstsize=size) | |
| # result += blended_pyramid[i] | |
| # return np.clip(result, 0, 255).astype(np.uint8) | |
| # def match_lighting(self, swapped_face, target_img, face_bbox): | |
| # """Match lighting conditions between swapped face and target""" | |
| # x1, y1, x2, y2 = [int(v) for v in face_bbox] | |
| # # Extract face region from target | |
| # target_face = target_img[y1:y2, x1:x2] | |
| # if target_face.size == 0 or swapped_face.size == 0: | |
| # return swapped_face | |
| # # Resize if needed | |
| # if swapped_face.shape[:2] != target_face.shape[:2]: | |
| # target_face = cv2.resize(target_face, | |
| # (swapped_face.shape[1], swapped_face.shape[0])) | |
| # # Convert to LAB color space | |
| # swapped_lab = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2LAB).astype(float) | |
| # target_lab = cv2.cvtColor(target_face, cv2.COLOR_BGR2LAB).astype(float) | |
| # # Match mean and std of L channel (luminance) | |
| # swapped_l = swapped_lab[:, :, 0] | |
| # target_l = target_lab[:, :, 0] | |
| # swapped_l_mean, swapped_l_std = swapped_l.mean(), swapped_l.std() | |
| # target_l_mean, target_l_std = target_l.mean(), target_l.std() | |
| # if swapped_l_std > 0: | |
| # swapped_lab[:, :, 0] = ((swapped_l - swapped_l_mean) / swapped_l_std * | |
| # target_l_std + target_l_mean) | |
| # # Convert back | |
| # result = cv2.cvtColor(swapped_lab.astype(np.uint8), cv2.COLOR_LAB2BGR) | |
| # return result | |
| # def adjust_face_mask(self, mask, erosion=3, dilation=5): | |
| # """Adjust mask to avoid harsh edges""" | |
| # # Slightly erode to avoid edge artifacts | |
| # kernel_erode = np.ones((erosion, erosion), np.uint8) | |
| # mask = cv2.erode(mask, kernel_erode, iterations=1) | |
| # # Then dilate to smooth | |
| # kernel_dilate = np.ones((dilation, dilation), np.uint8) | |
| # mask = cv2.dilate(mask, kernel_dilate, iterations=1) | |
| # # Gaussian blur for soft edges | |
| # mask = cv2.GaussianBlur(mask, (15, 15), 0) | |
| # return mask | |
| # def natural_face_swap(self, src_img, tgt_img, use_laplacian=True): | |
| # """ | |
| # Complete natural face swap pipeline | |
| # Args: | |
| # src_img: Source image (RGB) | |
| # tgt_img: Target image (RGB) | |
| # use_laplacian: Use Laplacian pyramid blending (slower but better) | |
| # Returns: | |
| # Naturally blended face-swapped image | |
| # """ | |
| # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| # # Detect faces | |
| # src_faces = self.face_app.get(src_bgr) | |
| # tgt_faces = self.face_app.get(tgt_bgr) | |
| # if not src_faces or not tgt_faces: | |
| # raise ValueError("No faces detected") | |
| # # Get largest faces | |
| # src_face = max(src_faces, key=lambda f: (f.bbox[2]-f.bbox[0])*(f.bbox[3]-f.bbox[1])) | |
| # tgt_face = max(tgt_faces, key=lambda f: (f.bbox[2]-f.bbox[0])*(f.bbox[3]-f.bbox[1])) | |
| # # Perform basic swap | |
| # swapped_bgr = self.swapper.get(tgt_bgr, tgt_face, src_face, paste_back=True) | |
| # # Create face mask | |
| # x1, y1, x2, y2 = [int(v) for v in tgt_face.bbox] | |
| # mask = np.zeros(tgt_bgr.shape[:2], dtype=np.uint8) | |
| # # Use landmarks for better mask if available | |
| # if hasattr(tgt_face, 'kps') and tgt_face.kps is not None: | |
| # kps = tgt_face.kps.astype(np.int32) | |
| # hull = cv2.convexHull(kps) | |
| # cv2.fillConvexPoly(mask, hull, 255) | |
| # else: | |
| # # Fallback to bbox with some padding | |
| # padding = int((x2 - x1) * 0.1) | |
| # cv2.ellipse(mask, | |
| # ((x1 + x2) // 2, (y1 + y2) // 2), | |
| # ((x2 - x1) // 2 + padding, (y2 - y1) // 2 + padding), | |
| # 0, 0, 360, 255, -1) | |
| # # Adjust mask for softer edges | |
| # mask = self.adjust_face_mask(mask) | |
| # # Color histogram matching | |
| # swapped_bgr = self.match_color_histogram(swapped_bgr, tgt_bgr, mask) | |
| # # Lighting adjustment | |
| # swapped_face_region = swapped_bgr[y1:y2, x1:x2] | |
| # adjusted_face = self.match_lighting(swapped_face_region, tgt_bgr, tgt_face.bbox) | |
| # swapped_bgr[y1:y2, x1:x2] = adjusted_face | |
| # # Final blending | |
| # if use_laplacian: | |
| # # Best quality but slower | |
| # result = self.laplacian_pyramid_blend(swapped_bgr, tgt_bgr, mask) | |
| # else: | |
| # # Faster alternative: Seamless cloning | |
| # center = ((x1 + x2) // 2, (y1 + y2) // 2) | |
| # result = self.seamless_clone_blend(swapped_bgr, tgt_bgr, mask, center) | |
| # return cv2.cvtColor(result, cv2.COLOR_BGR2RGB) | |
| # # ============================================ | |
| # # Integration into your existing code | |
| # # ============================================ | |
| # def enhanced_face_swap_and_enhance(src_img, tgt_img, swapper, face_app, temp_dir=None): | |
| # """ | |
| # Enhanced version of your face_swap_and_enhance function | |
| # """ | |
| # try: | |
| # # Initialize natural swapper | |
| # natural_swapper = NaturalFaceSwapper(swapper, face_app) | |
| # # Perform natural swap | |
| # swapped_rgb = natural_swapper.natural_face_swap( | |
| # src_img, tgt_img, | |
| # use_laplacian=True # Set False for faster processing | |
| # ) | |
| # # Apply CodeFormer enhancement | |
| # enhanced_rgb = enhance_image_with_codeformer(swapped_rgb, temp_dir) | |
| # # Post-enhancement sharpening (subtle) | |
| # kernel_sharpen = np.array([[-0.5, -0.5, -0.5], | |
| # [-0.5, 5.0, -0.5], | |
| # [-0.5, -0.5, -0.5]]) * 0.3 | |
| # enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| # sharpened = cv2.filter2D(enhanced_bgr, -1, kernel_sharpen) | |
| # # Blend sharpened with original (60% sharp, 40% original) | |
| # final_bgr = cv2.addWeighted(sharpened, 0.6, enhanced_bgr, 0.4, 0) | |
| # final_rgb = cv2.cvtColor(final_bgr, cv2.COLOR_BGR2RGB) | |
| # # Save result | |
| # if temp_dir is None: | |
| # temp_dir = os.path.join(tempfile.gettempdir(), f"faceswap_{uuid.uuid4().hex[:8]}") | |
| # os.makedirs(temp_dir, exist_ok=True) | |
| # final_path = os.path.join(temp_dir, "enhanced.png") | |
| # cv2.imwrite(final_path, final_bgr) | |
| # return final_rgb, final_path, "" | |
| # except Exception as e: | |
| # return None, None, f"❌ Error: {str(e)}" | |
| # --------------------- FastAPI --------------------- | |
| fastapi_app = FastAPI() | |
| async def startup_db(): | |
| global client, database | |
| logger.info("Initializing MongoDB for API logs...") | |
| client = AsyncIOMotorClient(MONGODB_URL) | |
| database = client.FaceSwap | |
| logger.info("MongoDB initialized for API logs") | |
| async def shutdown_db(): | |
| global client | |
| if client: | |
| client.close() | |
| logger.info("MongoDB connection closed") | |
| # --------------------- Auth --------------------- | |
| security = HTTPBearer() | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| if credentials.credentials != API_SECRET_TOKEN: | |
| raise HTTPException(status_code=401, detail="Invalid or missing token") | |
| return credentials.credentials | |
| # --------------------- Logging API Hits --------------------- | |
| async def log_faceswap_hit(token: str, status: str = "success"): | |
| global database | |
| if database is None: | |
| return | |
| await database.api_logs.insert_one({ | |
| "token": token, | |
| "endpoint": "/faceswap", | |
| "status": status, | |
| "timestamp": datetime.utcnow() | |
| }) | |
| # --------------------- Face Swap Pipeline --------------------- | |
| swap_lock = threading.Lock() | |
| def enhance_image_with_codeformer(rgb_img, temp_dir=None): | |
| if temp_dir is None: | |
| temp_dir = os.path.join(tempfile.gettempdir(), f"enhance_{uuid.uuid4().hex[:8]}") | |
| os.makedirs(temp_dir, exist_ok=True) | |
| input_path = os.path.join(temp_dir, "input.jpg") | |
| cv2.imwrite(input_path, cv2.cvtColor(rgb_img, cv2.COLOR_RGB2BGR)) | |
| python_cmd = sys.executable if sys.executable else "python3" | |
| cmd = ( | |
| f"{python_cmd} {CODEFORMER_PATH} " | |
| f"-w 0.7 " | |
| f"--input_path {input_path} " | |
| f"--output_path {temp_dir} " | |
| f"--bg_upsampler realesrgan " | |
| f"--face_upsample" | |
| ) | |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise RuntimeError(result.stderr) | |
| final_dir = os.path.join(temp_dir, "final_results") | |
| files = [f for f in os.listdir(final_dir) if f.endswith(".png")] | |
| if not files: | |
| raise RuntimeError("No enhanced output") | |
| final_path = os.path.join(final_dir, files[0]) | |
| enhanced = cv2.imread(final_path) | |
| return cv2.cvtColor(enhanced, cv2.COLOR_BGR2RGB) | |
| def multi_face_swap(src_img, tgt_img): | |
| src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| src_faces = face_analysis_app.get(src_bgr) | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| if not src_faces or not tgt_faces: | |
| raise ValueError("No faces detected") | |
| def face_sort_key(face): | |
| x1, y1, x2, y2 = face.bbox | |
| area = (x2 - x1) * (y2 - y1) | |
| cx = (x1 + x2) / 2 | |
| return (-area, cx) | |
| # Split by gender | |
| src_male = [f for f in src_faces if f.gender == 1] | |
| src_female = [f for f in src_faces if f.gender == 0] | |
| tgt_male = [f for f in tgt_faces if f.gender == 1] | |
| tgt_female = [f for f in tgt_faces if f.gender == 0] | |
| # Sort inside gender groups | |
| src_male = sorted(src_male, key=face_sort_key) | |
| src_female = sorted(src_female, key=face_sort_key) | |
| tgt_male = sorted(tgt_male, key=face_sort_key) | |
| tgt_female = sorted(tgt_female, key=face_sort_key) | |
| # Build final swap pairs | |
| pairs = [] | |
| for s, t in zip(src_male, tgt_male): | |
| pairs.append((s, t)) | |
| for s, t in zip(src_female, tgt_female): | |
| pairs.append((s, t)) | |
| # Fallback if gender mismatch | |
| if not pairs: | |
| src_faces = sorted(src_faces, key=face_sort_key) | |
| tgt_faces = sorted(tgt_faces, key=face_sort_key) | |
| pairs = list(zip(src_faces, tgt_faces)) | |
| result_img = tgt_bgr.copy() | |
| for src_face, _ in pairs: | |
| # 🔁 re-detect current target faces | |
| current_faces = face_analysis_app.get(result_img) | |
| current_faces = sorted(current_faces, key=face_sort_key) | |
| # choose best matching gender | |
| candidates = [ | |
| f for f in current_faces if f.gender == src_face.gender | |
| ] or current_faces | |
| target_face = candidates[0] | |
| result_img = swapper.get( | |
| result_img, | |
| target_face, | |
| src_face, | |
| paste_back=True | |
| ) | |
| return cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| def face_swap_and_enhance(src_img, tgt_img, temp_dir=None): | |
| try: | |
| with swap_lock: | |
| # Use a temp dir for intermediate files | |
| if temp_dir is None: | |
| temp_dir = os.path.join(tempfile.gettempdir(), f"faceswap_work_{uuid.uuid4().hex[:8]}") | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| os.makedirs(temp_dir, exist_ok=True) | |
| src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| src_faces = face_analysis_app.get(src_bgr) | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| if not src_faces or not tgt_faces: | |
| return None, None, "❌ Face not detected in one of the images" | |
| swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg") | |
| swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0]) | |
| if swapped_bgr is None: | |
| return None, None, "❌ Face swap failed" | |
| cv2.imwrite(swapped_path, swapped_bgr) | |
| python_cmd = sys.executable if sys.executable else "python3" | |
| cmd = f"{python_cmd} {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample" | |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| return None, None, f"❌ CodeFormer failed:\n{result.stderr}" | |
| final_results_dir = os.path.join(temp_dir, "final_results") | |
| final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")] | |
| if not final_files: | |
| return None, None, "❌ No enhanced image found" | |
| final_path = os.path.join(final_results_dir, final_files[0]) | |
| final_img_bgr = cv2.imread(final_path) | |
| if final_img_bgr is None: | |
| return None, None, "❌ Failed to read enhanced image file" | |
| final_img = cv2.cvtColor(final_img_bgr, cv2.COLOR_BGR2RGB) | |
| return final_img, final_path, "" | |
| except Exception as e: | |
| return None, None, f"❌ Error: {str(e)}" | |
| def compress_image( | |
| image_bytes: bytes, | |
| max_size=(1280, 1280), # max width/height | |
| quality=75 # JPEG quality (60–80 is ideal) | |
| ) -> bytes: | |
| """ | |
| Compress image by resizing and lowering quality. | |
| Returns compressed image bytes. | |
| """ | |
| img = Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| # Resize while maintaining aspect ratio | |
| img.thumbnail(max_size, Image.LANCZOS) | |
| output = io.BytesIO() | |
| img.save( | |
| output, | |
| format="JPEG", | |
| quality=quality, | |
| optimize=True, | |
| progressive=True | |
| ) | |
| return output.getvalue() | |
| # --------------------- DigitalOcean Spaces Helper --------------------- | |
| def get_spaces_client(): | |
| session = boto3.session.Session() | |
| client = session.client( | |
| 's3', | |
| region_name=DO_SPACES_REGION, | |
| endpoint_url=DO_SPACES_ENDPOINT, | |
| aws_access_key_id=DO_SPACES_KEY, | |
| aws_secret_access_key=DO_SPACES_SECRET, | |
| config=Config(signature_version='s3v4') | |
| ) | |
| return client | |
| def upload_to_spaces(file_bytes, key, content_type="image/png"): | |
| client = get_spaces_client() | |
| client.put_object(Bucket=DO_SPACES_BUCKET, Key=key, Body=file_bytes, ContentType=content_type, ACL='public-read') | |
| return f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{key}" | |
| def download_from_spaces(key): | |
| client = get_spaces_client() | |
| obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=key) | |
| return obj['Body'].read() | |
| def build_multi_faceswap_gradio(): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## 👩❤️👨 Multi Face Swap (Couple → Couple)") | |
| with gr.Row(): | |
| src = gr.Image(type="numpy", label="Source Image (2 Faces)") | |
| tgt = gr.Image(type="numpy", label="Target Image (2 Faces)") | |
| out = gr.Image(type="numpy", label="Swapped Result") | |
| error = gr.Textbox(label="Logs", interactive=False) | |
| def process(src_img, tgt_img): | |
| try: | |
| swapped = multi_face_swap(src_img, tgt_img) | |
| enhanced = enhance_image_with_codeformer(swapped) | |
| return enhanced, "" | |
| except Exception as e: | |
| return None, str(e) | |
| btn = gr.Button("Swap Faces") | |
| btn.click(process, [src, tgt], [out, error]) | |
| return demo | |
| def mandatory_enhancement(rgb_img): | |
| """ | |
| Always runs CodeFormer on the final image. | |
| Fail-safe: returns original if enhancement fails. | |
| """ | |
| try: | |
| return enhance_image_with_codeformer(rgb_img) | |
| except Exception as e: | |
| logger.error(f"CodeFormer failed, returning original: {e}") | |
| return rgb_img | |
| # --------------------- API Endpoints --------------------- | |
| async def root(): | |
| """Root endpoint""" | |
| return { | |
| "success": True, | |
| "message": "FaceSwap API", | |
| "data": { | |
| "version": "1.0.0", | |
| "Product Name":"Beauty Camera - GlowCam AI Studio", | |
| "Released By" : "LogicGo Infotech" | |
| } | |
| } | |
| async def health(): | |
| return {"status": "healthy"} | |
| from fastapi import Form | |
| import requests | |
| async def test_admin_db(): | |
| try: | |
| doc = await admin_db.list_collection_names() | |
| return {"ok": True, "collections": doc} | |
| except Exception as e: | |
| return {"ok": False, "error": str(e), "url": ADMIN_MONGO_URL} | |
| async def face_swap_api( | |
| source: UploadFile = File(...), | |
| target_category_id: str = Form(None), | |
| new_category_id: str = Form(None), | |
| user_id: Optional[str] = Form(None), | |
| credentials: HTTPAuthorizationCredentials = Security(security) | |
| ): | |
| start_time = datetime.utcnow() | |
| try: | |
| # ------------------------------------------------------------------ | |
| # VALIDATION | |
| # ------------------------------------------------------------------ | |
| # -------------------------------------------------------------- | |
| # BACKWARD COMPATIBILITY FOR OLD ANDROID VERSIONS | |
| # -------------------------------------------------------------- | |
| if target_category_id == "": | |
| target_category_id = None | |
| if new_category_id == "": | |
| new_category_id = None | |
| if user_id == "": | |
| user_id = None | |
| logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| if target_category_id and new_category_id: | |
| raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| if not target_category_id and not new_category_id: | |
| raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # ------------------------------------------------------------------ | |
| # READ SOURCE IMAGE | |
| # ------------------------------------------------------------------ | |
| src_bytes = await source.read() | |
| src_key = f"faceswap/source/{uuid.uuid4().hex}_{source.filename}" | |
| upload_to_spaces(src_bytes, src_key, content_type=source.content_type) | |
| # ------------------------------------------------------------------ | |
| # CASE 1 : new_category_id → MongoDB lookup | |
| # ------------------------------------------------------------------ | |
| if new_category_id: | |
| doc = await subcategories_col.find_one({ | |
| "asset_images._id": ObjectId(new_category_id) | |
| }) | |
| if not doc: | |
| raise HTTPException(404, "Asset image not found in database") | |
| # extract correct asset | |
| asset = next( | |
| (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| None | |
| ) | |
| if not asset: | |
| raise HTTPException(404, "Asset image URL not found") | |
| # correct URL | |
| target_url = asset["url"] | |
| # correct categoryId (ObjectId) | |
| #category_oid = doc["categoryId"] # <-- DO NOT CONVERT TO STRING | |
| subcategory_oid = doc["_id"] | |
| # ------------------------------------------------------------------# | |
| # # MEDIA_CLICKS (ONLY IF user_id PRESENT) | |
| # ------------------------------------------------------------------# | |
| if user_id: | |
| try: | |
| user_id_clean = user_id.strip() | |
| if not user_id_clean: | |
| raise ValueError("user_id cannot be empty") | |
| try: | |
| user_oid = ObjectId(user_id_clean) | |
| except (InvalidId, ValueError) as e: | |
| logger.error(f"Invalid user_id format: {user_id_clean}") | |
| raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| now = datetime.utcnow() | |
| # Normalize dates (UTC midnight) | |
| today_date = datetime(now.year, now.month, now.day) | |
| # ------------------------------------------------- | |
| # STEP 1: Ensure root document exists | |
| # ------------------------------------------------- | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$setOnInsert": { | |
| "userId": user_oid, | |
| "createdAt": now, | |
| "ai_edit_complete": 0, | |
| "ai_edit_daily_count": [] | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 2: Handle DAILY USAGE (BINARY, NO DUPLICATES) | |
| # ------------------------------------------------- | |
| doc = await media_clicks_col.find_one( | |
| {"userId": user_oid}, | |
| {"ai_edit_daily_count": 1} | |
| ) | |
| daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| # Normalize today to UTC midnight | |
| today_date = datetime(now.year, now.month, now.day) | |
| # Build normalized date → count map (THIS ENFORCES UNIQUENESS) | |
| daily_map = {} | |
| for entry in daily_entries: | |
| d = entry["date"] | |
| if isinstance(d, datetime): | |
| d = datetime(d.year, d.month, d.day) | |
| daily_map[d] = entry["count"] # overwrite = no duplicates | |
| # Determine last recorded date | |
| last_date = max(daily_map.keys()) if daily_map else today_date | |
| # Fill ALL missing days with count = 0 | |
| next_day = last_date + timedelta(days=1) | |
| while next_day < today_date: | |
| daily_map.setdefault(next_day, 0) | |
| next_day += timedelta(days=1) | |
| # Mark today as used (binary) | |
| daily_map[today_date] = 1 | |
| # Rebuild list: OLDEST → NEWEST | |
| final_daily_entries = [ | |
| {"date": d, "count": daily_map[d]} | |
| for d in sorted(daily_map.keys()) | |
| ] | |
| # Keep only last 32 days | |
| final_daily_entries = final_daily_entries[-32:] | |
| # Atomic replace | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$set": { | |
| "ai_edit_daily_count": final_daily_entries, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 3: Try updating existing subCategory | |
| # ------------------------------------------------- | |
| update_result = await media_clicks_col.update_one( | |
| { | |
| "userId": user_oid, | |
| "subCategories.subCategoryId": subcategory_oid | |
| }, | |
| { | |
| "$inc": { | |
| "subCategories.$.click_count": 1, | |
| "ai_edit_complete": 1 | |
| }, | |
| "$set": { | |
| "subCategories.$.lastClickedAt": now, | |
| "ai_edit_last_date": now, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 4: Push subCategory if missing | |
| # ------------------------------------------------- | |
| if update_result.matched_count == 0: | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$inc": { | |
| "ai_edit_complete": 1 | |
| }, | |
| "$set": { | |
| "ai_edit_last_date": now, | |
| "updatedAt": now | |
| }, | |
| "$push": { | |
| "subCategories": { | |
| "subCategoryId": subcategory_oid, | |
| "click_count": 1, | |
| "lastClickedAt": now | |
| } | |
| } | |
| } | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 5: Sort subCategories by lastClickedAt (ascending - oldest first) | |
| # ------------------------------------------------- | |
| user_doc = await media_clicks_col.find_one({"userId": user_oid}) | |
| if user_doc and "subCategories" in user_doc: | |
| subcategories = user_doc["subCategories"] | |
| # Sort by lastClickedAt in ascending order (oldest first) | |
| # Handle missing or None dates by using datetime.min | |
| subcategories_sorted = sorted( | |
| subcategories, | |
| key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| ) | |
| # Update with sorted array | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$set": { | |
| "subCategories": subcategories_sorted, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| logger.info( | |
| "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| user_id, | |
| str(subcategory_oid) | |
| ) | |
| except Exception as media_err: | |
| logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| # # ------------------------------------------------------------------ | |
| # # CASE 2 : target_category_id → DigitalOcean path (unchanged logic) | |
| # # ------------------------------------------------------------------ | |
| if target_category_id: | |
| client = get_spaces_client() | |
| base_prefix = "faceswap/target/" | |
| resp = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| ) | |
| # Extract categories from the CommonPrefixes | |
| categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| target_url = None | |
| # --- FIX STARTS HERE --- | |
| for category in categories: | |
| original_prefix = f"faceswap/target/{category}/original/" | |
| thumb_prefix = f"faceswap/target/{category}/thumb/" # Keep for file list check (optional but safe) | |
| # List objects in original/ | |
| original_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| ).get("Contents", []) | |
| # List objects in thumb/ (optional: for the old code's extra check) | |
| thumb_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| ).get("Contents", []) | |
| # Extract only the filenames and filter for .png | |
| original_filenames = sorted([ | |
| obj["Key"].split("/")[-1] for obj in original_objects | |
| if obj["Key"].split("/")[-1].endswith(".png") | |
| ]) | |
| thumb_filenames = [ | |
| obj["Key"].split("/")[-1] for obj in thumb_objects | |
| ] | |
| # Replicate the old indexing logic based on sorted filenames | |
| for idx, filename in enumerate(original_filenames, start=1): | |
| cid = f"{category.lower()}image_{idx}" | |
| # Optional: Replicate the thumb file check for 100% parity | |
| # if filename in thumb_filenames and cid == target_category_id: | |
| # Simpler check just on the ID, assuming thumb files are present | |
| if cid == target_category_id: | |
| # Construct the final target URL using the full prefix and the filename | |
| target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| break | |
| if target_url: | |
| break | |
| # --- FIX ENDS HERE --- | |
| if not target_url: | |
| raise HTTPException(404, "Target categoryId not found") | |
| # # ------------------------------------------------------------------ | |
| # # DOWNLOAD TARGET IMAGE | |
| # # ------------------------------------------------------------------ | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.get(target_url) | |
| response.raise_for_status() | |
| tgt_bytes = response.content | |
| src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src_bgr is None or tgt_bgr is None: | |
| raise HTTPException(400, "Invalid image data") | |
| src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # ------------------------------------------------------------------ | |
| # FACE SWAP EXECUTION | |
| # ------------------------------------------------------------------ | |
| final_img, final_path, err = face_swap_and_enhance(src_rgb, tgt_rgb) | |
| # #--------------------Version 2.0 ----------------------------------------# | |
| # final_img, final_path, err = enhanced_face_swap_and_enhance(src_rgb, tgt_rgb) | |
| # #--------------------Version 2.0 ----------------------------------------# | |
| if err: | |
| raise HTTPException(500, err) | |
| with open(final_path, "rb") as f: | |
| result_bytes = f.read() | |
| result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| result_url = upload_to_spaces(result_bytes, result_key) | |
| # ------------------------------------------------- | |
| # COMPRESS IMAGE (2–3 MB target) | |
| # ------------------------------------------------- | |
| compressed_bytes = compress_image( | |
| image_bytes=result_bytes, | |
| max_size=(1280, 1280), | |
| quality=72 | |
| ) | |
| compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| compressed_url = upload_to_spaces( | |
| compressed_bytes, | |
| compressed_key, | |
| content_type="image/jpeg" | |
| ) | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| await database.api_logs.insert_one({ | |
| "endpoint": "/face-swap", | |
| "status": "success", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time | |
| }) | |
| return { | |
| "result_key": result_key, | |
| "result_url": result_url, | |
| "Compressed_Image_URL": compressed_url | |
| } | |
| except Exception as e: | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| await database.api_logs.insert_one({ | |
| "endpoint": "/face-swap", | |
| "status": "fail", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time, | |
| "error": str(e) | |
| }) | |
| raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| async def preview_result(result_key: str): | |
| try: | |
| img_bytes = download_from_spaces(result_key) | |
| except Exception: | |
| raise HTTPException(status_code=404, detail="Result not found") | |
| return Response( | |
| content=img_bytes, | |
| media_type="image/png", | |
| headers={"Content-Disposition": "inline; filename=result.png"} | |
| ) | |
| async def multi_face_swap_api( | |
| source_image: UploadFile = File(...), | |
| target_image: UploadFile = File(...) | |
| ): | |
| start_time = datetime.utcnow() | |
| try: | |
| # ----------------------------- | |
| # Read images | |
| # ----------------------------- | |
| src_bytes = await source_image.read() | |
| tgt_bytes = await target_image.read() | |
| src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src_bgr is None or tgt_bgr is None: | |
| raise HTTPException(400, "Invalid image data") | |
| src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # ----------------------------- | |
| # Multi-face swap | |
| # ----------------------------- | |
| swapped_rgb = multi_face_swap(src_rgb, tgt_rgb) | |
| # ----------------------------- | |
| # 🔥 MANDATORY ENHANCEMENT | |
| # ----------------------------- | |
| final_rgb = mandatory_enhancement(swapped_rgb) | |
| final_bgr = cv2.cvtColor(final_rgb, cv2.COLOR_RGB2BGR) | |
| # ----------------------------- | |
| # Save temp result | |
| # ----------------------------- | |
| temp_dir = tempfile.mkdtemp(prefix="multi_faceswap_") | |
| result_path = os.path.join(temp_dir, "result.png") | |
| cv2.imwrite(result_path, final_bgr) | |
| with open(result_path, "rb") as f: | |
| result_bytes = f.read() | |
| # ----------------------------- | |
| # Upload | |
| # ----------------------------- | |
| result_key = f"faceswap/multi/{uuid.uuid4().hex}.png" | |
| result_url = upload_to_spaces( | |
| result_bytes, | |
| result_key, | |
| content_type="image/png" | |
| ) | |
| return { | |
| "result_key": result_key, | |
| "result_url": result_url | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def face_swap_api( | |
| image1: UploadFile = File(...), | |
| image2: Optional[UploadFile] = File(None), | |
| target_category_id: str = Form(None), | |
| new_category_id: str = Form(None), | |
| user_id: Optional[str] = Form(None), | |
| credentials: HTTPAuthorizationCredentials = Security(security) | |
| ): | |
| """ | |
| Production-ready face swap endpoint supporting: | |
| - Multiple source images (image1 + optional image2) | |
| - Gender-based pairing | |
| - Merged faces from multiple sources | |
| - Mandatory CodeFormer enhancement | |
| """ | |
| start_time = datetime.utcnow() | |
| try: | |
| # ----------------------------- | |
| # Validate input | |
| # ----------------------------- | |
| if target_category_id == "": | |
| target_category_id = None | |
| if new_category_id == "": | |
| new_category_id = None | |
| if user_id == "": | |
| user_id = None | |
| if target_category_id and new_category_id: | |
| raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| if not target_category_id and not new_category_id: | |
| raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| # ----------------------------- | |
| # Read source images | |
| # ----------------------------- | |
| src_images = [] | |
| img1_bytes = await image1.read() | |
| src1 = cv2.imdecode(np.frombuffer(img1_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src1 is None: | |
| raise HTTPException(400, "Invalid image1 data") | |
| src_images.append(cv2.cvtColor(src1, cv2.COLOR_BGR2RGB)) | |
| if image2: | |
| img2_bytes = await image2.read() | |
| src2 = cv2.imdecode(np.frombuffer(img2_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src2 is not None: | |
| src_images.append(cv2.cvtColor(src2, cv2.COLOR_BGR2RGB)) | |
| # ----------------------------- | |
| # Resolve target image | |
| # ----------------------------- | |
| target_url = None | |
| if new_category_id: | |
| doc = await subcategories_col.find_one({ | |
| "asset_images._id": ObjectId(new_category_id) | |
| }) | |
| if not doc: | |
| raise HTTPException(404, "Asset image not found in database") | |
| asset = next( | |
| (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| None | |
| ) | |
| if not asset: | |
| raise HTTPException(404, "Asset image URL not found") | |
| target_url = asset["url"] | |
| subcategory_oid = doc["_id"] | |
| if user_id: | |
| try: | |
| user_id_clean = user_id.strip() | |
| if not user_id_clean: | |
| raise ValueError("user_id cannot be empty") | |
| try: | |
| user_oid = ObjectId(user_id_clean) | |
| except (InvalidId, ValueError): | |
| logger.error(f"Invalid user_id format: {user_id_clean}") | |
| raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| now = datetime.utcnow() | |
| # Step 1: ensure root document exists | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$setOnInsert": { | |
| "userId": user_oid, | |
| "createdAt": now, | |
| "ai_edit_complete": 0, | |
| "ai_edit_daily_count": [] | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| # Step 2: handle daily usage (binary, no duplicates) | |
| doc = await media_clicks_col.find_one( | |
| {"userId": user_oid}, | |
| {"ai_edit_daily_count": 1} | |
| ) | |
| daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| today_date = datetime(now.year, now.month, now.day) | |
| daily_map = {} | |
| for entry in daily_entries: | |
| d = entry["date"] | |
| if isinstance(d, datetime): | |
| d = datetime(d.year, d.month, d.day) | |
| daily_map[d] = entry["count"] | |
| last_date = max(daily_map.keys()) if daily_map else None | |
| if last_date != today_date: | |
| daily_map[today_date] = 1 | |
| final_daily_entries = [ | |
| {"date": d, "count": daily_map[d]} | |
| for d in sorted(daily_map.keys()) | |
| ] | |
| final_daily_entries = final_daily_entries[-32:] | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$set": { | |
| "ai_edit_daily_count": final_daily_entries, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| # Step 3: try updating existing subCategory | |
| update_result = await media_clicks_col.update_one( | |
| { | |
| "userId": user_oid, | |
| "subCategories.subCategoryId": subcategory_oid | |
| }, | |
| { | |
| "$inc": { | |
| "subCategories.$.click_count": 1, | |
| "ai_edit_complete": 1 | |
| }, | |
| "$set": { | |
| "subCategories.$.lastClickedAt": now, | |
| "ai_edit_last_date": now, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| # Step 4: push subCategory if missing | |
| if update_result.matched_count == 0: | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$inc": { | |
| "ai_edit_complete": 1 | |
| }, | |
| "$set": { | |
| "ai_edit_last_date": now, | |
| "updatedAt": now | |
| }, | |
| "$push": { | |
| "subCategories": { | |
| "subCategoryId": subcategory_oid, | |
| "click_count": 1, | |
| "lastClickedAt": now | |
| } | |
| } | |
| } | |
| ) | |
| # Step 5: sort subCategories by lastClickedAt (ascending) | |
| user_doc = await media_clicks_col.find_one({"userId": user_oid}) | |
| if user_doc and "subCategories" in user_doc: | |
| subcategories = user_doc["subCategories"] | |
| subcategories_sorted = sorted( | |
| subcategories, | |
| key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| ) | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$set": { | |
| "subCategories": subcategories_sorted, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| logger.info( | |
| "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| user_id, | |
| str(subcategory_oid) | |
| ) | |
| except Exception as media_err: | |
| logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| if target_category_id: | |
| client = get_spaces_client() | |
| base_prefix = "faceswap/target/" | |
| resp = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| ) | |
| categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| for category in categories: | |
| original_prefix = f"faceswap/target/{category}/original/" | |
| thumb_prefix = f"faceswap/target/{category}/thumb/" | |
| original_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| ).get("Contents", []) | |
| thumb_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| ).get("Contents", []) | |
| original_filenames = sorted([ | |
| obj["Key"].split("/")[-1] for obj in original_objects | |
| if obj["Key"].split("/")[-1].endswith(".png") | |
| ]) | |
| for idx, filename in enumerate(original_filenames, start=1): | |
| cid = f"{category.lower()}image_{idx}" | |
| if cid == target_category_id: | |
| target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| break | |
| if target_url: | |
| break | |
| if not target_url: | |
| raise HTTPException(404, "Target categoryId not found") | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.get(target_url) | |
| response.raise_for_status() | |
| tgt_bytes = response.content | |
| tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if tgt_bgr is None: | |
| raise HTTPException(400, "Invalid target image data") | |
| # ----------------------------- | |
| # Merge all source faces | |
| # ----------------------------- | |
| all_src_faces = [] | |
| for img in src_images: | |
| faces = face_analysis_app.get(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| all_src_faces.extend(faces) | |
| if not all_src_faces: | |
| raise HTTPException(400, "No faces detected in source images") | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| if not tgt_faces: | |
| raise HTTPException(400, "No faces detected in target image") | |
| # ----------------------------- | |
| # Gender-based pairing | |
| # ----------------------------- | |
| def face_sort_key(face): | |
| x1, y1, x2, y2 = face.bbox | |
| area = (x2 - x1) * (y2 - y1) | |
| cx = (x1 + x2) / 2 | |
| return (-area, cx) | |
| # Separate by gender | |
| src_male = sorted([f for f in all_src_faces if f.gender == 1], key=face_sort_key) | |
| src_female = sorted([f for f in all_src_faces if f.gender == 0], key=face_sort_key) | |
| tgt_male = sorted([f for f in tgt_faces if f.gender == 1], key=face_sort_key) | |
| tgt_female = sorted([f for f in tgt_faces if f.gender == 0], key=face_sort_key) | |
| pairs = [] | |
| for s, t in zip(src_male, tgt_male): | |
| pairs.append((s, t)) | |
| for s, t in zip(src_female, tgt_female): | |
| pairs.append((s, t)) | |
| # fallback if gender mismatch | |
| if not pairs: | |
| src_all = sorted(all_src_faces, key=face_sort_key) | |
| tgt_all = sorted(tgt_faces, key=face_sort_key) | |
| pairs = list(zip(src_all, tgt_all)) | |
| # ----------------------------- | |
| # Perform face swap | |
| # ----------------------------- | |
| with swap_lock: | |
| result_img = tgt_bgr.copy() | |
| for src_face, _ in pairs: | |
| current_faces = sorted(face_analysis_app.get(result_img), key=face_sort_key) | |
| candidates = [f for f in current_faces if f.gender == src_face.gender] or current_faces | |
| target_face = candidates[0] | |
| result_img = swapper.get(result_img, target_face, src_face, paste_back=True) | |
| result_rgb = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| # ----------------------------- | |
| # Mandatory enhancement | |
| # ----------------------------- | |
| enhanced_rgb = mandatory_enhancement(result_rgb) | |
| enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| # ----------------------------- | |
| # Save, upload, compress | |
| # ----------------------------- | |
| temp_dir = tempfile.mkdtemp(prefix="faceswap_") | |
| final_path = os.path.join(temp_dir, "result.png") | |
| cv2.imwrite(final_path, enhanced_bgr) | |
| with open(final_path, "rb") as f: | |
| result_bytes = f.read() | |
| result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| result_url = upload_to_spaces(result_bytes, result_key) | |
| compressed_bytes = compress_image(result_bytes, max_size=(1280, 1280), quality=72) | |
| compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| compressed_url = upload_to_spaces(compressed_bytes, compressed_key, content_type="image/jpeg") | |
| # ----------------------------- | |
| # Log API usage | |
| # ----------------------------- | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| await database.api_logs.insert_one({ | |
| "endpoint": "/face-swap-couple", | |
| "status": "success", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time | |
| }) | |
| return { | |
| "result_key": result_key, | |
| "result_url": result_url, | |
| "compressed_url": compressed_url | |
| } | |
| except Exception as e: | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| await database.api_logs.insert_one({ | |
| "endpoint": "/face-swap-couple", | |
| "status": "fail", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time, | |
| "error": str(e) | |
| }) | |
| raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| # --------------------- Mount Gradio --------------------- | |
| multi_faceswap_app = build_multi_faceswap_gradio() | |
| fastapi_app = mount_gradio_app( | |
| fastapi_app, | |
| multi_faceswap_app, | |
| path="/gradio-couple-faceswap" | |
| ) | |
| if __name__ == "__main__": | |
| uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) | |