| | |
| | """ |
| | Benchmark script for mosaic generation performance analysis. |
| | """ |
| |
|
| | import time |
| | import numpy as np |
| | from PIL import Image |
| | import matplotlib.pyplot as plt |
| | from typing import Dict, List |
| | import argparse |
| | import os |
| |
|
| | from src.config import Config, Implementation |
| | from src.pipeline import MosaicPipeline |
| | from src.utils import pil_to_np, np_to_pil |
| |
|
| |
|
| | def create_test_image(width: int = 512, height: int = 512) -> Image.Image: |
| | """Create a test image with various features for benchmarking.""" |
| | |
| | img_array = np.zeros((height, width, 3), dtype=np.float32) |
| | |
| | |
| | for y in range(height): |
| | for x in range(width): |
| | |
| | img_array[y, x, 0] = x / width |
| | |
| | |
| | img_array[y, x, 1] = y / height |
| | |
| | |
| | img_array[y, x, 2] = (x + y) / (width + height) |
| | |
| | |
| | center_x, center_y = width // 2, height // 2 |
| | radius = min(width, height) // 4 |
| | |
| | for y in range(height): |
| | for x in range(width): |
| | |
| | dist = np.sqrt((x - center_x)**2 + (y - center_y)**2) |
| | if dist < radius: |
| | img_array[y, x] = [1.0, 0.5, 0.2] |
| | |
| | return np_to_pil(img_array) |
| |
|
| |
|
| | def benchmark_grid_sizes(pipeline: MosaicPipeline, test_image: Image.Image, |
| | grid_sizes: List[int]) -> Dict: |
| | """Benchmark performance across different grid sizes.""" |
| | print("Benchmarking grid sizes...") |
| | results = {} |
| | |
| | for grid_size in grid_sizes: |
| | print(f"Testing grid size {grid_size}x{grid_size}...") |
| | |
| | |
| | pipeline.config.grid = grid_size |
| | pipeline.config.out_w = (test_image.width // grid_size) * grid_size |
| | pipeline.config.out_h = (test_image.height // grid_size) * grid_size |
| | |
| | |
| | start_time = time.time() |
| | pipeline_results = pipeline.run_full_pipeline(test_image) |
| | total_time = time.time() - start_time |
| | |
| | results[grid_size] = { |
| | 'processing_time': total_time, |
| | 'total_tiles': grid_size * grid_size, |
| | 'tiles_per_second': (grid_size * grid_size) / total_time, |
| | 'mse': pipeline_results['metrics']['mse'], |
| | 'ssim': pipeline_results['metrics']['ssim'], |
| | 'output_resolution': f"{pipeline_results['outputs']['mosaic'].width}x{pipeline_results['outputs']['mosaic'].height}" |
| | } |
| | |
| | print(f" Processing time: {total_time:.3f}s") |
| | print(f" Tiles per second: {results[grid_size]['tiles_per_second']:.1f}") |
| | |
| | return results |
| |
|
| |
|
| | def benchmark_implementations(pipeline: MosaicPipeline, test_image: Image.Image) -> Dict: |
| | """Compare vectorized vs loop-based implementations.""" |
| | print("Benchmarking implementations...") |
| | |
| | results = {} |
| | |
| | |
| | print("Testing vectorized implementation...") |
| | pipeline.config.impl = Implementation.VECT |
| | start_time = time.time() |
| | vec_results = pipeline.run_full_pipeline(test_image) |
| | vec_time = time.time() - start_time |
| | |
| | results['vectorized'] = { |
| | 'processing_time': vec_time, |
| | 'mse': vec_results['metrics']['mse'], |
| | 'ssim': vec_results['metrics']['ssim'] |
| | } |
| | |
| | |
| | print("Testing loop-based implementation...") |
| | pipeline.config.impl = Implementation.LOOPS |
| | start_time = time.time() |
| | loop_results = pipeline.run_full_pipeline(test_image) |
| | loop_time = time.time() - start_time |
| | |
| | results['loop_based'] = { |
| | 'processing_time': loop_time, |
| | 'mse': loop_results['metrics']['mse'], |
| | 'ssim': loop_results['metrics']['ssim'] |
| | } |
| | |
| | |
| | speedup = loop_time / vec_time if vec_time > 0 else 0 |
| | results['comparison'] = { |
| | 'speedup_factor': speedup, |
| | 'vectorized_faster': vec_time < loop_time |
| | } |
| | |
| | print(f"Vectorized: {vec_time:.3f}s") |
| | print(f"Loop-based: {loop_time:.3f}s") |
| | print(f"Speedup factor: {speedup:.2f}x") |
| | |
| | return results |
| |
|
| |
|
| | def plot_benchmark_results(grid_results: Dict, impl_results: Dict, output_dir: str = "images"): |
| | """Create plots of benchmark results.""" |
| | os.makedirs(output_dir, exist_ok=True) |
| | |
| | |
| | plt.figure(figsize=(10, 6)) |
| | grid_sizes = sorted(grid_results.keys()) |
| | processing_times = [grid_results[gs]['processing_time'] for gs in grid_sizes] |
| | total_tiles = [grid_results[gs]['total_tiles'] for gs in grid_sizes] |
| | |
| | plt.subplot(1, 2, 1) |
| | plt.plot(grid_sizes, processing_times, 'bo-', linewidth=2, markersize=8) |
| | plt.xlabel('Grid Size') |
| | plt.ylabel('Processing Time (seconds)') |
| | plt.title('Processing Time vs Grid Size') |
| | plt.grid(True, alpha=0.3) |
| | |
| | plt.subplot(1, 2, 2) |
| | plt.plot(total_tiles, processing_times, 'ro-', linewidth=2, markersize=8) |
| | plt.xlabel('Total Number of Tiles') |
| | plt.ylabel('Processing Time (seconds)') |
| | plt.title('Processing Time vs Number of Tiles') |
| | plt.grid(True, alpha=0.3) |
| | |
| | plt.tight_layout() |
| | plt.savefig(f"{output_dir}/processing_time_analysis.png", dpi=300, bbox_inches='tight') |
| | plt.close() |
| | |
| | |
| | plt.figure(figsize=(12, 5)) |
| | |
| | plt.subplot(1, 2, 1) |
| | mse_values = [grid_results[gs]['mse'] for gs in grid_sizes] |
| | plt.plot(grid_sizes, mse_values, 'go-', linewidth=2, markersize=8) |
| | plt.xlabel('Grid Size') |
| | plt.ylabel('MSE') |
| | plt.title('Mean Squared Error vs Grid Size') |
| | plt.grid(True, alpha=0.3) |
| | plt.yscale('log') |
| | |
| | plt.subplot(1, 2, 2) |
| | ssim_values = [grid_results[gs]['ssim'] for gs in grid_sizes] |
| | plt.plot(grid_sizes, ssim_values, 'mo-', linewidth=2, markersize=8) |
| | plt.xlabel('Grid Size') |
| | plt.ylabel('SSIM') |
| | plt.title('Structural Similarity vs Grid Size') |
| | plt.grid(True, alpha=0.3) |
| | |
| | plt.tight_layout() |
| | plt.savefig(f"{output_dir}/quality_metrics_analysis.png", dpi=300, bbox_inches='tight') |
| | plt.close() |
| | |
| | |
| | plt.figure(figsize=(8, 6)) |
| | impl_names = ['Vectorized', 'Loop-based'] |
| | impl_times = [ |
| | impl_results['vectorized']['processing_time'], |
| | impl_results['loop_based']['processing_time'] |
| | ] |
| | |
| | bars = plt.bar(impl_names, impl_times, color=['skyblue', 'lightcoral']) |
| | plt.ylabel('Processing Time (seconds)') |
| | plt.title('Implementation Performance Comparison') |
| | plt.grid(True, alpha=0.3, axis='y') |
| | |
| | |
| | for bar, time_val in zip(bars, impl_times): |
| | plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, |
| | f'{time_val:.3f}s', ha='center', va='bottom') |
| | |
| | plt.tight_layout() |
| | plt.savefig(f"{output_dir}/implementation_comparison.png", dpi=300, bbox_inches='tight') |
| | plt.close() |
| |
|
| |
|
| | def generate_benchmark_report(grid_results: Dict, impl_results: Dict, output_file: str = "benchmark_report.txt"): |
| | """Generate a comprehensive benchmark report.""" |
| | with open(output_file, 'w') as f: |
| | f.write("MOSAIC GENERATION BENCHMARK REPORT\n") |
| | f.write("=" * 50 + "\n\n") |
| | |
| | |
| | f.write("GRID SIZE PERFORMANCE ANALYSIS\n") |
| | f.write("-" * 30 + "\n") |
| | for grid_size in sorted(grid_results.keys()): |
| | result = grid_results[grid_size] |
| | f.write(f"Grid {grid_size}x{grid_size}:\n") |
| | f.write(f" Processing Time: {result['processing_time']:.3f}s\n") |
| | f.write(f" Total Tiles: {result['total_tiles']}\n") |
| | f.write(f" Tiles per Second: {result['tiles_per_second']:.1f}\n") |
| | f.write(f" MSE: {result['mse']:.6f}\n") |
| | f.write(f" SSIM: {result['ssim']:.4f}\n") |
| | f.write(f" Output Resolution: {result['output_resolution']}\n\n") |
| | |
| | |
| | grid_sizes = sorted(grid_results.keys()) |
| | if len(grid_sizes) >= 2: |
| | first_result = grid_results[grid_sizes[0]] |
| | last_result = grid_results[grid_sizes[-1]] |
| | |
| | tile_ratio = last_result['total_tiles'] / first_result['total_tiles'] |
| | time_ratio = last_result['processing_time'] / first_result['processing_time'] |
| | |
| | f.write("SCALING ANALYSIS\n") |
| | f.write("-" * 20 + "\n") |
| | f.write(f"Tile increase ratio: {tile_ratio:.2f}x\n") |
| | f.write(f"Time increase ratio: {time_ratio:.2f}x\n") |
| | f.write(f"Scaling efficiency: {tile_ratio/time_ratio:.2f}\n") |
| | f.write(f"Linear scaling: {'Yes' if abs(time_ratio - tile_ratio) / tile_ratio < 0.1 else 'No'}\n\n") |
| | |
| | |
| | f.write("IMPLEMENTATION COMPARISON\n") |
| | f.write("-" * 25 + "\n") |
| | f.write(f"Vectorized processing time: {impl_results['vectorized']['processing_time']:.3f}s\n") |
| | f.write(f"Loop-based processing time: {impl_results['loop_based']['processing_time']:.3f}s\n") |
| | f.write(f"Speedup factor: {impl_results['comparison']['speedup_factor']:.2f}x\n") |
| | f.write(f"Vectorized is faster: {'Yes' if impl_results['comparison']['vectorized_faster'] else 'No'}\n\n") |
| | |
| | |
| | f.write("QUALITY COMPARISON\n") |
| | f.write("-" * 18 + "\n") |
| | f.write(f"Vectorized MSE: {impl_results['vectorized']['mse']:.6f}\n") |
| | f.write(f"Loop-based MSE: {impl_results['loop_based']['mse']:.6f}\n") |
| | f.write(f"Vectorized SSIM: {impl_results['vectorized']['ssim']:.4f}\n") |
| | f.write(f"Loop-based SSIM: {impl_results['loop_based']['ssim']:.4f}\n") |
| |
|
| |
|
| | def main(): |
| | """Main benchmark function.""" |
| | parser = argparse.ArgumentParser(description='Benchmark mosaic generation performance') |
| | parser.add_argument('--grid-sizes', nargs='+', type=int, default=[16, 32, 48, 64], |
| | help='Grid sizes to test (default: 16 32 48 64)') |
| | parser.add_argument('--output-dir', default='images', help='Output directory for plots') |
| | parser.add_argument('--test-image', help='Path to test image (optional)') |
| | args = parser.parse_args() |
| | |
| | print("Starting mosaic generation benchmark...") |
| | |
| | |
| | if args.test_image and os.path.exists(args.test_image): |
| | test_image = Image.open(args.test_image) |
| | print(f"Using test image: {args.test_image}") |
| | else: |
| | test_image = create_test_image() |
| | print("Using generated test image") |
| | |
| | |
| | config = Config(grid=32) |
| | pipeline = MosaicPipeline(config) |
| | |
| | |
| | print("\n" + "="*50) |
| | grid_results = benchmark_grid_sizes(pipeline, test_image, args.grid_sizes) |
| | |
| | print("\n" + "="*50) |
| | impl_results = benchmark_implementations(pipeline, test_image) |
| | |
| | |
| | print("\nGenerating plots and report...") |
| | plot_benchmark_results(grid_results, impl_results, args.output_dir) |
| | generate_benchmark_report(grid_results, impl_results) |
| | |
| | print(f"\nBenchmark complete!") |
| | print(f"Plots saved to: {args.output_dir}/") |
| | print(f"Report saved to: benchmark_report.txt") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|