Spaces:
Running
Running
| import os | |
| import json | |
| import hashlib | |
| import random | |
| import threading | |
| import time | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import gradio as gr | |
| from PIL import Image | |
| from huggingface_hub import HfApi, CommitOperationAdd | |
| from huggingface_hub import snapshot_download | |
| # ---------------------- | |
| # Configuration | |
| # ---------------------- | |
| # Core evaluation parameters | |
| NUM_MODEL_FOLDERS = 5 # Number of model output folders to compare | |
| NUM_IMAGES_TO_RANK = NUM_MODEL_FOLDERS + 1 # Models + LR reference (7 total by default) | |
| TARGET_PER_PERSON = 10 # Number of rounds each person should complete | |
| IMAGES_PER_ROW = 3 # Number of images to display per row in the UI | |
| REFERENCE_IMAGE_HEIGHT = 180 # Height for reference image display | |
| CANDIDATE_IMAGE_HEIGHT_STEP_A = 200 # Height for candidate images in step A | |
| CANDIDATE_IMAGE_HEIGHT_STEP_B = 180 # Height for candidate images in step B | |
| # Contact and messaging | |
| CONTACT_EMAIL = "ffallah[at]asu.edu" | |
| STUDY_TITLE = "Image Evaluation Study" | |
| STEP_A_CRITERION = "quality" | |
| STEP_B_CRITERION = "sim_refA" | |
| STEP_A_INSTRUCTION = "Rank by quality (1 = best)" | |
| STEP_B_INSTRUCTION = "Rank by similarity to Reference A (1 = most similar)" | |
| # HuggingFace configuration | |
| HF_RESULTS_REPO = os.getenv("HF_RESULTS_REPO") | |
| HF_RESULTS_REPO_TYPE = "dataset" | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| _hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None | |
| # ---------------------- | |
| # FOLDER CONFIGURATION - MODIFY THIS SECTION | |
| # ---------------------- | |
| # Directly specify your folder paths here | |
| MODEL_FOLDERS = [ | |
| "data/rareflow", # Model 1 | |
| "data/misr", # Model 2 | |
| "data/seesr", # Model 3 | |
| "data/samsr", # Model 4 | |
| "data/adc", # Model 5 | |
| ] | |
| # Reference folders | |
| HIGH_RES_FOLDER = "data/hr" # High resolution reference images | |
| LOW_RES_FOLDER = "data/lr" # Low resolution reference images | |
| # Override with environment variables if they exist | |
| for i in range(NUM_MODEL_FOLDERS): | |
| env_var = f"FOLDER_{i+1}" | |
| env_value = os.environ.get(env_var) | |
| if env_value: | |
| if i < len(MODEL_FOLDERS): | |
| MODEL_FOLDERS[i] = env_value | |
| else: | |
| MODEL_FOLDERS.append(env_value) | |
| # Ensure we have exactly NUM_MODEL_FOLDERS folders | |
| if len(MODEL_FOLDERS) < NUM_MODEL_FOLDERS: | |
| raise ValueError( | |
| f"Not enough model folders specified. Expected {NUM_MODEL_FOLDERS}, got {len(MODEL_FOLDERS)}. " | |
| f"Please specify all folder paths in MODEL_FOLDERS list." | |
| ) | |
| elif len(MODEL_FOLDERS) > NUM_MODEL_FOLDERS: | |
| MODEL_FOLDERS = MODEL_FOLDERS[:NUM_MODEL_FOLDERS] | |
| # Override reference folders with environment variables if they exist | |
| HIGH_RES_FOLDER = os.environ.get("HIGH_RES_FOLDER", HIGH_RES_FOLDER) | |
| LOW_RES_FOLDER = os.environ.get("LOW_RES_FOLDER", LOW_RES_FOLDER) | |
| INPUT_DATASET_REPO = os.getenv("INPUT_DATASET_REPO", "").strip() | |
| if INPUT_DATASET_REPO: | |
| _ds_local = snapshot_download( | |
| repo_id=INPUT_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, # uses your secret | |
| allow_patterns=["data/**"], | |
| local_dir_use_symlinks=False, | |
| ) | |
| _root = os.path.join(_ds_local, "data") | |
| # If your subfolder names are different, change these 7 lines only: | |
| MODEL_FOLDERS = [ | |
| os.path.join(_root, "rareflow"), | |
| os.path.join(_root, "misr"), | |
| os.path.join(_root, "seesr"), | |
| os.path.join(_root, "samsr"), | |
| os.path.join(_root, "adc"), | |
| ] | |
| HIGH_RES_FOLDER = os.path.join(_root, "hr") | |
| LOW_RES_FOLDER = os.path.join(_root, "lr") | |
| # Use MODEL_FOLDERS as MAIN_FOLDERS for compatibility | |
| MAIN_FOLDERS = MODEL_FOLDERS | |
| # Results configuration | |
| RESULTS_DIR = os.environ.get("RESULTS_DIR", "new_results") | |
| PROGRESS_PATH = os.path.join(RESULTS_DIR, "progress.json") | |
| GLOBAL_PROGRESS_PATH = os.path.join(RESULTS_DIR, "global_progress.json") # NEW: Track global completion | |
| ALL_RESULTS_JSONL = os.path.join(RESULTS_DIR, "all_results.jsonl") | |
| SAVE_PII = True # PII hidden from UI; set False to omit in logs | |
| # Compact results configuration | |
| COMPACT_DIR = os.environ.get("COMPACT_DIR", os.path.join(RESULTS_DIR, "compact")) | |
| WRITE_VERBOSE_EVENTS = False # Set to True if you want detailed JSONL event logs | |
| # File handling | |
| VALID_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"} | |
| MAX_ATOMIC_SAVE_ATTEMPTS = 10 | |
| ATOMIC_SAVE_INITIAL_DELAY = 0.05 | |
| ATOMIC_SAVE_MAX_DELAY = 0.5 | |
| # Generate letters dynamically based on number of images | |
| LETTERS = [chr(ord('A') + i) for i in range(NUM_IMAGES_TO_RANK)] | |
| # Threading | |
| WRITE_LOCK = threading.Lock() | |
| # ---------------------- | |
| # Global Progress Management (NEW) | |
| # ---------------------- | |
| def load_global_progress() -> Dict[str, Any]: | |
| """Load global progress tracking total images completed across all users.""" | |
| if not os.path.exists(GLOBAL_PROGRESS_PATH): | |
| return {"total_images_completed": 0, "user_assignments": {}} | |
| try: | |
| with open(GLOBAL_PROGRESS_PATH, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return {"total_images_completed": 0, "user_assignments": {}} | |
| def save_global_progress(global_progress: Dict[str, Any]): | |
| """Save global progress atomically.""" | |
| with WRITE_LOCK: | |
| with open(GLOBAL_PROGRESS_PATH, "w", encoding="utf-8") as f: | |
| json.dump(global_progress, f, ensure_ascii=False, indent=2) | |
| def get_user_image_assignment(uid: str, available_images: List[str]) -> List[str]: | |
| """ | |
| Get or create the image assignment for a user. | |
| This ensures each new user gets the next sequential set of images. | |
| """ | |
| global_progress = load_global_progress() | |
| user_assignments = global_progress.get("user_assignments", {}) | |
| # If user already has an assignment, return it | |
| if uid in user_assignments: | |
| return user_assignments[uid]["assigned_images"] | |
| # Calculate starting point for new user | |
| total_completed = global_progress.get("total_images_completed", 0) | |
| num_available = len(available_images) | |
| # Build the assignment list with wrapping | |
| assigned_images = [] | |
| for i in range(TARGET_PER_PERSON): | |
| image_idx = (total_completed + i) % num_available | |
| assigned_images.append(available_images[image_idx]) | |
| # Save the assignment | |
| user_assignments[uid] = { | |
| "assigned_images": assigned_images, | |
| "start_index": total_completed, | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) | |
| } | |
| # Update total for next user | |
| global_progress["total_images_completed"] = total_completed + TARGET_PER_PERSON | |
| global_progress["user_assignments"] = user_assignments | |
| save_global_progress(global_progress) | |
| return assigned_images | |
| def get_global_stats() -> Dict[str, int]: | |
| """Get statistics about global progress.""" | |
| global_progress = load_global_progress() | |
| available_images = get_available_images() | |
| total_images = len(available_images) | |
| total_completed = global_progress.get("total_images_completed", 0) | |
| num_users = len(global_progress.get("user_assignments", {})) | |
| # Calculate how many complete cycles through all images | |
| complete_cycles = total_completed // total_images if total_images > 0 else 0 | |
| images_in_current_cycle = total_completed % total_images if total_images > 0 else 0 | |
| return { | |
| "total_images": total_images, | |
| "total_completed": total_completed, | |
| "num_users": num_users, | |
| "complete_cycles": complete_cycles, | |
| "images_in_current_cycle": images_in_current_cycle | |
| } | |
| # ---------------------- | |
| # Helpers | |
| # ---------------------- | |
| def _ensure_private_repo(repo_id: str): | |
| if not _hf_api: | |
| return | |
| try: | |
| _hf_api.repo_info(repo_id, repo_type=HF_RESULTS_REPO_TYPE) | |
| except Exception: | |
| _hf_api.create_repo(repo_id=repo_id, repo_type=HF_RESULTS_REPO_TYPE, private=True) | |
| def push_results_to_private_repo(uid: str): | |
| if not HF_TOKEN or not HF_RESULTS_REPO or not _hf_api: | |
| return | |
| try: | |
| _ensure_private_repo(HF_RESULTS_REPO) | |
| os.makedirs(RESULTS_DIR, exist_ok=True) | |
| open(ALL_RESULTS_JSONL, "a").close() | |
| user_file = os.path.join(RESULTS_DIR, f"{uid}.jsonl") | |
| open(user_file, "a").close() | |
| compact_user_file = _compact_user_path(uid) | |
| ops = [ | |
| CommitOperationAdd(path_in_repo="new_results/all_results.jsonl", path_or_fileobj=ALL_RESULTS_JSONL), | |
| CommitOperationAdd(path_in_repo=f"new_results/users/{uid}.jsonl", path_or_fileobj=user_file), | |
| CommitOperationAdd(path_in_repo=f"new_results/compact/{uid}.json", path_or_fileobj=compact_user_file), | |
| ] | |
| # Also upload global progress | |
| if os.path.exists(GLOBAL_PROGRESS_PATH): | |
| ops.append(CommitOperationAdd( | |
| path_in_repo="new_results/global_progress.json", | |
| path_or_fileobj=GLOBAL_PROGRESS_PATH | |
| )) | |
| _hf_api.create_commit( | |
| repo_id=HF_RESULTS_REPO, | |
| repo_type=HF_RESULTS_REPO_TYPE, | |
| operations=ops, | |
| commit_message="Update evaluation results" | |
| ) | |
| except Exception as e: | |
| print(f"[WARN] push_results_to_private_repo failed: {e}") | |
| def ensure_paths(): | |
| os.makedirs(RESULTS_DIR, exist_ok=True) | |
| os.makedirs(COMPACT_DIR, exist_ok=True) | |
| # Check model folders | |
| for i, folder in enumerate(MODEL_FOLDERS): | |
| if not os.path.isdir(folder): | |
| raise FileNotFoundError( | |
| f"Model folder {i+1} not found: '{folder}'. " | |
| f"Please create it and add images with matching filenames." | |
| ) | |
| # Check reference folders | |
| if not os.path.isdir(HIGH_RES_FOLDER): | |
| raise FileNotFoundError( | |
| f"High resolution folder not found: '{HIGH_RES_FOLDER}'. " | |
| f"Please create it and add reference images." | |
| ) | |
| if not os.path.isdir(LOW_RES_FOLDER): | |
| raise FileNotFoundError( | |
| f"Low resolution folder not found: '{LOW_RES_FOLDER}'. " | |
| f"Please create it and add reference images." | |
| ) | |
| def load_image(path: str) -> Optional[Image.Image]: | |
| try: | |
| return Image.open(path).convert("RGB") | |
| except Exception as e: | |
| print(f"Error loading image {path}: {e}") | |
| return None | |
| def get_available_images() -> List[str]: | |
| ref_folder = MODEL_FOLDERS[0] if MODEL_FOLDERS else None | |
| if not ref_folder or not os.path.exists(ref_folder): | |
| return [] | |
| out = [] | |
| for fn in os.listdir(ref_folder): | |
| if not any(fn.lower().endswith(ext) for ext in VALID_IMAGE_EXTENSIONS): | |
| continue | |
| # Check if file exists in all folders | |
| if all(os.path.exists(os.path.join(f, fn)) for f in MODEL_FOLDERS + [HIGH_RES_FOLDER, LOW_RES_FOLDER]): | |
| out.append(fn) | |
| return sorted(out) | |
| def append_jsonl(path: str, record: Dict[str, Any]): | |
| line = json.dumps(record, ensure_ascii=False) | |
| with WRITE_LOCK: | |
| with open(path, "a", encoding="utf-8") as f: | |
| f.write(line + "\n") | |
| def load_progress() -> Dict[str, Dict[str, Any]]: | |
| if not os.path.exists(PROGRESS_PATH): | |
| return {} | |
| try: | |
| with open(PROGRESS_PATH, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return {} | |
| def save_progress(progress: Dict[str, Dict[str, Any]]): | |
| with WRITE_LOCK: | |
| with open(PROGRESS_PATH, "w", encoding="utf-8") as f: | |
| json.dump(progress, f, ensure_ascii=False, indent=2) | |
| def hash_user_id(name: str, email: str) -> str: | |
| norm = (name or "").strip().lower() + "|" + (email or "").strip().lower() | |
| return hashlib.sha256(norm.encode("utf-8")).hexdigest()[:16] | |
| # ---------------------- | |
| # Compact results helpers | |
| # ---------------------- | |
| def _compact_user_path(uid: str) -> str: | |
| return os.path.join(COMPACT_DIR, f"{uid}.json") | |
| def _atomic_save_json(path: str, obj: Dict[str, Any], max_tries: int = MAX_ATOMIC_SAVE_ATTEMPTS, | |
| delay: float = ATOMIC_SAVE_INITIAL_DELAY): | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| # Unique temp name (prevents interleaving when multiple threads/processes write) | |
| tmp = f"{path}.tmp.{os.getpid()}.{threading.get_ident()}" | |
| data = json.dumps(obj, ensure_ascii=False, indent=2) | |
| for attempt in range(max_tries): | |
| try: | |
| with WRITE_LOCK: # intra-process guard | |
| with open(tmp, "w", encoding="utf-8") as f: | |
| f.write(data) | |
| # Atomic replace; may raise PermissionError on Windows if target is open | |
| os.replace(tmp, path) | |
| return | |
| except PermissionError: | |
| # Windows: another process (AV/indexer/uploader) has the target open. | |
| # Backoff a bit and try again. | |
| time.sleep(delay) | |
| delay = min(ATOMIC_SAVE_MAX_DELAY, delay * 2) | |
| except Exception: | |
| # Clean up temp on unexpected errors | |
| try: | |
| if os.path.exists(tmp): | |
| os.remove(tmp) | |
| except Exception: | |
| pass | |
| raise | |
| # Last-gasp fallback: try direct write (not atomic, but better than nothing) | |
| with WRITE_LOCK: | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(data) | |
| try: | |
| if os.path.exists(tmp): | |
| os.remove(tmp) | |
| except Exception: | |
| pass | |
| def _load_compact_user(uid: str) -> Dict[str, Any]: | |
| path = _compact_user_path(uid) | |
| if not os.path.exists(path): | |
| return {} | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return {} | |
| def _ensure_compact_user(uid: str, name: Optional[str] = None, email: Optional[str] = None) -> Dict[str, Any]: | |
| data = _load_compact_user(uid) | |
| if not data: | |
| data = {"uid": uid, "name": name or "", "email": email or "", "rounds": []} | |
| _atomic_save_json(_compact_user_path(uid), data) | |
| return data | |
| changed = False | |
| # Only overwrite if a non-empty value is provided | |
| if name: | |
| if data.get("name") != name: | |
| data["name"] = name | |
| changed = True | |
| if email: | |
| if data.get("email") != email: | |
| data["email"] = email | |
| changed = True | |
| if changed: | |
| _atomic_save_json(_compact_user_path(uid), data) | |
| return data | |
| def _compact_upsert_round(uid: str, name: str, email: str, filename: str): | |
| """ | |
| Ensure a round record exists for this filename. If the newest entry for this | |
| filename already has step2_order filled, we insert a NEW round for the same image | |
| (edge case: repeat). Otherwise we reuse the most recent incomplete one. | |
| """ | |
| data = _ensure_compact_user(uid, name, email) | |
| rounds = data.setdefault("rounds", []) | |
| # find the most recent round with this filename that still needs step2 | |
| for r in reversed(rounds): | |
| if r.get("image") == filename and r.get("step2_order") is None: | |
| return # already have an in-progress entry for this image | |
| # otherwise append a new round entry | |
| rounds.append({"image": filename, "step1_order": None, "step2_order": None}) | |
| _atomic_save_json(_compact_user_path(uid), data) | |
| def _compact_write_step(uid: str, filename: str, *, step: str, order_letters: List[str]): | |
| """ | |
| step ∈ {"step1_order","step2_order"}; update most-recent matching round. | |
| """ | |
| assert step in {"step1_order", "step2_order"} | |
| data = _load_compact_user(uid) | |
| rounds = data.get("rounds", []) | |
| for r in reversed(rounds): | |
| if r.get("image") == filename: | |
| r[step] = order_letters | |
| _atomic_save_json(_compact_user_path(uid), data) | |
| return | |
| # If we got here, no round exists yet (shouldn't happen if we upsert at start) | |
| # Create it defensively: | |
| new_round = {"image": filename, "step1_order": None, "step2_order": None} | |
| new_round[step] = order_letters | |
| data.setdefault("rounds", []).append(new_round) | |
| _atomic_save_json(_compact_user_path(uid), data) | |
| # ---------------------- | |
| # Round building | |
| # ---------------------- | |
| def _folder_keys() -> List[str]: | |
| """Generate identifiers for each folder based on folder path basename""" | |
| keys = [] | |
| # Use basename of each model folder as key | |
| for folder in MODEL_FOLDERS: | |
| basename = os.path.basename(folder.rstrip("/")) | |
| if not basename: # Handle edge case of root paths | |
| basename = folder.replace("/", "_").replace("\\", "_") | |
| keys.append(basename) | |
| # Add LR folder key (since LR is now in the options) | |
| lr_basename = os.path.basename(LOW_RES_FOLDER.rstrip("/")) | |
| if not lr_basename: | |
| lr_basename = "lr" | |
| keys.append(lr_basename) | |
| return keys # length = NUM_IMAGES_TO_RANK | |
| def _build_candidate_paths(filename: str) -> List[str]: | |
| """Build paths to candidate images from all model folders plus HR""" | |
| paths = [os.path.join(f, filename) for f in MODEL_FOLDERS] | |
| # LR is now one of the options | |
| paths.append(os.path.join(LOW_RES_FOLDER, filename)) | |
| assert len(paths) == NUM_IMAGES_TO_RANK | |
| return paths | |
| def _start_round_state(uid: str, user_assigned_images: List[str], progress: Dict[str, Any]) -> Tuple[str, List[int]]: | |
| """ | |
| Decide filename and order for the current round using user's pre-assigned images. | |
| Returns (filename, order_idx). | |
| """ | |
| entry = progress.setdefault(uid, {}) | |
| completed = entry.get("completed_rounds", 0) | |
| if "round_filename" in entry and "order_idx" in entry: | |
| return entry["round_filename"], entry["order_idx"] | |
| # Use the user's assigned images (not global pool) | |
| if completed >= len(user_assigned_images): | |
| # Should not happen if TARGET_PER_PERSON is set correctly | |
| filename = user_assigned_images[-1] | |
| else: | |
| filename = user_assigned_images[completed] | |
| seed = random.getrandbits(32) | |
| rng = random.Random(seed) | |
| order_idx = list(range(NUM_IMAGES_TO_RANK)) | |
| rng.shuffle(order_idx) | |
| entry["current_step"] = "A" | |
| entry["round_filename"] = filename | |
| entry["order_idx"] = order_idx | |
| entry["seed"] = seed | |
| save_progress(progress) | |
| # Write round_start record once | |
| _write_round_start(uid, filename, order_idx, seed) | |
| return filename, order_idx | |
| def _write_round_start(uid: str, filename: str, order_idx: List[int], seed: int): | |
| if not WRITE_VERBOSE_EVENTS: | |
| return # suppress verbose event logs unless enabled | |
| source_keys = _folder_keys() | |
| candidate_map = {LETTERS[pos]: source_keys[idx] for pos, idx in enumerate(order_idx)} | |
| record = { | |
| "event": "round_start", | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | |
| "user_id": uid, | |
| "round_filename": filename, | |
| "seed": seed, | |
| "candidate_map": candidate_map, | |
| "referenceA_type": "hr", | |
| "referenceA_filename": filename | |
| } | |
| os.makedirs(RESULTS_DIR, exist_ok=True) | |
| append_jsonl(ALL_RESULTS_JSONL, record) | |
| append_jsonl(os.path.join(RESULTS_DIR, f"{uid}.jsonl"), record) | |
| def _ranks_to_order_letters(ranks: List[int]) -> List[str]: | |
| # ranks[i] = rank of position i (0-based). We return letters by rank ascending. | |
| n = len(ranks) | |
| order = [] | |
| for r in range(1, n + 1): | |
| idx = ranks.index(r) | |
| order.append(LETTERS[idx]) | |
| return order | |
| # ---------------------- | |
| # Selection & labeling | |
| # ---------------------- | |
| def _toggle_selection(selection: Optional[List[int]], idx: int, n: int = NUM_IMAGES_TO_RANK) -> List[int]: | |
| sel = list(selection or []) | |
| if idx in sel: | |
| sel.remove(idx) | |
| else: | |
| if len(sel) < n: | |
| sel.append(idx) | |
| return sel | |
| def _compute_rank_labels(selection: List[int], names: List[str]) -> Tuple[List[str], str]: | |
| n = len(names) | |
| ranks = [0] * n | |
| for rank, image_pos in enumerate(selection, start=1): | |
| if 1 <= image_pos <= n: | |
| ranks[image_pos - 1] = rank | |
| labels = [ | |
| (f"{names[i-1]} — Rank {ranks[i-1]}" if ranks[i-1] else names[i-1]) | |
| for i in range(1, n + 1) | |
| ] | |
| ranking_str = ",".join(str(r) for r in ranks) if 0 not in ranks else "" | |
| return labels, ranking_str | |
| def _make_click_handler_with_names(n: int, ranking_box: gr.Textbox, state_sel: gr.State, names: List[str]): | |
| def _handler_for_index(idx: int): | |
| def _handler(current_selection: List[int]): | |
| sel = _toggle_selection(current_selection, idx, n=n) | |
| labels, ranking_str = _compute_rank_labels(sel, names=names) | |
| img_updates = [gr.update(label=labels[i]) for i in range(n)] | |
| return (*img_updates, gr.update(value=ranking_str), sel) | |
| return _handler | |
| return _handler_for_index | |
| # ---------------------- | |
| # Validation helpers | |
| # ---------------------- | |
| def _is_complete_ranking(ranks_str: str, n: int = NUM_IMAGES_TO_RANK) -> bool: | |
| """Return True iff ranks_str is like '1,2,...,n' in some permuted order per image.""" | |
| try: | |
| parts = [int(x.strip()) for x in ranks_str.split(",")] | |
| except Exception: | |
| return False | |
| if len(parts) != n: | |
| return False | |
| return set(parts) == set(range(1, n + 1)) | |
| # ---------------------- | |
| # Save answers | |
| # ---------------------- | |
| def _save_answer(user_meta: Dict[str, Any], criterion: str, ranks_str: str) -> List[str]: | |
| ranks = [int(x.strip()) for x in ranks_str.split(",")] | |
| letters_by_rank = _ranks_to_order_letters(ranks) | |
| if WRITE_VERBOSE_EVENTS: | |
| record = { | |
| "event": "answer", | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | |
| "user_id": user_meta["uid"], | |
| "name": user_meta["name"] if SAVE_PII else None, | |
| "email": user_meta["email"] if SAVE_PII else None, | |
| "round_index": user_meta["completed_rounds"] + 1, | |
| "round_filename": user_meta["round_filename"], | |
| "criterion": criterion, | |
| "ranking_letters": letters_by_rank | |
| } | |
| os.makedirs(RESULTS_DIR, exist_ok=True) | |
| append_jsonl(os.path.join(RESULTS_DIR, f"{user_meta['uid']}.jsonl"), record) | |
| append_jsonl(ALL_RESULTS_JSONL, record) | |
| return letters_by_rank | |
| # ---------------------- | |
| # UI logic | |
| # ---------------------- | |
| def _prep_images_for_round(filename: str, order_idx: List[int]): | |
| # Build candidate images (models + LR) and reference (HR) | |
| paths = _build_candidate_paths(filename) | |
| ordered_paths = [paths[i] for i in order_idx] | |
| imgs = [load_image(p) for p in ordered_paths] | |
| # HR is the reference now | |
| ref_hr = load_image(os.path.join(HIGH_RES_FOLDER, filename)) | |
| # Labels "Image A..G" | |
| names = [f"Image {LETTERS[i]}" for i in range(NUM_IMAGES_TO_RANK)] | |
| return imgs, ref_hr, names | |
| def start_or_resume(name: str, email: str): | |
| if not name or not email: | |
| # Non-destructive validation | |
| gr.Warning("Please enter your name and email to begin.") | |
| # Generate the correct number of updates | |
| img_updates = [gr.update()] * NUM_IMAGES_TO_RANK * 2 # for both A and B images | |
| return ( | |
| "", [], 0, "A", [], "", [], | |
| *img_updates, | |
| gr.update(), | |
| "Please enter your details to begin.", | |
| gr.update(visible=False), # eval panel | |
| gr.update(visible=False), # thanks | |
| gr.update(visible=False), # A | |
| gr.update(visible=False), # B | |
| gr.update(), # name | |
| gr.update(), # email | |
| [], [], "", "", "", # states & inputs reset | |
| gr.update(visible=True) # start_group stays visible | |
| ) | |
| ensure_paths() | |
| available = get_available_images() | |
| if not available: | |
| gr.Warning("No matching images found across all folders.") | |
| img_updates = [gr.update()] * NUM_IMAGES_TO_RANK * 2 | |
| return ( | |
| "", [], 0, "A", [], "", [], | |
| *img_updates, | |
| gr.update(), | |
| "No matching images found across all folders.", | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), gr.update(visible=False), | |
| gr.update(), gr.update(), | |
| [], [], "", "", "", | |
| gr.update(visible=True) | |
| ) | |
| uid = hash_user_id(name, email) | |
| # Get user's assigned images (this handles the sequential assignment) | |
| user_assigned_images = get_user_image_assignment(uid, available) | |
| progress = load_progress() | |
| entry = progress.setdefault(uid, {"completed_rounds": 0}) | |
| completed = entry.get("completed_rounds", 0) | |
| # Get global stats for display | |
| stats = get_global_stats() | |
| if completed >= TARGET_PER_PERSON: | |
| status = f"Welcome back, {name}! You've completed all {TARGET_PER_PERSON} rounds. 🎉" | |
| # Clear images and show thanks | |
| img_updates = [gr.update(value=None)] * NUM_IMAGES_TO_RANK * 2 | |
| return ( | |
| uid, available, completed, "A", [], "", user_assigned_images, | |
| *img_updates, | |
| gr.update(value=None), | |
| status, | |
| gr.update(visible=False), # eval panel | |
| gr.update(visible=True), # thanks | |
| gr.update(visible=False), # A group | |
| gr.update(visible=False), # B group | |
| gr.update(visible=False), # hide name | |
| gr.update(visible=False), # hide email | |
| [], [], "", "", "", | |
| gr.update(visible=False) # hide start_group after start | |
| ) | |
| # Ensure current round state exists | |
| filename, order_idx = _start_round_state(uid, user_assigned_images, progress) | |
| _compact_upsert_round(uid, name, email, filename) | |
| imgs, ref_hr, names = _prep_images_for_round(filename, order_idx) | |
| # Show which images this user is assigned | |
| global_info = f" (Your images: {entry.get('completed_rounds', 0) + 1}-{min((entry.get('completed_rounds', 0) + 1) + (TARGET_PER_PERSON - completed - 1), len(available))})" | |
| status = f"Round {completed + 1} / {TARGET_PER_PERSON} • Step 1 / 2 — {STEP_A_INSTRUCTION}{global_info}" | |
| # Prepare updates for both steps (same candidates) | |
| a_updates = [gr.update(value=img, label=names[i]) for i, img in enumerate(imgs)] | |
| b_updates = [gr.update(value=img, label=names[i]) for i, img in enumerate(imgs)] | |
| return ( | |
| uid, available, completed, entry.get("current_step", "A"), order_idx, filename, user_assigned_images, | |
| *a_updates, | |
| *b_updates, | |
| gr.update(value=ref_hr, label="Reference A (HR, not clickable)"), status, | |
| gr.update(visible=True), # eval panel | |
| gr.update(visible=False), # thanks | |
| gr.update(visible=True), # show A | |
| gr.update(visible=False), # hide B | |
| gr.update(visible=False), # hide name | |
| gr.update(visible=False), # hide email | |
| [], [], "", "", "", # selections & inputs | |
| gr.update(visible=False) # hide start_group after start | |
| ) | |
| def continue_after_A(name: str, email: str, uid: str, available: List[str], completed_rounds: int, | |
| current_step: str, order_idx: List[int], round_filename: str, | |
| user_assigned_images: List[str], a_ranking: str): | |
| if not a_ranking or not _is_complete_ranking(a_ranking, n=NUM_IMAGES_TO_RANK): | |
| gr.Warning(f"Please rank all {NUM_IMAGES_TO_RANK} images (1–{NUM_IMAGES_TO_RANK}). Your selections are preserved.") | |
| # Keep Step A visible, do not advance | |
| return ( | |
| f"⚠️ Step A: please rank all {NUM_IMAGES_TO_RANK} images before continuing.", | |
| gr.update(visible=True), # A group stays visible | |
| gr.update(visible=False) # B group stays hidden | |
| ) | |
| # Save Step A | |
| letters_by_rank = _save_answer( | |
| { | |
| "uid": uid, "name": name, "email": email, | |
| "completed_rounds": completed_rounds, | |
| "round_filename": round_filename | |
| }, | |
| criterion=STEP_A_CRITERION, | |
| ranks_str=a_ranking | |
| ) | |
| # Write compact step1_order and push | |
| _compact_write_step(uid, round_filename, step="step1_order", order_letters=letters_by_rank) | |
| # Move to step B | |
| progress = load_progress() | |
| progress.setdefault(uid, {}) | |
| progress[uid]["current_step"] = "B" | |
| save_progress(progress) | |
| status = f"✅ Saved. Round {completed_rounds + 1} / {TARGET_PER_PERSON} • Step 2 / 2 — {STEP_B_INSTRUCTION}" | |
| return ( | |
| status, | |
| gr.update(visible=False), # A group | |
| gr.update(visible=True) # B group | |
| ) | |
| def submit_after_B(name: str, email: str, uid: str, available: List[str], completed_rounds: int, | |
| current_step: str, order_idx: List[int], round_filename: str, | |
| user_assigned_images: List[str], b_ranking: str, notes: str): | |
| if not b_ranking or not _is_complete_ranking(b_ranking, n=NUM_IMAGES_TO_RANK): | |
| gr.Warning(f"Please rank all {NUM_IMAGES_TO_RANK} images (1–{NUM_IMAGES_TO_RANK}) for similarity. Your selections are preserved.") | |
| # Keep Step B visible | |
| img_updates = [gr.update()] * NUM_IMAGES_TO_RANK * 2 | |
| return ( | |
| completed_rounds, current_step, order_idx, round_filename, | |
| *img_updates, | |
| gr.update(), # no change to ref | |
| f"⚠️ Step B: please rank all {NUM_IMAGES_TO_RANK} images before submitting.", | |
| gr.update(visible=True), # eval panel | |
| gr.update(visible=False), # thanks | |
| gr.update(visible=False), # A | |
| gr.update(visible=True), # B | |
| [], [], "", "", notes # keep B selections in state (leave notes as-is) | |
| ) | |
| # Save Step B | |
| letters_by_rank = _save_answer( | |
| { | |
| "uid": uid, "name": name, "email": email, | |
| "completed_rounds": completed_rounds, | |
| "round_filename": round_filename | |
| }, | |
| criterion=STEP_B_CRITERION, | |
| ranks_str=b_ranking | |
| ) | |
| # Write compact step2_order | |
| _compact_write_step(uid, round_filename, step="step2_order", order_letters=letters_by_rank) | |
| # Mark round complete | |
| progress = load_progress() | |
| entry = progress.setdefault(uid, {"completed_rounds": 0}) | |
| entry["completed_rounds"] = completed_rounds + 1 | |
| entry.pop("round_filename", None) | |
| entry.pop("order_idx", None) | |
| entry.pop("seed", None) | |
| entry["current_step"] = "A" | |
| save_progress(progress) | |
| # Optional push | |
| push_results_to_private_repo(uid) | |
| # Finished all rounds? | |
| if entry["completed_rounds"] >= TARGET_PER_PERSON: | |
| status = f"✅ All {TARGET_PER_PERSON} rounds completed! Thank you!" | |
| img_updates_clear = [gr.update(value=None)] * NUM_IMAGES_TO_RANK * 2 | |
| return ( | |
| entry["completed_rounds"], "A", [], "", | |
| *img_updates_clear, | |
| gr.update(value=None), | |
| status, | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| gr.update(visible=False), gr.update(visible=False), | |
| [], [], "", "", "" | |
| ) | |
| # Prepare NEXT round (A) | |
| filename, new_order_idx = _start_round_state(uid, user_assigned_images, progress) | |
| _compact_upsert_round(uid, name, email, filename) | |
| imgs, ref_hr, names = _prep_images_for_round(filename, new_order_idx) | |
| a_updates = [gr.update(value=img, label=names[i]) for i, img in enumerate(imgs)] | |
| b_updates = [gr.update(value=img, label=names[i]) for i, img in enumerate(imgs)] | |
| status = f"✅ Round {entry['completed_rounds']} completed! Now Round {entry['completed_rounds'] + 1} / {TARGET_PER_PERSON} • Step 1 / 2 — {STEP_A_INSTRUCTION}" | |
| return ( | |
| entry["completed_rounds"], "A", new_order_idx, filename, | |
| *a_updates, | |
| *b_updates, | |
| gr.update(value=ref_hr, label="Reference A (HR, not clickable)"), status, | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(visible=True), gr.update(visible=False), | |
| [], [], "", "", "" | |
| ) | |
| # ---------------------- | |
| # UI | |
| # ---------------------- | |
| if __name__ == "__main__": | |
| print("=" * 60) | |
| print("FOLDER CONFIGURATION:") | |
| print("=" * 60) | |
| print(f"Number of model folders: {NUM_MODEL_FOLDERS}") | |
| print("\nModel folders:") | |
| for i, folder in enumerate(MODEL_FOLDERS, 1): | |
| exists = "✓" if os.path.isdir(folder) else "✗" | |
| print(f" {i}. {folder} [{exists}]") | |
| print(f"\nHigh resolution folder: {HIGH_RES_FOLDER} [{'✓' if os.path.isdir(HIGH_RES_FOLDER) else '✗'}]") | |
| print(f"Low resolution folder: {LOW_RES_FOLDER} [{'✓' if os.path.isdir(LOW_RES_FOLDER) else '✗'}]") | |
| print("=" * 60) | |
| custom_css = f""" | |
| .cand-a img {{ | |
| height: {CANDIDATE_IMAGE_HEIGHT_STEP_A}px !important; | |
| width: auto !important; | |
| object-fit: contain; | |
| }} | |
| .cand-b img {{ | |
| height: {CANDIDATE_IMAGE_HEIGHT_STEP_B}px !important; | |
| width: auto !important; | |
| object-fit: contain; | |
| }} | |
| .ref-a img {{ | |
| height: {REFERENCE_IMAGE_HEIGHT}px !important; | |
| width: auto !important; | |
| object-fit: contain; | |
| }} | |
| """ | |
| with gr.Blocks(title=STUDY_TITLE, theme=gr.themes.Soft(), css=custom_css) as demo: | |
| # Hidden state - Added user_assigned_images | |
| state_uid = gr.State("") | |
| state_available = gr.State([]) | |
| state_completed = gr.State(0) | |
| state_current_step = gr.State("A") | |
| state_order_idx = gr.State([]) | |
| state_round_filename = gr.State("") | |
| state_user_assigned_images = gr.State([]) # NEW: Track user's assigned images | |
| # Start screen (intro + name/email only on page 1) | |
| with gr.Group(visible=True) as start_group: | |
| gr.Markdown( | |
| f""" | |
| # {STUDY_TITLE} | |
| In this study you will compare different versions of the **same image**. | |
| Each **round** has **2 steps**: | |
| --- | |
| ## Step 1 – Rate image quality | |
| - You will see **{NUM_IMAGES_TO_RANK} images** of the same scene. | |
| - Click the image you think has the **best overall quality** first. | |
| - This image gets **rank 1**. | |
| - Then click the image with the next best quality (rank 2), and so on, | |
| until **every image has a rank**. | |
| --- | |
| ## Step 2 – Match the high-resolution reference | |
| - You will see **one reference image at the top**. | |
| This is the **high-resolution (HR) reference**. | |
| - At the bottom, you will see the **same {NUM_IMAGES_TO_RANK} images** again. | |
| - Click the image that looks **most similar to the HR reference** first (rank 1), | |
| then the next most similar, and so on, until **all images are ranked**. | |
| --- | |
| ## How the clicking works | |
| - Every time you click an image, it gets the **next rank number**. | |
| - Click the same image again to **remove** its rank and fix mistakes. | |
| - You must give each image **one unique rank from 1 to {NUM_IMAGES_TO_RANK}** | |
| before you can go to the next step. | |
| --- | |
| ## Rounds and saving | |
| - There are **{TARGET_PER_PERSON} rounds** for you to complete. | |
| - Your answers are **saved after each step**. | |
| - If you close the page, you can **continue later**: | |
| - Use the **same name and email** and click **“Start / Resume”**. | |
| --- | |
| If you agree to take part, please enter your **full name** and **email** below, | |
| then click **“Start / Resume”**. | |
| For any questions: **{CONTACT_EMAIL}** | |
| """ | |
| ) | |
| with gr.Row(): | |
| name = gr.Textbox(label="Full name", placeholder="Jane Doe") | |
| email = gr.Textbox(label="Email address", placeholder="jane@example.com") | |
| start_btn = gr.Button("Start / Resume", variant="primary") | |
| status = gr.Markdown("") | |
| # Evaluation panel (hidden until Start) | |
| eval_panel = gr.Group(visible=False) | |
| with eval_panel: | |
| # Step A — Quality | |
| with gr.Group(visible=False) as group_A: | |
| gr.Markdown(f"## Step A — {STEP_A_INSTRUCTION}") | |
| # Create image components dynamically in rows | |
| a_imgs = [] | |
| for i in range(0, NUM_IMAGES_TO_RANK, IMAGES_PER_ROW): | |
| with gr.Row(): | |
| for j in range(i, min(i + IMAGES_PER_ROW, NUM_IMAGES_TO_RANK)): | |
| img = gr.Image( | |
| label=f"Image {LETTERS[j]}", | |
| value=None, # will be filled via .update(...) | |
| interactive=True, # keep tiles clickable | |
| sources=[], # <- hides Upload/Webcam/Clipboard | |
| show_download_button=False, | |
| type="pil", | |
| image_mode="RGB", | |
| height= None, #CANDIDATE_IMAGE_HEIGHT_STEP_A, | |
| elem_classes=["cand-a"], | |
| ) | |
| a_imgs.append(img) | |
| a_ranking = gr.Textbox(visible=False, interactive=False) | |
| a_next = gr.Button("Continue →", variant="primary") | |
| # Step B — Similarity to Reference A (LR) | |
| with gr.Group(visible=False) as group_B: | |
| gr.Markdown(f"## Step B — {STEP_B_INSTRUCTION}") | |
| with gr.Row(): | |
| b_ref = gr.Image( | |
| label="Reference A (HR)", | |
| interactive=False, # not clickable | |
| sources=[], # <- hides Upload/Webcam/Clipboard | |
| show_download_button=False, | |
| type="pil", | |
| image_mode="RGB", | |
| height= None, #REFERENCE_IMAGE_HEIGHT, | |
| elem_classes=["ref-a"] | |
| ) | |
| # Create image components dynamically in rows | |
| b_imgs = [] | |
| for i in range(0, NUM_IMAGES_TO_RANK, IMAGES_PER_ROW): | |
| with gr.Row(): | |
| for j in range(i, min(i + IMAGES_PER_ROW, NUM_IMAGES_TO_RANK)): | |
| img = gr.Image( | |
| label=f"Image {LETTERS[j]}", | |
| value=None, | |
| interactive=True, # keep tiles clickable | |
| sources=[], # <- hides Upload/Webcam/Clipboard | |
| show_download_button=False, | |
| type="pil", | |
| image_mode="RGB", | |
| height= None, #CANDIDATE_IMAGE_HEIGHT_STEP_B | |
| elem_classes=["cand-b"] | |
| ) | |
| b_imgs.append(img) | |
| b_ranking = gr.Textbox(visible=False, interactive=False) | |
| notes = gr.Textbox(label="Optional notes", lines=3, placeholder="Any observations...") | |
| submit_btn = gr.Button("Submit (finish round)", variant="primary") | |
| thanks_group = gr.Group(visible=False) | |
| with thanks_group: | |
| gr.Markdown( | |
| f""" | |
| ## 🎉 Thanks for participating! | |
| You’ve completed **all {TARGET_PER_PERSON} rounds**. | |
| Your responses have been **saved** and will be included in our analysis. | |
| **What’s next?** | |
| - You can safely **close this tab**. | |
| - If you have more time later, you’re welcome to revisit—your progress is already complete. | |
| - Questions or feedback? **{CONTACT_EMAIL}** | |
| _We appreciate your help!_ | |
| """ | |
| ) | |
| # Click-to-rank wiring (A) | |
| names_A = [f"Image {ch}" for ch in LETTERS] | |
| _a_handler = _make_click_handler_with_names( | |
| n=NUM_IMAGES_TO_RANK, ranking_box=a_ranking, state_sel=gr.State([]), names=names_A | |
| ) | |
| # Need persistent state for selections: | |
| a_sel = gr.State([]) | |
| # Wire up click handlers for all images in step A | |
| for i in range(NUM_IMAGES_TO_RANK): | |
| a_imgs[i].select( | |
| _a_handler(i + 1), | |
| inputs=[a_sel], | |
| outputs=[*a_imgs, a_ranking, a_sel] | |
| ) | |
| # Click-to-rank wiring (B) | |
| names_B = [f"Image {ch}" for ch in LETTERS] | |
| b_sel = gr.State([]) | |
| _b_handler = _make_click_handler_with_names( | |
| n=NUM_IMAGES_TO_RANK, ranking_box=b_ranking, state_sel=b_sel, names=names_B | |
| ) | |
| # Wire up click handlers for all images in step B | |
| for i in range(NUM_IMAGES_TO_RANK): | |
| b_imgs[i].select( | |
| _b_handler(i + 1), | |
| inputs=[b_sel], | |
| outputs=[*b_imgs, b_ranking, b_sel] | |
| ) | |
| # Events - Updated to include user_assigned_images | |
| start_btn.click( | |
| start_or_resume, | |
| inputs=[name, email], | |
| outputs=[ | |
| state_uid, state_available, state_completed, state_current_step, state_order_idx, | |
| state_round_filename, state_user_assigned_images, # Added state_user_assigned_images | |
| # A imgs | |
| *a_imgs, | |
| # B imgs | |
| *b_imgs, | |
| # Reference | |
| b_ref, | |
| # status & panels | |
| status, eval_panel, thanks_group, group_A, group_B, | |
| # hide name/email | |
| name, email, | |
| # reset selections/inputs | |
| a_sel, b_sel, a_ranking, b_ranking, notes, | |
| # control start page visibility | |
| start_group | |
| ] | |
| ) | |
| a_next.click( | |
| continue_after_A, | |
| inputs=[name, email, state_uid, state_available, state_completed, state_current_step, | |
| state_order_idx, state_round_filename, state_user_assigned_images, a_ranking], | |
| outputs=[status, group_A, group_B] | |
| ) | |
| submit_btn.click( | |
| submit_after_B, | |
| inputs=[name, email, state_uid, state_available, state_completed, state_current_step, | |
| state_order_idx, state_round_filename, state_user_assigned_images, b_ranking, notes], | |
| outputs=[ | |
| state_completed, state_current_step, state_order_idx, state_round_filename, | |
| # reload next round images | |
| *a_imgs, | |
| *b_imgs, | |
| b_ref, | |
| status, eval_panel, thanks_group, group_A, group_B, | |
| # reset selections/inputs | |
| a_sel, b_sel, a_ranking, b_ranking, notes | |
| ] | |
| ) | |
| # Check and display configuration | |
| try: | |
| ensure_paths() | |
| matching_images = get_available_images() | |
| stats = get_global_stats() | |
| # print(f"\n✅ Found {len(matching_images)} matching images across all folders.") | |
| # if matching_images and len(matching_images) <= 10: | |
| # print("Matching images:", matching_images) | |
| # print(f"\n📊 Global Progress:") | |
| # print(f" Total users registered: {stats['num_users']}") | |
| # print(f" Total images assigned: {stats['total_completed']}") | |
| # print(f" Complete cycles: {stats['complete_cycles']}") | |
| # print(f" Current cycle progress: {stats['images_in_current_cycle']}/{stats['total_images']}") | |
| # print(f"\n📊 Configuration: {NUM_IMAGES_TO_RANK} images to rank ({NUM_MODEL_FOLDERS} models + 1 HR reference)") | |
| # print(f"🎯 Target: {TARGET_PER_PERSON} rounds per person") | |
| # print("\nLaunching app...") | |
| demo.queue() | |
| demo.launch() | |
| except FileNotFoundError as e: | |
| print(f"\n❌ ERROR: {e}") | |
| print("\nPlease check your folder configuration and ensure all folders exist with matching images.") |