Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| import soundfile as sf | |
| import edge_tts | |
| import asyncio | |
| from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
| from keybert import KeyBERT | |
| from moviepy.editor import ( | |
| VideoFileClip, | |
| AudioFileClip, | |
| concatenate_videoclips, | |
| concatenate_audioclips, | |
| CompositeAudioClip, | |
| AudioClip, | |
| TextClip, | |
| CompositeVideoClip, | |
| VideoClip | |
| ) | |
| import numpy as np | |
| import json | |
| import logging | |
| import os | |
| import requests | |
| import re | |
| import math | |
| import tempfile | |
| import shutil | |
| import uuid | |
| import threading | |
| import time | |
| from datetime import datetime, timedelta | |
| # ------------------- Configuración de Timeout ------------------- | |
| os.environ["GRADIO_SERVER_TIMEOUT"] = "3800" # 30 minutos en segundos | |
| # ------------------- Configuración & Globals ------------------- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
| if not PEXELS_API_KEY: | |
| logger.warning("PEXELS_API_KEY no definido. Los videos no funcionarán.") | |
| tokenizer, gpt2_model, kw_model = None, None, None | |
| RESULTS_DIR = "video_results" | |
| os.makedirs(RESULTS_DIR, exist_ok=True) | |
| TASKS = {} | |
| # ------------------- Motor Edge TTS ------------------- | |
| class EdgeTTSEngine: | |
| def __init__(self, voice="es-ES-AlvaroNeural"): | |
| self.voice = voice | |
| logger.info(f"Inicializando Edge TTS con voz: {voice}") | |
| async def _synthesize_async(self, text, output_path): | |
| """Sintetiza texto a voz usando Edge TTS de forma asíncrona""" | |
| try: | |
| communicate = edge_tts.Communicate(text, self.voice) | |
| await communicate.save(output_path) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error en Edge TTS: {e}") | |
| return False | |
| def synthesize(self, text, output_path): | |
| """Sintetiza texto a voz (wrapper síncrono)""" | |
| try: | |
| # Ejecutar la función async en un nuevo loop | |
| return asyncio.run(self._synthesize_async(text, output_path)) | |
| except Exception as e: | |
| logger.error(f"Error al sintetizar con Edge TTS: {e}") | |
| return False | |
| # Instancia global del motor TTS | |
| tts_engine = EdgeTTSEngine() | |
| # ------------------- Carga Perezosa de Modelos ------------------- | |
| def get_tokenizer(): | |
| global tokenizer | |
| if tokenizer is None: | |
| logger.info("Cargando tokenizer GPT2 español...") | |
| tokenizer = GPT2Tokenizer.from_pretrained("datificate/gpt2-small-spanish") | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| return tokenizer | |
| def get_gpt2_model(): | |
| global gpt2_model | |
| if gpt2_model is None: | |
| logger.info("Cargando modelo GPT-2 español...") | |
| gpt2_model = GPT2LMHeadModel.from_pretrained("datificate/gpt2-small-spanish").eval() | |
| return gpt2_model | |
| def get_kw_model(): | |
| global kw_model | |
| if kw_model is None: | |
| logger.info("Cargando modelo KeyBERT multilingüe...") | |
| kw_model = KeyBERT("paraphrase-multilingual-MiniLM-L12-v2") | |
| return kw_model | |
| # ------------------- Funciones del Pipeline ------------------- | |
| def update_task_progress(task_id, message): | |
| if task_id in TASKS: | |
| TASKS[task_id]['progress_log'] = message | |
| logger.info(f"[{task_id}] {message}") | |
| def gpt2_script(prompt: str) -> str: | |
| """Genera un guión usando GPT-2""" | |
| try: | |
| local_tokenizer = get_tokenizer() | |
| local_gpt2_model = get_gpt2_model() | |
| instruction = f"Escribe un guion corto y coherente sobre: {prompt}" | |
| inputs = local_tokenizer(instruction, return_tensors="pt", truncation=True, max_length=512) | |
| outputs = local_gpt2_model.generate( | |
| **inputs, | |
| max_length=160 + inputs["input_ids"].shape[1], | |
| do_sample=True, | |
| top_p=0.9, | |
| top_k=40, | |
| temperature=0.7, | |
| no_repeat_ngram_size=3, | |
| pad_token_id=local_tokenizer.pad_token_id, | |
| eos_token_id=local_tokenizer.eos_token_id, | |
| ) | |
| text = local_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| generated = text.split("sobre:")[-1].strip() | |
| return generated if generated else prompt | |
| except Exception as e: | |
| logger.error(f"Error generando guión: {e}") | |
| return f"Hoy hablaremos sobre {prompt}. Este es un tema fascinante que merece nuestra atención." | |
| def generate_tts_audio(text: str, output_path: str) -> bool: | |
| """Genera audio usando Edge TTS""" | |
| try: | |
| logger.info("Generando audio con Edge TTS...") | |
| success = tts_engine.synthesize(text, output_path) | |
| if success and os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| logger.info(f"Audio generado exitosamente: {output_path}") | |
| return True | |
| else: | |
| logger.error("El archivo de audio no se generó correctamente") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error generando TTS: {e}") | |
| return False | |
| def extract_keywords(text: str) -> list[str]: | |
| """Extrae palabras clave del texto para búsqueda de videos""" | |
| try: | |
| local_kw_model = get_kw_model() | |
| clean_text = re.sub(r"[^\w\sáéíóúñÁÉÍÓÚÑ]", "", text.lower()) | |
| kws = local_kw_model.extract_keywords(clean_text, stop_words="spanish", top_n=5) | |
| keywords = [k.replace(" ", "+") for k, _ in kws if k] | |
| return keywords if keywords else ["naturaleza", "paisaje"] | |
| except Exception as e: | |
| logger.error(f"Error extrayendo keywords: {e}") | |
| return ["mystery", "conspiracy", "alien", "UFO", "secret", "cover-up", "illusion", "paranoia", | |
| "secret society", "lie", "simulation", "matrix", "terror", "darkness", "shadow", "enigma", | |
| "urban legend", "unknown", "hidden", "mistrust", "experiment", "government", "control", | |
| "surveillance", "propaganda", "deception", "whistleblower", "anomaly", "extraterrestrial", | |
| "shadow government", "cabal", "deep state", "new world order", "mind control", "brainwashing", | |
| "disinformation", "false flag", "assassin", "black ops", "anomaly", "men in black", "abduction", | |
| "hybrid", "ancient aliens", "hollow earth", "simulation theory", "alternate reality", "predictive programming", | |
| "symbolism", "occult", "eerie", "haunting", "unexplained", "forbidden knowledge", "redacted", "conspiracy theorist"] | |
| def search_pexels_videos(query: str, count: int = 3) -> list[dict]: | |
| """Busca videos en Pexels""" | |
| if not PEXELS_API_KEY: | |
| return [] | |
| try: | |
| response = requests.get( | |
| "https://api.pexels.com/videos/search", | |
| headers={"Authorization": PEXELS_API_KEY}, | |
| params={"query": query, "per_page": count, "orientation": "landscape"}, | |
| timeout=20 | |
| ) | |
| response.raise_for_status() | |
| return response.json().get("videos", []) | |
| except Exception as e: | |
| logger.error(f"Error buscando videos en Pexels: {e}") | |
| return [] | |
| def download_video(url: str, folder: str) -> str | None: | |
| """Descarga un video desde URL""" | |
| try: | |
| filename = f"{uuid.uuid4().hex}.mp4" | |
| filepath = os.path.join(folder, filename) | |
| with requests.get(url, stream=True, timeout=60) as response: | |
| response.raise_for_status() | |
| with open(filepath, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=1024*1024): | |
| f.write(chunk) | |
| if os.path.exists(filepath) and os.path.getsize(filepath) > 1000: | |
| return filepath | |
| else: | |
| logger.error(f"Archivo descargado inválido: {filepath}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error descargando video {url}: {e}") | |
| return None | |
| def create_subtitle_clips(script: str, video_width: int, video_height: int, duration: float): | |
| """Crea clips de subtítulos""" | |
| try: | |
| sentences = [s.strip() for s in re.split(r"[.!?¿¡]", script) if s.strip()] | |
| if not sentences: | |
| return [] | |
| total_words = sum(len(s.split()) for s in sentences) or 1 | |
| time_per_word = duration / total_words | |
| clips = [] | |
| current_time = 0.0 | |
| for sentence in sentences: | |
| num_words = len(sentence.split()) | |
| sentence_duration = num_words * time_per_word | |
| if sentence_duration < 0.5: | |
| continue | |
| txt_clip = ( | |
| TextClip( | |
| sentence, | |
| fontsize=max(20, int(video_height * 0.05)), | |
| color="white", | |
| stroke_color="black", | |
| stroke_width=2, | |
| method="caption", | |
| size=(int(video_width * 0.9), None), | |
| font="Arial-Bold" | |
| ) | |
| .set_start(current_time) | |
| .set_duration(sentence_duration) | |
| .set_position(("center", "bottom")) | |
| ) | |
| clips.append(txt_clip) | |
| current_time += sentence_duration | |
| return clips | |
| except Exception as e: | |
| logger.error(f"Error creando subtítulos: {e}") | |
| return [] | |
| def loop_audio_to_duration(audio_clip: AudioFileClip, target_duration: float) -> AudioFileClip: | |
| """Hace loop del audio hasta alcanzar la duración objetivo""" | |
| try: | |
| if audio_clip.duration >= target_duration: | |
| return audio_clip.subclip(0, target_duration) | |
| loops_needed = math.ceil(target_duration / audio_clip.duration) | |
| looped_audio = concatenate_audioclips([audio_clip] * loops_needed) | |
| return looped_audio.subclip(0, target_duration) | |
| except Exception as e: | |
| logger.error(f"Error haciendo loop del audio: {e}") | |
| return audio_clip | |
| def create_video(script_text: str, generate_script: bool, music_path: str | None, task_id: str) -> str: | |
| """Función principal para crear el video""" | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| # Paso 1: Generar o usar guión | |
| update_task_progress(task_id, "Paso 1/7: Preparando guión...") | |
| if generate_script: | |
| script = gpt2_script(script_text) | |
| else: | |
| script = script_text.strip() | |
| if not script: | |
| raise ValueError("El guión está vacío") | |
| # Paso 2: Generar audio TTS | |
| update_task_progress(task_id, "Paso 2/7: Generando audio con Edge TTS...") | |
| audio_path = os.path.join(temp_dir, "voice.wav") | |
| if not generate_tts_audio(script, audio_path): | |
| raise RuntimeError("Error generando el audio TTS") | |
| voice_clip = AudioFileClip(audio_path) | |
| video_duration = voice_clip.duration | |
| if video_duration < 1: | |
| raise ValueError("El audio generado es demasiado corto") | |
| # Paso 3: Buscar y descargar videos | |
| update_task_progress(task_id, "Paso 3/7: Buscando videos en Pexels...") | |
| video_paths = [] | |
| keywords = extract_keywords(script) | |
| for i, keyword in enumerate(keywords[:3]): # Límite de 3 keywords | |
| update_task_progress(task_id, f"Paso 3/7: Buscando videos para '{keyword}' ({i+1}/{len(keywords[:3])})") | |
| videos = search_pexels_videos(keyword, 2) | |
| for video_data in videos: | |
| if len(video_paths) >= 6: # Límite de 6 videos | |
| break | |
| video_files = video_data.get("video_files", []) | |
| if video_files: | |
| # Tomar el video de mejor calidad | |
| best_file = max(video_files, key=lambda f: f.get("width", 0)) | |
| video_url = best_file.get("link") | |
| if video_url: | |
| downloaded_path = download_video(video_url, temp_dir) | |
| if downloaded_path: | |
| video_paths.append(downloaded_path) | |
| if not video_paths: | |
| raise RuntimeError("No se pudieron descargar videos de Pexels") | |
| # Paso 4: Procesar videos | |
| update_task_progress(task_id, f"Paso 4/7: Procesando {len(video_paths)} videos...") | |
| video_clips = [] | |
| for path in video_paths: | |
| try: | |
| clip = VideoFileClip(path) | |
| # Tomar máximo 8 segundos de cada clip | |
| duration = min(8, clip.duration) | |
| video_clips.append(clip.subclip(0, duration)) | |
| except Exception as e: | |
| logger.error(f"Error procesando video {path}: {e}") | |
| continue | |
| if not video_clips: | |
| raise RuntimeError("No se pudieron procesar los videos") | |
| # Concatenar videos | |
| base_video = concatenate_videoclips(video_clips, method="chain") | |
| # Extender video si es más corto que el audio | |
| if base_video.duration < video_duration: | |
| loops_needed = math.ceil(video_duration / base_video.duration) | |
| base_video = concatenate_videoclips([base_video] * loops_needed) | |
| # Cortar al tiempo exacto del audio | |
| base_video = base_video.subclip(0, video_duration) | |
| # Paso 5: Componer audio final | |
| update_task_progress(task_id, "Paso 5/7: Componiendo audio...") | |
| if music_path and os.path.exists(music_path): | |
| try: | |
| music_clip = AudioFileClip(music_path) | |
| music_clip = loop_audio_to_duration(music_clip, video_duration).volumex(0.2) | |
| final_audio = CompositeAudioClip([music_clip, voice_clip]) | |
| except Exception as e: | |
| logger.error(f"Error con música: {e}") | |
| final_audio = voice_clip | |
| else: | |
| final_audio = voice_clip | |
| # Paso 7: Renderizar video final | |
| update_task_progress(task_id, "Paso 7/7: Renderizando video final...") | |
| final_video = base_video.set_audio(final_audio) | |
| output_path = os.path.join(RESULTS_DIR, f"video_{task_id}.mp4") | |
| final_video.write_videofile( | |
| output_path, | |
| fps=15, | |
| codec="mpeg4", | |
| audio_codec="mp3", | |
| threads=1, | |
| preset="ultrafast", | |
| logger=None, | |
| verbose=False | |
| ) | |
| # Limpiar clips | |
| voice_clip.close() | |
| if 'music_clip' in locals(): | |
| music_clip.close() | |
| base_video.close() | |
| final_video.close() | |
| for clip in video_clips: | |
| clip.close() | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Error creando video: {e}") | |
| raise | |
| finally: | |
| # Limpiar directorio temporal | |
| try: | |
| shutil.rmtree(temp_dir) | |
| except: | |
| pass | |
| def worker_thread(task_id: str, mode: str, topic: str, user_script: str, music_path: str | None): | |
| """Hilo worker para procesamiento de video""" | |
| try: | |
| generate_script = (mode == "Generar Guion con IA") | |
| content = topic if generate_script else user_script | |
| output_path = create_video(content, generate_script, music_path, task_id) | |
| TASKS[task_id].update({ | |
| "status": "done", | |
| "result": output_path, | |
| "progress_log": "✅ ¡Video completado exitosamente!" | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error en worker {task_id}: {e}") | |
| TASKS[task_id].update({ | |
| "status": "error", | |
| "error": str(e), | |
| "progress_log": f"❌ Error: {str(e)}" | |
| }) | |
| def generate_video_with_progress(mode, topic, user_script, music): | |
| """Función principal que maneja la generación con progreso en tiempo real""" | |
| # Validar entrada | |
| content = topic if mode == "Generar Guion con IA" else user_script | |
| if not content or not content.strip(): | |
| yield "❌ Error: Por favor, ingresa un tema o guion.", None, None | |
| return | |
| # Crear tarea | |
| task_id = uuid.uuid4().hex[:8] | |
| TASKS[task_id] = { | |
| "status": "processing", | |
| "progress_log": "🚀 Iniciando generación de video...", | |
| "timestamp": datetime.utcnow() | |
| } | |
| # Iniciar worker | |
| worker = threading.Thread( | |
| target=worker_thread, | |
| args=(task_id, mode, topic, user_script, music), | |
| daemon=True | |
| ) | |
| worker.start() | |
| # Monitorear progreso | |
| while TASKS[task_id]["status"] == "processing": | |
| yield TASKS[task_id]['progress_log'], None, None | |
| time.sleep(1) | |
| # Retornar resultado final | |
| if TASKS[task_id]["status"] == "error": | |
| yield TASKS[task_id]['progress_log'], None, None | |
| elif TASKS[task_id]["status"] == "done": | |
| result_path = TASKS[task_id]['result'] | |
| yield TASKS[task_id]['progress_log'], result_path, result_path | |
| # ------------------- Limpieza automática ------------------- | |
| def cleanup_old_files(): | |
| """Limpia archivos antiguos cada hora""" | |
| while True: | |
| try: | |
| time.sleep(6600) # 1 hora | |
| now = datetime.utcnow() | |
| logger.info("Ejecutando limpieza de archivos antiguos...") | |
| for task_id, info in list(TASKS.items()): | |
| if "timestamp" in info and now - info["timestamp"] > timedelta(hours=24): | |
| if info.get("result") and os.path.exists(info.get("result")): | |
| try: | |
| os.remove(info["result"]) | |
| logger.info(f"Archivo eliminado: {info['result']}") | |
| except Exception as e: | |
| logger.error(f"Error eliminando archivo: {e}") | |
| del TASKS[task_id] | |
| except Exception as e: | |
| logger.error(f"Error en cleanup: {e}") | |
| # Iniciar hilo de limpieza | |
| threading.Thread(target=cleanup_old_files, daemon=True).start() | |
| # ------------------- Interfaz Gradio ------------------- | |
| def toggle_input_fields(mode): | |
| """Alterna los campos de entrada según el modo seleccionado""" | |
| return ( | |
| gr.update(visible=mode == "Generar Guion con IA"), | |
| gr.update(visible=mode != "Generar Guion con IA") | |
| ) | |
| # Crear interfaz | |
| with gr.Blocks(title="🎬 Generador de Videos IA", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🎬 Generador de Videos con IA | |
| Crea videos profesionales a partir de texto usando: | |
| - **Edge TTS** para voz en español | |
| - **GPT-2** para generación de guiones | |
| - **Pexels API** para videos de stock | |
| - **Subtítulos automáticos** y efectos visuales | |
| El progreso se mostrará en tiempo real. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("### ⚙️ Configuración") | |
| mode_radio = gr.Radio( | |
| choices=["Generar Guion con IA", "Usar Mi Guion"], | |
| value="Generar Guion con IA", | |
| label="Método de creación" | |
| ) | |
| topic_input = gr.Textbox( | |
| label="💡 Tema para la IA", | |
| placeholder="Ej: Los misterios del océano profundo", | |
| lines=2 | |
| ) | |
| script_input = gr.Textbox( | |
| label="📝 Tu Guion Completo", | |
| placeholder="Escribe aquí tu guion personalizado...", | |
| lines=8, | |
| visible=False | |
| ) | |
| music_input = gr.Audio( | |
| type="filepath", | |
| label="🎵 Música de fondo (opcional)" | |
| ) | |
| generate_btn = gr.Button( | |
| "🎬 Generar Video", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### 📊 Progreso y Resultados") | |
| progress_output = gr.Textbox( | |
| label="📋 Log de progreso en tiempo real", | |
| lines=12, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| video_output = gr.Video( | |
| label="🎥 Video generado", | |
| height=400 | |
| ) | |
| download_output = gr.File( | |
| label="📥 Descargar archivo" | |
| ) | |
| # Event handlers | |
| mode_radio.change( | |
| fn=toggle_input_fields, | |
| inputs=[mode_radio], | |
| outputs=[topic_input, script_input] | |
| ) | |
| generate_btn.click( | |
| fn=generate_video_with_progress, | |
| inputs=[mode_radio, topic_input, script_input, music_input], | |
| outputs=[progress_output, video_output, download_output] | |
| ) | |
| gr.Markdown(""" | |
| ### 📋 Instrucciones: | |
| 1. **Elige el método**: Genera un guion con IA o usa el tuyo propio | |
| 2. **Configura el contenido**: Ingresa un tema interesante o tu guion | |
| 3. **Música opcional**: Sube un archivo de audio para fondo musical | |
| 4. **Genera**: Presiona el botón y observa el progreso en tiempo real | |
| ⏱️ **Tiempo estimado**: 2-5 minutos dependiendo de la duración del contenido. | |
| """) | |
| # Ejecutar aplicación | |
| if __name__ == "__main__": | |
| logger.info("🚀 Iniciando aplicación Generador de Videos IA...") | |
| # Configurar la cola (versión compatible) | |
| demo.queue(max_size=10) | |
| # Lanzar aplicación (parámetros básicos compatibles) | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_api=False, | |
| share=True | |
| ) |