alex commited on
Commit
00e7318
·
1 Parent(s): 7451ae5
Files changed (4) hide show
  1. app.py +4 -2
  2. requirements.txt +1 -1
  3. supertonic.py +113 -35
  4. time_util.py +9 -0
app.py CHANGED
@@ -54,15 +54,17 @@ import torchvision.transforms as transforms
54
  import torch.nn.functional as F
55
  from OmniAvatar.utils.audio_preprocess import add_silence_to_audio_ffmpeg
56
 
57
- from supertonic import generate_speech
58
 
59
  os.environ["PROCESSED_RESULTS"] = f"{os.getcwd()}/proprocess_results"
60
 
61
  def tts_from_text(text, tts_dir, voice_choice):
62
 
63
- output = generate_speech([text], tts_dir, voice_choice)[0]
 
64
  return output
65
 
 
66
  def speak_to_me(session_id, evt: gr.EventData):
67
  detail = getattr(evt, "data", None) or getattr(evt, "_data", {}) or {}
68
 
 
54
  import torch.nn.functional as F
55
  from OmniAvatar.utils.audio_preprocess import add_silence_to_audio_ffmpeg
56
 
57
+ from supertonic import generate_speech, load_text_to_speech
58
 
59
  os.environ["PROCESSED_RESULTS"] = f"{os.getcwd()}/proprocess_results"
60
 
61
  def tts_from_text(text, tts_dir, voice_choice):
62
 
63
+ text_to_speech = load_text_to_speech(True)
64
+ output = generate_speech(text_to_speech , [text], tts_dir, voice_choice)[0]
65
  return output
66
 
67
+ @spaces.GPU()
68
  def speak_to_me(session_id, evt: gr.EventData):
69
  detail = getattr(evt, "data", None) or getattr(evt, "_data", {}) or {}
70
 
requirements.txt CHANGED
@@ -17,4 +17,4 @@ gradio_extendedimage @ https://github.com/OutofAi/gradio-extendedimage/releases/
17
  gradio_extendedaudio @ https://github.com/OutofAi/gradio-extendedaudio/releases/download/0.0.5/gradio_extendedaudio-0.0.5-py3-none-any.whl
18
 
19
  flash-attn-3 @ https://huggingface.co/alexnasa/flash-attn-3/resolve/main/128/flash_attn_3-3.0.0b1-cp39-abi3-linux_x86_64.whl
20
- onnxruntime
 
17
  gradio_extendedaudio @ https://github.com/OutofAi/gradio-extendedaudio/releases/download/0.0.5/gradio_extendedaudio-0.0.5-py3-none-any.whl
18
 
19
  flash-attn-3 @ https://huggingface.co/alexnasa/flash-attn-3/resolve/main/128/flash_attn_3-3.0.0b1-cp39-abi3-linux_x86_64.whl
20
+ onnxruntime-gpu
supertonic.py CHANGED
@@ -1,14 +1,15 @@
1
  import json
2
  import os
3
  import time
4
- from contextlib import contextmanager
5
  from typing import Optional
6
  from unicodedata import normalize
7
-
8
  import numpy as np
9
  import onnxruntime as ort
10
  import soundfile as sf
11
  from huggingface_hub import snapshot_download
 
12
 
13
 
14
  class UnicodeProcessor:
@@ -87,24 +88,65 @@ class TextToSpeech:
87
  noisy_latent = noisy_latent * latent_mask
88
  return noisy_latent, latent_mask
89
 
 
90
  def _infer(
91
- self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05
 
 
 
 
 
 
 
92
  ) -> tuple[np.ndarray, np.ndarray]:
93
  assert (
94
  len(text_list) == style.ttl.shape[0]
95
  ), "Number of texts must match number of style vectors"
96
  bsz = len(text_list)
 
97
  text_ids, text_mask = self.text_processor(text_list)
98
- dur_onnx, *_ = self.dp_ort.run(
 
 
99
  None, {"text_ids": text_ids, "style_dp": style.dp, "text_mask": text_mask}
100
  )
101
- dur_onnx = dur_onnx / speed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
102
  text_emb_onnx, *_ = self.text_enc_ort.run(
103
  None,
104
  {"text_ids": text_ids, "style_ttl": style.ttl, "text_mask": text_mask},
105
- ) # dur_onnx: [bsz]
106
- xt, latent_mask = self.sample_noisy_latent(dur_onnx)
 
107
  total_step_np = np.array([total_step] * bsz, dtype=np.float32)
 
108
  for step in range(total_step):
109
  current_step = np.array([step] * bsz, dtype=np.float32)
110
  xt, *_ = self.vector_est_ort.run(
@@ -119,8 +161,30 @@ class TextToSpeech:
119
  "total_step": total_step_np,
120
  },
121
  )
 
122
  wav, *_ = self.vocoder_ort.run(None, {"latent": xt})
123
- return wav, dur_onnx
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
124
 
125
  def __call__(
126
  self,
@@ -149,10 +213,6 @@ class TextToSpeech:
149
  dur_cat += dur_onnx + silence_duration
150
  return wav_cat, dur_cat
151
 
152
- def batch(
153
- self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05
154
- ) -> tuple[np.ndarray, np.ndarray]:
155
- return self._infer(text_list, style, total_step, speed)
156
 
157
 
158
  def length_to_mask(lengths: np.ndarray, max_len: Optional[int] = None) -> np.ndarray:
@@ -219,11 +279,17 @@ def load_text_processor(onnx_dir: str) -> UnicodeProcessor:
219
  text_processor = UnicodeProcessor(unicode_indexer_path)
220
  return text_processor
221
 
 
 
 
 
 
 
 
222
 
223
- def load_text_to_speech(onnx_dir: str, use_gpu: bool = False) -> TextToSpeech:
224
  opts = ort.SessionOptions()
225
  if use_gpu:
226
- raise NotImplementedError("GPU mode is not fully tested")
227
  else:
228
  providers = ["CPUExecutionProvider"]
229
  print("Using CPU for inference")
@@ -268,12 +334,6 @@ def load_voice_style(voice_style_paths: list[str], verbose: bool = False) -> Sty
268
  return Style(ttl_style, dp_style)
269
 
270
 
271
- @contextmanager
272
- def timer(name: str):
273
- start = time.time()
274
- print(f"{name}...")
275
- yield
276
- print(f" -> {name} completed in {time.time() - start:.2f} sec")
277
 
278
 
279
  def sanitize_filename(text: str, max_len: int) -> str:
@@ -327,11 +387,19 @@ def chunk_text(text: str, max_len: int = 300) -> list[str]:
327
 
328
  return chunks
329
 
330
- model_dir = snapshot_download("Supertone/supertonic")
331
- onnx_dir = f"{model_dir}/onnx"
332
- text_to_speech = load_text_to_speech(onnx_dir, False)
333
-
334
- def generate_speech(text_list, save_dir, voice_style="M1", total_step=5, speed=1.05, n_test=1, batch=None):
 
 
 
 
 
 
 
 
335
 
336
  saved_files_list = []
337
 
@@ -345,20 +413,30 @@ def generate_speech(text_list, save_dir, voice_style="M1", total_step=5, speed=1
345
  style = load_voice_style(voice_style_paths, verbose=True)
346
 
347
  for n in range(n_test):
348
- print(f"\n[{n+1}/{n_test}] Starting synthesis...")
349
- with timer("Generating speech from text"):
350
- if batch:
351
- wav, duration = text_to_speech.batch(text_list, style, total_step, speed)
352
- else:
353
- wav, duration = text_to_speech(text_list[0], style, total_step, speed)
 
 
 
 
 
 
 
 
 
 
354
  if not os.path.exists(save_dir):
355
  os.makedirs(save_dir)
 
356
  for b in range(bsz):
357
- fname = f"{sanitize_filename(text_list[b], 20)}_{n+1}.wav"
358
- w = wav[b, : int(text_to_speech.sample_rate * duration[b].item())] # [T_trim]
 
359
  sf.write(os.path.join(save_dir, fname), w, text_to_speech.sample_rate)
360
  saved_files_list.append(f"{save_dir}/{fname}")
361
- # print(f"Saved: {save_dir}/{fname}")
362
- print("\n=== Synthesis completed successfully! ===")
363
 
364
  return saved_files_list
 
1
  import json
2
  import os
3
  import time
4
+ from time_util import timer
5
  from typing import Optional
6
  from unicodedata import normalize
7
+ import uuid
8
  import numpy as np
9
  import onnxruntime as ort
10
  import soundfile as sf
11
  from huggingface_hub import snapshot_download
12
+ from typing import Optional, Union
13
 
14
 
15
  class UnicodeProcessor:
 
88
  noisy_latent = noisy_latent * latent_mask
89
  return noisy_latent, latent_mask
90
 
91
+
92
  def _infer(
93
+ self,
94
+ text_list: list[str],
95
+ style: Style,
96
+ total_step: int,
97
+ speed: float = 1.05,
98
+ suggested_duration: Optional[Union[float, list[float], np.ndarray]] = None,
99
+ speed_min_factor: float = 0.75,
100
+ speed_max_factor: float = 1.2,
101
  ) -> tuple[np.ndarray, np.ndarray]:
102
  assert (
103
  len(text_list) == style.ttl.shape[0]
104
  ), "Number of texts must match number of style vectors"
105
  bsz = len(text_list)
106
+
107
  text_ids, text_mask = self.text_processor(text_list)
108
+
109
+ # 1) Predict base duration
110
+ dur_pred, *_ = self.dp_ort.run(
111
  None, {"text_ids": text_ids, "style_dp": style.dp, "text_mask": text_mask}
112
  )
113
+ dur_pred = np.array(dur_pred, dtype=np.float32).reshape(bsz) # (bsz,)
114
+
115
+ # 2) Adjust duration based on suggested_duration (if given)
116
+ if suggested_duration is not None:
117
+ sugg = np.array(suggested_duration, dtype=np.float32)
118
+ if sugg.ndim == 0:
119
+ # same suggestion for all
120
+ sugg = np.full((bsz,), float(sugg), dtype=np.float32)
121
+ else:
122
+ sugg = sugg.reshape(bsz)
123
+
124
+ eps = 1e-3
125
+ sugg = np.clip(sugg, eps, None)
126
+
127
+ # we want dur_used ≈ sugg
128
+ # dur_used = dur_pred / speed_used => speed_target = dur_pred / sugg
129
+ speed_target = dur_pred / sugg
130
+
131
+ speed_min = speed * speed_min_factor
132
+ speed_max = speed * speed_max_factor
133
+ speed_used = np.clip(speed_target, speed_min, speed_max)
134
+
135
+ dur_used = dur_pred / speed_used
136
+ else:
137
+ # default behaviour
138
+ speed_used = np.full((bsz,), speed, dtype=np.float32)
139
+ dur_used = dur_pred / speed_used
140
+
141
+ # 3) Continue as before, using dur_used
142
  text_emb_onnx, *_ = self.text_enc_ort.run(
143
  None,
144
  {"text_ids": text_ids, "style_ttl": style.ttl, "text_mask": text_mask},
145
+ )
146
+
147
+ xt, latent_mask = self.sample_noisy_latent(dur_used)
148
  total_step_np = np.array([total_step] * bsz, dtype=np.float32)
149
+
150
  for step in range(total_step):
151
  current_step = np.array([step] * bsz, dtype=np.float32)
152
  xt, *_ = self.vector_est_ort.run(
 
161
  "total_step": total_step_np,
162
  },
163
  )
164
+
165
  wav, *_ = self.vocoder_ort.run(None, {"latent": xt})
166
+ return wav, dur_used
167
+
168
+ def batch(
169
+ self,
170
+ text_list: list[str],
171
+ style: Style,
172
+ total_step: int,
173
+ speed: float = 1.05,
174
+ suggested_duration: Optional[Union[float, list[float], np.ndarray]] = None,
175
+ speed_min_factor: float = 0.75,
176
+ speed_max_factor: float = 1.2,
177
+ ) -> tuple[np.ndarray, np.ndarray]:
178
+ return self._infer(
179
+ text_list,
180
+ style,
181
+ total_step,
182
+ speed=speed,
183
+ suggested_duration=suggested_duration,
184
+ speed_min_factor=speed_min_factor,
185
+ speed_max_factor=speed_max_factor,
186
+ )
187
+
188
 
189
  def __call__(
190
  self,
 
213
  dur_cat += dur_onnx + silence_duration
214
  return wav_cat, dur_cat
215
 
 
 
 
 
216
 
217
 
218
  def length_to_mask(lengths: np.ndarray, max_len: Optional[int] = None) -> np.ndarray:
 
279
  text_processor = UnicodeProcessor(unicode_indexer_path)
280
  return text_processor
281
 
282
+ # text_to_speech = load_text_to_speech(False)
283
+
284
+
285
+ model_dir = snapshot_download("Supertone/supertonic")
286
+ onnx_dir = f"{model_dir}/onnx"
287
+
288
+ def load_text_to_speech(use_gpu: bool = False) -> TextToSpeech:
289
 
 
290
  opts = ort.SessionOptions()
291
  if use_gpu:
292
+ providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
293
  else:
294
  providers = ["CPUExecutionProvider"]
295
  print("Using CPU for inference")
 
334
  return Style(ttl_style, dp_style)
335
 
336
 
 
 
 
 
 
 
337
 
338
 
339
  def sanitize_filename(text: str, max_len: int) -> str:
 
387
 
388
  return chunks
389
 
390
+ def generate_speech(
391
+ text_to_speech,
392
+ text_list,
393
+ save_dir,
394
+ voice_style="M1",
395
+ total_step=5,
396
+ speed=1.05,
397
+ n_test=1,
398
+ batch=None,
399
+ suggested_durations=None, # NEW: list/np.ndarray of seconds, len == len(text_list)
400
+ speed_min_factor=0.75,
401
+ speed_max_factor=1.2,
402
+ ):
403
 
404
  saved_files_list = []
405
 
 
413
  style = load_voice_style(voice_style_paths, verbose=True)
414
 
415
  for n in range(n_test):
416
+ if batch:
417
+ wav, duration = text_to_speech.batch(
418
+ text_list,
419
+ style,
420
+ total_step,
421
+ speed=speed,
422
+ suggested_duration=suggested_durations,
423
+ speed_min_factor=speed_min_factor,
424
+ speed_max_factor=speed_max_factor,
425
+ )
426
+ else:
427
+ # optional: could support suggested_durations[0] here too
428
+ wav, duration = text_to_speech(
429
+ text_list[0], style, total_step, speed
430
+ )
431
+
432
  if not os.path.exists(save_dir):
433
  os.makedirs(save_dir)
434
+
435
  for b in range(bsz):
436
+ unique = uuid.uuid4().hex[:8]
437
+ fname = f"{sanitize_filename(text_list[b], 20)}_{unique}_{n+1}.wav"
438
+ w = wav[b, : int(text_to_speech.sample_rate * duration[b].item())]
439
  sf.write(os.path.join(save_dir, fname), w, text_to_speech.sample_rate)
440
  saved_files_list.append(f"{save_dir}/{fname}")
 
 
441
 
442
  return saved_files_list
time_util.py ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ from contextlib import contextmanager
3
+
4
+ @contextmanager
5
+ def timer(name: str):
6
+ start = time.time()
7
+ print(f"{name}...")
8
+ yield
9
+ print(f" -> {name} completed in {time.time() - start:.2f} sec")