| | |
| | """ |
| | OCULUS Benchmark Evaluation Suite |
| | |
| | Evaluates Oculus on multiple vision-language benchmarks: |
| | 1. COCO Detection (mAP) |
| | 2. Car Part Damage Detection |
| | 3. Counting (Pixmo-style) |
| | 4. VQA Accuracy |
| | 5. RefCOCO Grounding (IoU) |
| | |
| | Inspired by Isaac model benchmarks. |
| | """ |
| |
|
| | import os |
| | import sys |
| | import json |
| | import time |
| | import random |
| | from pathlib import Path |
| | from dataclasses import dataclass, field |
| | from typing import List, Dict, Tuple, Optional |
| | from collections import defaultdict |
| |
|
| | import numpy as np |
| | import torch |
| | from PIL import Image |
| |
|
| | OCULUS_ROOT = Path(__file__).parent |
| | sys.path.insert(0, str(OCULUS_ROOT)) |
| |
|
| | from oculus_unified_model import OculusForConditionalGeneration |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def compute_iou(box1: List[float], box2: List[float]) -> float: |
| | """Compute IoU between two boxes [x1, y1, x2, y2].""" |
| | x1 = max(box1[0], box2[0]) |
| | y1 = max(box1[1], box2[1]) |
| | x2 = min(box1[2], box2[2]) |
| | y2 = min(box1[3], box2[3]) |
| | |
| | inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| | |
| | area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
| | area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
| | |
| | union_area = area1 + area2 - inter_area + 1e-8 |
| | |
| | return inter_area / union_area |
| |
|
| |
|
| | def compute_ap(recalls: List[float], precisions: List[float]) -> float: |
| | """Compute Average Precision from recall/precision curve.""" |
| | recalls = [0] + list(recalls) + [1] |
| | precisions = [0] + list(precisions) + [0] |
| | |
| | |
| | for i in range(len(precisions) - 2, -1, -1): |
| | precisions[i] = max(precisions[i], precisions[i + 1]) |
| | |
| | |
| | ap = 0 |
| | for i in range(1, len(recalls)): |
| | ap += (recalls[i] - recalls[i - 1]) * precisions[i] |
| | |
| | return ap |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class COCODetectionBenchmark: |
| | """COCO Detection benchmark - computes mAP@0.5.""" |
| | |
| | def __init__(self, data_dir: str = "data/coco", max_samples: int = 500): |
| | self.data_dir = Path(data_dir) |
| | self.max_samples = max_samples |
| | |
| | |
| | ann_file = self.data_dir / "annotations" / "instances_train2017.json" |
| | |
| | with open(ann_file) as f: |
| | coco = json.load(f) |
| | |
| | |
| | self.cat_id_to_name = {c['id']: c['name'] for c in coco['categories']} |
| | self.cat_id_to_idx = {c['id']: i for i, c in enumerate(coco['categories'])} |
| | |
| | |
| | img_to_anns = defaultdict(list) |
| | for ann in coco['annotations']: |
| | if ann.get('iscrowd', 0): |
| | continue |
| | img_to_anns[ann['image_id']].append(ann) |
| | |
| | self.samples = [] |
| | for img in coco['images']: |
| | if img['id'] not in img_to_anns: |
| | continue |
| | |
| | img_path = self.data_dir / "images" / img['file_name'] |
| | if not img_path.exists(): |
| | continue |
| | |
| | anns = img_to_anns[img['id']] |
| | boxes = [] |
| | labels = [] |
| | for ann in anns: |
| | if 'bbox' not in ann: |
| | continue |
| | x, y, w, h = ann['bbox'] |
| | |
| | boxes.append([ |
| | x / img['width'], |
| | y / img['height'], |
| | (x + w) / img['width'], |
| | (y + h) / img['height'] |
| | ]) |
| | labels.append(self.cat_id_to_idx[ann['category_id']]) |
| | |
| | if boxes: |
| | self.samples.append({ |
| | 'path': str(img_path), |
| | 'boxes': boxes, |
| | 'labels': labels |
| | }) |
| | |
| | if len(self.samples) >= max_samples: |
| | break |
| | |
| | print(f" Loaded {len(self.samples)} COCO samples") |
| | |
| | def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
| | """Evaluate detection performance.""" |
| | print("\n📦 COCO Detection Benchmark") |
| | print("-" * 40) |
| | |
| | all_ious = [] |
| | all_correct = [] |
| | |
| | for i, sample in enumerate(self.samples): |
| | if i % 50 == 0: |
| | print(f" Progress: {i}/{len(self.samples)}") |
| | |
| | try: |
| | image = Image.open(sample['path']).convert('RGB') |
| | output = model.generate(image, mode="box", prompt="Detect objects") |
| | |
| | gt_boxes = sample['boxes'] |
| | pred_boxes = output.boxes |
| | pred_labels = [int(l) for l in output.labels] |
| | |
| | |
| | for gt_box, gt_label in zip(gt_boxes, sample['labels']): |
| | best_iou = 0 |
| | is_correct = False |
| | |
| | for pred_box, pred_label in zip(pred_boxes, pred_labels): |
| | iou = compute_iou(gt_box, list(pred_box)) |
| | if iou > best_iou: |
| | best_iou = iou |
| | is_correct = (iou >= 0.5) and (pred_label == gt_label) |
| | |
| | all_ious.append(best_iou) |
| | all_correct.append(is_correct) |
| | |
| | except Exception as e: |
| | pass |
| | |
| | mean_iou = np.mean(all_ious) if all_ious else 0 |
| | accuracy = np.mean(all_correct) if all_correct else 0 |
| | |
| | results = { |
| | 'mean_iou': float(mean_iou), |
| | 'accuracy': float(accuracy), |
| | 'num_samples': len(self.samples) |
| | } |
| | |
| | print(f" Mean IoU: {mean_iou:.4f}") |
| | print(f" Accuracy (IoU>0.5 + correct class): {accuracy:.4f}") |
| | |
| | return results |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class CarDamageBenchmark: |
| | """Car Part Damage detection benchmark from HuggingFace.""" |
| | |
| | CAR_PART_LABELS = [ |
| | 'Back-bumper', 'Back-door', 'Back-wheel', 'Back-window', 'Back-windshield', |
| | 'Fender', 'Front-bumper', 'Front-door', 'Front-wheel', 'Front-window', |
| | 'Grille', 'Headlight', 'Hood', 'License-plate', 'Mirror', 'Quarter-panel', |
| | 'Rocker-panel', 'Roof', 'Tail-light', 'Trunk', 'Windshield' |
| | ] |
| | |
| | def __init__(self, max_samples: int = 50): |
| | self.max_samples = max_samples |
| | self.samples = [] |
| | |
| | try: |
| | from datasets import load_dataset |
| | print(" Loading car_part_damage dataset...") |
| | ds = load_dataset("moondream/car_part_damage", split="test") |
| | |
| | for i, item in enumerate(ds): |
| | if i >= max_samples: |
| | break |
| | |
| | boxes = [] |
| | labels = [] |
| | for ann in item['annotations']: |
| | bbox = ann['bbox'] |
| | |
| | boxes.append([ |
| | bbox[0] / item['width'], |
| | bbox[1] / item['height'], |
| | bbox[2] / item['width'], |
| | bbox[3] / item['height'] |
| | ]) |
| | labels.append(ann['category']) |
| | |
| | self.samples.append({ |
| | 'image': item['image'], |
| | 'boxes': boxes, |
| | 'labels': labels, |
| | 'width': item['width'], |
| | 'height': item['height'] |
| | }) |
| | |
| | print(f" Loaded {len(self.samples)} car damage samples") |
| | |
| | except Exception as e: |
| | print(f" ⚠️ Could not load dataset: {e}") |
| | |
| | def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
| | """Evaluate on car damage detection.""" |
| | print("\n🚗 Car Part Damage Benchmark") |
| | print("-" * 40) |
| | |
| | if not self.samples: |
| | return {'error': 'Dataset not loaded'} |
| | |
| | all_ious = [] |
| | correct_parts = 0 |
| | total_parts = 0 |
| | |
| | for i, sample in enumerate(self.samples): |
| | if i % 10 == 0: |
| | print(f" Progress: {i}/{len(self.samples)}") |
| | |
| | try: |
| | image = sample['image'] |
| | output = model.generate(image, mode="box", prompt="Detect car parts and damage") |
| | |
| | pred_boxes = output.boxes |
| | |
| | for gt_box in sample['boxes']: |
| | total_parts += 1 |
| | best_iou = 0 |
| | |
| | for pred_box in pred_boxes: |
| | iou = compute_iou(gt_box, list(pred_box)) |
| | best_iou = max(best_iou, iou) |
| | |
| | all_ious.append(best_iou) |
| | if best_iou >= 0.5: |
| | correct_parts += 1 |
| | |
| | except Exception as e: |
| | pass |
| | |
| | mean_iou = np.mean(all_ious) if all_ious else 0 |
| | recall = correct_parts / total_parts if total_parts > 0 else 0 |
| | |
| | results = { |
| | 'mean_iou': float(mean_iou), |
| | 'recall@0.5': float(recall), |
| | 'correct_parts': correct_parts, |
| | 'total_parts': total_parts |
| | } |
| | |
| | print(f" Mean IoU: {mean_iou:.4f}") |
| | print(f" Recall@0.5: {recall:.4f} ({correct_parts}/{total_parts})") |
| | |
| | return results |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class CountingBenchmark: |
| | """Object counting benchmark.""" |
| | |
| | def __init__(self, data_dir: str = "data/coco", max_samples: int = 200): |
| | self.data_dir = Path(data_dir) |
| | self.samples = [] |
| | |
| | |
| | ann_file = self.data_dir / "annotations" / "instances_val2017.json" |
| | if not ann_file.exists(): |
| | ann_file = self.data_dir / "annotations" / "instances_train2017.json" |
| | |
| | with open(ann_file) as f: |
| | coco = json.load(f) |
| | |
| | self.cat_id_to_name = {c['id']: c['name'] for c in coco['categories']} |
| | |
| | |
| | img_counts = defaultdict(lambda: defaultdict(int)) |
| | for ann in coco['annotations']: |
| | if not ann.get('iscrowd', 0): |
| | img_counts[ann['image_id']][ann['category_id']] += 1 |
| | |
| | for img in coco['images']: |
| | if img['id'] not in img_counts: |
| | continue |
| | |
| | img_path = self.data_dir / "images" / img['file_name'] |
| | if not img_path.exists(): |
| | continue |
| | |
| | counts = img_counts[img['id']] |
| | |
| | most_common_cat = max(counts.keys(), key=lambda k: counts[k]) |
| | count = counts[most_common_cat] |
| | |
| | if 2 <= count <= 10: |
| | self.samples.append({ |
| | 'path': str(img_path), |
| | 'category': self.cat_id_to_name[most_common_cat], |
| | 'count': count |
| | }) |
| | |
| | if len(self.samples) >= max_samples: |
| | break |
| | |
| | print(f" Loaded {len(self.samples)} counting samples") |
| | |
| | def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
| | """Evaluate counting accuracy.""" |
| | print("\n🔢 Counting Benchmark") |
| | print("-" * 40) |
| | |
| | exact_matches = 0 |
| | within_one = 0 |
| | total = 0 |
| | errors = [] |
| | |
| | for i, sample in enumerate(self.samples): |
| | if i % 25 == 0: |
| | print(f" Progress: {i}/{len(self.samples)}") |
| | |
| | try: |
| | image = Image.open(sample['path']).convert('RGB') |
| | question = f"How many {sample['category']}s are in this image?" |
| | |
| | output = model.generate(image, mode="text", prompt=question) |
| | |
| | |
| | response = output.text.lower() |
| | gt_count = sample['count'] |
| | |
| | |
| | pred_count = None |
| | for word in response.split(): |
| | try: |
| | pred_count = int(word) |
| | break |
| | except: |
| | pass |
| | |
| | |
| | word_to_num = { |
| | 'zero': 0, 'one': 1, 'two': 2, 'three': 3, 'four': 4, |
| | 'five': 5, 'six': 6, 'seven': 7, 'eight': 8, 'nine': 9, 'ten': 10 |
| | } |
| | if pred_count is None: |
| | for word, num in word_to_num.items(): |
| | if word in response: |
| | pred_count = num |
| | break |
| | |
| | if pred_count is not None: |
| | total += 1 |
| | if pred_count == gt_count: |
| | exact_matches += 1 |
| | if abs(pred_count - gt_count) <= 1: |
| | within_one += 1 |
| | errors.append(abs(pred_count - gt_count)) |
| | |
| | except Exception as e: |
| | pass |
| | |
| | accuracy = exact_matches / total if total > 0 else 0 |
| | within1_acc = within_one / total if total > 0 else 0 |
| | mae = np.mean(errors) if errors else 0 |
| | |
| | results = { |
| | 'exact_accuracy': float(accuracy), |
| | 'within_one_accuracy': float(within1_acc), |
| | 'mae': float(mae), |
| | 'total': total |
| | } |
| | |
| | print(f" Exact Accuracy: {accuracy:.2%}") |
| | print(f" Within-1 Accuracy: {within1_acc:.2%}") |
| | print(f" Mean Absolute Error: {mae:.2f}") |
| | |
| | return results |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class VQABenchmark: |
| | """Visual Question Answering benchmark.""" |
| | |
| | def __init__(self, data_dir: str = "data/coco", max_samples: int = 200): |
| | self.data_dir = Path(data_dir) |
| | |
| | |
| | self.samples = [] |
| | |
| | ann_file = self.data_dir / "annotations" / "instances_val2017.json" |
| | if not ann_file.exists(): |
| | ann_file = self.data_dir / "annotations" / "instances_train2017.json" |
| | |
| | with open(ann_file) as f: |
| | coco = json.load(f) |
| | |
| | self.cat_id_to_name = {c['id']: c['name'] for c in coco['categories']} |
| | |
| | |
| | img_cats = defaultdict(set) |
| | for ann in coco['annotations']: |
| | img_cats[ann['image_id']].add(ann['category_id']) |
| | |
| | for img in coco['images']: |
| | if img['id'] not in img_cats: |
| | continue |
| | |
| | img_path = self.data_dir / "images" / img['file_name'] |
| | if not img_path.exists(): |
| | continue |
| | |
| | cats = list(img_cats[img['id']]) |
| | if cats: |
| | cat = random.choice(cats) |
| | cat_name = self.cat_id_to_name[cat] |
| | |
| | |
| | questions = [ |
| | (f"Is there a {cat_name} in this image?", "yes"), |
| | (f"What objects are visible in this image?", cat_name), |
| | ] |
| | |
| | for q, a in questions[:1]: |
| | self.samples.append({ |
| | 'path': str(img_path), |
| | 'question': q, |
| | 'answer': a |
| | }) |
| | |
| | if len(self.samples) >= max_samples: |
| | break |
| | |
| | print(f" Loaded {len(self.samples)} VQA samples") |
| | |
| | def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
| | """Evaluate VQA accuracy.""" |
| | print("\n❓ VQA Benchmark") |
| | print("-" * 40) |
| | |
| | correct = 0 |
| | total = 0 |
| | |
| | for i, sample in enumerate(self.samples): |
| | if i % 25 == 0: |
| | print(f" Progress: {i}/{len(self.samples)}") |
| | |
| | try: |
| | image = Image.open(sample['path']).convert('RGB') |
| | output = model.generate(image, mode="text", prompt=sample['question']) |
| | |
| | response = output.text.lower() |
| | answer = sample['answer'].lower() |
| | |
| | |
| | is_correct = answer in response |
| | |
| | if is_correct: |
| | correct += 1 |
| | total += 1 |
| | |
| | except Exception as e: |
| | pass |
| | |
| | accuracy = correct / total if total > 0 else 0 |
| | |
| | results = { |
| | 'accuracy': float(accuracy), |
| | 'correct': correct, |
| | 'total': total |
| | } |
| | |
| | print(f" Accuracy: {accuracy:.2%} ({correct}/{total})") |
| | |
| | return results |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def run_benchmarks(model_path: str, benchmarks: List[str] = None): |
| | """Run all benchmarks on the model.""" |
| | |
| | print("=" * 70) |
| | print("🔮 OCULUS BENCHMARK EVALUATION SUITE") |
| | print("=" * 70) |
| | print(f"Model: {model_path}") |
| | |
| | |
| | print("\n[Loading Model]") |
| | model = OculusForConditionalGeneration.from_pretrained(model_path) |
| | |
| | |
| | heads_path = Path(model_path) / "heads.pth" |
| | if heads_path.exists(): |
| | import torch |
| | heads = torch.load(heads_path) |
| | model.detection_head.load_state_dict(heads['detection']) |
| | model.point_head.load_state_dict(heads['point']) |
| | print(" ✓ Loaded trained detection heads") |
| | |
| | model.vision_encoder.load_encoders() |
| | model.load_language_model() |
| | |
| | all_results = {} |
| | |
| | |
| | if benchmarks is None: |
| | benchmarks = ['coco', 'car_damage', 'counting', 'vqa'] |
| | |
| | if 'coco' in benchmarks: |
| | bench = COCODetectionBenchmark(max_samples=100) |
| | all_results['coco_detection'] = bench.evaluate(model) |
| | |
| | if 'car_damage' in benchmarks: |
| | bench = CarDamageBenchmark(max_samples=50) |
| | all_results['car_damage'] = bench.evaluate(model) |
| | |
| | if 'counting' in benchmarks: |
| | bench = CountingBenchmark(max_samples=100) |
| | all_results['counting'] = bench.evaluate(model) |
| | |
| | if 'vqa' in benchmarks: |
| | bench = VQABenchmark(max_samples=100) |
| | all_results['vqa'] = bench.evaluate(model) |
| | |
| | |
| | print("\n" + "=" * 70) |
| | print("📊 BENCHMARK SUMMARY") |
| | print("=" * 70) |
| | |
| | for name, results in all_results.items(): |
| | print(f"\n{name}:") |
| | for k, v in results.items(): |
| | if isinstance(v, float): |
| | print(f" {k}: {v:.4f}") |
| | else: |
| | print(f" {k}: {v}") |
| | |
| | |
| | results_path = Path(model_path) / "benchmark_results.json" |
| | with open(results_path, "w") as f: |
| | json.dump(all_results, f, indent=2) |
| | print(f"\n💾 Results saved to: {results_path}") |
| | |
| | return all_results |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import argparse |
| | |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--model", default="checkpoints/oculus_detection/final") |
| | parser.add_argument("--benchmarks", nargs="+", default=None) |
| | args = parser.parse_args() |
| | |
| | run_benchmarks(args.model, args.benchmarks) |
| |
|