File size: 2,907 Bytes
3f86ed0
f29e862
3f86ed0
60a0b24
 
 
 
 
 
 
 
 
 
3f86ed0
3a3b216
f29e862
 
3a3b216
e4316f1
 
 
 
d289bff
e4316f1
60a0b24
 
 
 
 
3a3b216
 
 
3f86ed0
 
 
 
 
 
 
6acd2be
 
 
 
f29e862
 
 
 
 
 
 
 
 
 
 
 
 
 
6acd2be
3f86ed0
6acd2be
3f86ed0
6acd2be
 
 
 
 
f29e862
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6acd2be
 
 
 
 
 
3f86ed0
f29e862
3f86ed0
f29e862
 
3f86ed0
 
f29e862
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional

from graphgen.models import (
    CSVReader,
    JSONLReader,
    JSONReader,
    ParquetReader,
    PDFReader,
    PickleReader,
    RDFReader,
    TXTReader,
)
from graphgen.utils import logger

from .parallel_file_scanner import ParallelFileScanner

_MAPPING = {
    "jsonl": JSONLReader,
    "json": JSONReader,
    "txt": TXTReader,
    "csv": CSVReader,
    "md": TXTReader,
    "pdf": PDFReader,
    "parquet": ParquetReader,
    "pickle": PickleReader,
    "rdf": RDFReader,
    "owl": RDFReader,
    "ttl": RDFReader,
}


def _build_reader(suffix: str, cache_dir: str | None):
    suffix = suffix.lower()
    if suffix == "pdf" and cache_dir is not None:
        return _MAPPING[suffix](output_dir=cache_dir)
    return _MAPPING[suffix]()


def read_files(
    input_file: str,
    allowed_suffix: Optional[List[str]] = None,
    cache_dir: Optional[str] = None,
    max_workers: int = 4,
    rescan: bool = False,
) -> Iterator[Dict[str, Any]]:
    """
    Read files from a path using parallel scanning and appropriate readers.

    Args:
        input_file: Path to a file or directory
        allowed_suffix: List of file suffixes to read. If None, uses all supported types
        cache_dir: Directory for caching PDF extraction and scan results
        max_workers: Number of workers for parallel scanning
        rescan: Whether to force rescan even if cached results exist
    """

    path = Path(input_file).expanduser()
    if not path.exists():
        raise FileNotFoundError(f"input_path not found: {input_file}")

    if allowed_suffix is None:
        support_suffix = set(_MAPPING.keys())
    else:
        support_suffix = {s.lower().lstrip(".") for s in allowed_suffix}

    with ParallelFileScanner(
        cache_dir=cache_dir or "cache",
        allowed_suffix=support_suffix,
        rescan=rescan,
        max_workers=max_workers,
    ) as scanner:
        scan_results = scanner.scan(str(path), recursive=True)

    # Extract files from scan results
    files_to_read = []
    for path_result in scan_results.values():
        if "error" in path_result:
            logger.warning("Error scanning %s: %s", path_result.path, path_result.error)
            continue
        files_to_read.extend(path_result.get("files", []))

    logger.info(
        "Found %d eligible file(s) under folder %s (allowed_suffix=%s)",
        len(files_to_read),
        input_file,
        support_suffix,
    )

    for file_info in files_to_read:
        try:
            file_path = file_info["path"]
            suffix = Path(file_path).suffix.lstrip(".").lower()
            reader = _build_reader(suffix, cache_dir)

            yield from reader.read(file_path)

        except Exception as e:  # pylint: disable=broad-except
            logger.exception("Error reading %s: %s", file_info.get("path"), e)