|
|
"""
|
|
|
BabelDOC with Agentic AI - MCP Server
|
|
|
PDF Translation with Layout Preservation + Google Drive Integration
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import re
|
|
|
import json
|
|
|
import base64
|
|
|
import httpx
|
|
|
from pathlib import Path
|
|
|
from typing import Optional, Tuple, List
|
|
|
from datetime import datetime
|
|
|
|
|
|
from fastmcp import FastMCP
|
|
|
|
|
|
|
|
|
try:
|
|
|
from google.oauth2.credentials import Credentials
|
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
|
from google.auth.transport.requests import Request
|
|
|
from googleapiclient.discovery import build
|
|
|
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
|
|
|
import io
|
|
|
GOOGLE_AVAILABLE = True
|
|
|
except ImportError:
|
|
|
GOOGLE_AVAILABLE = False
|
|
|
|
|
|
|
|
|
MAX_PAGES = 20
|
|
|
GRADIO_URL = "http://127.0.0.1:7860"
|
|
|
OUTPUT_DIR = Path.home() / "Downloads" / "BabelDocs"
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
GDRIVE_SCOPES = ['https://www.googleapis.com/auth/drive']
|
|
|
GDRIVE_OAUTH_PATH = Path(os.getenv(
|
|
|
"GDRIVE_OAUTH_CREDENTIALS",
|
|
|
Path.home() / "Downloads" / "gcp-oauth.keys.json"
|
|
|
))
|
|
|
GDRIVE_TOKEN_PATH = OUTPUT_DIR / "gdrive_token.json"
|
|
|
|
|
|
|
|
|
MODAL_BASE_URL = os.getenv("BABELDOCS_MODAL_URL")
|
|
|
if not MODAL_BASE_URL:
|
|
|
raise ValueError("BABELDOCS_MODAL_URL environment variable is required")
|
|
|
MODAL_TRANSLATE_URL = f"{MODAL_BASE_URL}-babeldocstranslator-api.modal.run"
|
|
|
MODAL_HEALTH_URL = f"{MODAL_BASE_URL}-babeldocstranslator-health.modal.run"
|
|
|
|
|
|
SUPPORTED_LANGUAGES = {
|
|
|
"en": "English", "fr": "French", "es": "Spanish", "de": "German",
|
|
|
"it": "Italian", "pt": "Portuguese", "zh": "Chinese", "ja": "Japanese",
|
|
|
"ko": "Korean", "ru": "Russian", "ar": "Arabic",
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _warmup_modal():
|
|
|
"""Wake up Modal container."""
|
|
|
try:
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
await client.get(MODAL_HEALTH_URL)
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
|
|
|
def _count_pdf_pages(pdf_bytes: bytes) -> int:
|
|
|
"""Count pages in PDF."""
|
|
|
try:
|
|
|
import fitz
|
|
|
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
|
|
count = len(doc)
|
|
|
doc.close()
|
|
|
return count
|
|
|
except ImportError:
|
|
|
content = pdf_bytes.decode('latin-1', errors='ignore')
|
|
|
return content.count('/Type /Page') - content.count('/Type /Pages')
|
|
|
|
|
|
|
|
|
def _extract_gdrive_file_id(url: str) -> Optional[str]:
|
|
|
"""Extract file ID from Google Drive URL."""
|
|
|
patterns = [
|
|
|
r'/file/d/([a-zA-Z0-9_-]+)',
|
|
|
r'id=([a-zA-Z0-9_-]+)',
|
|
|
r'/open\?id=([a-zA-Z0-9_-]+)',
|
|
|
r'^([a-zA-Z0-9_-]{25,})$',
|
|
|
]
|
|
|
for pattern in patterns:
|
|
|
match = re.search(pattern, url)
|
|
|
if match:
|
|
|
return match.group(1)
|
|
|
return None
|
|
|
|
|
|
|
|
|
def _get_gdrive_credentials():
|
|
|
"""Get or refresh Google Drive credentials."""
|
|
|
if not GOOGLE_AVAILABLE:
|
|
|
return None, "Google libraries not installed"
|
|
|
if not GDRIVE_OAUTH_PATH.exists():
|
|
|
return None, f"OAuth credentials not found at {GDRIVE_OAUTH_PATH}"
|
|
|
|
|
|
creds = None
|
|
|
if GDRIVE_TOKEN_PATH.exists():
|
|
|
try:
|
|
|
creds = Credentials.from_authorized_user_file(str(GDRIVE_TOKEN_PATH), GDRIVE_SCOPES)
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
if not creds or not creds.valid:
|
|
|
if creds and creds.expired and creds.refresh_token:
|
|
|
try:
|
|
|
creds.refresh(Request())
|
|
|
except:
|
|
|
creds = None
|
|
|
|
|
|
if not creds:
|
|
|
try:
|
|
|
flow = InstalledAppFlow.from_client_secrets_file(str(GDRIVE_OAUTH_PATH), GDRIVE_SCOPES)
|
|
|
for port in [8101, 8102, 8103, 0]:
|
|
|
try:
|
|
|
creds = flow.run_local_server(port=port, open_browser=True, bind_addr="127.0.0.1")
|
|
|
break
|
|
|
except OSError:
|
|
|
if port == 0:
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
return None, f"OAuth failed: {str(e)}"
|
|
|
|
|
|
with open(GDRIVE_TOKEN_PATH, 'w') as token:
|
|
|
token.write(creds.to_json())
|
|
|
|
|
|
return creds, None
|
|
|
|
|
|
|
|
|
def _upload_to_gdrive(file_path: str, folder_id: Optional[str] = None) -> Tuple[Optional[str], Optional[str]]:
|
|
|
"""Upload file to Google Drive. Returns (file_id, error)."""
|
|
|
creds, error = _get_gdrive_credentials()
|
|
|
if error:
|
|
|
return None, error
|
|
|
|
|
|
try:
|
|
|
service = build('drive', 'v3', credentials=creds)
|
|
|
file_metadata = {'name': Path(file_path).name}
|
|
|
if folder_id:
|
|
|
file_metadata['parents'] = [folder_id]
|
|
|
|
|
|
media = MediaFileUpload(file_path, mimetype='application/pdf', resumable=True)
|
|
|
file = service.files().create(body=file_metadata, media_body=media, fields='id, webViewLink').execute()
|
|
|
return file.get('id'), None
|
|
|
except Exception as e:
|
|
|
return None, f"Upload failed: {str(e)}"
|
|
|
|
|
|
|
|
|
def _list_gdrive_folders() -> Tuple[Optional[List[dict]], Optional[str]]:
|
|
|
"""List folders in Google Drive."""
|
|
|
creds, error = _get_gdrive_credentials()
|
|
|
if error:
|
|
|
return None, error
|
|
|
|
|
|
try:
|
|
|
service = build('drive', 'v3', credentials=creds)
|
|
|
results = service.files().list(
|
|
|
q="mimeType='application/vnd.google-apps.folder' and trashed=false",
|
|
|
fields='files(id, name)', pageSize=50
|
|
|
).execute()
|
|
|
return results.get('files', []), None
|
|
|
except Exception as e:
|
|
|
return None, f"Failed to list folders: {str(e)}"
|
|
|
|
|
|
|
|
|
def _list_gdrive_files(folder_id: Optional[str] = None, file_type: Optional[str] = None) -> Tuple[Optional[List[dict]], Optional[str]]:
|
|
|
"""List files in Google Drive."""
|
|
|
creds, error = _get_gdrive_credentials()
|
|
|
if error:
|
|
|
return None, error
|
|
|
|
|
|
try:
|
|
|
service = build('drive', 'v3', credentials=creds)
|
|
|
query_parts = ["trashed=false"]
|
|
|
if folder_id:
|
|
|
query_parts.append(f"'{folder_id}' in parents")
|
|
|
if file_type == "pdf":
|
|
|
query_parts.append("mimeType='application/pdf'")
|
|
|
elif file_type == "folder":
|
|
|
query_parts.append("mimeType='application/vnd.google-apps.folder'")
|
|
|
|
|
|
results = service.files().list(
|
|
|
q=" and ".join(query_parts),
|
|
|
fields='files(id, name, mimeType, size, webViewLink)',
|
|
|
pageSize=100, orderBy='modifiedTime desc'
|
|
|
).execute()
|
|
|
return results.get('files', []), None
|
|
|
except Exception as e:
|
|
|
return None, f"Failed to list files: {str(e)}"
|
|
|
|
|
|
|
|
|
def _search_gdrive_files(query: str, file_type: Optional[str] = None) -> Tuple[Optional[List[dict]], Optional[str]]:
|
|
|
"""Search files in Google Drive by name."""
|
|
|
creds, error = _get_gdrive_credentials()
|
|
|
if error:
|
|
|
return None, error
|
|
|
|
|
|
try:
|
|
|
service = build('drive', 'v3', credentials=creds)
|
|
|
query_parts = [f"name contains '{query}'", "trashed=false"]
|
|
|
if file_type == "pdf":
|
|
|
query_parts.append("mimeType='application/pdf'")
|
|
|
|
|
|
results = service.files().list(
|
|
|
q=" and ".join(query_parts),
|
|
|
fields='files(id, name, mimeType, size, webViewLink)',
|
|
|
pageSize=50, orderBy='modifiedTime desc'
|
|
|
).execute()
|
|
|
return results.get('files', []), None
|
|
|
except Exception as e:
|
|
|
return None, f"Search failed: {str(e)}"
|
|
|
|
|
|
|
|
|
def _download_gdrive_file(file_id: str, destination: Optional[str] = None) -> Tuple[Optional[str], Optional[str]]:
|
|
|
"""Download file from Google Drive."""
|
|
|
creds, error = _get_gdrive_credentials()
|
|
|
if error:
|
|
|
return None, error
|
|
|
|
|
|
try:
|
|
|
service = build('drive', 'v3', credentials=creds)
|
|
|
file_metadata = service.files().get(fileId=file_id, fields='name').execute()
|
|
|
filename = file_metadata.get('name', f'download_{file_id}')
|
|
|
|
|
|
dest_path = Path(destination) / filename if destination else OUTPUT_DIR / filename
|
|
|
|
|
|
request = service.files().get_media(fileId=file_id)
|
|
|
file_handle = io.BytesIO()
|
|
|
downloader = MediaIoBaseDownload(file_handle, request)
|
|
|
done = False
|
|
|
while not done:
|
|
|
_, done = downloader.next_chunk()
|
|
|
|
|
|
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
dest_path.write_bytes(file_handle.getvalue())
|
|
|
return str(dest_path), None
|
|
|
except Exception as e:
|
|
|
return None, f"Download failed: {str(e)}"
|
|
|
|
|
|
|
|
|
async def _get_pdf_bytes(source: str) -> Tuple[bytes, str, Optional[str]]:
|
|
|
"""Get PDF bytes from local file or Google Drive URL. Returns (bytes, source_name, error)."""
|
|
|
|
|
|
if "drive.google.com" in source or "docs.google.com" in source:
|
|
|
file_id = _extract_gdrive_file_id(source)
|
|
|
if not file_id:
|
|
|
return b"", "", "Invalid Google Drive URL"
|
|
|
|
|
|
local_path, error = _download_gdrive_file(file_id)
|
|
|
if error:
|
|
|
return b"", "", error
|
|
|
|
|
|
pdf_bytes = Path(local_path).read_bytes()
|
|
|
return pdf_bytes, f"Google Drive: {Path(local_path).name}", None
|
|
|
|
|
|
|
|
|
if re.match(r'^[a-zA-Z0-9_-]{25,}$', source):
|
|
|
local_path, error = _download_gdrive_file(source)
|
|
|
if error:
|
|
|
return b"", "", error
|
|
|
|
|
|
pdf_bytes = Path(local_path).read_bytes()
|
|
|
return pdf_bytes, f"Google Drive: {Path(local_path).name}", None
|
|
|
|
|
|
|
|
|
pdf_path = Path(source)
|
|
|
if not pdf_path.exists():
|
|
|
return b"", "", f"File not found: {source}"
|
|
|
if pdf_path.suffix.lower() != ".pdf":
|
|
|
return b"", "", "File must be a PDF"
|
|
|
|
|
|
return pdf_path.read_bytes(), pdf_path.name, None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mcp = FastMCP(
|
|
|
name="babeldocs",
|
|
|
instructions=f"""PDF translation with layout preservation + Google Drive integration.
|
|
|
|
|
|
Max {MAX_PAGES} pages. For larger PDFs use Gradio at {GRADIO_URL}
|
|
|
|
|
|
WORKFLOW:
|
|
|
1. search_gdrive("filename") - Find PDF
|
|
|
2. download_from_gdrive(file_id) - Download
|
|
|
3. translate_pdf(path, "fr") - Translate
|
|
|
4. upload_to_gdrive(path, folder_id) - Upload
|
|
|
|
|
|
Or all-in-one: translate_and_upload(source, "fr", folder_id)
|
|
|
|
|
|
Output: {OUTPUT_DIR}
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def translate_pdf(source: str, target_lang: str = "fr") -> dict:
|
|
|
"""Translate PDF with layout preservation. Returns single translated file."""
|
|
|
await _warmup_modal()
|
|
|
|
|
|
try:
|
|
|
pdf_bytes, source_name, error = await _get_pdf_bytes(source)
|
|
|
if error:
|
|
|
return {"success": False, "message": error}
|
|
|
|
|
|
page_count = _count_pdf_pages(pdf_bytes)
|
|
|
if page_count > MAX_PAGES:
|
|
|
return {
|
|
|
"success": False,
|
|
|
"message": f"PDF has {page_count} pages (max {MAX_PAGES}). Use Gradio: {GRADIO_URL}"
|
|
|
}
|
|
|
|
|
|
if target_lang not in SUPPORTED_LANGUAGES:
|
|
|
return {"success": False, "message": f"Unsupported language: {target_lang}"}
|
|
|
|
|
|
|
|
|
payload = {
|
|
|
"pdf_base64": base64.b64encode(pdf_bytes).decode("utf-8"),
|
|
|
"target_lang": target_lang,
|
|
|
"no_dual": True,
|
|
|
"no_mono": False,
|
|
|
}
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=900.0, follow_redirects=True) as client:
|
|
|
response = await client.post(MODAL_TRANSLATE_URL, json=payload)
|
|
|
response.raise_for_status()
|
|
|
result = response.json()
|
|
|
|
|
|
if not result.get("success"):
|
|
|
return {"success": False, "message": result.get("message", "Translation failed")}
|
|
|
|
|
|
|
|
|
pdf_data = result.get("mono_img_pdf_base64") or result.get("mono_pdf_base64")
|
|
|
if not pdf_data:
|
|
|
return {"success": False, "message": "No output PDF generated"}
|
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
if source_name.startswith("Google Drive:"):
|
|
|
output_filename = f"translated_{timestamp}.{target_lang}.pdf"
|
|
|
else:
|
|
|
original_name = Path(source_name).stem
|
|
|
output_filename = f"{original_name}_translated.{target_lang}.pdf"
|
|
|
|
|
|
output_path = OUTPUT_DIR / output_filename
|
|
|
output_path.write_bytes(base64.b64decode(pdf_data))
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"message": f"Translated to {SUPPORTED_LANGUAGES[target_lang]}",
|
|
|
"source": source_name,
|
|
|
"page_count": page_count,
|
|
|
"output_file": str(output_path),
|
|
|
"filename": output_filename,
|
|
|
"stats": result.get("stats", {}),
|
|
|
}
|
|
|
|
|
|
except httpx.TimeoutException:
|
|
|
return {"success": False, "message": "Translation timed out (max 15 min)"}
|
|
|
except Exception as e:
|
|
|
return {"success": False, "message": f"Error: {str(e)}"}
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def translate_and_upload(source: str, target_lang: str = "fr", folder_id: Optional[str] = None) -> dict:
|
|
|
"""Translate PDF and upload to Google Drive."""
|
|
|
result = await translate_pdf(source, target_lang)
|
|
|
if not result.get("success"):
|
|
|
return result
|
|
|
|
|
|
file_id, error = _upload_to_gdrive(result["output_file"], folder_id)
|
|
|
if error:
|
|
|
return {"success": False, "message": error, "local_file": result["output_file"]}
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"message": f"Translated and uploaded to Google Drive",
|
|
|
"source": result.get("source"),
|
|
|
"page_count": result.get("page_count"),
|
|
|
"gdrive_id": file_id,
|
|
|
"gdrive_link": f"https://drive.google.com/file/d/{file_id}/view",
|
|
|
"local_file": result["output_file"],
|
|
|
}
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def check_pdf(source: str) -> dict:
|
|
|
"""Check if PDF can be translated (page count)."""
|
|
|
await _warmup_modal()
|
|
|
|
|
|
try:
|
|
|
pdf_bytes, source_name, error = await _get_pdf_bytes(source)
|
|
|
if error:
|
|
|
return {"success": False, "message": error}
|
|
|
|
|
|
page_count = _count_pdf_pages(pdf_bytes)
|
|
|
can_translate = page_count <= MAX_PAGES
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"source": source_name,
|
|
|
"pages": page_count,
|
|
|
"size_mb": round(len(pdf_bytes) / (1024 * 1024), 2),
|
|
|
"can_translate": can_translate,
|
|
|
"message": f"Ready ({page_count} pages)" if can_translate else f"Too large ({page_count} > {MAX_PAGES})"
|
|
|
}
|
|
|
except Exception as e:
|
|
|
return {"success": False, "message": f"Error: {str(e)}"}
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def get_supported_languages() -> dict:
|
|
|
"""Get supported languages."""
|
|
|
return {"languages": SUPPORTED_LANGUAGES, "default": "fr"}
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def upload_to_gdrive(file_path: str, folder_id: Optional[str] = None) -> dict:
|
|
|
"""Upload file to Google Drive."""
|
|
|
if not GOOGLE_AVAILABLE:
|
|
|
return {"success": False, "message": "Google libraries not installed"}
|
|
|
|
|
|
path = Path(file_path)
|
|
|
if not path.exists():
|
|
|
return {"success": False, "message": f"File not found: {file_path}"}
|
|
|
|
|
|
file_id, error = _upload_to_gdrive(file_path, folder_id)
|
|
|
if error:
|
|
|
return {"success": False, "message": error}
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"message": f"Uploaded {path.name}",
|
|
|
"file_id": file_id,
|
|
|
"web_link": f"https://drive.google.com/file/d/{file_id}/view",
|
|
|
}
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def list_gdrive_folders() -> dict:
|
|
|
"""List Google Drive folders."""
|
|
|
if not GOOGLE_AVAILABLE:
|
|
|
return {"success": False, "message": "Google libraries not installed"}
|
|
|
|
|
|
folders, error = _list_gdrive_folders()
|
|
|
if error:
|
|
|
return {"success": False, "message": error}
|
|
|
|
|
|
return {"success": True, "folders": folders, "count": len(folders)}
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def list_gdrive_files(folder_id: Optional[str] = None, file_type: Optional[str] = None) -> dict:
|
|
|
"""List files in Google Drive."""
|
|
|
if not GOOGLE_AVAILABLE:
|
|
|
return {"success": False, "message": "Google libraries not installed"}
|
|
|
|
|
|
files, error = _list_gdrive_files(folder_id, file_type)
|
|
|
if error:
|
|
|
return {"success": False, "message": error}
|
|
|
|
|
|
for f in files:
|
|
|
if f.get('size'):
|
|
|
f['size_mb'] = round(int(f['size']) / (1024 * 1024), 2)
|
|
|
|
|
|
return {"success": True, "files": files, "count": len(files)}
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def search_gdrive(query: str, file_type: Optional[str] = None) -> dict:
|
|
|
"""Search Google Drive by filename."""
|
|
|
if not GOOGLE_AVAILABLE:
|
|
|
return {"success": False, "message": "Google libraries not installed"}
|
|
|
|
|
|
files, error = _search_gdrive_files(query, file_type)
|
|
|
if error:
|
|
|
return {"success": False, "message": error}
|
|
|
|
|
|
for f in files:
|
|
|
if f.get('size'):
|
|
|
f['size_mb'] = round(int(f['size']) / (1024 * 1024), 2)
|
|
|
|
|
|
return {"success": True, "query": query, "files": files, "count": len(files)}
|
|
|
|
|
|
|
|
|
@mcp.tool()
|
|
|
async def download_from_gdrive(file_id: str) -> dict:
|
|
|
"""Download file from Google Drive."""
|
|
|
if not GOOGLE_AVAILABLE:
|
|
|
return {"success": False, "message": "Google libraries not installed"}
|
|
|
|
|
|
local_path, error = _download_gdrive_file(file_id)
|
|
|
if error:
|
|
|
return {"success": False, "message": error}
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"message": f"Downloaded to {local_path}",
|
|
|
"local_path": local_path,
|
|
|
"filename": Path(local_path).name,
|
|
|
}
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
mcp.run()
|
|
|
|