Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import queue | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import threading | |
| import time | |
| import traceback | |
| from abc import ABC, abstractmethod | |
| from contextlib import contextmanager | |
| from pathlib import Path | |
| from queue import Queue | |
| import ffmpeg | |
| import numpy as np | |
| import pydub | |
| from pydub import AudioSegment | |
| from stf_tools.writers._async import AudioAsyncWriter, VideoAsyncWriter | |
| from stf_tools.writers._thread import AudioThreadWriter, VideoThreadWriter | |
| video_pipe_name = "video" | |
| audio_pipe_name = "audio" | |
| class BaseFFMPEGWriter(ABC): | |
| def __init__( | |
| self, | |
| path, | |
| width, | |
| height, | |
| fps, | |
| crf=17, | |
| audio_sample_rate=16000, | |
| quiet=True, | |
| ): | |
| self.path = Path(path) | |
| self.width = width | |
| self.height = height | |
| self.fps = fps | |
| self.crf = crf | |
| self.path.parent.mkdir(exist_ok=True, parents=True) | |
| pipe_root = tempfile.mkdtemp() | |
| self.pipe_dir = Path(pipe_root) | |
| self.video_pipe_path = self.pipe_dir / video_pipe_name | |
| self.audio_pipe_path = self.pipe_dir / audio_pipe_name | |
| os.mkfifo(self.video_pipe_path) | |
| os.mkfifo(self.audio_pipe_path) | |
| self.audio_sample_rate = audio_sample_rate | |
| self.write_process = self._run_ffmpeg( | |
| quiet=quiet, | |
| ) | |
| def _run_ffmpeg(self, quiet): | |
| """ffmpeg writer using named pipe""" | |
| class ThreadFFMPEGWriter(BaseFFMPEGWriter): | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.video_writer = VideoThreadWriter(self.video_pipe_path, self.fps) | |
| self.audio_writer = AudioThreadWriter( | |
| self.audio_pipe_path, self.audio_sample_rate | |
| ) | |
| def finish(self, forced=False): | |
| self.video_writer.finish(forced=forced) | |
| self.audio_writer.finish(forced=forced) | |
| if forced: | |
| self.write_process.kill() | |
| else: | |
| self.write_process.wait() | |
| shutil.rmtree(self.pipe_dir, ignore_errors=True) | |
| class AsyncFFMPEGWriter(BaseFFMPEGWriter): | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.video_writer = VideoAsyncWriter(self.video_pipe_path, self.fps) | |
| self.audio_writer = AudioAsyncWriter( | |
| self.audio_pipe_path, self.audio_sample_rate | |
| ) | |
| def finish(self, forced=False): | |
| if forced: | |
| self.write_process.kill() | |
| else: | |
| self.write_process.wait() | |
| shutil.rmtree(self.pipe_dir, ignore_errors=True) | |