Spaces:
Running
Running
| from __future__ import annotations | |
| import logging | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, Optional | |
| import numpy as np | |
| from PIL import Image | |
| LOGGER = logging.getLogger(__name__) | |
| class TruForUnavailableError(RuntimeError): | |
| """Raised when the TruFor assets are missing or inference fails.""" | |
| class TruForResult: | |
| score: Optional[float] | |
| map_overlay: Optional[Image.Image] | |
| confidence_overlay: Optional[Image.Image] | |
| raw_scores: Dict[str, float] | |
| class TruForEngine: | |
| """Wrapper that executes TruFor inference through docker or python backends.""" | |
| def __init__( | |
| self, | |
| repo_root: Optional[Path] = None, | |
| weights_path: Optional[Path] = None, | |
| device: str = "cpu", | |
| ) -> None: | |
| self.base_dir = Path(__file__).resolve().parent | |
| self.device = device | |
| self.backend: Optional[str] = None | |
| self.status_message = "TruFor backend not initialized." | |
| backend_pref = os.environ.get("TRUFOR_BACKEND", "auto").lower() | |
| if backend_pref not in {"auto", "native", "docker"}: | |
| backend_pref = "auto" | |
| errors: list[str] = [] | |
| if backend_pref in {"auto", "native"}: | |
| try: | |
| self._configure_native_backend(repo_root, weights_path) | |
| self.backend = "native" | |
| self.status_message = "TruFor ready (bundled python backend)." | |
| except TruForUnavailableError as exc: | |
| errors.append(f"Native backend unavailable: {exc}") | |
| if backend_pref == "native": | |
| raise | |
| if self.backend is None and backend_pref in {"auto", "docker"}: | |
| try: | |
| self._configure_docker_backend() | |
| self.backend = "docker" | |
| self.status_message = f'TruFor ready (docker image "{self.docker_image}").' | |
| except TruForUnavailableError as exc: | |
| errors.append(f"Docker backend unavailable: {exc}") | |
| if backend_pref == "docker": | |
| raise | |
| if self.backend is None: | |
| raise TruForUnavailableError(" | ".join(errors) if errors else "TruFor backend unavailable.") | |
| # ------------------------------------------------------------------ | |
| # Backend configuration helpers | |
| # ------------------------------------------------------------------ | |
| def _configure_docker_backend(self) -> None: | |
| if shutil.which("docker") is None: | |
| raise TruForUnavailableError("docker CLI not found on PATH.") | |
| test_docker_dir = self.base_dir / "test_docker" | |
| if not test_docker_dir.exists(): | |
| raise TruForUnavailableError("test_docker directory not found in workspace.") | |
| image_name = os.environ.get("TRUFOR_DOCKER_IMAGE", "trufor") | |
| inspect = subprocess.run( | |
| ["docker", "image", "inspect", image_name], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| check=False, | |
| ) | |
| if inspect.returncode != 0: | |
| raise TruForUnavailableError( | |
| f'Docker image "{image_name}" not found. Build it with "bash test_docker/docker_build.sh".' | |
| ) | |
| weights_candidate = Path(os.environ.get("TRUFOR_DOCKER_WEIGHTS", self.base_dir / "weights")).expanduser() | |
| weight_file = weights_candidate / "trufor.pth.tar" | |
| self.docker_weights_dir: Optional[Path] | |
| self.docker_weights_dir = weight_file.parent if weight_file.exists() else None | |
| self.docker_runtime = os.environ.get("TRUFOR_DOCKER_RUNTIME") | |
| gpu_pref = os.environ.get("TRUFOR_DOCKER_GPU") | |
| if gpu_pref is None: | |
| gpu_pref = "-1" if self.device == "cpu" else "0" | |
| self.docker_gpu = gpu_pref | |
| gpus_arg = os.environ.get("TRUFOR_DOCKER_GPUS_ARG") | |
| if not gpus_arg and gpu_pref not in {"-1", "cpu", "none"}: | |
| gpus_arg = "all" | |
| self.docker_gpus_arg = gpus_arg | |
| self.docker_image = image_name | |
| def _configure_native_backend(self, _repo_root: Optional[Path], weights_path: Optional[Path]) -> None: | |
| try: | |
| from trufor_native import TruForBundledModel | |
| except ImportError as exc: # pragma: no cover - packaging guard | |
| raise TruForUnavailableError("Bundled TruFor modules are not available.") from exc | |
| default_weights = self.base_dir / "weights" / "trufor.pth.tar" | |
| weight_candidate = weights_path or os.environ.get("TRUFOR_WEIGHTS") or default_weights | |
| weight_path = Path(weight_candidate).expanduser() | |
| if not weight_path.exists(): | |
| raise TruForUnavailableError( | |
| f"TruFor weights missing at {weight_path}. Place trufor.pth.tar under weights/ or set TRUFOR_WEIGHTS." | |
| ) | |
| try: | |
| self.native_model = TruForBundledModel(weight_path, device=self.device) | |
| except Exception as exc: # pragma: no cover - propagate detailed failure | |
| raise TruForUnavailableError(f"Failed to initialise bundled TruFor model: {exc}") from exc | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def infer(self, image: Image.Image) -> TruForResult: | |
| if image is None: | |
| raise TruForUnavailableError("No image supplied to TruFor inference.") | |
| if self.backend == "docker": | |
| return self._infer_docker(image) | |
| if self.backend == "native": | |
| return self._infer_native(image) | |
| raise TruForUnavailableError("TruFor backend not configured.") | |
| # ------------------------------------------------------------------ | |
| # Inference helpers | |
| # ------------------------------------------------------------------ | |
| def _infer_native(self, image: Image.Image) -> TruForResult: | |
| outputs = self.native_model.predict(image) | |
| overlays: Dict[str, Optional[Image.Image]] = {"map": None, "conf": None} | |
| try: | |
| overlays["map"] = self._apply_heatmap(image, outputs.tamper_map) | |
| except Exception as exc: # pragma: no cover - visualisation fallback | |
| LOGGER.debug("Failed to build tamper heatmap: %s", exc) | |
| if outputs.confidence_map is not None: | |
| try: | |
| overlays["conf"] = self._apply_heatmap(image, outputs.confidence_map) | |
| except Exception as exc: # pragma: no cover | |
| LOGGER.debug("Failed to build confidence heatmap: %s", exc) | |
| raw_scores: Dict[str, float] = { | |
| "tamper_mean": float(np.mean(outputs.tamper_map)), | |
| "tamper_max": float(np.max(outputs.tamper_map)), | |
| } | |
| if outputs.confidence_map is not None: | |
| raw_scores["confidence_mean"] = float(np.mean(outputs.confidence_map)) | |
| raw_scores["confidence_max"] = float(np.max(outputs.confidence_map)) | |
| if outputs.detection_score is not None: | |
| raw_scores["tamper_score"] = float(outputs.detection_score) | |
| return TruForResult( | |
| score=outputs.detection_score, | |
| map_overlay=overlays["map"], | |
| confidence_overlay=overlays["conf"], | |
| raw_scores=raw_scores, | |
| ) | |
| def _infer_docker(self, image: Image.Image) -> TruForResult: | |
| with tempfile.TemporaryDirectory(prefix="trufor_docker_") as workdir: | |
| workdir_path = Path(workdir) | |
| input_dir = workdir_path / "data" | |
| output_dir = workdir_path / "data_out" | |
| input_dir.mkdir(parents=True, exist_ok=True) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| input_path = input_dir / "input.png" | |
| image.convert("RGB").save(input_path) | |
| cmd = ["docker", "run", "--rm"] | |
| if self.docker_runtime: | |
| cmd.extend(["--runtime", self.docker_runtime]) | |
| gpu_flag = str(self.docker_gpu) | |
| if gpu_flag.lower() in {"cpu", "none"}: | |
| gpu_flag = "-1" | |
| if gpu_flag != "-1" and self.docker_gpus_arg: | |
| cmd.extend(["--gpus", self.docker_gpus_arg]) | |
| cmd.extend([ | |
| "-v", | |
| f"{input_dir.resolve()}:/data:ro", | |
| "-v", | |
| f"{output_dir.resolve()}:/data_out:rw", | |
| ]) | |
| if self.docker_weights_dir is not None: | |
| cmd.extend([ | |
| "-v", | |
| f"{self.docker_weights_dir.resolve()}:/weights:ro", | |
| ]) | |
| cmd.append(self.docker_image) | |
| cmd.extend( | |
| [ | |
| "-gpu", | |
| gpu_flag, | |
| "-in", | |
| "data/input.png", | |
| "-out", | |
| "data_out", | |
| ] | |
| ) | |
| LOGGER.debug("Running TruFor docker command: %s", " ".join(cmd)) | |
| result = subprocess.run( | |
| cmd, | |
| text=True, | |
| capture_output=True, | |
| check=False, | |
| ) | |
| return self._process_results(result, output_dir, image) | |
| # ------------------------------------------------------------------ | |
| # Result parsing | |
| # ------------------------------------------------------------------ | |
| def _process_results(self, run_result: subprocess.CompletedProcess[str], output_dir: Path, image: Image.Image) -> TruForResult: | |
| if run_result.returncode != 0: | |
| stderr_tail = "\n".join(run_result.stderr.strip().splitlines()[-8:]) if run_result.stderr else "" | |
| LOGGER.error("TruFor stderr: %s", stderr_tail) | |
| raise TruForUnavailableError( | |
| "TruFor inference failed. Inspect dependencies and stderr:\n" + stderr_tail | |
| ) | |
| npz_files = list(output_dir.rglob("*.npz")) | |
| if not npz_files: | |
| stdout_tail = "\n".join(run_result.stdout.strip().splitlines()[-8:]) if run_result.stdout else "" | |
| raise TruForUnavailableError( | |
| "TruFor inference produced no output files. Stdout tail:\n" + stdout_tail | |
| ) | |
| data = np.load(npz_files[0], allow_pickle=False) | |
| tamper_map = data.get("map") | |
| conf_map = data.get("conf") | |
| score = float(data["score"]) if "score" in data.files else None | |
| overlays: Dict[str, Optional[Image.Image]] = {"map": None, "conf": None} | |
| try: | |
| overlays["map"] = self._apply_heatmap(image, tamper_map) if tamper_map is not None else None | |
| except Exception as exc: # pragma: no cover | |
| LOGGER.debug("Failed to build tamper heatmap: %s", exc) | |
| try: | |
| overlays["conf"] = self._apply_heatmap(image, conf_map) if conf_map is not None else None | |
| except Exception as exc: # pragma: no cover | |
| LOGGER.debug("Failed to build confidence heatmap: %s", exc) | |
| raw_scores: Dict[str, float] = {} | |
| if score is not None: | |
| raw_scores["tamper_score"] = score | |
| if tamper_map is not None: | |
| raw_scores["tamper_mean"] = float(np.mean(tamper_map)) | |
| raw_scores["tamper_max"] = float(np.max(tamper_map)) | |
| if conf_map is not None: | |
| raw_scores["confidence_mean"] = float(np.mean(conf_map)) | |
| raw_scores["confidence_max"] = float(np.max(conf_map)) | |
| return TruForResult( | |
| score=score, | |
| map_overlay=overlays["map"], | |
| confidence_overlay=overlays["conf"], | |
| raw_scores=raw_scores, | |
| ) | |
| def _apply_heatmap(base: Image.Image, data: np.ndarray, alpha: float = 0.55) -> Image.Image: | |
| base_rgb = base.convert("RGB") | |
| if data is None or data.ndim != 2: | |
| raise ValueError("Expected a 2D map from TruFor") | |
| data = np.asarray(data, dtype=np.float32) | |
| if np.allclose(data.max(), data.min()): | |
| norm = np.zeros_like(data, dtype=np.float32) | |
| else: | |
| norm = (data - data.min()) / (data.max() - data.min()) | |
| heat = np.zeros((*norm.shape, 3), dtype=np.uint8) | |
| heat[..., 0] = np.clip(norm * 255, 0, 255).astype(np.uint8) | |
| heat[..., 1] = np.clip(np.sqrt(norm) * 255, 0, 255).astype(np.uint8) | |
| heat[..., 2] = np.clip((1.0 - norm) * 255, 0, 255).astype(np.uint8) | |
| heat_img = Image.fromarray(heat, mode="RGB").resize(base_rgb.size, Image.BILINEAR) | |
| return Image.blend(base_rgb, heat_img, alpha) | |