Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| from huggingface_hub import hf_hub_download | |
| from torch.nn import Linear, Sequential, Tanh | |
| import soundfile as sf | |
| import edge_tts | |
| from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
| from keybert import KeyBERT | |
| from moviepy.editor import ( | |
| VideoFileClip, | |
| AudioFileClip, | |
| concatenate_videoclips, | |
| concatenate_audioclips, | |
| CompositeAudioClip, | |
| AudioClip, | |
| TextClip, | |
| CompositeVideoClip, | |
| VideoClip | |
| ) | |
| import numpy as np | |
| import json | |
| import logging | |
| import os | |
| import requests | |
| import re | |
| import math | |
| import tempfile | |
| import shutil | |
| import uuid | |
| import threading | |
| import time | |
| from datetime import datetime, timedelta | |
| # ------------------- CÓDIGO DEL MOTOR TOUCANTTS (Integrado) ------------------- | |
| # Este bloque contiene las funciones y clases extraídas para que el TTS funcione sin archivos externos. | |
| # --- Contenido de Utility/utils.py --- | |
| def float2pcm(sig, dtype='int16'): | |
| sig = np.asarray(sig) | |
| if sig.dtype.kind != 'f': | |
| raise TypeError("'sig' must be a float array") | |
| dtype = np.dtype(dtype) | |
| if dtype.kind not in 'iu': | |
| raise TypeError("'dtype' must be an integer type") | |
| i = np.iinfo(dtype) | |
| abs_max = 2 ** (i.bits - 1) | |
| offset = i.min + abs_max | |
| return (sig * abs_max + offset).clip(i.min, i.max).astype(dtype) | |
| def load_json_from_path(path): | |
| with open(path, "r") as f: | |
| return json.load(f) | |
| # --- Contenido de InferenceInterfaces/ToucanTTS.py (simplificado) y ControllableInterface.py --- | |
| # Se han omitido y simplificado partes para reducir la complejidad, manteniendo la funcionalidad esencial. | |
| # La carga completa del modelo ToucanTTS se hace a través de hf_hub_download, por lo que no es necesario el código completo aquí. | |
| # La clase ControllableInterface es una adaptación de la original. | |
| class EdgeTTSInterface: | |
| def __init__(self, voice="es-ES-AlvaroNeural"): # puedes cambiar a "es-ES-ElviraNeural" | |
| self.voice = voice | |
| def read(self, text, language="es", accent=None): | |
| tmp_path = tempfile.mktemp(suffix=".wav") | |
| async def _synth(): | |
| communicate = edge_tts.Communicate(text, self.voice) | |
| await communicate.save(tmp_path) | |
| asyncio.run(_synth()) | |
| # cargar el wav en numpy | |
| wav, sr = sf.read(tmp_path, dtype="float32") | |
| return sr, wav | |
| def get_tts_interface(): | |
| global tts_interface | |
| if tts_interface is None: | |
| tts_interface = EdgeTTSInterface() | |
| return tts_interface | |
| # ------------------- Configuración & Globals ------------------- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
| if not PEXELS_API_KEY: | |
| raise RuntimeError("Debes definir PEXELS_API_KEY en 'Settings' -> 'Variables & secrets'") | |
| tokenizer, gpt2_model, kw_model, tts_interface = None, None, None, None | |
| RESULTS_DIR = "video_results" | |
| os.makedirs(RESULTS_DIR, exist_ok=True) | |
| TASKS = {} | |
| # ------------------- Carga Perezosa de Modelos ------------------- | |
| def get_tokenizer(): | |
| global tokenizer | |
| if tokenizer is None: | |
| logger.info("Cargando tokenizer (primera vez)...") | |
| tokenizer = GPT2Tokenizer.from_pretrained("datificate/gpt2-small-spanish") | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| return tokenizer | |
| def get_gpt2_model(): | |
| global gpt2_model | |
| if gpt2_model is None: | |
| logger.info("Cargando modelo GPT-2 (primera vez)...") | |
| gpt2_model = GPT2LMHeadModel.from_pretrained("datificate/gpt2-small-spanish").eval() | |
| return gpt2_model | |
| def get_kw_model(): | |
| global kw_model | |
| if kw_model is None: | |
| logger.info("Cargando modelo KeyBERT (primera vez)...") | |
| kw_model = KeyBERT("paraphrase-multilingual-MiniLM-L12-v2") | |
| return kw_model | |
| def get_tts_interface(): | |
| # Esta función ahora es un punto de entrada para el motor ToucanTTS | |
| # La carga real se hará dentro de la función de síntesis para manejar el primer uso | |
| # De momento, la dejamos como placeholder por si se necesita inicializar algo globalmente | |
| pass | |
| # ------------------- Funciones del Pipeline de Vídeo ------------------- | |
| def update_task_progress(task_id, message): | |
| if task_id in TASKS: | |
| TASKS[task_id]['progress_log'] = message | |
| logger.info(f"[{task_id}] {message}") | |
| def gpt2_script(prompt: str) -> str: | |
| local_tokenizer = get_tokenizer() | |
| local_gpt2_model = get_gpt2_model() | |
| instruction = f"Escribe un guion corto y coherente sobre: {prompt}" | |
| inputs = local_tokenizer(instruction, return_tensors="pt", truncation=True, max_length=512) | |
| outputs = local_gpt2_model.generate( | |
| **inputs, | |
| max_length=160 + inputs["input_ids"].shape[1], | |
| do_sample=True, | |
| top_p=0.9, | |
| top_k=40, | |
| temperature=0.7, | |
| no_repeat_ngram_size=3, | |
| pad_token_id=local_tokenizer.pad_token_id, | |
| eos_token_id=local_tokenizer.eos_token_id, | |
| ) | |
| text = local_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| return text.split("sobre:")[-1].strip() | |
| def toucan_tts_synth(text: str, path: str): | |
| """Sintetiza audio usando el motor ToucanTTS.""" | |
| # En un entorno real, la inicialización de ToucanTTSInterface sería aquí para lazy loading | |
| # Por simplicidad y para depurar, la dejaremos en el worker principal | |
| # Esta función ahora solo llama al motor | |
| sr, wav = get_tts_interface().read(text) | |
| sf.write(path, float2pcm(wav), sr) | |
| def keywords(text: str) -> list[str]: | |
| local_kw_model = get_kw_model() | |
| clean_text = re.sub(r"[^\w\sáéíóúñÁÉÍÓÚÑ]", "", text.lower()) | |
| kws = local_kw_model.extract_keywords(clean_text, stop_words="spanish", top_n=5) | |
| return [k.replace(" ", "+") for k, _ in kws if k] or ["naturaleza"] | |
| def pexels_search(query: str, count: int) -> list[dict]: | |
| res = requests.get( | |
| "https://api.pexels.com/videos/search", | |
| headers={"Authorization": PEXELS_API_KEY}, | |
| params={"query": query, "per_page": count, "orientation": "landscape"}, | |
| timeout=20 | |
| ) | |
| res.raise_for_status() | |
| return res.json().get("videos", []) | |
| def download_file(url: str, folder: str) -> str | None: | |
| try: | |
| name = uuid.uuid4().hex + ".mp4" | |
| path = os.path.join(folder, name) | |
| with requests.get(url, stream=True, timeout=60) as r: | |
| r.raise_for_status() | |
| with open(path, "wb") as f: | |
| for chunk in r.iter_content(1024 * 1024): | |
| f.write(chunk) | |
| return path if os.path.exists(path) and os.path.getsize(path) > 1000 else None | |
| except Exception as e: | |
| logger.error(f"Fallo al descargar {url}: {e}") | |
| return None | |
| def loop_audio(audio_clip: AudioFileClip, duration: float) -> AudioFileClip: | |
| if audio_clip.duration >= duration: | |
| return audio_clip.subclip(0, duration) | |
| loops = math.ceil(duration / audio_clip.duration) | |
| return concatenate_audioclips([audio_clip] * loops).subclip(0, duration) | |
| def make_subtitle_clips(script: str, video_w: int, video_h: int, duration: float): | |
| sentences = [s.strip() for s in re.split(r"[.!?¿¡]", script) if s.strip()] | |
| if not sentences: | |
| return [] | |
| total_words = sum(len(s.split()) for s in sentences) or 1 | |
| time_per_word = duration / total_words | |
| clips, current_time = [], 0.0 | |
| for sentence in sentences: | |
| num_words = len(sentence.split()) | |
| sentence_duration = num_words * time_per_word | |
| if sentence_duration < 0.1: | |
| continue | |
| txt_clip = ( | |
| TextClip( | |
| sentence, | |
| fontsize=int(video_h * 0.05), | |
| color="white", | |
| stroke_color="black", | |
| stroke_width=1.5, | |
| method="caption", | |
| size=(int(video_w * 0.9), None), | |
| font="Arial-Bold" | |
| ) | |
| .set_start(current_time) | |
| .set_duration(sentence_duration) | |
| .set_position(("center", "bottom")) | |
| ) | |
| clips.append(txt_clip) | |
| current_time += sentence_duration | |
| return clips | |
| def make_grain_clip(size: tuple[int, int], duration: float): | |
| w, h = size | |
| def make_frame(t): | |
| noise = np.random.randint(0, 40, (h, w, 1), dtype=np.uint8) | |
| return np.repeat(noise, 3, axis=2) | |
| return VideoClip(make_frame, duration=duration).set_opacity(0.15) | |
| def build_video(script_text: str, generate_script_flag: bool, music_path: str | None, task_id: str) -> str: | |
| tmp_dir = tempfile.mkdtemp() | |
| try: | |
| update_task_progress(task_id, "Paso 1/7: Generando guion...") | |
| script = gpt2_script(script_text) if generate_script_flag else script_text.strip() | |
| update_task_progress(task_id, "Paso 2/7: Creando audio con ToucanTTS...") | |
| voice_path = os.path.join(tmp_dir, "voice.wav") | |
| toucan_tts_synth(script, voice_path) | |
| voice_clip = AudioFileClip(voice_path) | |
| video_duration = voice_clip.duration | |
| if video_duration < 1: | |
| raise ValueError("El audio generado es demasiado corto.") | |
| update_task_progress(task_id, "Paso 3/7: Buscando clips en Pexels...") | |
| video_paths = [] | |
| kws = keywords(script) | |
| for i, kw in enumerate(kws): | |
| update_task_progress(task_id, f"Paso 3/7: Buscando... (keyword {i+1}/{len(kws)}: '{kw}')") | |
| if len(video_paths) >= 8: | |
| break | |
| for video_data in pexels_search(kw, 2): | |
| best_file = max( | |
| video_data.get("video_files", []), | |
| key=lambda f: f.get("width", 0) | |
| ) | |
| if best_file: | |
| path = download_file(best_file.get('link'), tmp_dir) | |
| if path: | |
| video_paths.append(path) | |
| if len(video_paths) >= 8: | |
| break | |
| if not video_paths: | |
| raise RuntimeError("No se encontraron vídeos en Pexels.") | |
| update_task_progress(task_id, f"Paso 4/7: Ensamblando {len(video_paths)} clips...") | |
| segments = [ | |
| VideoFileClip(p).subclip(0, min(8, VideoFileClip(p).duration)) | |
| for p in video_paths | |
| ] | |
| base_video = concatenate_videoclips(segments, method="chain") | |
| if base_video.duration < video_duration: | |
| base_video = concatenate_videoclips([base_video] * math.ceil(video_duration / base_video.duration)) | |
| base_video = base_video.subclip(0, video_duration) | |
| update_task_progress(task_id, "Paso 5/7: Componiendo audio final...") | |
| if music_path: | |
| music_clip = loop_audio(AudioFileClip(music_path), video_duration).volumex(0.20) | |
| final_audio = CompositeAudioClip([music_clip, voice_clip]) | |
| else: | |
| final_audio = voice_clip | |
| update_task_progress(task_id, "Paso 6/7: Añadiendo subtítulos y efectos...") | |
| subtitles = make_subtitle_clips(script, base_video.w, base_video.h, video_duration) | |
| grain_effect = make_grain_clip(base_video.size, video_duration) | |
| update_task_progress(task_id, "Paso 7/7: Renderizando vídeo final (esto puede tardar)...") | |
| final_video = CompositeVideoClip([base_video, grain_effect, *subtitles]).set_audio(final_audio) | |
| output_path = os.path.join(tmp_dir, "final_video.mp4") | |
| final_video.write_videofile( | |
| output_path, | |
| fps=24, | |
| codec="libx264", | |
| audio_codec="aac", | |
| threads=2, | |
| logger=None | |
| ) | |
| return output_path | |
| finally: | |
| if 'voice_clip' in locals(): | |
| voice_clip.close() | |
| if 'music_clip' in locals(): | |
| music_clip.close() | |
| if 'base_video' in locals(): | |
| base_video.close() | |
| if 'final_video' in locals(): | |
| final_video.close() | |
| if 'segments' in locals(): | |
| for seg in segments: | |
| seg.close() | |
| def worker(task_id: str, mode: str, topic: str, user_script: str, music: str | None): | |
| # Carga del motor TTS aquí, para que ocurra dentro del hilo de trabajo y no bloquee el arranque global | |
| global tts_interface | |
| if tts_interface is None: | |
| update_task_progress(task_id, "Cargando motor de voz ToucanTTS (primera vez, puede tardar)...") | |
| try: | |
| # Aquí necesitamos importar dinámicamente o asegurar que las dependencias estén | |
| # en un lugar accesible para la carga del modelo. | |
| # Este es un punto complejo que requiere que el modelo esté disponible | |
| # en el path de python. | |
| update_task_progress(task_id, "Simulando carga de TTS para evitar error de importación complejo.") | |
| # Para una solución real, el código de ToucanTTS tendría que estar en el path. | |
| # get_tts_interface() | |
| except Exception as e: | |
| TASKS[task_id].update({"status": "error", "error": f"Fallo al cargar el motor TTS: {e}"}) | |
| return | |
| try: | |
| text = topic if mode == "Generar Guion con IA" else user_script | |
| # Como ToucanTTS no está completamente integrado, simularemos un error por ahora. | |
| except Exception as e: | |
| logger.error(f"Error en el worker para la tarea {task_id}: {e}", exc_info=True) | |
| TASKS[task_id].update({"status": "error", "error": str(e)}) | |
| def janitor_thread(): | |
| while True: | |
| time.sleep(3600) | |
| now = datetime.utcnow() | |
| logger.info("[JANITOR] Realizando limpieza de vídeos antiguos...") | |
| for task_id, info in list(TASKS.items()): | |
| if "timestamp" in info and now - info["timestamp"] > timedelta(hours=24): | |
| if info.get("result") and os.path.exists(info.get("result")): | |
| try: | |
| os.remove(info["result"]) | |
| logger.info(f"[JANITOR] Eliminado: {info['result']}") | |
| except Exception as e: | |
| logger.error(f"[JANITOR] Error al eliminar {info['result']}: {e}") | |
| del TASKS[task_id] | |
| threading.Thread(target=janitor_thread, daemon=True).start() | |
| def generate_and_monitor(mode, topic, user_script, music): | |
| content = topic if mode == "Generar Guion con IA" else user_script | |
| if not content.strip(): | |
| yield "Por favor, ingresa un tema o guion.", None, None | |
| return | |
| task_id = uuid.uuid4().hex[:8] | |
| TASKS[task_id] = { | |
| "status": "processing", | |
| "progress_log": "Iniciando tarea...", | |
| "timestamp": datetime.utcnow() | |
| } | |
| worker_thread = threading.Thread( | |
| target=worker, | |
| args=(task_id, mode, topic, user_script, music), | |
| daemon=True | |
| ) | |
| worker_thread.start() | |
| while TASKS[task_id]["status"] == "processing": | |
| yield TASKS[task_id]['progress_log'], None, None | |
| time.sleep(1) | |
| if TASKS[task_id]["status"] == "error": | |
| yield f"❌ Error: {TASKS[task_id]['error']}", None, None | |
| elif TASKS[task_id]["status"] == "done": | |
| yield "✅ ¡Vídeo completado!", TASKS[task_id]['result'], TASKS[task_id]['result'] | |
| # Interfaz Gradio | |
| with gr.Blocks(title="Generador de Vídeos IA", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🎬 Generador de Vídeos con IA") | |
| gr.Markdown("Crea vídeos a partir de texto con voz, música y efectos visuales. El progreso se mostrará en tiempo real.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| mode_radio = gr.Radio( | |
| ["Generar Guion con IA", "Usar Mi Guion"], | |
| value="Generar Guion con IA", | |
| label="Elige el método" | |
| ) | |
| topic_textbox = gr.Textbox( | |
| label="Tema para la IA", | |
| placeholder="Ej: La exploración espacial y sus desafíos" | |
| ) | |
| script_textbox = gr.Textbox( | |
| label="Tu Guion Completo", | |
| lines=5, | |
| visible=False, | |
| placeholder="Pega aquí tu guion..." | |
| ) | |
| music_upload = gr.Audio(type="filepath", label="Música de fondo (opcional)") | |
| submit_button = gr.Button("✨ Generar Vídeo", variant="primary") | |
| with gr.Column(scale=2): | |
| gr.Markdown("## Progreso y Resultados") | |
| progress_log = gr.Textbox( | |
| label="Log de Progreso en Tiempo Real", | |
| lines=10, | |
| interactive=False | |
| ) | |
| video_output = gr.Video(label="Resultado del Vídeo") | |
| download_file_output = gr.File(label="Descargar Fichero") | |
| def toggle_textboxes(mode): | |
| return ( | |
| gr.update(visible=mode == "Generar Guion con IA"), | |
| gr.update(visible=mode != "Generar Guion con IA") | |
| ) | |
| mode_radio.change( | |
| toggle_textboxes, | |
| inputs=mode_radio, | |
| outputs=[topic_textbox, script_textbox] | |
| ) | |
| submit_button.click( | |
| fn=generate_and_monitor, | |
| inputs=[mode_radio, topic_textbox, script_textbox, music_upload], | |
| outputs=[progress_log, video_output, download_file_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |