| | from __future__ import annotations |
| | import numpy as np |
| | from PIL import Image |
| | from typing import Dict, List, Tuple, Optional |
| | import time |
| | from .config import Config, Implementation |
| | from .mosaic import MosaicGenerator |
| | from .metrics import calculate_comprehensive_metrics, interpret_metrics |
| | from .utils import pil_to_np, np_to_pil |
| |
|
| |
|
| | class MosaicPipeline: |
| | """Complete pipeline for mosaic generation with performance analysis.""" |
| | |
| | def __init__(self, config: Config): |
| | self.config = config |
| | self.mosaic_generator = MosaicGenerator(config) |
| | self.results = {} |
| | |
| | def run_full_pipeline(self, image: Image.Image) -> Dict: |
| | """ |
| | Run the complete mosaic generation pipeline. |
| | |
| | Args: |
| | image: Input PIL Image |
| | |
| | Returns: |
| | Dictionary with all results and metrics |
| | """ |
| | self.config.validate() |
| |
|
| | results = { |
| | 'input_image': image, |
| | 'config': self.config.__dict__.copy(), |
| | 'timing': {}, |
| | 'metrics': {}, |
| | 'outputs': {} |
| | } |
| | |
| | |
| | start_time = time.time() |
| | mosaic_img, stats = self.mosaic_generator.generate_mosaic(image) |
| | results['timing'] = stats['processing_time'] |
| | results['outputs']['mosaic'] = mosaic_img |
| | |
| | |
| | metrics_start = time.time() |
| | metrics = calculate_comprehensive_metrics(image, mosaic_img) |
| | results['metrics'] = metrics |
| | results['metrics_interpretation'] = interpret_metrics(metrics) |
| | results['timing']['metrics_calculation'] = time.time() - metrics_start |
| | |
| | |
| | results['outputs']['processed_image'] = self.mosaic_generator.preprocess_image(image) |
| | results['grid_info'] = { |
| | 'grid_size': self.config.grid, |
| | 'tile_size': self.config.tile_size, |
| | 'total_tiles': self.config.grid ** 2 |
| | } |
| | |
| | self.results = results |
| | return results |
| | |
| | def benchmark_implementations(self, image: Image.Image) -> Dict: |
| | """ |
| | Compare vectorized vs loop-based implementations. |
| | |
| | Args: |
| | image: Input PIL Image |
| | |
| | Returns: |
| | Dictionary with performance comparison |
| | """ |
| | original_impl = self.config.impl |
| | |
| | results = { |
| | 'vectorized': {}, |
| | 'loop_based': {}, |
| | 'comparison': {} |
| | } |
| | |
| | |
| | self.config.impl = Implementation.VECT |
| | start_time = time.time() |
| | vec_results = self.run_full_pipeline(image) |
| | vec_time = time.time() - start_time |
| | |
| | results['vectorized'] = { |
| | 'processing_time': vec_time, |
| | 'metrics': vec_results['metrics'], |
| | 'mosaic': vec_results['outputs']['mosaic'] |
| | } |
| | |
| | |
| | self.config.impl = Implementation.LOOPS |
| | start_time = time.time() |
| | loop_results = self.run_full_pipeline(image) |
| | loop_time = time.time() - start_time |
| | |
| | results['loop_based'] = { |
| | 'processing_time': loop_time, |
| | 'metrics': loop_results['metrics'], |
| | 'mosaic': loop_results['outputs']['mosaic'] |
| | } |
| | |
| | |
| | speedup = loop_time / vec_time if vec_time > 0 else 0 |
| | results['comparison'] = { |
| | 'speedup_factor': speedup, |
| | 'time_difference': loop_time - vec_time, |
| | 'vectorized_faster': vec_time < loop_time |
| | } |
| | |
| | |
| | self.config.impl = original_impl |
| | |
| | return results |
| | |
| | def benchmark_grid_sizes(self, image: Image.Image, grid_sizes: List[int]) -> Dict: |
| | """ |
| | Benchmark performance for different grid sizes. |
| | |
| | Args: |
| | image: Input PIL Image |
| | grid_sizes: List of grid sizes to test |
| | |
| | Returns: |
| | Dictionary with grid size performance results |
| | """ |
| | results = {} |
| | original_grid = self.config.grid |
| | original_out_w = self.config.out_w |
| | original_out_h = self.config.out_h |
| | |
| | for grid_size in grid_sizes: |
| | self.config.grid = grid_size |
| | |
| | |
| | aspect_ratio = image.width / image.height |
| | if aspect_ratio > 1: |
| | |
| | self.config.out_w = (image.width // grid_size) * grid_size |
| | self.config.out_h = int(self.config.out_w / aspect_ratio // grid_size) * grid_size |
| | else: |
| | |
| | self.config.out_h = (image.height // grid_size) * grid_size |
| | self.config.out_w = int(self.config.out_h * aspect_ratio // grid_size) * grid_size |
| | |
| | |
| | start_time = time.time() |
| | pipeline_results = self.run_full_pipeline(image) |
| | total_time = time.time() - start_time |
| | |
| | results[grid_size] = { |
| | 'processing_time': total_time, |
| | 'output_resolution': f"{pipeline_results['outputs']['mosaic'].width}x{pipeline_results['outputs']['mosaic'].height}", |
| | 'total_tiles': grid_size * grid_size, |
| | 'tiles_per_second': (grid_size * grid_size) / total_time if total_time > 0 else 0, |
| | 'metrics': pipeline_results['metrics'] |
| | } |
| | |
| | |
| | self.config.grid = original_grid |
| | self.config.out_w = original_out_w |
| | self.config.out_h = original_out_h |
| | |
| | return results |
| | |
| | def analyze_performance_scaling(self, benchmark_results: Dict) -> Dict: |
| | """ |
| | Analyze how performance scales with grid size. |
| | |
| | Args: |
| | benchmark_results: Results from benchmark_grid_sizes |
| | |
| | Returns: |
| | Dictionary with scaling analysis |
| | """ |
| | grid_sizes = sorted(benchmark_results.keys()) |
| | processing_times = [benchmark_results[gs]['processing_time'] for gs in grid_sizes] |
| | total_tiles = [benchmark_results[gs]['total_tiles'] for gs in grid_sizes] |
| | tiles_per_second = [benchmark_results[gs]['tiles_per_second'] for gs in grid_sizes] |
| | |
| | |
| | scaling_analysis = { |
| | 'grid_sizes': grid_sizes, |
| | 'processing_times': processing_times, |
| | 'total_tiles': total_tiles, |
| | 'tiles_per_second': tiles_per_second, |
| | 'scaling_factors': {} |
| | } |
| | |
| | if len(grid_sizes) >= 2: |
| | |
| | tile_ratio = total_tiles[-1] / total_tiles[0] |
| | time_ratio = processing_times[-1] / processing_times[0] |
| | |
| | scaling_analysis['scaling_factors'] = { |
| | 'tile_increase_ratio': tile_ratio, |
| | 'time_increase_ratio': time_ratio, |
| | 'scaling_efficiency': tile_ratio / time_ratio if time_ratio > 0 else 0, |
| | 'is_linear_scaling': abs(time_ratio - tile_ratio) / tile_ratio < 0.1 |
| | } |
| | |
| | return scaling_analysis |
| | |
| | def generate_report(self, image: Image.Image, benchmark_results: Optional[Dict] = None) -> str: |
| | """ |
| | Generate a comprehensive report of the mosaic generation process. |
| | |
| | Args: |
| | image: Input PIL Image |
| | benchmark_results: Optional benchmark results |
| | |
| | Returns: |
| | Formatted report string |
| | """ |
| | |
| | if not self.results: |
| | self.run_full_pipeline(image) |
| | |
| | report = [] |
| | report.append("=" * 60) |
| | report.append("MOSAIC GENERATION REPORT") |
| | report.append("=" * 60) |
| | |
| | |
| | report.append("\nCONFIGURATION:") |
| | report.append(f"Grid Size: {self.config.grid}x{self.config.grid}") |
| | report.append(f"Tile Size: {self.config.tile_size}x{self.config.tile_size}") |
| | report.append(f"Output Resolution: {self.config.out_w}x{self.config.out_h}") |
| | report.append(f"Implementation: {self.config.impl.value}") |
| | report.append(f"Color Matching: {self.config.match_space.value}") |
| | report.append(f"Total Tiles: {self.config.grid ** 2}") |
| | |
| | |
| | report.append("\nPROCESSING TIME:") |
| | for stage, time_val in self.results['timing'].items(): |
| | report.append(f"{stage.replace('_', ' ').title()}: {time_val:.3f} seconds") |
| | |
| | |
| | report.append("\nQUALITY METRICS:") |
| | metrics = self.results['metrics'] |
| | interpretations = self.results['metrics_interpretation'] |
| | |
| | report.append(f"MSE: {metrics['mse']:.6f} ({interpretations['mse']})") |
| | report.append(f"PSNR: {metrics['psnr']:.2f} dB ({interpretations['psnr']})") |
| | report.append(f"SSIM: {metrics['ssim']:.4f} ({interpretations['ssim']})") |
| | report.append(f"RMSE: {metrics['rmse']:.6f}") |
| | report.append(f"MAE: {metrics['mae']:.6f}") |
| | |
| | |
| | if benchmark_results: |
| | report.append("\nBENCHMARK RESULTS:") |
| | for grid_size, result in benchmark_results.items(): |
| | report.append(f"Grid {grid_size}x{grid_size}:") |
| | report.append(f" Processing Time: {result['processing_time']:.3f}s") |
| | report.append(f" Tiles per Second: {result['tiles_per_second']:.1f}") |
| | report.append(f" Output Resolution: {result['output_resolution']}") |
| | |
| | report.append("\n" + "=" * 60) |
| | |
| | return "\n".join(report) |
| |
|