Spaces:
Running
Running
| from pathlib import Path | |
| from typing import Any, Dict, Iterator, List, Optional | |
| from graphgen.models import ( | |
| CSVReader, | |
| JSONLReader, | |
| JSONReader, | |
| ParquetReader, | |
| PDFReader, | |
| PickleReader, | |
| RDFReader, | |
| TXTReader, | |
| ) | |
| from graphgen.utils import logger | |
| from .parallel_file_scanner import ParallelFileScanner | |
| _MAPPING = { | |
| "jsonl": JSONLReader, | |
| "json": JSONReader, | |
| "txt": TXTReader, | |
| "csv": CSVReader, | |
| "md": TXTReader, | |
| "pdf": PDFReader, | |
| "parquet": ParquetReader, | |
| "pickle": PickleReader, | |
| "rdf": RDFReader, | |
| "owl": RDFReader, | |
| "ttl": RDFReader, | |
| } | |
| def _build_reader(suffix: str, cache_dir: str | None): | |
| suffix = suffix.lower() | |
| if suffix == "pdf" and cache_dir is not None: | |
| return _MAPPING[suffix](output_dir=cache_dir) | |
| return _MAPPING[suffix]() | |
| def read_files( | |
| input_file: str, | |
| allowed_suffix: Optional[List[str]] = None, | |
| cache_dir: Optional[str] = None, | |
| max_workers: int = 4, | |
| rescan: bool = False, | |
| ) -> Iterator[Dict[str, Any]]: | |
| """ | |
| Read files from a path using parallel scanning and appropriate readers. | |
| Args: | |
| input_file: Path to a file or directory | |
| allowed_suffix: List of file suffixes to read. If None, uses all supported types | |
| cache_dir: Directory for caching PDF extraction and scan results | |
| max_workers: Number of workers for parallel scanning | |
| rescan: Whether to force rescan even if cached results exist | |
| """ | |
| path = Path(input_file).expanduser() | |
| if not path.exists(): | |
| raise FileNotFoundError(f"input_path not found: {input_file}") | |
| if allowed_suffix is None: | |
| support_suffix = set(_MAPPING.keys()) | |
| else: | |
| support_suffix = {s.lower().lstrip(".") for s in allowed_suffix} | |
| with ParallelFileScanner( | |
| cache_dir=cache_dir or "cache", | |
| allowed_suffix=support_suffix, | |
| rescan=rescan, | |
| max_workers=max_workers, | |
| ) as scanner: | |
| scan_results = scanner.scan(str(path), recursive=True) | |
| # Extract files from scan results | |
| files_to_read = [] | |
| for path_result in scan_results.values(): | |
| if "error" in path_result: | |
| logger.warning("Error scanning %s: %s", path_result.path, path_result.error) | |
| continue | |
| files_to_read.extend(path_result.get("files", [])) | |
| logger.info( | |
| "Found %d eligible file(s) under folder %s (allowed_suffix=%s)", | |
| len(files_to_read), | |
| input_file, | |
| support_suffix, | |
| ) | |
| for file_info in files_to_read: | |
| try: | |
| file_path = file_info["path"] | |
| suffix = Path(file_path).suffix.lstrip(".").lower() | |
| reader = _build_reader(suffix, cache_dir) | |
| yield from reader.read(file_path) | |
| except Exception as e: # pylint: disable=broad-except | |
| logger.exception("Error reading %s: %s", file_info.get("path"), e) | |