Spaces:
Build error
Build error
| import os | |
| import random | |
| import torch | |
| import torch.nn.functional as F | |
| import torch.utils.data | |
| from torch import LongTensor | |
| from tqdm import tqdm | |
| import torchaudio | |
| from pypinyin import Style, lazy_pinyin | |
| import math | |
| from ttts.gpt.voice_tokenizer import VoiceBpeTokenizer | |
| from ttts.utils.infer_utils import load_model | |
| import json | |
| import os | |
| def padding_to_8(x): | |
| l = x.shape[-1] | |
| l = (math.floor(l / 8) + 1) * 8 | |
| x = torch.nn.functional.pad(x, (0, l-x.shape[-1])) | |
| return x | |
| def read_jsonl(path): | |
| with open(path, 'r') as f: | |
| json_str = f.read() | |
| data_list = [] | |
| for line in json_str.splitlines(): | |
| data = json.loads(line) | |
| data_list.append(data) | |
| return data_list | |
| def write_jsonl(path, all_paths): | |
| with open(path,'w', encoding='utf-8') as file: | |
| for item in all_paths: | |
| json.dump(item, file, ensure_ascii=False) | |
| file.write('\n') | |
| def padding_to_8(x): | |
| l = x.shape[-1] | |
| l = (math.floor(l / 8) + 1) * 8 | |
| x = torch.nn.functional.pad(x, (0, l-x.shape[-1])) | |
| return x | |
| class DiffusionDataset(torch.utils.data.Dataset): | |
| def __init__(self, opt): | |
| self.jsonl_path = opt['dataset']['path'] | |
| self.audiopaths_and_text = read_jsonl(self.jsonl_path) | |
| self.tok = VoiceBpeTokenizer('ttts/gpt/gpt_tts_tokenizer.json') | |
| def __getitem__(self, index): | |
| # Fetch text and add start/stop tokens. | |
| audiopath_and_text = self.audiopaths_and_text[index] | |
| audiopath, text = audiopath_and_text['path'], audiopath_and_text['text'] | |
| text = ' '.join(lazy_pinyin(text, style=Style.TONE3, neutral_tone_with_five=True)) | |
| text = self.tok.encode(text) | |
| text_tokens = LongTensor(text) | |
| try: | |
| mel_path = audiopath + '.mel.pth' | |
| mel_raw = torch.load(mel_path)[0] | |
| quant_path = audiopath + '.melvq.pth' | |
| mel_codes = LongTensor(torch.load(quant_path)[0]) | |
| except: | |
| return None | |
| # Define the number of frames for the random crop (adjust as needed) | |
| crop_frames = random.randint(int(mel_raw.shape[1] // 4), int(mel_raw.shape[1] // 4 * 3)) | |
| # Ensure the crop doesn't exceed the length of the original audio | |
| max_start_frame = mel_raw.shape[1] - crop_frames | |
| start_frame = random.randint(0, max_start_frame) | |
| # Perform the random crop | |
| mel_refer = mel_raw[:, start_frame: start_frame + crop_frames] | |
| mel_refer = padding_to_8(mel_refer) | |
| # split = random.randint(int(mel_raw.shape[1]//3), int(mel_raw.shape[1]//3*2)) | |
| # if random.random()>0.5: | |
| # mel_refer = mel_raw[:,split:] | |
| # else: | |
| # mel_refer = mel_raw[:,:split] | |
| # if mel_refer.shape[1]>200: | |
| # mel_refer = mel_refer[:,:200] | |
| #text_token mel_codes | |
| if mel_raw.shape[1]>400: | |
| mel_raw = mel_raw[:,:400] | |
| mel_codes = mel_codes[:100] | |
| if mel_codes.shape[-1]%2==1: | |
| mel_codes = mel_codes[:-1] | |
| mel_raw = mel_raw[:,:-4] | |
| return text_tokens, mel_codes, mel_raw, mel_refer | |
| def __len__(self): | |
| return len(self.audiopaths_and_text) | |
| class DiffusionCollater(): | |
| def __init__(self): | |
| pass | |
| def __call__(self, batch): | |
| batch = [x for x in batch if x is not None] | |
| if len(batch)==0: | |
| return None | |
| text_lens = [len(x[0]) for x in batch] | |
| max_text_len = max(text_lens) | |
| mel_code_lens = [len(x[1]) for x in batch] | |
| max_mel_code_len = max(mel_code_lens) | |
| mel_lens = [x[2].shape[1] for x in batch] | |
| max_mel_len = max(mel_lens) | |
| mel_refer_lens = [x[3].shape[1] for x in batch] | |
| max_mel_refer_len = max(mel_refer_lens) | |
| texts = [] | |
| mel_codes = [] | |
| mels = [] | |
| mel_refers = [] | |
| # This is the sequential "background" tokens that are used as padding for text tokens, as specified in the DALLE paper. | |
| for b in batch: | |
| text_token, mel_code, mel, mel_refer = b | |
| texts.append(F.pad(text_token,(0,max_text_len-len(text_token)), value=0)) | |
| mel_codes.append(F.pad(mel_code,(0,max_mel_code_len-len(mel_code)), value=0)) | |
| mels.append(F.pad(mel,(0, max_mel_len-mel.shape[1]), value=0)) | |
| mel_refers.append(F.pad(mel_refer,(0, max_mel_refer_len-mel_refer.shape[1]), value=0)) | |
| padded_text = torch.stack(texts) | |
| padded_mel_code = torch.stack(mel_codes) | |
| padded_mel = torch.stack(mels) | |
| padded_mel_refer = torch.stack(mel_refers) | |
| return { | |
| 'padded_text': padded_text, | |
| 'padded_mel_code': padded_mel_code, | |
| 'padded_mel': padded_mel, | |
| 'mel_lengths': LongTensor(mel_lens), | |
| 'padded_mel_refer':padded_mel_refer, | |
| 'mel_refer_lengths':LongTensor(mel_refer_lens) | |
| } | |
| if __name__ == '__main__': | |
| params = { | |
| 'mode': 'gpt_tts', | |
| 'path': 'E:\\audio\\LJSpeech-1.1\\ljs_audio_text_train_filelist.txt', | |
| 'phase': 'train', | |
| 'n_workers': 0, | |
| 'batch_size': 16, | |
| 'mel_vocab_size': 512, | |
| } | |
| cfg = json.load(open('ttts/diffusion/config.json')) | |
| ds = DiffusionDataset(cfg) | |
| dl = torch.utils.data.DataLoader(ds, **cfg['dataloader'], collate_fn=DiffusionCollater()) | |
| i = 0 | |
| m = [] | |
| max_text = 0 | |
| max_mel = 0 | |
| for b in tqdm(dl): | |
| break | |