Spaces:
Running
Running
| import os | |
| import subprocess | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.responses import FileResponse | |
| from tempfile import NamedTemporaryFile | |
| import uvicorn | |
| app = FastAPI(title="Universal File Converter API") | |
| # --------------------------------------------------------- | |
| # The 300+ Format Routing Dictionary | |
| # Expand this dictionary to include all 300 extensions you need. | |
| # --------------------------------------------------------- | |
| CONVERSION_ENGINES = { | |
| # Documents -> LibreOffice | |
| 'pdf': 'libreoffice', 'docx': 'libreoffice', 'odt': 'libreoffice', 'xlsx': 'libreoffice', 'pptx': 'libreoffice', | |
| # Images -> ImageMagick | |
| 'jpg': 'imagemagick', 'png': 'imagemagick', 'webp': 'imagemagick', 'gif': 'imagemagick', 'bmp': 'imagemagick', | |
| # Media -> FFmpeg | |
| 'mp4': 'ffmpeg', 'mp3': 'ffmpeg', 'wav': 'ffmpeg', 'avi': 'ffmpeg', 'mkv': 'ffmpeg', 'flac': 'ffmpeg', | |
| # Text & Markup -> Pandoc | |
| 'html': 'pandoc', 'md': 'pandoc', 'rst': 'pandoc', 'epub': 'pandoc', 'tex': 'pandoc' | |
| } | |
| def execute_conversion(input_path: str, output_path: str, target_ext: str): | |
| """Routes the file to the correct system engine based on the target extension.""" | |
| engine = CONVERSION_ENGINES.get(target_ext) | |
| if not engine: | |
| raise ValueError(f"Target format '{target_ext}' is not mapped to an engine.") | |
| # Construct the specific CLI command | |
| if engine == 'libreoffice': | |
| out_dir = os.path.dirname(output_path) | |
| cmd = ['soffice', '--headless', '--convert-to', target_ext, '--outdir', out_dir, input_path] | |
| elif engine == 'imagemagick': | |
| cmd = ['magick', input_path, output_path] | |
| elif engine == 'ffmpeg': | |
| # -y overwrites output files without asking | |
| cmd = ['ffmpeg', '-y', '-i', input_path, output_path] | |
| elif engine == 'pandoc': | |
| cmd = ['pandoc', input_path, '-o', output_path] | |
| try: | |
| # Execute the command and capture any errors | |
| subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"Engine Error: {e.stderr.decode('utf-8')}") | |
| async def convert_file( | |
| file: UploadFile = File(...), | |
| target_format: str = Form(...) | |
| ): | |
| target_format = target_format.lower().strip('.') | |
| # Extract original extension safely | |
| filename_parts = file.filename.rsplit('.', 1) | |
| input_ext = filename_parts[-1] if len(filename_parts) > 1 else "tmp" | |
| base_name = filename_parts[0] if len(filename_parts) > 1 else "file" | |
| # Write the uploaded file to a temporary location | |
| with NamedTemporaryFile(delete=False, suffix=f".{input_ext}") as tmp_in: | |
| tmp_in.write(await file.read()) | |
| input_path = tmp_in.name | |
| # Define the output path in the same temporary directory | |
| output_path = input_path.rsplit('.', 1)[0] + f".{target_format}" | |
| try: | |
| execute_conversion(input_path, output_path, target_format) | |
| # Return the file to the frontend space | |
| return FileResponse( | |
| path=output_path, | |
| filename=f"{base_name}_converted.{target_format}", | |
| media_type='application/octet-stream' | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| # Hugging Face binds the Space to 0.0.0.0:7860 | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |