Spaces:
Running
Running
File size: 2,907 Bytes
3f86ed0 f29e862 3f86ed0 60a0b24 3f86ed0 3a3b216 f29e862 3a3b216 e4316f1 d289bff e4316f1 60a0b24 3a3b216 3f86ed0 6acd2be f29e862 6acd2be 3f86ed0 6acd2be 3f86ed0 6acd2be f29e862 6acd2be 3f86ed0 f29e862 3f86ed0 f29e862 3f86ed0 f29e862 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional
from graphgen.models import (
CSVReader,
JSONLReader,
JSONReader,
ParquetReader,
PDFReader,
PickleReader,
RDFReader,
TXTReader,
)
from graphgen.utils import logger
from .parallel_file_scanner import ParallelFileScanner
_MAPPING = {
"jsonl": JSONLReader,
"json": JSONReader,
"txt": TXTReader,
"csv": CSVReader,
"md": TXTReader,
"pdf": PDFReader,
"parquet": ParquetReader,
"pickle": PickleReader,
"rdf": RDFReader,
"owl": RDFReader,
"ttl": RDFReader,
}
def _build_reader(suffix: str, cache_dir: str | None):
suffix = suffix.lower()
if suffix == "pdf" and cache_dir is not None:
return _MAPPING[suffix](output_dir=cache_dir)
return _MAPPING[suffix]()
def read_files(
input_file: str,
allowed_suffix: Optional[List[str]] = None,
cache_dir: Optional[str] = None,
max_workers: int = 4,
rescan: bool = False,
) -> Iterator[Dict[str, Any]]:
"""
Read files from a path using parallel scanning and appropriate readers.
Args:
input_file: Path to a file or directory
allowed_suffix: List of file suffixes to read. If None, uses all supported types
cache_dir: Directory for caching PDF extraction and scan results
max_workers: Number of workers for parallel scanning
rescan: Whether to force rescan even if cached results exist
"""
path = Path(input_file).expanduser()
if not path.exists():
raise FileNotFoundError(f"input_path not found: {input_file}")
if allowed_suffix is None:
support_suffix = set(_MAPPING.keys())
else:
support_suffix = {s.lower().lstrip(".") for s in allowed_suffix}
with ParallelFileScanner(
cache_dir=cache_dir or "cache",
allowed_suffix=support_suffix,
rescan=rescan,
max_workers=max_workers,
) as scanner:
scan_results = scanner.scan(str(path), recursive=True)
# Extract files from scan results
files_to_read = []
for path_result in scan_results.values():
if "error" in path_result:
logger.warning("Error scanning %s: %s", path_result.path, path_result.error)
continue
files_to_read.extend(path_result.get("files", []))
logger.info(
"Found %d eligible file(s) under folder %s (allowed_suffix=%s)",
len(files_to_read),
input_file,
support_suffix,
)
for file_info in files_to_read:
try:
file_path = file_info["path"]
suffix = Path(file_path).suffix.lstrip(".").lower()
reader = _build_reader(suffix, cache_dir)
yield from reader.read(file_path)
except Exception as e: # pylint: disable=broad-except
logger.exception("Error reading %s: %s", file_info.get("path"), e)
|