Spaces:
Runtime error
Runtime error
| """ | |
| Model Evaluator Module | |
| Provides model evaluation and inference capabilities. | |
| """ | |
| from dataclasses import dataclass, field | |
| from typing import List, Dict, Optional, Any | |
| import json | |
| import time | |
| from pathlib import Path | |
| import torch | |
| class EvaluationConfig: | |
| """ | |
| Configuration for model evaluation. | |
| """ | |
| model_name: str = "gpt2" | |
| model_path: Optional[str] = None | |
| max_length: int = 512 | |
| temperature: float = 0.7 | |
| top_p: float = 0.9 | |
| top_k: int = 50 | |
| num_beams: int = 1 | |
| do_sample: bool = True | |
| batch_size: int = 1 | |
| device: str = "auto" | |
| max_samples: Optional[int] = None | |
| save_results: bool = True | |
| output_dir: str = "evaluation_results" | |
| include_metrics: bool = True | |
| include_timings: bool = True | |
| class ModelEvaluator: | |
| """ | |
| Model evaluator for testing and benchmarking LLMs. | |
| Features: | |
| - Inference on test datasets | |
| - Batch processing | |
| - Performance metrics | |
| - Result saving and analysis | |
| """ | |
| def __init__( | |
| self, | |
| config: Optional[EvaluationConfig] = None, | |
| model: Optional[Any] = None, | |
| tokenizer: Optional[Any] = None | |
| ): | |
| """ | |
| Initialize evaluator. | |
| Args: | |
| config: Evaluation configuration | |
| model: Pre-loaded model (optional) | |
| tokenizer: Pre-loaded tokenizer (optional) | |
| """ | |
| self.config = config or EvaluationConfig() | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.results = [] | |
| self.metrics = {} | |
| def load_model(self): | |
| """Load model and tokenizer.""" | |
| if self.model is not None and self.tokenizer is not None: | |
| print("Using pre-loaded model and tokenizer") | |
| return | |
| try: | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| print(f"Loading model: {self.config.model_name}") | |
| # Load tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.config.model_path or self.config.model_name, | |
| trust_remote_code=True | |
| ) | |
| # Ensure pad token exists | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| # Load model | |
| device_map = "auto" if self.config.device == "auto" else None | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.config.model_path or self.config.model_name, | |
| device_map=device_map, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 | |
| ) | |
| self.model.eval() | |
| print("Model loaded successfully") | |
| except Exception as e: | |
| print(f"Error loading model: {e}") | |
| raise | |
| def generate_response( | |
| self, | |
| prompt: str, | |
| max_length: Optional[int] = None, | |
| temperature: Optional[float] = None | |
| ) -> str: | |
| """ | |
| Generate response for a single prompt. | |
| Args: | |
| prompt: Input prompt | |
| max_length: Max generation length | |
| temperature: Sampling temperature | |
| Returns: | |
| Generated text | |
| """ | |
| if self.model is None or self.tokenizer is None: | |
| self.load_model() | |
| max_length = max_length or self.config.max_length | |
| temperature = temperature or self.config.temperature | |
| try: | |
| # Tokenize | |
| inputs = self.tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=2048 | |
| ) | |
| # Move to device | |
| if hasattr(self.model, 'device'): | |
| inputs = {k: v.to(self.model.device) for k, v in inputs.items()} | |
| # Generate | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_length=max_length, | |
| temperature=temperature, | |
| top_p=self.config.top_p, | |
| top_k=self.config.top_k, | |
| num_beams=self.config.num_beams, | |
| do_sample=self.config.do_sample, | |
| pad_token_id=self.tokenizer.pad_token_id, | |
| eos_token_id=self.tokenizer.eos_token_id | |
| ) | |
| # Decode | |
| generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Remove prompt from output | |
| if generated_text.startswith(prompt): | |
| generated_text = generated_text[len(prompt):].strip() | |
| return generated_text | |
| except Exception as e: | |
| print(f"Generation error: {e}") | |
| return f"[Error: {str(e)}]" | |
| def evaluate_dataset( | |
| self, | |
| dataset: List[Dict[str, str]], | |
| max_samples: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Evaluate model on a dataset. | |
| Args: | |
| dataset: List of examples with 'instruction', 'input', 'output' | |
| max_samples: Maximum samples to evaluate | |
| Returns: | |
| Evaluation results | |
| """ | |
| if self.model is None: | |
| self.load_model() | |
| max_samples = max_samples or self.config.max_samples or len(dataset) | |
| dataset = dataset[:max_samples] | |
| print(f"\nEvaluating on {len(dataset)} examples...") | |
| results = [] | |
| predictions = [] | |
| references = [] | |
| start_time = time.time() | |
| for i, example in enumerate(dataset): | |
| # Build prompt | |
| instruction = example.get('instruction', '') | |
| input_text = example.get('input', '') | |
| reference = example.get('output', '') | |
| if input_text: | |
| prompt = f"{instruction}\n\nInput: {input_text}\n\nResponse:" | |
| else: | |
| prompt = f"{instruction}\n\nResponse:" | |
| # Generate | |
| example_start = time.time() | |
| prediction = self.generate_response(prompt) | |
| example_time = time.time() - example_start | |
| # Store results | |
| result = { | |
| 'index': i, | |
| 'instruction': instruction, | |
| 'input': input_text, | |
| 'reference': reference, | |
| 'prediction': prediction, | |
| 'generation_time': example_time | |
| } | |
| results.append(result) | |
| predictions.append(prediction) | |
| references.append(reference) | |
| if (i + 1) % 10 == 0: | |
| print(f" Processed {i + 1}/{len(dataset)} examples...") | |
| total_time = time.time() - start_time | |
| # Calculate metrics | |
| metrics = {} | |
| if self.config.include_metrics: | |
| try: | |
| from .metrics import Metrics | |
| metrics_calc = Metrics() | |
| metrics = metrics_calc.calculate_all_metrics(predictions, references) | |
| except Exception as e: | |
| print(f"Metrics calculation error: {e}") | |
| metrics = {'error': str(e)} | |
| # Compile results | |
| evaluation_results = { | |
| 'config': { | |
| 'model_name': self.config.model_name, | |
| 'model_path': self.config.model_path, | |
| 'max_length': self.config.max_length, | |
| 'temperature': self.config.temperature, | |
| 'num_samples': len(dataset) | |
| }, | |
| 'metrics': metrics, | |
| 'timing': { | |
| 'total_time': total_time, | |
| 'avg_time_per_example': total_time / len(dataset), | |
| 'throughput': len(dataset) / total_time | |
| }, | |
| 'examples': results | |
| } | |
| # Save results | |
| if self.config.save_results: | |
| self.save_results(evaluation_results) | |
| print(f"\n✅ Evaluation complete!") | |
| print(f"Total time: {total_time:.2f}s") | |
| print(f"Avg time per example: {total_time/len(dataset):.2f}s") | |
| if metrics: | |
| print(f"\nMetrics:") | |
| for key, value in metrics.items(): | |
| if isinstance(value, (int, float)): | |
| print(f" {key}: {value:.2f}") | |
| return evaluation_results | |
| def save_results(self, results: Dict[str, Any], filename: Optional[str] = None): | |
| """ | |
| Save evaluation results to JSON. | |
| Args: | |
| results: Evaluation results | |
| filename: Output filename | |
| """ | |
| output_dir = Path(self.config.output_dir) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| if filename is None: | |
| timestamp = time.strftime('%Y%m%d_%H%M%S') | |
| filename = f"evaluation_{timestamp}.json" | |
| filepath = output_dir / filename | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(results, f, indent=2, ensure_ascii=False) | |
| print(f"Results saved to: {filepath}") | |
| def load_results(self, filepath: str) -> Dict[str, Any]: | |
| """ | |
| Load evaluation results from JSON. | |
| Args: | |
| filepath: Path to results file | |
| Returns: | |
| Loaded results | |
| """ | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| results = json.load(f) | |
| return results | |
| def compare_results(self, results_list: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Compare multiple evaluation results. | |
| Args: | |
| results_list: List of evaluation results | |
| Returns: | |
| Comparison summary | |
| """ | |
| comparison = { | |
| 'num_evaluations': len(results_list), | |
| 'models': [r['config']['model_name'] for r in results_list], | |
| 'metrics_comparison': {} | |
| } | |
| # Extract metrics | |
| all_metrics = {} | |
| for i, results in enumerate(results_list): | |
| model_name = results['config']['model_name'] | |
| all_metrics[model_name] = results.get('metrics', {}) | |
| # Compare each metric | |
| metric_names = set() | |
| for metrics in all_metrics.values(): | |
| metric_names.update(metrics.keys()) | |
| for metric in metric_names: | |
| values = {} | |
| for model, metrics in all_metrics.items(): | |
| if metric in metrics: | |
| values[model] = metrics[metric] | |
| if values: | |
| comparison['metrics_comparison'][metric] = values | |
| return comparison | |