File size: 5,134 Bytes
bad4ddc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# /// script
# dependencies = [
#     "torch",
#     "numpy",
# ]
# ///

"""Simple utilities for running the models."""
import torch

def to_dtype(dtype_str: str):
    """Convert string to torch dtype."""
    if dtype_str == "float16":
        return torch.float16
    if dtype_str == "bfloat16":
        return torch.bfloat16
    return torch.float32

def tensor_stats(t: torch.Tensor) -> str:
    """Generate stats string for a tensor."""
    return (f"shape={tuple(t.shape)}, "
            f"dtype={t.dtype}, "
            f"device={t.device}, "
            f"mean={t.mean().item():.6f}, "
            f"std={t.std().item():.6f}")

def set_seed(seed: int):
    """Set seeds for reproducibility."""
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

"""Reusable benchmarking utilities for performance testing."""
import time
import numpy as np
from contextlib import contextmanager
from typing import Callable, Dict, Tuple, Any, Optional
import torch
import json

def precise_timing(func: Callable[[], Any], warmup: int = 5, iters: int = 20, 
                   input_generator: Optional[Callable[[int], Any]] = None) -> Tuple[Any, float]:
    """High precision timing function with warmup and optional input generation per iteration."""
    # Warmup
    for i in range(warmup):
        if input_generator:
            inputs = input_generator(i)
            func(inputs)
        else:
            func()
    
    if torch.cuda.is_available():
        torch.cuda.synchronize()
    
    start = time.perf_counter()
    result = None
    for i in range(iters):
        if input_generator:
            inputs = input_generator(i + warmup)  # Continue seed sequence after warmup
            result = func(inputs)
        else:
            result = func()
    
    if torch.cuda.is_available():
        torch.cuda.synchronize()
    
    end = time.perf_counter()
    avg_time = (end - start) / iters
    return result, avg_time

def memory_usage() -> Dict[str, float]:
    """Get current memory usage in GB."""
    if not torch.cuda.is_available():
        return {"allocated": 0.0, "cached": 0.0, "max_allocated": 0.0}
    
    return {
        "allocated": torch.cuda.memory_allocated() / 1024**3,
        "cached": torch.cuda.memory_reserved() / 1024**3,
        "max_allocated": torch.cuda.max_memory_allocated() / 1024**3
    }

@contextmanager
def bench_context(warmup: int = 10, iters: int = 50, device=None, dtype=None, 
                  tokens: int = None, save_json: Optional[str] = None,
                  input_shape: Optional[Tuple] = None, input_seed_base: int = 42):
    """Context manager for benchmarking with comprehensive metrics and optional input generation."""
    
    def run_benchmark(model_func, *args, **kwargs):
        torch.cuda.empty_cache() if torch.cuda.is_available() else None
        
        mem_before = memory_usage()
        
        # Create input generator if input_shape is provided
        input_generator = None
        if input_shape is not None:
            def create_input(iteration: int):
                # Use deterministic but different seed for each iteration
                iteration_seed = input_seed_base + iteration * 123  # Spread out seeds
                torch.manual_seed(iteration_seed)
                if torch.cuda.is_available():
                    torch.cuda.manual_seed(iteration_seed)
                return torch.randn(*input_shape, device=device, dtype=dtype) * 0.1
            input_generator = create_input
        
        if input_generator:
            result, avg_time = precise_timing(lambda x: model_func(x), warmup, iters, input_generator)
        else:
            result, avg_time = precise_timing(lambda: model_func(*args, **kwargs), warmup, iters)
        
        mem_after = memory_usage()
        
        # Calculate metrics
        metrics = {
            "avg_time_ms": avg_time * 1000,
            "throughput_tokens_per_sec": tokens / avg_time if tokens else None,
            "memory_allocated_gb": mem_after["allocated"],
            "memory_cached_gb": mem_after["cached"],
            "memory_increase_gb": mem_after["allocated"] - mem_before["allocated"],
            "device": str(device) if device else "cpu",
            "dtype": str(dtype) if dtype else "float32",
            "tokens": tokens,
            "warmup_iters": warmup,
            "timing_iters": iters
        }
        
        # Print results
        print(f"Average time: {metrics['avg_time_ms']:.3f} ms")
        if tokens:
            print(f"Throughput: {metrics['throughput_tokens_per_sec']:.0f} tokens/sec")
        print(f"Memory allocated: {metrics['memory_allocated_gb']:.3f} GB")
        print(f"Memory increase: {metrics['memory_increase_gb']:.3f} GB")
        
        # Save to JSON if requested
        if save_json:
            with open(save_json, 'w') as f:
                json.dump(metrics, f, indent=2)
        
        return result
    
    yield run_benchmark