|
|
""" |
|
|
Fine-tune emotion2vec+ on Portuguese BR emotion datasets (VERBO + emoUERJ). |
|
|
|
|
|
This script implements Option A from academic research: |
|
|
- Fine-tune emotion2vec+ (SOTA base model) |
|
|
- Train on VERBO (1,167 samples) + emoUERJ (377 samples) |
|
|
- Use data augmentation to improve generalization |
|
|
- Expected improvement: +5-10% accuracy on PT-BR data |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import numpy as np |
|
|
from transformers import ( |
|
|
Wav2Vec2Processor, |
|
|
Wav2Vec2ForSequenceClassification, |
|
|
TrainingArguments, |
|
|
Trainer |
|
|
) |
|
|
from datasets import load_dataset, concatenate_datasets, Audio |
|
|
import logging |
|
|
from pathlib import Path |
|
|
import argparse |
|
|
from typing import Dict, List, Any |
|
|
import librosa |
|
|
from dataclasses import dataclass |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
EMOTION_LABELS = { |
|
|
"neutral": 0, |
|
|
"happy": 1, |
|
|
"sad": 2, |
|
|
"angry": 3, |
|
|
"fearful": 4, |
|
|
"disgusted": 5, |
|
|
"surprised": 6 |
|
|
} |
|
|
|
|
|
LABEL_TO_ID = EMOTION_LABELS |
|
|
ID_TO_LABEL = {v: k for k, v in EMOTION_LABELS.items()} |
|
|
|
|
|
|
|
|
class AudioAugmenter: |
|
|
"""Data augmentation for audio to improve model robustness.""" |
|
|
|
|
|
@staticmethod |
|
|
def time_stretch(audio: np.ndarray, rate: float = 1.0) -> np.ndarray: |
|
|
"""Time stretching (slower/faster).""" |
|
|
return librosa.effects.time_stretch(audio, rate=rate) |
|
|
|
|
|
@staticmethod |
|
|
def pitch_shift(audio: np.ndarray, sr: int, n_steps: float = 0.0) -> np.ndarray: |
|
|
"""Pitch shifting.""" |
|
|
return librosa.effects.pitch_shift(audio, sr=sr, n_steps=n_steps) |
|
|
|
|
|
@staticmethod |
|
|
def add_noise(audio: np.ndarray, noise_factor: float = 0.005) -> np.ndarray: |
|
|
"""Add white noise.""" |
|
|
noise = np.random.randn(len(audio)) |
|
|
return audio + noise_factor * noise |
|
|
|
|
|
@staticmethod |
|
|
def augment(audio: np.ndarray, sr: int, augment_type: str = None) -> np.ndarray: |
|
|
"""Apply random augmentation.""" |
|
|
if augment_type == 'time_stretch': |
|
|
rate = np.random.uniform(0.9, 1.1) |
|
|
return AudioAugmenter.time_stretch(audio, rate) |
|
|
elif augment_type == 'pitch_shift': |
|
|
n_steps = np.random.uniform(-2, 2) |
|
|
return AudioAugmenter.pitch_shift(audio, sr, n_steps) |
|
|
elif augment_type == 'noise': |
|
|
return AudioAugmenter.add_noise(audio) |
|
|
else: |
|
|
return audio |
|
|
|
|
|
|
|
|
def load_verbo_dataset(): |
|
|
""" |
|
|
Load VERBO dataset (1,167 samples, 7 emotions). |
|
|
|
|
|
VERBO is a Brazilian Portuguese emotional speech corpus. |
|
|
Paper: "VERBO: A Corpus for Emotion Recognition in Brazilian Portuguese" |
|
|
|
|
|
Note: This dataset may need to be manually downloaded and prepared. |
|
|
""" |
|
|
logger.info("Loading VERBO dataset...") |
|
|
|
|
|
try: |
|
|
|
|
|
dataset = load_dataset("VERBO/emotion", split="train") |
|
|
logger.info(f"โ
VERBO loaded: {len(dataset)} samples") |
|
|
return dataset |
|
|
except: |
|
|
logger.warning("โ ๏ธ VERBO not available on HuggingFace") |
|
|
logger.info("Please download VERBO manually from: http://www02.smt.ufrj.br/~verbo/") |
|
|
logger.info("Or contact dataset authors for access") |
|
|
return None |
|
|
|
|
|
|
|
|
def load_emouej_dataset(): |
|
|
""" |
|
|
Load emoUERJ dataset (377 samples, 4 emotions). |
|
|
|
|
|
emoUERJ is a Brazilian Portuguese emotional speech dataset. |
|
|
Paper: "emoUERJ: A Deep Learning-Based Emotion Classifier for Brazilian Portuguese" |
|
|
|
|
|
Note: This dataset may need to be manually downloaded and prepared. |
|
|
""" |
|
|
logger.info("Loading emoUERJ dataset...") |
|
|
|
|
|
try: |
|
|
|
|
|
dataset = load_dataset("emoUERJ/emotion", split="train") |
|
|
logger.info(f"โ
emoUERJ loaded: {len(dataset)} samples") |
|
|
return dataset |
|
|
except: |
|
|
logger.warning("โ ๏ธ emoUERJ not available on HuggingFace") |
|
|
logger.info("Please download emoUERJ manually or contact dataset authors") |
|
|
return None |
|
|
|
|
|
|
|
|
def normalize_emotion_labels(dataset, emotion_field: str = "emotion"): |
|
|
""" |
|
|
Normalize emotion labels to standard 7-class format. |
|
|
|
|
|
Maps dataset-specific labels to: neutral, happy, sad, angry, fearful, disgusted, surprised |
|
|
""" |
|
|
def map_label(example): |
|
|
emotion = example[emotion_field].lower() |
|
|
|
|
|
|
|
|
emotion_map = { |
|
|
"neutro": "neutral", |
|
|
"neutral": "neutral", |
|
|
"alegria": "happy", |
|
|
"feliz": "happy", |
|
|
"happy": "happy", |
|
|
"tristeza": "sad", |
|
|
"triste": "sad", |
|
|
"sad": "sad", |
|
|
"raiva": "angry", |
|
|
"angry": "angry", |
|
|
"medo": "fearful", |
|
|
"fearful": "fearful", |
|
|
"nojo": "disgusted", |
|
|
"disgusted": "disgusted", |
|
|
"surpresa": "surprised", |
|
|
"surprised": "surprised" |
|
|
} |
|
|
|
|
|
normalized = emotion_map.get(emotion, "neutral") |
|
|
example["label"] = LABEL_TO_ID[normalized] |
|
|
example["emotion_text"] = normalized |
|
|
|
|
|
return example |
|
|
|
|
|
return dataset.map(map_label) |
|
|
|
|
|
|
|
|
def prepare_dataset(examples, processor, augment: bool = False): |
|
|
"""Prepare dataset for training.""" |
|
|
audio_arrays = examples["audio"] |
|
|
|
|
|
processed = [] |
|
|
for audio in audio_arrays: |
|
|
array = audio["array"] |
|
|
sr = audio["sampling_rate"] |
|
|
|
|
|
|
|
|
if sr != 16000: |
|
|
array = librosa.resample(array, orig_sr=sr, target_sr=16000) |
|
|
|
|
|
|
|
|
if augment and np.random.random() < 0.5: |
|
|
aug_type = np.random.choice(['time_stretch', 'pitch_shift', 'noise']) |
|
|
array = AudioAugmenter.augment(array, 16000, aug_type) |
|
|
|
|
|
processed.append(array) |
|
|
|
|
|
|
|
|
inputs = processor( |
|
|
processed, |
|
|
sampling_rate=16000, |
|
|
return_tensors="pt", |
|
|
padding=True, |
|
|
max_length=16000 * 10, |
|
|
truncation=True |
|
|
) |
|
|
|
|
|
inputs["labels"] = examples["label"] |
|
|
return inputs |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DataCollatorWithPadding: |
|
|
"""Custom data collator for audio data.""" |
|
|
processor: Wav2Vec2Processor |
|
|
|
|
|
def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, torch.Tensor]: |
|
|
|
|
|
input_values = [{"input_values": feature["input_values"]} for feature in features] |
|
|
labels = [feature["labels"] for feature in features] |
|
|
|
|
|
|
|
|
batch = self.processor.pad( |
|
|
input_values, |
|
|
padding=True, |
|
|
return_tensors="pt" |
|
|
) |
|
|
|
|
|
batch["labels"] = torch.tensor(labels) |
|
|
return batch |
|
|
|
|
|
|
|
|
def compute_metrics(eval_pred): |
|
|
"""Compute evaluation metrics.""" |
|
|
predictions, labels = eval_pred |
|
|
predictions = np.argmax(predictions, axis=1) |
|
|
|
|
|
accuracy = (predictions == labels).mean() |
|
|
|
|
|
|
|
|
per_class_acc = {} |
|
|
for label_id, label_name in ID_TO_LABEL.items(): |
|
|
mask = labels == label_id |
|
|
if mask.sum() > 0: |
|
|
per_class_acc[label_name] = (predictions[mask] == labels[mask]).mean() |
|
|
|
|
|
return { |
|
|
"accuracy": accuracy, |
|
|
**{f"accuracy_{k}": v for k, v in per_class_acc.items()} |
|
|
} |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Fine-tune emotion2vec on PT-BR datasets") |
|
|
parser.add_argument("--base-model", type=str, default="emotion2vec/emotion2vec_plus_large", |
|
|
help="Base model to fine-tune") |
|
|
parser.add_argument("--output-dir", type=str, default="models/emotion/emotion2vec_finetuned_ptbr", |
|
|
help="Output directory for fine-tuned model") |
|
|
parser.add_argument("--epochs", type=int, default=20, |
|
|
help="Number of training epochs") |
|
|
parser.add_argument("--batch-size", type=int, default=8, |
|
|
help="Training batch size") |
|
|
parser.add_argument("--learning-rate", type=float, default=3e-5, |
|
|
help="Learning rate") |
|
|
parser.add_argument("--augment", action="store_true", |
|
|
help="Use data augmentation") |
|
|
parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu", |
|
|
help="Device to use (cuda/cpu)") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
logger.info("=" * 60) |
|
|
logger.info("Fine-tuning emotion2vec on Portuguese BR datasets") |
|
|
logger.info("=" * 60) |
|
|
logger.info(f"Base model: {args.base_model}") |
|
|
logger.info(f"Device: {args.device}") |
|
|
logger.info(f"Epochs: {args.epochs}") |
|
|
logger.info(f"Batch size: {args.batch_size}") |
|
|
logger.info(f"Data augmentation: {args.augment}") |
|
|
|
|
|
|
|
|
verbo = load_verbo_dataset() |
|
|
emouej = load_emouej_dataset() |
|
|
|
|
|
if verbo is None and emouej is None: |
|
|
logger.error("โ No datasets available. Please download VERBO and/or emoUERJ manually.") |
|
|
logger.info("\nDataset sources:") |
|
|
logger.info("- VERBO: http://www02.smt.ufrj.br/~verbo/") |
|
|
logger.info("- emoUERJ: Contact authors or check university repository") |
|
|
return |
|
|
|
|
|
|
|
|
datasets = [] |
|
|
if verbo is not None: |
|
|
verbo = normalize_emotion_labels(verbo) |
|
|
datasets.append(verbo) |
|
|
if emouej is not None: |
|
|
emouej = normalize_emotion_labels(emouej) |
|
|
datasets.append(emouej) |
|
|
|
|
|
combined_dataset = concatenate_datasets(datasets) if len(datasets) > 1 else datasets[0] |
|
|
|
|
|
|
|
|
combined_dataset = combined_dataset.cast_column("audio", Audio(sampling_rate=16000)) |
|
|
|
|
|
|
|
|
split_dataset = combined_dataset.train_test_split(test_size=0.15, seed=42) |
|
|
train_dataset = split_dataset["train"] |
|
|
val_dataset = split_dataset["test"] |
|
|
|
|
|
logger.info(f"\n๐ Dataset statistics:") |
|
|
logger.info(f" Training samples: {len(train_dataset)}") |
|
|
logger.info(f" Validation samples: {len(val_dataset)}") |
|
|
|
|
|
|
|
|
logger.info(f"\n๐ Loading base model: {args.base_model}...") |
|
|
processor = Wav2Vec2Processor.from_pretrained(args.base_model) |
|
|
model = Wav2Vec2ForSequenceClassification.from_pretrained( |
|
|
args.base_model, |
|
|
num_labels=len(EMOTION_LABELS), |
|
|
id2label=ID_TO_LABEL, |
|
|
label2id=LABEL_TO_ID |
|
|
) |
|
|
|
|
|
|
|
|
logger.info("\n๐ Preprocessing datasets...") |
|
|
train_dataset = train_dataset.map( |
|
|
lambda x: prepare_dataset(x, processor, augment=args.augment), |
|
|
batched=True, |
|
|
remove_columns=train_dataset.column_names |
|
|
) |
|
|
val_dataset = val_dataset.map( |
|
|
lambda x: prepare_dataset(x, processor, augment=False), |
|
|
batched=True, |
|
|
remove_columns=val_dataset.column_names |
|
|
) |
|
|
|
|
|
|
|
|
output_dir = Path(args.output_dir) |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
training_args = TrainingArguments( |
|
|
output_dir=str(output_dir), |
|
|
evaluation_strategy="epoch", |
|
|
save_strategy="epoch", |
|
|
learning_rate=args.learning_rate, |
|
|
per_device_train_batch_size=args.batch_size, |
|
|
per_device_eval_batch_size=args.batch_size, |
|
|
num_train_epochs=args.epochs, |
|
|
warmup_ratio=0.1, |
|
|
logging_steps=10, |
|
|
load_best_model_at_end=True, |
|
|
metric_for_best_model="accuracy", |
|
|
push_to_hub=False, |
|
|
save_total_limit=2, |
|
|
fp16=args.device == "cuda", |
|
|
) |
|
|
|
|
|
|
|
|
data_collator = DataCollatorWithPadding(processor=processor) |
|
|
|
|
|
|
|
|
trainer = Trainer( |
|
|
model=model, |
|
|
args=training_args, |
|
|
train_dataset=train_dataset, |
|
|
eval_dataset=val_dataset, |
|
|
data_collator=data_collator, |
|
|
compute_metrics=compute_metrics, |
|
|
) |
|
|
|
|
|
|
|
|
logger.info("\n๐ Starting fine-tuning...") |
|
|
trainer.train() |
|
|
|
|
|
|
|
|
logger.info("\n๐ Final evaluation...") |
|
|
metrics = trainer.evaluate() |
|
|
logger.info(f"Validation accuracy: {metrics['eval_accuracy']:.4f}") |
|
|
|
|
|
|
|
|
logger.info(f"\n๐พ Saving fine-tuned model to {output_dir}...") |
|
|
trainer.save_model(str(output_dir)) |
|
|
processor.save_pretrained(str(output_dir)) |
|
|
|
|
|
logger.info("\nโ
Fine-tuning complete!") |
|
|
logger.info(f"Model saved to: {output_dir}") |
|
|
logger.info("\nTo use this model in the ensemble:") |
|
|
logger.info(f" Emotion2VecModel(model_name='{args.output_dir}', ...)") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|