Spaces:
Runtime error
Runtime error
| import json | |
| import math | |
| import os | |
| import numpy as np | |
| from torch.utils.data import Dataset | |
| from text import text_to_sequence | |
| from utils.tools import pad_1D, pad_2D | |
| class Dataset(Dataset): | |
| def __init__( | |
| self, filename, preprocess_config, train_config, sort=False, drop_last=False | |
| ): | |
| self.dataset_name = preprocess_config["dataset"] | |
| self.preprocessed_path = preprocess_config["path"]["preprocessed_path"] | |
| self.cleaners = preprocess_config["preprocessing"]["text"]["text_cleaners"] | |
| self.batch_size = train_config["optimizer"]["batch_size"] | |
| self.basename, self.speaker, self.text, self.raw_text, self.emotion = self.process_meta( | |
| filename | |
| ) | |
| with open(os.path.join(self.preprocessed_path, "speakers.json")) as f: | |
| self.speaker_map = json.load(f) | |
| with open(os.path.join(self.preprocessed_path, "emotions.json")) as f: | |
| self.emotion_map = json.load(f) | |
| self.sort = sort | |
| self.drop_last = drop_last | |
| def __len__(self): | |
| return len(self.text) | |
| def __getitem__(self, idx): | |
| basename = self.basename[idx] | |
| speaker = self.speaker[idx] | |
| emotion = self.emotion[idx] | |
| speaker_id = self.speaker_map[speaker] | |
| emotion_id = self.emotion_map[emotion] | |
| raw_text = self.raw_text[idx] | |
| phone = np.array(text_to_sequence(self.text[idx], self.cleaners)) | |
| mel_path = os.path.join( | |
| self.preprocessed_path, | |
| "mel", | |
| "{}-mel-{}.npy".format(speaker, basename), | |
| ) | |
| mel = np.load(mel_path) | |
| pitch_path = os.path.join( | |
| self.preprocessed_path, | |
| "pitch", | |
| "{}-pitch-{}.npy".format(speaker, basename), | |
| ) | |
| pitch = np.load(pitch_path) | |
| energy_path = os.path.join( | |
| self.preprocessed_path, | |
| "energy", | |
| "{}-energy-{}.npy".format(speaker, basename), | |
| ) | |
| energy = np.load(energy_path) | |
| duration_path = os.path.join( | |
| self.preprocessed_path, | |
| "duration", | |
| "{}-duration-{}.npy".format(speaker, basename), | |
| ) | |
| duration = np.load(duration_path) | |
| sample = { | |
| "id": basename, | |
| "speaker": speaker_id, | |
| "emotion": emotion_id, | |
| "text": phone, | |
| "raw_text": raw_text, | |
| "mel": mel, | |
| "pitch": pitch, | |
| "energy": energy, | |
| "duration": duration, | |
| } | |
| return sample | |
| def process_meta(self, filename): | |
| with open( | |
| os.path.join(self.preprocessed_path, filename), "r", encoding="utf-8" | |
| ) as f: | |
| name = [] | |
| speaker = [] | |
| emotion = [] | |
| text = [] | |
| raw_text = [] | |
| for line in f.readlines(): | |
| n, s, t, r, e = line.strip("\n").split("|") | |
| name.append(n) | |
| speaker.append(s) | |
| text.append(t) | |
| raw_text.append(r) | |
| emotion.append(e) | |
| return name, speaker, text, raw_text, emotion | |
| def reprocess(self, data, idxs): | |
| ids = [data[idx]["id"] for idx in idxs] | |
| speakers = [data[idx]["speaker"] for idx in idxs] | |
| emotions = [data[idx]["emotion"] for idx in idxs] | |
| texts = [data[idx]["text"] for idx in idxs] | |
| raw_texts = [data[idx]["raw_text"] for idx in idxs] | |
| mels = [data[idx]["mel"] for idx in idxs] | |
| pitches = [data[idx]["pitch"] for idx in idxs] | |
| energies = [data[idx]["energy"] for idx in idxs] | |
| durations = [data[idx]["duration"] for idx in idxs] | |
| text_lens = np.array([text.shape[0] for text in texts]) | |
| mel_lens = np.array([mel.shape[0] for mel in mels]) | |
| speakers = np.array(speakers) | |
| emotions = np.array(emotions) | |
| texts = pad_1D(texts) | |
| mels = pad_2D(mels) | |
| pitches = pad_1D(pitches) | |
| energies = pad_1D(energies) | |
| durations = pad_1D(durations) | |
| return ( | |
| ids, | |
| raw_texts, | |
| speakers, | |
| texts, | |
| text_lens, | |
| max(text_lens), | |
| emotions, | |
| mels, | |
| mel_lens, | |
| max(mel_lens), | |
| pitches, | |
| energies, | |
| durations, | |
| ) | |
| def collate_fn(self, data): | |
| data_size = len(data) | |
| if self.sort: | |
| len_arr = np.array([d["text"].shape[0] for d in data]) | |
| idx_arr = np.argsort(-len_arr) | |
| else: | |
| idx_arr = np.arange(data_size) | |
| tail = idx_arr[len(idx_arr) - (len(idx_arr) % self.batch_size):] | |
| idx_arr = idx_arr[: len(idx_arr) - (len(idx_arr) % self.batch_size)] | |
| idx_arr = idx_arr.reshape((-1, self.batch_size)).tolist() | |
| if not self.drop_last and len(tail) > 0: | |
| idx_arr += [tail.tolist()] | |
| output = list() | |
| for idx in idx_arr: | |
| output.append(self.reprocess(data, idx)) | |
| return output | |
| class TextDataset(Dataset): | |
| def __init__(self, filepath, preprocess_config): | |
| self.cleaners = preprocess_config["preprocessing"]["text"]["text_cleaners"] | |
| self.basename, self.speaker, self.text, self.raw_text, self.emotion = self.process_meta( | |
| filepath | |
| ) | |
| with open( | |
| os.path.join( | |
| preprocess_config["path"]["preprocessed_path"], "speakers.json" | |
| ) | |
| ) as f: | |
| self.speaker_map = json.load(f) | |
| with open(os.path.join( | |
| preprocess_config["path"]["preprocessed"], "emotions.json" | |
| ) | |
| ) as f: | |
| self.emotion_map = json.load(f) | |
| def __len__(self): | |
| return len(self.text) | |
| def __getitem__(self, idx): | |
| basename = self.basename[idx] | |
| speaker = self.speaker[idx] | |
| speaker_id = self.speaker_map[speaker] | |
| raw_text = self.raw_text[idx] | |
| emotion = self.emotion[idx] | |
| phone = np.array(text_to_sequence(self.text[idx], self.cleaners)) | |
| return (basename, speaker_id, phone, raw_text, emotion) | |
| def process_meta(self, filename): | |
| with open(filename, "r", encoding="utf-8") as f: | |
| name = [] | |
| speaker = [] | |
| text = [] | |
| raw_text = [] | |
| emotion = [] | |
| for line in f.readlines(): | |
| n, s, t, r, e = line.strip("\n").split("|") | |
| name.append(n) | |
| speaker.append(s) | |
| text.append(t) | |
| raw_text.append(r) | |
| emotion.append(e) | |
| return name, speaker, text, raw_text, emotion | |
| def collate_fn(self, data): | |
| ids = [d[0] for d in data] | |
| speakers = np.array([d[1] for d in data]) | |
| texts = [d[2] for d in data] | |
| raw_texts = [d[3] for d in data] | |
| emotions = [d[4] for d in data] | |
| text_lens = np.array([text.shape[0] for text in texts]) | |
| texts = pad_1D(texts) | |
| return ids, raw_texts, speakers, texts, emotions, text_lens, max(text_lens) | |
| if __name__ == "__main__": | |
| # Test | |
| import torch | |
| import yaml | |
| from torch.utils.data import DataLoader | |
| from utils.utils import to_device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| preprocess_config = yaml.load( | |
| open("./config/LJSpeech/preprocess.yaml", "r"), Loader=yaml.FullLoader | |
| ) | |
| train_config = yaml.load( | |
| open("./config/LJSpeech/train.yaml", "r"), Loader=yaml.FullLoader | |
| ) | |
| train_dataset = Dataset( | |
| "train.txt", preprocess_config, train_config, sort=True, drop_last=True | |
| ) | |
| val_dataset = Dataset( | |
| "val.txt", preprocess_config, train_config, sort=False, drop_last=False | |
| ) | |
| train_loader = DataLoader( | |
| train_dataset, | |
| batch_size=train_config["optimizer"]["batch_size"] * 4, | |
| shuffle=True, | |
| collate_fn=train_dataset.collate_fn, | |
| ) | |
| val_loader = DataLoader( | |
| val_dataset, | |
| batch_size=train_config["optimizer"]["batch_size"], | |
| shuffle=False, | |
| collate_fn=val_dataset.collate_fn, | |
| ) | |
| n_batch = 0 | |
| for batchs in train_loader: | |
| for batch in batchs: | |
| to_device(batch, device) | |
| n_batch += 1 | |
| print( | |
| "Training set with size {} is composed of {} batches.".format( | |
| len(train_dataset), n_batch | |
| ) | |
| ) | |
| n_batch = 0 | |
| for batchs in val_loader: | |
| for batch in batchs: | |
| to_device(batch, device) | |
| n_batch += 1 | |
| print( | |
| "Validation set with size {} is composed of {} batches.".format( | |
| len(val_dataset), n_batch | |
| ) | |
| ) | |