Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| import logging | |
| import tempfile | |
| import requests | |
| from datetime import datetime | |
| import edge_tts | |
| import gradio as gr | |
| import torch | |
| from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
| from keybert import KeyBERT | |
| from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip | |
| import re | |
| import json | |
| import uuid | |
| import threading | |
| from queue import Queue | |
| import time | |
| # Configuración de logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Directorio persistente para archivos | |
| PERSIST_DIR = "persistent_data" | |
| os.makedirs(PERSIST_DIR, exist_ok=True) | |
| # Cola de procesamiento | |
| processing_queue = Queue() | |
| task_status = {} | |
| # Clase para manejar tareas | |
| class VideoTask: | |
| def __init__(self, task_id, prompt_type, input_text, musica_file=None): | |
| self.task_id = task_id | |
| self.prompt_type = prompt_type | |
| self.input_text = input_text | |
| self.musica_file = musica_file | |
| self.status = "pending" | |
| self.progress = 0 | |
| self.result = None | |
| self.error = None | |
| self.steps_completed = [] | |
| def to_dict(self): | |
| return { | |
| "task_id": self.task_id, | |
| "status": self.status, | |
| "progress": self.progress, | |
| "result": self.result, | |
| "error": self.error, | |
| "steps_completed": self.steps_completed | |
| } | |
| # Worker thread para procesar videos | |
| def video_processor_worker(): | |
| while True: | |
| try: | |
| task = processing_queue.get(timeout=1) | |
| if task is None: | |
| break | |
| logger.info(f"Procesando tarea: {task.task_id}") | |
| process_video_task(task) | |
| except: | |
| time.sleep(0.1) | |
| continue | |
| # Iniciar worker thread | |
| worker_thread = threading.Thread(target=video_processor_worker, daemon=True) | |
| worker_thread.start() | |
| def save_task_state(task): | |
| """Guarda el estado de la tarea en un archivo JSON""" | |
| task_file = os.path.join(PERSIST_DIR, f"{task.task_id}.json") | |
| with open(task_file, 'w') as f: | |
| json.dump(task.to_dict(), f) | |
| def load_task_state(task_id): | |
| """Carga el estado de una tarea desde archivo""" | |
| task_file = os.path.join(PERSIST_DIR, f"{task_id}.json") | |
| if os.path.exists(task_file): | |
| with open(task_file, 'r') as f: | |
| return json.load(f) | |
| return None | |
| def process_video_task(task): | |
| """Procesa una tarea de video paso a paso""" | |
| try: | |
| task.status = "processing" | |
| task.progress = 0 | |
| save_task_state(task) | |
| # Paso 1: Generar guión | |
| task.progress = 10 | |
| save_task_state(task) | |
| if task.prompt_type == "Generar Guion con IA": | |
| guion = generate_script_simple(task.input_text) | |
| else: | |
| guion = task.input_text.strip() | |
| task.steps_completed.append("guion_generado") | |
| task.progress = 20 | |
| save_task_state(task) | |
| # Paso 2: Generar audio TTS | |
| audio_path = os.path.join(PERSIST_DIR, f"{task.task_id}_audio.mp3") | |
| success = asyncio.run(text_to_speech_simple(guion, audio_path)) | |
| if not success: | |
| raise Exception("Error generando audio") | |
| task.steps_completed.append("audio_generado") | |
| task.progress = 40 | |
| save_task_state(task) | |
| # Paso 3: Buscar videos (simplificado) | |
| keywords = extract_keywords_simple(guion) | |
| video_urls = search_videos_simple(keywords) | |
| task.steps_completed.append("videos_encontrados") | |
| task.progress = 60 | |
| save_task_state(task) | |
| # Paso 4: Crear video final (simplificado) | |
| output_path = os.path.join(PERSIST_DIR, f"{task.task_id}_final.mp4") | |
| # Simulación de creación de video | |
| # En producción, aquí iría tu lógica de moviepy | |
| create_simple_video(video_urls, audio_path, output_path, task.musica_file) | |
| task.steps_completed.append("video_creado") | |
| task.progress = 100 | |
| task.status = "completed" | |
| task.result = output_path | |
| save_task_state(task) | |
| except Exception as e: | |
| logger.error(f"Error procesando tarea {task.task_id}: {str(e)}") | |
| task.status = "error" | |
| task.error = str(e) | |
| save_task_state(task) | |
| # Funciones simplificadas | |
| def generate_script_simple(prompt): | |
| """Versión simplificada de generación de guión""" | |
| # Aquí puedes usar tu lógica GPT-2 existente | |
| return f"Este es un video sobre {prompt}. Es fascinante y educativo." | |
| async def text_to_speech_simple(text, output_path): | |
| """TTS simplificado""" | |
| try: | |
| communicate = edge_tts.Communicate(text, "es-ES-JuanNeural") | |
| await communicate.save(output_path) | |
| return os.path.exists(output_path) | |
| except: | |
| return False | |
| def extract_keywords_simple(text): | |
| """Extracción simple de keywords""" | |
| words = text.lower().split() | |
| # Filtrar palabras comunes | |
| keywords = [w for w in words if len(w) > 4][:3] | |
| return keywords if keywords else ["nature", "video", "background"] | |
| def search_videos_simple(keywords): | |
| """Búsqueda simplificada de videos""" | |
| # Aquí iría tu lógica de Pexels | |
| # Por ahora retornamos URLs de ejemplo | |
| return ["video1.mp4", "video2.mp4"] | |
| def create_simple_video(video_urls, audio_path, output_path, music_path=None): | |
| """Creación simplificada de video""" | |
| # Aquí iría tu lógica de MoviePy | |
| # Por ahora creamos un archivo dummy | |
| with open(output_path, 'w') as f: | |
| f.write("dummy video content") | |
| time.sleep(2) # Simular procesamiento | |
| # Interfaz Gradio mejorada | |
| def submit_video_request(prompt_type, prompt_ia, prompt_manual, musica_file): | |
| """Envía una solicitud de video a la cola""" | |
| input_text = prompt_ia if prompt_type == "Generar Guion con IA" else prompt_manual | |
| if not input_text or not input_text.strip(): | |
| return None, "Por favor ingresa un texto" | |
| task_id = str(uuid.uuid4()) | |
| task = VideoTask(task_id, prompt_type, input_text, musica_file) | |
| # Guardar estado inicial | |
| task_status[task_id] = task | |
| save_task_state(task) | |
| # Añadir a la cola | |
| processing_queue.put(task) | |
| return task_id, f"Tarea creada: {task_id}" | |
| def check_video_status(task_id): | |
| """Verifica el estado de una tarea""" | |
| if not task_id: | |
| return "No hay ID de tarea", None, None | |
| # Intentar cargar desde archivo | |
| task_data = load_task_state(task_id) | |
| if not task_data: | |
| return "Tarea no encontrada", None, None | |
| status = task_data['status'] | |
| progress = task_data['progress'] | |
| if status == "pending": | |
| return f"⏳ En cola... ({progress}%)", None, None | |
| elif status == "processing": | |
| steps = ", ".join(task_data['steps_completed']) | |
| return f"🔄 Procesando... ({progress}%) - Completado: {steps}", None, None | |
| elif status == "completed": | |
| video_path = task_data['result'] | |
| if os.path.exists(video_path): | |
| return "✅ Video completado!", video_path, video_path | |
| else: | |
| return "❌ Video completado pero archivo no encontrado", None, None | |
| elif status == "error": | |
| return f"❌ Error: {task_data['error']}", None, None | |
| return "Estado desconocido", None, None | |
| # Interfaz Gradio | |
| with gr.Blocks(title="Generador de Videos con IA") as app: | |
| gr.Markdown("# 🎬 Generador de Videos con IA (Sistema de Cola)") | |
| with gr.Tabs(): | |
| with gr.TabItem("Crear Video"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| prompt_type = gr.Radio( | |
| ["Generar Guion con IA", "Usar Mi Guion"], | |
| label="Método de Entrada", | |
| value="Generar Guion con IA" | |
| ) | |
| prompt_ia = gr.Textbox( | |
| label="Tema para IA", | |
| placeholder="Describe el tema del video..." | |
| ) | |
| prompt_manual = gr.Textbox( | |
| label="Tu Guion Completo", | |
| placeholder="Escribe tu guion aquí...", | |
| visible=False | |
| ) | |
| musica_input = gr.Audio( | |
| label="Música de fondo (opcional)", | |
| type="filepath" | |
| ) | |
| submit_btn = gr.Button("📤 Enviar a Cola", variant="primary") | |
| with gr.Column(): | |
| task_id_output = gr.Textbox( | |
| label="ID de Tarea", | |
| interactive=False | |
| ) | |
| submit_status = gr.Textbox( | |
| label="Estado de Envío", | |
| interactive=False | |
| ) | |
| with gr.TabItem("Verificar Estado"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| task_id_input = gr.Textbox( | |
| label="ID de Tarea", | |
| placeholder="Pega aquí el ID de tu tarea..." | |
| ) | |
| check_btn = gr.Button("🔍 Verificar Estado") | |
| auto_check = gr.Checkbox( | |
| label="Verificar automáticamente cada 5 segundos" | |
| ) | |
| with gr.Column(): | |
| status_output = gr.Textbox( | |
| label="Estado Actual", | |
| interactive=False | |
| ) | |
| video_output = gr.Video( | |
| label="Video Generado", | |
| interactive=False | |
| ) | |
| download_output = gr.File( | |
| label="Descargar Video", | |
| interactive=False | |
| ) | |
| # Eventos | |
| prompt_type.change( | |
| lambda x: ( | |
| gr.update(visible=x == "Generar Guion con IA"), | |
| gr.update(visible=x == "Usar Mi Guion") | |
| ), | |
| inputs=prompt_type, | |
| outputs=[prompt_ia, prompt_manual] | |
| ) | |
| submit_btn.click( | |
| submit_video_request, | |
| inputs=[prompt_type, prompt_ia, prompt_manual, musica_input], | |
| outputs=[task_id_output, submit_status] | |
| ) | |
| check_btn.click( | |
| check_video_status, | |
| inputs=[task_id_input], | |
| outputs=[status_output, video_output, download_output] | |
| ) | |
| # Auto-check cada 5 segundos si está activado | |
| def auto_check_status(task_id, should_check): | |
| if should_check and task_id: | |
| return check_video_status(task_id) | |
| return gr.update(), gr.update(), gr.update() | |
| auto_check.change( | |
| lambda x: gr.update(visible=x), | |
| inputs=[auto_check], | |
| outputs=[check_btn] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch(server_name="0.0.0.0", server_port=7860) |