CapsulesVideoPro_v2 / src /capsule_builder.py
omarbajouk's picture
Update src/capsule_builder.py
75070d3 verified
import os, json, uuid, gc, traceback, shutil, time
from moviepy.editor import ImageClip, AudioFileClip, CompositeVideoClip, TextClip
from moviepy.video.VideoClip import ColorClip
from .config import OUT_DIR, TMP_DIR, MANIFEST_PATH, W, H
from .tts_utils import tts_edge, tts_gtts, normalize_audio_to_wav
from .graphics import make_background
from .video_utils import prepare_video_presentateur, write_srt, write_video_with_fallback, safe_name
capsules = []
# Charger le manifeste existant
if os.path.exists(MANIFEST_PATH):
try:
data = json.load(open(MANIFEST_PATH, "r", encoding="utf-8"))
if isinstance(data, dict) and "capsules" in data:
capsules.extend(data["capsules"])
except Exception:
pass
def _save_manifest():
with open(MANIFEST_PATH, "w", encoding="utf-8") as f:
json.dump({"capsules": capsules}, f, ensure_ascii=False, indent=2)
def build_capsule(titre, sous_titre, texte_voix, texte_ecran, theme,
image_fond=None, logo_path=None, logo_pos="haut-gauche",
fond_mode="plein écran", video_presentateur=None,
voix_type="Féminine", position_presentateur="bottom-right",
plein=False, moteur_voix="Edge-TTS (recommandé)",
langue="fr", speaker=None):
# 1️⃣ TTS
try:
audio_mp = tts_edge(texte_voix, voice=speaker or ("fr-FR-DeniseNeural" if langue == "fr" else "nl-NL-MaaikeNeural"))
except Exception as e:
print(f"[Capsule] Erreur TTS Edge ({e}), fallback gTTS.")
audio_mp = tts_gtts(texte_voix, lang=langue)
audio_wav = audio_mp
if not audio_mp.lower().endswith(".wav"):
try:
audio_wav = normalize_audio_to_wav(audio_mp)
except Exception as e:
print(f"[Audio] Normalisation échouée ({e}), on garde {audio_mp}")
# 2️⃣ Fond
print("[Capsule] Génération du fond...")
fond_path = make_background(titre, sous_titre, texte_ecran, theme,
logo_path, logo_pos, image_fond, fond_mode)
if fond_path and not os.path.exists(fond_path):
# Correction chemin TMP_DIR
alt_path = os.path.join(os.getcwd(), "_tmp_capsules", os.path.basename(fond_path))
if os.path.exists(alt_path):
fond_path = alt_path
print(f"[Capsule] ⚙️ Correction du chemin fond: {fond_path}")
if fond_path is None:
print("[Capsule] ❌ fond_path est None, création d'urgence")
from PIL import Image
try:
emergency_bg = Image.new("RGB", (W, H), (0, 82, 147))
fond_path = os.path.join(TMP_DIR, f"emergency_{uuid.uuid4().hex[:6]}.png")
emergency_bg.save(fond_path)
print(f"[Capsule] ✅ Fond d'urgence créé: {fond_path}")
except Exception as e:
print(f"[Capsule] ❌ Impossible de créer le fond d'urgence: {e}")
fond_path = None
# 3️⃣ Composition vidéo
audio = AudioFileClip(audio_wav)
dur = float(audio.duration or 5.0)
target_fps = 25
clips = []
if fond_path and os.path.exists(fond_path):
try:
bg = ImageClip(fond_path).set_duration(dur)
clips.append(bg)
print(f"[Capsule] ✅ Fond chargé: {fond_path}")
except Exception as e:
print(f"[Capsule] ❌ Erreur ImageClip, fallback ColorClip: {e}")
bg = ColorClip(size=(W, H), color=(0, 82, 147)).set_duration(dur)
clips.append(bg)
else:
print("[Capsule] ❌ Aucun fond valide, utilisation ColorClip")
bg = ColorClip(size=(W, H), color=(0, 82, 147)).set_duration(dur)
clips.append(bg)
# 4️⃣ Présentateur
v_presentateur = None
has_text = bool(texte_ecran and texte_ecran.strip())
plein_auto = plein if not has_text else False
if video_presentateur and os.path.exists(video_presentateur):
ext = os.path.splitext(video_presentateur)[1].lower()
if ext in [".mp4", ".mov", ".avi", ".mkv"]:
print(f"[Capsule] Vidéo présentateur trouvée: {video_presentateur}")
v_presentateur = prepare_video_presentateur(
video_presentateur, dur, position_presentateur, plein_auto
)
elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".webp"]:
print(f"[Capsule] Image présentateur trouvée: {video_presentateur}")
try:
img_clip = ImageClip(video_presentateur).set_duration(dur)
if plein_auto:
img_clip = img_clip.resize((W, H)).set_position(("center", "center"))
else:
img_clip = img_clip.resize(width=520)
pos_map = {
"bottom-right": ("right", "bottom"),
"bottom-left": ("left", "bottom"),
"top-right": ("right", "top"),
"top-left": ("left", "top"),
"center": ("center", "center"),
}
img_clip = img_clip.set_position(pos_map.get(position_presentateur, ("right", "bottom")))
v_presentateur = img_clip
except Exception as e:
print(f"[Capsule] ❌ Erreur image présentateur : {e}")
if v_presentateur:
clips.append(v_presentateur)
print(f"[Capsule] ✅ Présentateur ajouté")
else:
print(f"[Capsule] Aucun présentateur: {video_presentateur}")
# 5️⃣ Overlay texte (par-dessus tout)
if texte_ecran and texte_ecran.strip():
try:
txt_clip = TextClip(
texte_ecran,
fontsize=50,
color='white',
font='DejaVu-Sans-Bold',
method='caption',
size=(W - 200, None)
).set_position(('center', 'bottom')).set_duration(dur)
clips.append(txt_clip)
print("[Capsule] ✅ Overlay texte ajouté (bas centré)")
except Exception as e:
print(f"[Capsule] ⚠️ Overlay texte échoué : {e}")
# 6️⃣ Assemblage final
final = CompositeVideoClip(clips).set_audio(audio.set_fps(44100))
name = safe_name(f"{titre}_{langue}")
out_base = os.path.join(OUT_DIR, name)
out = write_video_with_fallback(final, out_base, fps=target_fps)
srt_path = write_srt(texte_voix, dur)
capsules.append({
"file": out,
"title": titre,
"langue": langue,
"voice": speaker or voix_type,
"theme": theme,
"duration": round(dur, 1),
"inputs": {"image_fond": image_fond, "logo": logo_path, "video_presentateur": video_presentateur},
"texte_voix": texte_voix,
"texte_ecran": texte_ecran
})
_save_manifest()
# 7️⃣ Nettoyage
try:
audio.close()
final.close()
bg.close()
if v_presentateur:
v_presentateur.close()
if os.path.exists(audio_mp):
os.remove(audio_mp)
if audio_wav != audio_mp and os.path.exists(audio_wav):
os.remove(audio_wav)
except Exception as e:
print(f"[Clean] Erreur nettoyage: {e}")
gc.collect()
return out, f"✅ Capsule {langue.upper()} créée ({dur:.1f}s, voix {speaker or voix_type})", srt_path
# 🧾 Fonctions supplémentaires identiques à ta version originale
def table_capsules():
return [[i+1, c["title"], c.get("langue","fr").upper(),
f"{c['duration']}s", c["theme"], c["voice"], os.path.basename(c["file"])]
for i, c in enumerate(capsules)]
def assemble_final():
if not capsules:
return None, "❌ Aucune capsule."
from moviepy.editor import VideoFileClip
from moviepy.video.compositing.concatenate import concatenate_videoclips
clips = [VideoFileClip(c["file"]) for c in capsules]
try:
out = write_video_with_fallback(
concatenate_videoclips(clips, method="compose"),
os.path.join(OUT_DIR, "VIDEO_COMPLETE"),
fps=25,
)
return out, f"🎉 Vidéo finale prête ({len(capsules)} capsules)."
finally:
for c in clips:
try: c.close()
except: pass
def supprimer_capsule(index):
try:
idx = int(index) - 1
if 0 <= idx < len(capsules):
fichier = capsules[idx]["file"]
if os.path.exists(fichier):
os.remove(fichier)
del capsules[idx]
_save_manifest()
return f"🗑 Capsule supprimée : {fichier}", table_capsules()
else:
return "⚠️ Index invalide.", table_capsules()
except Exception as e:
return f"❌ Erreur lors de la suppression : {e}", table_capsules()
def deplacer_capsule(index, direction):
try:
idx = int(index) - 1
if direction == "up" and idx > 0:
capsules[idx - 1], capsules[idx] = capsules[idx], capsules[idx - 1]
elif direction == "down" and idx < len(capsules) - 1:
capsules[idx + 1], capsules[idx] = capsules[idx], capsules[idx + 1]
_save_manifest()
return f"🔁 Capsule déplacée {direction}.", table_capsules()
except Exception as e:
return f"❌ Erreur de déplacement : {e}", table_capsules()
def export_project_zip():
"""Crée un ZIP complet pour toutes les capsules (vidéos, srt, inputs, params, manifest)."""
try:
zip_root = os.path.join(TMP_DIR, f"zip_export_{int(time.time())}")
os.makedirs(zip_root, exist_ok=True)
manifest = {"capsules": capsules}
with open(os.path.join(zip_root, "manifest.json"), "w", encoding="utf-8") as f:
json.dump(manifest, f, ensure_ascii=False, indent=2)
for i, cap in enumerate(capsules, 1):
cap_dir = os.path.join(zip_root, f"capsule_{i}")
os.makedirs(os.path.join(cap_dir, "inputs"), exist_ok=True)
if cap.get("file") and os.path.exists(cap["file"]):
shutil.copy2(cap["file"], os.path.join(cap_dir, os.path.basename(cap["file"])))
for f in os.listdir(OUT_DIR):
if f.lower().endswith(".srt"):
shutil.copy2(os.path.join(OUT_DIR, f), os.path.join(cap_dir, f))
for k in ["image_fond", "logo", "video_presentateur"]:
p = cap.get("inputs", {}).get(k) if cap.get("inputs") else None
if p and os.path.exists(p):
shutil.copy2(p, os.path.join(cap_dir, "inputs", os.path.basename(p)))
with open(os.path.join(cap_dir, "params.json"), "w", encoding="utf-8") as pf:
json.dump(cap, pf, ensure_ascii=False, indent=2)
zip_path = shutil.make_archive(os.path.join(TMP_DIR, f"capsules_export_{int(time.time())}"), "zip", zip_root)
return zip_path
except Exception as e:
print(f"[Export] ❌ Erreur ZIP : {e}")
return None
def assemble_final_fast():
"""Concaténation instantanée sans réencodage (FFmpeg direct)."""
import subprocess
if not capsules:
return None, "❌ Aucune capsule."
list_path = os.path.join(OUT_DIR, "concat_list.txt")
with open(list_path, "w", encoding="utf-8") as f:
for c in capsules:
f.write(f"file '{c['file']}'\n")
out_path = os.path.join(OUT_DIR, "VIDEO_COMPLETE_FAST.mp4")
cmd = ["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_path, "-c", "copy", out_path]
subprocess.run(cmd, check=True)
return out_path, "🎉 Vidéo finale assemblée instantanément (sans réencodage)"