Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| import requests | |
| import gradio as gr | |
| from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeVideoClip | |
| from moviepy.audio.fx.all import audio_loop | |
| import edge_tts | |
| import asyncio | |
| from datetime import datetime | |
| from pathlib import Path | |
| from transformers import pipeline | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| # Pexels API key from environment variable | |
| PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
| if not PEXELS_API_KEY: | |
| logger.error("PEXELS_API_KEY no encontrada en variables de entorno") | |
| logger.info("Loaded PEXELS_API_KEY from environment") | |
| # Ensure asyncio works with Gradio | |
| def run_async(coro): | |
| logger.info("Running async coroutine") | |
| try: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| result = loop.run_until_complete(coro) | |
| loop.close() | |
| logger.info("Async coroutine completed") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error en run_async: {e}") | |
| return None | |
| # Load lightweight text generation model for Spanish | |
| logger.info("Loading text generation model: facebook/mbart-large-50") | |
| try: | |
| generator = pipeline("text-generation", model="facebook/mbart-large-50", device="cpu") | |
| logger.info("Model loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Error loading model: {e}") | |
| generator = None | |
| # List available Spanish voices for Edge TTS | |
| SPANISH_VOICES = [ | |
| "es-MX-DaliaNeural", | |
| "es-MX-JorgeNeural", | |
| "es-MX-CecilioNeural", | |
| "es-MX-BeatrizNeural", | |
| "es-MX-CandelaNeural", | |
| "es-MX-CarlosNeural", | |
| "es-MX-LarissaNeural", | |
| "es-MX-ManuelNeural", | |
| "es-MX-MarinaNeural", | |
| "es-MX-NuriaNeural" | |
| ] | |
| # Fetch videos from Pexels | |
| def fetch_pexels_videos(query, num_videos=5): | |
| logger.info(f"Fetching {num_videos} videos from Pexels with query: {query}") | |
| headers = {"Authorization": PEXELS_API_KEY} | |
| url = f"https://api.pexels.com/videos/search?query={query}&per_page={num_videos}" | |
| try: | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| videos = [video["video_files"][0]["link"] for video in response.json()["videos"]] | |
| logger.info(f"Fetched {len(videos)} videos from Pexels") | |
| return videos | |
| logger.error(f"Failed to fetch videos from Pexels. Status code: {response.status_code}") | |
| except Exception as e: | |
| logger.error(f"Error fetching videos from Pexels: {e}") | |
| return [] | |
| # Generate script using local model or custom text | |
| def generate_script(prompt, custom_text=None): | |
| logger.info("Generating script") | |
| if custom_text and custom_text.strip(): | |
| logger.info("Using custom text provided by user") | |
| return custom_text.strip() | |
| if not prompt or not prompt.strip(): | |
| logger.error("No prompt or custom text provided") | |
| return "Error: Debes proporcionar un prompt o un guion personalizado." | |
| # Generate script with local model | |
| if generator: | |
| input_text = f"Genera un guion para un video sobre '{prompt}'. Crea una lista numerada con descripciones breves (máximo 20 palabras por ítem) para un top 10 relacionado con el tema en español." | |
| logger.info(f"Generating script with prompt: {prompt}") | |
| try: | |
| result = generator(input_text, max_length=300, num_return_sequences=1, do_sample=True, truncation=True)[0]['generated_text'] | |
| logger.info("Script generated successfully") | |
| return result.strip() | |
| except Exception as e: | |
| logger.error(f"Error generating script: {e}") | |
| # Fallback mock response | |
| logger.info("Using fallback mock response") | |
| if "recetas" in prompt.lower(): | |
| return """ | |
| 1. Tacos al pastor: Jugosa carne marinada con piña. | |
| 2. Lasagna: Capas de pasta, carne y queso fundido. | |
| 3. Sushi: Arroz y pescado fresco en rollos delicados. | |
| 4. Pizza casera: Masa crujiente con tus ingredientes favoritos. | |
| 5. Paella: Arroz con mariscos y azafrán. | |
| 6. Ceviche: Pescado fresco marinado en limón. | |
| 7. Ramen: Caldo rico con fideos y cerdo. | |
| 8. Tiramisú: Postre cremoso con café y mascarpone. | |
| 9. Enchiladas: Tortillas rellenas con salsa picante. | |
| 10. Curry: Especias intensas con carne o vegetales. | |
| """ | |
| return f"Top 10 sobre {prompt}: No se pudo generar un guion específico." | |
| # Generate voice using Edge TTS | |
| async def generate_voice(text, output_file="output.mp3"): | |
| if not text or len(text.strip()) < 10: | |
| logger.error("Texto demasiado corto para generar voz") | |
| return None | |
| logger.info(f"Generating voice with Edge TTS") | |
| try: | |
| # Seleccionar una voz aleatoria | |
| voice = random.choice(SPANISH_VOICES) | |
| logger.info(f"Using voice: {voice}") | |
| communicate = edge_tts.Communicate(text, voice=voice) | |
| await communicate.save(output_file) | |
| logger.info(f"Voice generated and saved to {output_file}") | |
| return output_file | |
| except Exception as e: | |
| logger.error(f"Error generating voice: {e}") | |
| # Intentar con otra voz si falla | |
| for voice in SPANISH_VOICES: | |
| try: | |
| logger.info(f"Trying with voice: {voice}") | |
| communicate = edge_tts.Communicate(text, voice=voice) | |
| await communicate.save(output_file) | |
| logger.info(f"Voice generated with backup voice {voice}") | |
| return output_file | |
| except: | |
| continue | |
| logger.error("All voice attempts failed") | |
| return None | |
| # Download and trim video | |
| def download_and_trim_video(url, duration, output_path): | |
| logger.info(f"Downloading video from {url}") | |
| try: | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| with open("temp_video.mp4", "wb") as f: | |
| for chunk in response.iter_content(chunk_size=1024): | |
| f.write(chunk) | |
| logger.info("Trimming video") | |
| clip = VideoFileClip("temp_video.mp4") | |
| clip_duration = min(duration, clip.duration) | |
| clip = clip.subclip(0, clip_duration) | |
| clip.write_videofile(output_path, codec="libx264", audio_codec="aac", threads=4) | |
| clip.close() | |
| os.remove("temp_video.mp4") | |
| logger.info(f"Video trimmed and saved to {output_path}") | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Error downloading/trimming video: {e}") | |
| return None | |
| # Main video creation function | |
| def create_video(prompt, custom_text, music_file): | |
| logger.info("Starting video creation process") | |
| # Validar inputs | |
| if not prompt and not custom_text: | |
| logger.error("No prompt or custom text provided") | |
| return "Error: Debes proporcionar un prompt o un guion personalizado." | |
| output_dir = "output_videos" | |
| os.makedirs(output_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_video = f"{output_dir}/video_{timestamp}.mp4" | |
| logger.info(f"Output video will be saved to {output_video}") | |
| # Generate or use provided script | |
| script = generate_script(prompt, custom_text) | |
| if "Error" in script: | |
| logger.error(script) | |
| return script | |
| # Generate voice | |
| voice_file = "temp_audio.mp3" | |
| voice_result = run_async(generate_voice(script, voice_file)) | |
| if not voice_result or not os.path.exists(voice_file): | |
| logger.error("Voice generation failed") | |
| return "Error: No se pudo generar la voz. Intenta con un texto diferente." | |
| try: | |
| audio = AudioFileClip(voice_file) | |
| video_duration = audio.duration | |
| logger.info(f"Audio duration: {video_duration} seconds") | |
| except Exception as e: | |
| logger.error(f"Error loading audio file: {e}") | |
| return "Error: El archivo de audio generado es inválido." | |
| # Fetch Pexels videos | |
| query = prompt.split()[0] if prompt else "generic" | |
| video_urls = fetch_pexels_videos(query, num_videos=5) | |
| if not video_urls: | |
| logger.error("No videos found on Pexels") | |
| return "Error: No se encontraron videos en Pexels." | |
| # Download and trim videos | |
| clips = [] | |
| for i, url in enumerate(video_urls): | |
| clip_path = f"temp_clip_{i}.mp4" | |
| result = download_and_trim_video(url, video_duration / len(video_urls), clip_path) | |
| if result: | |
| try: | |
| clip = VideoFileClip(clip_path) | |
| clips.append(clip) | |
| logger.info(f"Processed video clip {i+1}/{len(video_urls)}") | |
| except Exception as e: | |
| logger.error(f"Error loading clip {i+1}: {e}") | |
| continue | |
| if not clips: | |
| logger.error("No valid video clips available") | |
| return "Error: No se pudieron procesar los videos descargados." | |
| # Concatenate video clips | |
| logger.info("Concatenating video clips") | |
| try: | |
| final_clip = concatenate_videoclips(clips, method="compose") | |
| final_clip = final_clip.set_duration(video_duration) | |
| except Exception as e: | |
| logger.error(f"Error concatenating clips: {e}") | |
| return "Error: No se pudieron unir los segmentos de video." | |
| # Add looped music or voice | |
| try: | |
| if music_file: | |
| logger.info("Adding user-uploaded music") | |
| music = AudioFileClip(music_file.name) | |
| music = audio_loop(music, duration=video_duration) | |
| final_audio = CompositeAudioClip([audio, music.volumex(0.3)]) | |
| else: | |
| logger.info("Using generated voice as audio") | |
| final_audio = audio | |
| final_clip = final_clip.set_audio(final_audio) | |
| except Exception as e: | |
| logger.error(f"Error processing audio: {e}") | |
| return "Error: Problema al procesar el audio." | |
| # Write final video | |
| logger.info(f"Writing final video to {output_video}") | |
| try: | |
| final_clip.write_videofile( | |
| output_video, | |
| codec="libx264", | |
| audio_codec="aac", | |
| fps=24, | |
| threads=4, | |
| preset='ultrafast', | |
| bitrate="3000k" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error writing video file: {e}") | |
| return "Error: No se pudo generar el video final." | |
| # Clean up | |
| logger.info("Cleaning up temporary files") | |
| try: | |
| for clip in clips: | |
| clip.close() | |
| audio.close() | |
| if music_file: | |
| music.close() | |
| final_clip.close() | |
| os.remove(voice_file) | |
| for i in range(len(video_urls)): | |
| if os.path.exists(f"temp_clip_{i}.mp4"): | |
| os.remove(f"temp_clip_{i}.mp4") | |
| except Exception as e: | |
| logger.error(f"Error during cleanup: {e}") | |
| logger.info("Video creation completed successfully") | |
| return output_video | |
| # Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Generador de Videos Automáticos") | |
| gr.Markdown(""" | |
| Crea videos automáticos con voz en español. | |
| Ingresa un tema (ej. 'Top 10 recetas mexicanas') o escribe tu propio guion. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| prompt = gr.Textbox( | |
| label="Tema del Video", | |
| placeholder="Ejemplo: Top 10 lugares para visitar en México", | |
| max_lines=2 | |
| ) | |
| custom_text = gr.Textbox( | |
| label="Guion Personalizado (opcional)", | |
| placeholder="Pega aquí tu propio guion si prefieres...", | |
| max_lines=10 | |
| ) | |
| music_file = gr.File( | |
| label="Música de Fondo (opcional, MP3)", | |
| file_types=[".mp3"] | |
| ) | |
| submit = gr.Button("Generar Video", variant="primary") | |
| with gr.Column(): | |
| output = gr.Video(label="Video Generado", format="mp4") | |
| gr.Examples( | |
| examples=[ | |
| ["Top 10 ciudades de México", "", None], | |
| ["", "1. Ciudad de México - La capital vibrante\n2. Guadalajara - Cuna del mariachi\n3. Monterrey - Modernidad industrial", None] | |
| ], | |
| inputs=[prompt, custom_text, music_file], | |
| label="Ejemplos para probar" | |
| ) | |
| submit.click( | |
| fn=create_video, | |
| inputs=[prompt, custom_text, music_file], | |
| outputs=output, | |
| api_name="generate_video" | |
| ) | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=True) |