|
|
from fastapi import APIRouter, HTTPException, BackgroundTasks, Request |
|
|
from fastapi.responses import FileResponse |
|
|
from pydantic import BaseModel |
|
|
from pdf2docx import Converter |
|
|
import os |
|
|
import pdfkit |
|
|
import uuid |
|
|
import markdown |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Optional |
|
|
|
|
|
router = APIRouter() |
|
|
|
|
|
TEMP_DIR = "/.tempfiles" |
|
|
FILE_RETENTION_MINUTES = 30 |
|
|
BASE_URL = "https://pvanand-doc-maker.hf.space/api/v1/" |
|
|
|
|
|
class MarkdownRequest(BaseModel): |
|
|
markdown_content: str |
|
|
|
|
|
class ConversionResponse(BaseModel): |
|
|
download_url: str |
|
|
expires_at: datetime |
|
|
|
|
|
|
|
|
converted_files = {} |
|
|
|
|
|
def ensure_temp_dir(): |
|
|
os.makedirs(TEMP_DIR, exist_ok=True) |
|
|
|
|
|
def get_download_url(file_id: str) -> str: |
|
|
|
|
|
return f"{BASE_URL}download/{file_id}" |
|
|
|
|
|
def generate_temp_filepath(extension: str) -> tuple[str, str]: |
|
|
file_id = str(uuid.uuid4()) |
|
|
file_path = os.path.join(TEMP_DIR, f"{file_id}.{extension}") |
|
|
return file_path, file_id |
|
|
|
|
|
def markdown_to_html(markdown_content: str) -> str: |
|
|
return markdown.markdown(markdown_content) |
|
|
|
|
|
def html_to_pdf(html_content: str, output_path: str) -> None: |
|
|
options = { |
|
|
'page-size': 'A4', |
|
|
'margin-top': '0.75in', |
|
|
'margin-right': '0.75in', |
|
|
'margin-bottom': '0.75in', |
|
|
'margin-left': '0.75in', |
|
|
'encoding': "UTF-8", |
|
|
} |
|
|
pdfkit.from_string(html_content, output_path, options=options) |
|
|
|
|
|
def pdf_to_docx(pdf_path: str, docx_path: str) -> None: |
|
|
cv = Converter(pdf_path) |
|
|
cv.convert(docx_path) |
|
|
cv.close() |
|
|
|
|
|
def cleanup_expired_files(background_tasks: BackgroundTasks): |
|
|
current_time = datetime.utcnow() |
|
|
expired_files = [] |
|
|
|
|
|
for file_id, metadata in converted_files.items(): |
|
|
if current_time > metadata['expires_at']: |
|
|
if os.path.exists(metadata['file_path']): |
|
|
background_tasks.add_task(os.unlink, metadata['file_path']) |
|
|
expired_files.append(file_id) |
|
|
|
|
|
for file_id in expired_files: |
|
|
converted_files.pop(file_id, None) |
|
|
|
|
|
@router.post("/convert/md_to_pdf", response_model=ConversionResponse) |
|
|
async def convert_md_to_pdf( |
|
|
request: Request, |
|
|
markdown_req: MarkdownRequest, |
|
|
background_tasks: BackgroundTasks |
|
|
): |
|
|
ensure_temp_dir() |
|
|
cleanup_expired_files(background_tasks) |
|
|
|
|
|
pdf_path, file_id = generate_temp_filepath("pdf") |
|
|
|
|
|
try: |
|
|
html_content = markdown_to_html(markdown_req.markdown_content) |
|
|
html_to_pdf(html_content, pdf_path) |
|
|
|
|
|
expiration_time = datetime.utcnow() + timedelta(minutes=FILE_RETENTION_MINUTES) |
|
|
converted_files[file_id] = { |
|
|
'file_path': pdf_path, |
|
|
'mime_type': 'application/pdf', |
|
|
'expires_at': expiration_time, |
|
|
'extension': 'pdf' |
|
|
} |
|
|
|
|
|
return ConversionResponse( |
|
|
download_url=get_download_url(file_id), |
|
|
expires_at=expiration_time |
|
|
) |
|
|
except Exception as e: |
|
|
if os.path.exists(pdf_path): |
|
|
os.unlink(pdf_path) |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@router.post("/convert/md_to_docx", response_model=ConversionResponse) |
|
|
async def convert_md_to_docx( |
|
|
request: Request, |
|
|
markdown_req: MarkdownRequest, |
|
|
background_tasks: BackgroundTasks |
|
|
): |
|
|
ensure_temp_dir() |
|
|
cleanup_expired_files(background_tasks) |
|
|
|
|
|
pdf_path = generate_temp_filepath("pdf")[0] |
|
|
docx_path, file_id = generate_temp_filepath("docx") |
|
|
|
|
|
try: |
|
|
html_content = markdown_to_html(markdown_req.markdown_content) |
|
|
html_to_pdf(html_content, pdf_path) |
|
|
pdf_to_docx(pdf_path, docx_path) |
|
|
|
|
|
|
|
|
os.unlink(pdf_path) |
|
|
|
|
|
expiration_time = datetime.utcnow() + timedelta(minutes=FILE_RETENTION_MINUTES) |
|
|
converted_files[file_id] = { |
|
|
'file_path': docx_path, |
|
|
'mime_type': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', |
|
|
'expires_at': expiration_time, |
|
|
'extension': 'docx' |
|
|
} |
|
|
|
|
|
return ConversionResponse( |
|
|
download_url=get_download_url(file_id), |
|
|
expires_at=expiration_time |
|
|
) |
|
|
except Exception as e: |
|
|
for path in [pdf_path, docx_path]: |
|
|
if os.path.exists(path): |
|
|
os.unlink(path) |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@router.get("/download/{file_id}") |
|
|
async def download_file( |
|
|
file_id: str, |
|
|
background_tasks: BackgroundTasks |
|
|
): |
|
|
cleanup_expired_files(background_tasks) |
|
|
|
|
|
file_info = converted_files.get(file_id) |
|
|
if not file_info: |
|
|
raise HTTPException(status_code=404, detail="File not found or expired") |
|
|
|
|
|
if datetime.utcnow() > file_info['expires_at']: |
|
|
converted_files.pop(file_id, None) |
|
|
if os.path.exists(file_info['file_path']): |
|
|
os.unlink(file_info['file_path']) |
|
|
raise HTTPException(status_code=404, detail="File has expired") |
|
|
|
|
|
return FileResponse( |
|
|
file_info['file_path'], |
|
|
media_type=file_info['mime_type'], |
|
|
filename=f"converted_{file_id}.{file_info['extension']}" |
|
|
) |