Spaces:
Running
Running
| import json | |
| import os | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Union | |
| import ray | |
| from ray.data import Dataset | |
| from graphgen.bases.base_reader import BaseReader | |
| from graphgen.models.reader.txt_reader import TXTReader | |
| from graphgen.utils import logger, pick_device | |
| class PDFReader(BaseReader): | |
| """ | |
| PDF files are converted using MinerU, see [MinerU](https://github.com/opendatalab/MinerU). | |
| After conversion, the resulting markdown file is parsed into text, images, tables, and formulas which can be used | |
| for multi-modal graph generation. | |
| """ | |
| def __init__( | |
| self, | |
| *, | |
| output_dir: Optional[Union[str, Path]] = None, | |
| method: str = "auto", # auto | txt | ocr | |
| lang: Optional[str] = None, # ch / en / ja / ... | |
| backend: Optional[ | |
| str | |
| ] = None, # pipeline | vlm-transformers | vlm-sglang-engine | vlm-sglang-client | |
| device: Optional[str] = "auto", # cpu | cuda | cuda:0 | npu | mps | auto | |
| source: Optional[str] = None, # huggingface | modelscope | local | |
| vlm_url: Optional[str] = None, # 当 backend=vlm-sglang-client 时必填 | |
| start_page: Optional[int] = None, # 0-based | |
| end_page: Optional[int] = None, # 0-based, inclusive | |
| formula: bool = True, | |
| table: bool = True, | |
| return_assets: bool = True, | |
| **other_mineru_kwargs: Any, | |
| ): | |
| super().__init__() | |
| self.output_dir = os.path.join(output_dir, "mineru") if output_dir else None | |
| if device == "auto": | |
| device = pick_device() | |
| self._default_kwargs: Dict[str, Any] = { | |
| "method": method, | |
| "lang": lang, | |
| "backend": backend, | |
| "device": device, | |
| "source": source, | |
| "vlm_url": vlm_url, | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| "formula": formula, | |
| "table": table, | |
| **other_mineru_kwargs, | |
| } | |
| self._default_kwargs = { | |
| k: v for k, v in self._default_kwargs.items() if v is not None | |
| } | |
| self.return_assets = return_assets | |
| self.parser = MinerUParser() | |
| self.txt_reader = TXTReader() | |
| def read( | |
| self, | |
| input_path: Union[str, List[str]], | |
| **override, | |
| ) -> Dataset: | |
| # Ensure input_path is a list | |
| if isinstance(input_path, str): | |
| input_path = [input_path] | |
| paths_ds = ray.data.from_items(input_path) | |
| def process_pdf(row: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| try: | |
| pdf_path = row["item"] | |
| kwargs = {**self._default_kwargs, **override} | |
| return self._call_mineru(Path(pdf_path), kwargs) | |
| except Exception as e: | |
| logger.error("Failed to process %s: %s", row, e) | |
| return [] | |
| docs_ds = paths_ds.flat_map(process_pdf) | |
| docs_ds = docs_ds.filter(self._should_keep_item) | |
| return docs_ds | |
| def _call_mineru( | |
| self, pdf_path: Path, kwargs: Dict[str, Any] | |
| ) -> List[Dict[str, Any]]: | |
| output_dir: Optional[str] = None | |
| if self.output_dir: | |
| output_dir = str(self.output_dir) | |
| return self.parser.parse_pdf(pdf_path, output_dir=output_dir, **kwargs) | |
| def _locate_md(self, pdf_path: Path, kwargs: Dict[str, Any]) -> Optional[Path]: | |
| out_dir = ( | |
| Path(self.output_dir) if self.output_dir else Path(tempfile.gettempdir()) | |
| ) | |
| method = kwargs.get("method", "auto") | |
| backend = kwargs.get("backend", "") | |
| if backend.startswith("vlm-"): | |
| method = "vlm" | |
| candidate = Path( | |
| os.path.join(out_dir, pdf_path.stem, method, f"{pdf_path.stem}.md") | |
| ) | |
| if candidate.exists(): | |
| return candidate | |
| candidate = Path(os.path.join(out_dir, f"{pdf_path.stem}.md")) | |
| if candidate.exists(): | |
| return candidate | |
| return None | |
| class MinerUParser: | |
| def __init__(self) -> None: | |
| self._check_bin() | |
| def parse_pdf( | |
| pdf_path: Union[str, Path], | |
| output_dir: Optional[Union[str, Path]] = None, | |
| method: str = "auto", | |
| device: str = "cpu", | |
| **kw: Any, | |
| ) -> List[Dict[str, Any]]: | |
| pdf = Path(pdf_path).expanduser().resolve() | |
| if not pdf.is_file(): | |
| raise FileNotFoundError(pdf) | |
| out = ( | |
| Path(output_dir) if output_dir else Path(tempfile.mkdtemp(prefix="mineru_")) | |
| ) | |
| out.mkdir(parents=True, exist_ok=True) | |
| cached = MinerUParser._try_load_cached_result(str(out), pdf.stem, method) | |
| if cached is not None: | |
| return cached | |
| MinerUParser._run_mineru(pdf, out, method, device, **kw) | |
| cached = MinerUParser._try_load_cached_result(str(out), pdf.stem, method) | |
| return cached if cached is not None else [] | |
| def _try_load_cached_result( | |
| out_dir: str, pdf_stem: str, method: str | |
| ) -> Optional[List[Dict[str, Any]]]: | |
| """ | |
| try to load cached json result from MinerU output. | |
| :param out_dir: | |
| :param pdf_stem: | |
| :param method: | |
| :return: | |
| """ | |
| json_file = os.path.join( | |
| out_dir, pdf_stem, method, f"{pdf_stem}_content_list.json" | |
| ) | |
| if not os.path.exists(json_file): | |
| return None | |
| try: | |
| with open(json_file, encoding="utf-8") as f: | |
| data = json.load(f) | |
| except Exception as exc: # pylint: disable=broad-except | |
| logger.warning("Failed to load cached MinerU result: %s", exc) | |
| return None | |
| base = os.path.dirname(json_file) | |
| results = [] | |
| for it in data: | |
| for key in ("img_path", "table_img_path", "equation_img_path"): | |
| rel_path = it.get(key) | |
| if rel_path: | |
| it[key] = str(Path(base).joinpath(rel_path).resolve()) | |
| if it["type"] == "text": | |
| it["content"] = it["text"] | |
| del it["text"] | |
| for key in ("page_idx", "bbox", "text_level"): | |
| if it.get(key) is not None: | |
| del it[key] | |
| results.append(it) | |
| return results | |
| def _run_mineru( | |
| pdf: Path, | |
| out: Path, | |
| method: str, | |
| device: str, | |
| **kw: Any, | |
| ) -> None: | |
| cmd = [ | |
| "mineru", | |
| "-p", | |
| str(pdf), | |
| "-o", | |
| str(out), | |
| "-m", | |
| method, | |
| "-d", | |
| device, | |
| ] | |
| for k, v in kw.items(): | |
| if v is None: | |
| continue | |
| if isinstance(v, bool): | |
| cmd += [f"--{k}", str(v).lower()] | |
| else: | |
| cmd += [f"--{k}", str(v)] | |
| logger.info("Parsing PDF with MinerU: %s", pdf) | |
| logger.debug("Running MinerU command: %s", " ".join(cmd)) | |
| proc = subprocess.run( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| encoding="utf-8", | |
| errors="ignore", | |
| check=False, | |
| ) | |
| if proc.returncode != 0: | |
| raise RuntimeError(f"MinerU failed: {proc.stderr or proc.stdout}") | |
| def _check_bin() -> None: | |
| try: | |
| subprocess.run( | |
| ["mineru", "--version"], | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| check=True, | |
| ) | |
| except (subprocess.CalledProcessError, FileNotFoundError) as exc: | |
| raise RuntimeError( | |
| "MinerU is not installed or not found in PATH. Please install it from pip: \n" | |
| "pip install -U 'mineru[core]'" | |
| ) from exc | |