Spaces:
Sleeping
Sleeping
| import os, re, math, uuid, time, shutil, logging, tempfile, threading, requests, numpy as np | |
| from datetime import datetime, timedelta | |
| from collections import Counter | |
| import gradio as gr | |
| import torch | |
| from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
| from keybert import KeyBERT | |
| from TTS.api import TTS | |
| from moviepy.editor import ( | |
| VideoFileClip, AudioFileClip, concatenate_videoclips, concatenate_audioclips, | |
| CompositeAudioClip, AudioClip, TextClip, CompositeVideoClip, VideoClip, vfx | |
| ) | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| logger = logging.getLogger(__name__) | |
| PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
| if not PEXELS_API_KEY: | |
| raise RuntimeError("Debes definir PEXELS_API_KEY en Variables & secrets") | |
| tokenizer = GPT2Tokenizer.from_pretrained("datificate/gpt2-small-spanish") | |
| gpt2 = GPT2LMHeadModel.from_pretrained("datificate/gpt2-small-spanish").eval() | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| kw_model = KeyBERT("distilbert-base-multilingual-cased") | |
| tts_engine = TTS(model_name="tts_models/es/css10/vits", progress_bar=False, gpu=False) | |
| RESULTS_DIR = "video_results" | |
| os.makedirs(RESULTS_DIR, exist_ok=True) | |
| TASKS = {} | |
| # βββββββββ helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def gpt2_script(prompt: str, mx: int = 160) -> str: | |
| ins = f"Escribe un guion corto, interesante y coherente sobre: {prompt}" | |
| inp = tokenizer(ins, return_tensors="pt", truncation=True, max_length=512) | |
| out = gpt2.generate( | |
| **inp, max_length=mx + inp["input_ids"].shape[1], do_sample=True, | |
| top_p=0.9, top_k=40, temperature=0.7, no_repeat_ngram_size=3, | |
| pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, | |
| ) | |
| txt = tokenizer.decode(out[0], skip_special_tokens=True) | |
| return txt.split("sobre:")[-1].strip()[:mx] | |
| def coqui_tts(text: str, path: str): | |
| text = re.sub(r"[^\w\s.,!?ÑéΓΓ³ΓΊΓ±ΓΓΓΓΓΓ]", "", text)[:500] | |
| tts_engine.tts_to_file(text=text, file_path=path) | |
| def keywords(text: str) -> list[str]: | |
| clean = re.sub(r"[^\w\sÑéΓΓ³ΓΊΓ±ΓΓΓΓΓΓ]", "", text.lower()) | |
| try: | |
| kws = kw_model.extract_keywords(clean, stop_words="spanish", top_n=5) | |
| return [k.replace(" ", "+") for k, _ in kws if k] | |
| except Exception: | |
| words = [w for w in clean.split() if len(w) > 4] | |
| return [w for w, _ in Counter(words).most_common(5)] or ["nature"] | |
| def pexels_search(q: str, n: int) -> list[dict]: | |
| r = requests.get( | |
| "https://api.pexels.com/videos/search", | |
| headers={"Authorization": PEXELS_API_KEY}, | |
| params={"query": q, "per_page": n, "orientation": "landscape"}, | |
| timeout=20, | |
| ) | |
| r.raise_for_status() | |
| return r.json().get("videos", []) | |
| def download(url: str, folder: str) -> str | None: | |
| name = uuid.uuid4().hex + ".mp4" | |
| path = os.path.join(folder, name) | |
| with requests.get(url, stream=True, timeout=60) as r: | |
| r.raise_for_status() | |
| with open(path, "wb") as f: | |
| for chunk in r.iter_content(1024 * 1024): | |
| f.write(chunk) | |
| return path if os.path.getsize(path) > 1000 else None | |
| def loop_audio(aclip: AudioFileClip, dur: float) -> AudioFileClip: | |
| if aclip.duration >= dur: | |
| return aclip.subclip(0, dur) | |
| loops = math.ceil(dur / aclip.duration) | |
| return concatenate_audioclips([aclip] * loops).subclip(0, dur) | |
| def make_subs_clips(script: str, video_w: int, video_h: int, duration: float): | |
| sentences = [s.strip() for s in re.split(r"[.!?ΒΏΒ‘]", script) if s.strip()] | |
| total_words = sum(len(s.split()) for s in sentences) or 1 | |
| word_time = duration / total_words | |
| clips, cursor = [], 0.0 | |
| for sent in sentences: | |
| n_words = len(sent.split()) | |
| dur = n_words * word_time | |
| txt_clip = ( | |
| TextClip(sent, fontsize=int(video_h * 0.05), color="white", | |
| stroke_color="black", stroke_width=2, method="caption", | |
| size=(int(video_w * 0.9), None)) | |
| .set_start(cursor) | |
| .set_duration(dur) | |
| .set_position(("center", video_h * 0.85)) | |
| ) | |
| clips.append(txt_clip) | |
| cursor += dur | |
| return clips | |
| def make_grain_clip(size: tuple[int, int], duration: float): | |
| w, h = size | |
| def frame(_t): | |
| noise = np.random.randint(0, 256, (h, w, 1), dtype=np.uint8) | |
| return np.repeat(noise, 3, axis=2) | |
| return VideoClip(frame, duration=duration).set_opacity(0.15) | |
| # βββββββββ video builder ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_video(text: str, gen_script: bool, music_fp: str | None) -> str: | |
| tmp = tempfile.mkdtemp() | |
| script = gpt2_script(text) if gen_script else text.strip() | |
| voice_path = os.path.join(tmp, "voice.mp3") | |
| coqui_tts(script, voice_path) | |
| voice_clip = AudioFileClip(voice_path) | |
| adur = voice_clip.duration | |
| vids = [] | |
| for kw in keywords(script): | |
| if len(vids) >= 8: | |
| break | |
| for v in pexels_search(kw, 2): | |
| best = max(v["video_files"], key=lambda x: x["width"] * x["height"]) | |
| p = download(best["link"], tmp) | |
| if p: | |
| vids.append(p) | |
| if len(vids) >= 8: | |
| break | |
| if not vids: | |
| raise RuntimeError("Sin vΓdeos disponibles") | |
| segs, acc = [], 0 | |
| for path in vids: | |
| if acc >= adur + 2: | |
| break | |
| clip = VideoFileClip(path) | |
| seg = clip.subclip(0, min(8, clip.duration)) | |
| segs.append(seg) | |
| acc += seg.duration | |
| base = concatenate_videoclips(segs, method="chain") | |
| if base.duration < adur: | |
| loops = math.ceil(adur / base.duration) | |
| base = concatenate_videoclips([base] * loops, method="chain") | |
| base = base.subclip(0, adur) | |
| if music_fp: | |
| mclip = loop_audio(AudioFileClip(music_fp), adur).volumex(0.2) | |
| audio = CompositeAudioClip([mclip, voice_clip]) | |
| else: | |
| audio = voice_clip | |
| subs = make_subs_clips(script, base.w, base.h, adur) | |
| grain = make_grain_clip((base.w, base.h), adur) | |
| final_vid = CompositeVideoClip([base, grain, *subs]).set_audio(audio) | |
| out_path = os.path.join(tmp, "final.mp4") | |
| final_vid.write_videofile(out_path, fps=24, codec="libx264", audio_codec="aac", logger=None) | |
| return out_path | |
| # βββββββββ async tasks ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def worker(tid: str, mode: str, topic: str, user_script: str, music: str | None): | |
| try: | |
| txt = topic if mode == "Generar Guion con IA" else user_script | |
| res_tmp = build_video(txt, mode == "Generar Guion con IA", music) | |
| final_path = os.path.join(RESULTS_DIR, f"{tid}.mp4") | |
| shutil.copy2(res_tmp, final_path) | |
| TASKS[tid] = {"status": "done", "result": final_path, "ts": datetime.utcnow()} | |
| except Exception as e: | |
| TASKS[tid] = {"status": "error", "error": str(e), "ts": datetime.utcnow()} | |
| def submit(mode, topic, user_script, music): | |
| content = topic if mode == "Generar Guion con IA" else user_script | |
| if not content.strip(): | |
| return "", "Ingresa texto" | |
| tid = uuid.uuid4().hex[:8] | |
| TASKS[tid] = {"status": "processing", "ts": datetime.utcnow()} | |
| threading.Thread(target=worker, args=(tid, mode, topic, user_script, music), daemon=True).start() | |
| return tid, f"Tarea {tid} creada" | |
| def check(tid): | |
| if tid not in TASKS: | |
| return None, None, "ID invΓ‘lido" | |
| info = TASKS[tid] | |
| stat = info["status"] | |
| if stat == "processing": | |
| return None, None, "Procesando..." | |
| if stat == "error": | |
| return None, None, f"Error: {info['error']}" | |
| return info["result"], info["result"], "VΓdeo listo π" | |
| # βββββββββ janitor thread βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def janitor(): | |
| while True: | |
| now = datetime.utcnow() | |
| for fname in os.listdir(RESULTS_DIR): | |
| fpath = os.path.join(RESULTS_DIR, fname) | |
| try: | |
| mtime = datetime.utcfromtimestamp(os.path.getmtime(fpath)) | |
| if now - mtime > timedelta(hours=24): | |
| os.remove(fpath) | |
| for k, v in list(TASKS.items()): | |
| if v.get("result") == fpath: | |
| del TASKS[k] | |
| except Exception: | |
| pass | |
| time.sleep(3600) | |
| threading.Thread(target=janitor, daemon=True).start() | |
| # βββββββββ gradio ui βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title="Generador de VΓdeos IA") as demo: | |
| with gr.Tabs(): | |
| with gr.TabItem("Crear VΓdeo"): | |
| mode = gr.Radio(["Generar Guion con IA", "Usar Mi Guion"], value="Generar Guion con IA") | |
| topic = gr.Textbox(label="Tema") | |
| user_script = gr.Textbox(label="Guion Completo", visible=False) | |
| music = gr.Audio(type="filepath", label="MΓΊsica (opcional)") | |
| btn = gr.Button("Generar") | |
| tid_out = gr.Textbox(label="ID de tarea") | |
| msg = gr.Textbox(label="Estado") | |
| with gr.TabItem("Revisar Estado"): | |
| tid_in = gr.Textbox(label="ID de tarea") | |
| chk = gr.Button("Verificar") | |
| vid = gr.Video() | |
| dlf = gr.File() | |
| mode.change( | |
| lambda m: (gr.update(visible=m == "Generar Guion con IA"), gr.update(visible=m != "Generar Guion con IA")), | |
| mode, [topic, user_script] | |
| ) | |
| btn.click(submit, [mode, topic, user_script, music], [tid_out, msg]) | |
| chk.click(check, tid_in, [vid, dlf, msg]) | |
| if __name__ == "__main__": | |
| demo.launch() |