File size: 2,364 Bytes
4993eb8
eb783b4
 
 
 
2ab6e0a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eb783b4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2ab6e0a
 
 
eb783b4
 
 
 
 
 
2ab6e0a
 
 
 
 
eb783b4
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import logging
import os
import shutil
import subprocess
import tempfile
from typing import List, Tuple

import cv2
import numpy as np


def extract_frames(video_path: str) -> Tuple[List[np.ndarray], float, int, int]:
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError("Unable to open video.")

    fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    frames: List[np.ndarray] = []
    success, frame = cap.read()
    while success:
        frames.append(frame)
        success, frame = cap.read()

    cap.release()

    if not frames:
        raise ValueError("Video decode produced zero frames.")

    return frames, fps, width, height


def _transcode_with_ffmpeg(src_path: str, dst_path: str) -> None:
    cmd = [
        "ffmpeg",
        "-y",
        "-i",
        src_path,
        "-c:v",
        "libx264",
        "-preset",
        "veryfast",
        "-pix_fmt",
        "yuv420p",
        "-movflags",
        "+faststart",
        dst_path,
    ]
    process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
    if process.returncode != 0:
        raise RuntimeError(process.stderr.decode("utf-8", errors="ignore"))


def write_video(frames: List[np.ndarray], output_path: str, fps: float, width: int, height: int) -> None:
    if not frames:
        raise ValueError("No frames available for writing.")
    temp_fd, temp_path = tempfile.mkstemp(prefix="raw_", suffix=".mp4")
    os.close(temp_fd)
    writer = cv2.VideoWriter(temp_path, cv2.VideoWriter_fourcc(*"mp4v"), fps or 1.0, (width, height))
    if not writer.isOpened():
        os.remove(temp_path)
        raise ValueError("Failed to open VideoWriter.")

    for frame in frames:
        writer.write(frame)

    writer.release()
    try:
        _transcode_with_ffmpeg(temp_path, output_path)
        logging.debug("Transcoded video to H.264 for browser compatibility.")
        os.remove(temp_path)
    except FileNotFoundError:
        logging.warning("ffmpeg not found; serving fallback MP4V output.")
        shutil.move(temp_path, output_path)
    except RuntimeError as exc:
        logging.warning("ffmpeg transcode failed (%s); serving fallback MP4V output.", exc)
        shutil.move(temp_path, output_path)