Spaces:
Running
Running
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| from datasets import Dataset | |
| import evaluate | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| TrainingArguments, | |
| Trainer, | |
| EarlyStoppingCallback | |
| ) | |
| from sklearn.model_selection import train_test_split | |
| class RobertaRetrainer: | |
| """ | |
| A class to retrain the RobertaLarge model using labeled financial news data. | |
| This follows the Hugging Face fine-tuning approach outlined in the RETRAIN.MD guide. | |
| """ | |
| def __init__(self, | |
| model_name="Farshid/roberta-large-financial-phrasebank-allagree1", | |
| output_dir="./nimou-RoBERTa", | |
| csv_path=None): | |
| """ | |
| Initialize the retrainer with model configuration. | |
| Args: | |
| model_name (str): HuggingFace model identifier | |
| output_dir (str): Directory where fine-tuned model will be saved | |
| csv_path (str): Path to the labeled dataset CSV | |
| """ | |
| self.model_name = model_name | |
| self.output_dir = output_dir | |
| self.csv_path = csv_path | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.tokenizer = None | |
| self.model = None | |
| self.train_dataset = None | |
| self.val_dataset = None | |
| self.accuracy_metric = evaluate.load("accuracy") | |
| print(f"Using device: {self.device}") | |
| # Create output directory if it doesn't exist | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| def load_data(self, csv_path=None): | |
| """ | |
| Load and prepare the dataset from CSV. | |
| Args: | |
| csv_path (str, optional): Override the CSV path provided in the constructor | |
| Returns: | |
| tuple: Processed train and validation datasets | |
| """ | |
| if csv_path: | |
| self.csv_path = csv_path | |
| if not self.csv_path: | |
| raise ValueError("CSV path must be provided") | |
| print(f"Loading data from {self.csv_path}") | |
| df = pd.read_csv(self.csv_path) | |
| # Basic data validation | |
| if 'text' not in df.columns or 'label' not in df.columns: | |
| raise ValueError("CSV must contain 'text' and 'label' columns") | |
| # Split into train and validation sets | |
| train_df, val_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label']) | |
| # Convert to HuggingFace datasets | |
| train_dataset = Dataset.from_pandas(train_df) | |
| val_dataset = Dataset.from_pandas(val_df) | |
| print(f"Training samples: {len(train_dataset)}") | |
| print(f"Validation samples: {len(val_dataset)}") | |
| # Display label distribution | |
| print("Label distribution in training set:") | |
| for label, count in train_df['label'].value_counts().items(): | |
| print(f" Label {label}: {count} samples ({count / len(train_df) * 100:.2f}%)") | |
| self.train_dataset = train_dataset | |
| self.val_dataset = val_dataset | |
| return train_dataset, val_dataset | |
| def load_model(self): | |
| """ | |
| Load the pretrained model and tokenizer. | |
| Returns: | |
| tuple: Loaded tokenizer and model | |
| """ | |
| print(f"Loading model {self.model_name}") | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
| self.model = AutoModelForSequenceClassification.from_pretrained( | |
| self.model_name, | |
| num_labels=3 # NEGATIVE=0, NEUTRAL=1, POSITIVE=2 | |
| ) | |
| return self.tokenizer, self.model | |
| def tokenize_data(self, max_length=128): | |
| """ | |
| Tokenize the datasets. | |
| Args: | |
| max_length (int): Maximum sequence length for tokenization | |
| Returns: | |
| tuple: Tokenized training and validation datasets | |
| """ | |
| if not self.tokenizer: | |
| self.load_model() | |
| if not self.train_dataset or not self.val_dataset: | |
| self.load_data() | |
| def preprocess(examples): | |
| return self.tokenizer( | |
| examples["text"], | |
| padding="max_length", | |
| truncation=True, | |
| max_length=max_length | |
| ) | |
| tokenized_train = self.train_dataset.map(preprocess, batched=True) | |
| tokenized_val = self.val_dataset.map(preprocess, batched=True) | |
| print("Datasets tokenized") | |
| self.train_dataset = tokenized_train | |
| self.val_dataset = tokenized_val | |
| return tokenized_train, tokenized_val | |
| def compute_metrics(self, eval_pred): | |
| """ | |
| Compute evaluation metrics during training. | |
| Args: | |
| eval_pred (tuple): Tuple of predictions and labels from the trainer | |
| Returns: | |
| dict: Dictionary containing evaluation metrics | |
| """ | |
| logits, labels = eval_pred | |
| predictions = np.argmax(logits, axis=1) | |
| acc = self.accuracy_metric.compute(predictions=predictions, references=labels) | |
| # Calculate precision, recall, and f1 for each class | |
| results = {"accuracy": acc["accuracy"]} | |
| return results | |
| def train(self, | |
| num_train_epochs=3, | |
| learning_rate=2e-5, | |
| per_device_train_batch_size=8, | |
| per_device_eval_batch_size=8, | |
| weight_decay=0.01, | |
| warmup_ratio=0.1, | |
| logging_steps=100, | |
| eval_steps=500, | |
| save_steps=1000, | |
| load_best_model_at_end=True): | |
| """ | |
| Train the model on the prepared dataset. | |
| Args: | |
| num_train_epochs (int): Number of training epochs | |
| learning_rate (float): Learning rate for optimizer | |
| per_device_train_batch_size (int): Batch size for training | |
| per_device_eval_batch_size (int): Batch size for evaluation | |
| weight_decay (float): Weight decay for regularization | |
| warmup_ratio (float): Ratio of warmup steps | |
| logging_steps (int): Number of steps between logging | |
| eval_steps (int): Number of steps between evaluations | |
| save_steps (int): Number of steps between checkpoints | |
| load_best_model_at_end (bool): Whether to load the best model at the end | |
| Returns: | |
| Trainer: Trained model trainer | |
| """ | |
| if not self.model: | |
| self.load_model() | |
| if not self.train_dataset or not self.val_dataset: | |
| self.tokenize_data() | |
| # Set training arguments | |
| training_args = TrainingArguments( | |
| output_dir=self.output_dir, | |
| num_train_epochs=num_train_epochs, | |
| per_device_train_batch_size=per_device_train_batch_size, | |
| per_device_eval_batch_size=per_device_eval_batch_size, | |
| learning_rate=learning_rate, | |
| weight_decay=weight_decay, | |
| warmup_ratio=warmup_ratio, | |
| evaluation_strategy="steps", | |
| eval_steps=eval_steps, | |
| logging_steps=logging_steps, | |
| save_steps=save_steps, | |
| load_best_model_at_end=load_best_model_at_end, | |
| metric_for_best_model="accuracy", | |
| save_total_limit=2, # Only keep the 2 best checkpoints | |
| ) | |
| # Move model to the correct device | |
| self.model.to(self.device) | |
| # Initialize the Trainer | |
| trainer = Trainer( | |
| model=self.model, | |
| args=training_args, | |
| train_dataset=self.train_dataset, | |
| eval_dataset=self.val_dataset, | |
| tokenizer=self.tokenizer, | |
| compute_metrics=self.compute_metrics, | |
| callbacks=[EarlyStoppingCallback(early_stopping_patience=3)] | |
| ) | |
| # Train the model | |
| print("Starting training...") | |
| trainer.train() | |
| # Save the best model | |
| trainer.save_model(self.output_dir) | |
| print(f"Model saved to {self.output_dir}") | |
| # Evaluate the model | |
| print("Evaluating model...") | |
| eval_results = trainer.evaluate() | |
| print(f"Evaluation results: {eval_results}") | |
| return trainer | |
| def main(): | |
| """ | |
| Main function to demonstrate the retraining process. | |
| """ | |
| # Define paths | |
| csv_path = "c:/Users/M/Desktop/repos/gotti/LLaMAVestor/src/logs/prepared_training_data.csv" | |
| output_dir = "c:/Users/M/Desktop/repos/gotti/LLaMAVestor/src/models/finetuned-roberta" | |
| # Initialize the retrainer | |
| retrainer = RobertaRetrainer( | |
| model_name="Farshid/roberta-large-financial-phrasebank-allagree1", | |
| output_dir=output_dir, | |
| csv_path=csv_path | |
| ) | |
| # Start the training process | |
| retrainer.load_data() | |
| retrainer.tokenize_data(max_length=128) | |
| trainer = retrainer.train( | |
| num_train_epochs=5, | |
| learning_rate=1e-5, | |
| per_device_train_batch_size=8, | |
| per_device_eval_batch_size=8 | |
| ) | |
| print("Training completed successfully!") | |
| if __name__ == "__main__": | |
| main() | |