github-actions[bot] commited on
Commit
f29e862
·
1 Parent(s): 9e67c3b

Auto-sync from demo at Wed Nov 26 03:20:24 UTC 2025

Browse files
graphgen/graphgen.py CHANGED
@@ -67,15 +67,16 @@ class GraphGen:
67
  self.graph_storage: NetworkXStorage = NetworkXStorage(
68
  self.working_dir, namespace="graph"
69
  )
70
- self.search_storage: JsonKVStorage = JsonKVStorage(
71
- self.working_dir, namespace="search"
72
- )
73
  self.rephrase_storage: JsonKVStorage = JsonKVStorage(
74
  self.working_dir, namespace="rephrase"
75
  )
76
  self.partition_storage: JsonListStorage = JsonListStorage(
77
  self.working_dir, namespace="partition"
78
  )
 
 
 
 
79
  self.qa_storage: JsonListStorage = JsonListStorage(
80
  os.path.join(self.working_dir, "data", "graphgen", f"{self.unique_id}"),
81
  namespace="qa",
@@ -94,23 +95,24 @@ class GraphGen:
94
  """
95
  read files from input sources
96
  """
97
- data = read_files(**read_config, cache_dir=self.working_dir)
98
- if len(data) == 0:
99
- logger.warning("No data to process")
100
- return
101
 
102
- assert isinstance(data, list) and isinstance(data[0], dict)
 
 
103
 
104
- # TODO: configurable whether to use coreference resolution
 
 
 
105
 
106
- new_docs = {compute_mm_hash(doc, prefix="doc-"): doc for doc in data}
107
- _add_doc_keys = self.full_docs_storage.filter_keys(list(new_docs.keys()))
108
- new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}
109
 
 
 
110
  if len(new_docs) == 0:
111
  logger.warning("All documents are already in the storage")
112
  return
113
-
114
  self.full_docs_storage.upsert(new_docs)
115
  self.full_docs_storage.index_done_callback()
116
 
 
67
  self.graph_storage: NetworkXStorage = NetworkXStorage(
68
  self.working_dir, namespace="graph"
69
  )
 
 
 
70
  self.rephrase_storage: JsonKVStorage = JsonKVStorage(
71
  self.working_dir, namespace="rephrase"
72
  )
73
  self.partition_storage: JsonListStorage = JsonListStorage(
74
  self.working_dir, namespace="partition"
75
  )
76
+ self.search_storage: JsonKVStorage = JsonKVStorage(
77
+ os.path.join(self.working_dir, "data", "graphgen", f"{self.unique_id}"),
78
+ namespace="search",
79
+ )
80
  self.qa_storage: JsonListStorage = JsonListStorage(
81
  os.path.join(self.working_dir, "data", "graphgen", f"{self.unique_id}"),
82
  namespace="qa",
 
95
  """
96
  read files from input sources
97
  """
98
+ doc_stream = read_files(**read_config, cache_dir=self.working_dir)
 
 
 
99
 
100
+ batch = {}
101
+ for doc in doc_stream:
102
+ doc_id = compute_mm_hash(doc, prefix="doc-")
103
 
104
+ batch[doc_id] = doc
105
+ if batch:
106
+ self.full_docs_storage.upsert(batch)
107
+ self.full_docs_storage.index_done_callback()
108
 
109
+ # TODO: configurable whether to use coreference resolution
 
 
110
 
111
+ _add_doc_keys = self.full_docs_storage.filter_keys(list(batch.keys()))
112
+ new_docs = {k: v for k, v in batch.items() if k in _add_doc_keys}
113
  if len(new_docs) == 0:
114
  logger.warning("All documents are already in the storage")
115
  return
 
116
  self.full_docs_storage.upsert(new_docs)
117
  self.full_docs_storage.index_done_callback()
118
 
graphgen/operators/read/parallel_file_scanner.py ADDED
@@ -0,0 +1,231 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ from concurrent.futures import ThreadPoolExecutor, as_completed
4
+ from pathlib import Path
5
+ from typing import Any, Dict, List, Set, Union
6
+
7
+ from diskcache import Cache
8
+
9
+ from graphgen.utils import logger
10
+
11
+
12
+ class ParallelFileScanner:
13
+ def __init__(
14
+ self, cache_dir: str, allowed_suffix, rescan: bool = False, max_workers: int = 4
15
+ ):
16
+ self.cache = Cache(cache_dir)
17
+ self.allowed_suffix = set(allowed_suffix) if allowed_suffix else None
18
+ self.rescan = rescan
19
+ self.max_workers = max_workers
20
+
21
+ def scan(
22
+ self, paths: Union[str, List[str]], recursive: bool = True
23
+ ) -> Dict[str, Any]:
24
+ if isinstance(paths, str):
25
+ paths = [paths]
26
+
27
+ results = {}
28
+ with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
29
+ future_to_path = {}
30
+ for p in paths:
31
+ if os.path.exists(p):
32
+ future = executor.submit(
33
+ self._scan_files, Path(p).resolve(), recursive, set()
34
+ )
35
+ future_to_path[future] = p
36
+ else:
37
+ logger.warning("[READ] Path does not exist: %s", p)
38
+
39
+ for future in as_completed(future_to_path):
40
+ path = future_to_path[future]
41
+ try:
42
+ results[path] = future.result()
43
+ except Exception as e:
44
+ logger.error("[READ] Error scanning path %s: %s", path, e)
45
+ results[path] = {
46
+ "error": str(e),
47
+ "files": [],
48
+ "dirs": [],
49
+ "stats": {},
50
+ }
51
+ return results
52
+
53
+ def _scan_files(
54
+ self, path: Path, recursive: bool, visited: Set[str]
55
+ ) -> Dict[str, Any]:
56
+ path_str = str(path)
57
+
58
+ # Avoid cycles due to symlinks
59
+ if path_str in visited:
60
+ logger.warning("[READ] Skipping already visited path: %s", path_str)
61
+ return self._empty_result(path_str)
62
+
63
+ # cache check
64
+ cache_key = f"scan::{path_str}::recursive::{recursive}"
65
+ cached = self.cache.get(cache_key)
66
+ if cached and not self.rescan:
67
+ logger.info("[READ] Using cached scan result for path: %s", path_str)
68
+ return cached["data"]
69
+
70
+ logger.info("[READ] Scanning path: %s", path_str)
71
+ files, dirs = [], []
72
+ stats = {"total_size": 0, "file_count": 0, "dir_count": 0, "errors": 0}
73
+
74
+ try:
75
+ path_stat = path.stat()
76
+ if path.is_file():
77
+ return self._scan_single_file(path, path_str, path_stat)
78
+ if path.is_dir():
79
+ with os.scandir(path_str) as entries:
80
+ for entry in entries:
81
+ try:
82
+ entry_stat = entry.stat(follow_symlinks=False)
83
+
84
+ if entry.is_dir():
85
+ dirs.append(
86
+ {
87
+ "path": entry.path,
88
+ "name": entry.name,
89
+ "mtime": entry_stat.st_mtime,
90
+ }
91
+ )
92
+ stats["dir_count"] += 1
93
+ else:
94
+ # allowed suffix filter
95
+ if not self._is_allowed_file(Path(entry.path)):
96
+ continue
97
+ files.append(
98
+ {
99
+ "path": entry.path,
100
+ "name": entry.name,
101
+ "size": entry_stat.st_size,
102
+ "mtime": entry_stat.st_mtime,
103
+ }
104
+ )
105
+ stats["total_size"] += entry_stat.st_size
106
+ stats["file_count"] += 1
107
+
108
+ except OSError:
109
+ stats["errors"] += 1
110
+
111
+ except (PermissionError, FileNotFoundError, OSError) as e:
112
+ logger.error("[READ] Failed to scan path %s: %s", path_str, e)
113
+ return {"error": str(e), "files": [], "dirs": [], "stats": stats}
114
+
115
+ if recursive:
116
+ sub_visited = visited | {path_str}
117
+ sub_results = self._scan_subdirs(dirs, sub_visited)
118
+
119
+ for sub_data in sub_results.values():
120
+ files.extend(sub_data.get("files", []))
121
+ stats["total_size"] += sub_data["stats"].get("total_size", 0)
122
+ stats["file_count"] += sub_data["stats"].get("file_count", 0)
123
+
124
+ result = {"path": path_str, "files": files, "dirs": dirs, "stats": stats}
125
+ self._cache_result(cache_key, result, path)
126
+ return result
127
+
128
+ def _scan_single_file(
129
+ self, path: Path, path_str: str, stat: os.stat_result
130
+ ) -> Dict[str, Any]:
131
+ """Scan a single file and return its metadata"""
132
+ if not self._is_allowed_file(path):
133
+ return self._empty_result(path_str)
134
+
135
+ return {
136
+ "path": path_str,
137
+ "files": [
138
+ {
139
+ "path": path_str,
140
+ "name": path.name,
141
+ "size": stat.st_size,
142
+ "mtime": stat.st_mtime,
143
+ }
144
+ ],
145
+ "dirs": [],
146
+ "stats": {
147
+ "total_size": stat.st_size,
148
+ "file_count": 1,
149
+ "dir_count": 0,
150
+ "errors": 0,
151
+ },
152
+ }
153
+
154
+ def _scan_subdirs(self, dir_list: List[Dict], visited: Set[str]) -> Dict[str, Any]:
155
+ """
156
+ Parallel scan subdirectories
157
+ :param dir_list
158
+ :param visited
159
+ :return:
160
+ """
161
+ results = {}
162
+ with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
163
+ futures = {
164
+ executor.submit(self._scan_files, Path(d["path"]), True, visited): d[
165
+ "path"
166
+ ]
167
+ for d in dir_list
168
+ }
169
+
170
+ for future in as_completed(futures):
171
+ path = futures[future]
172
+ try:
173
+ results[path] = future.result()
174
+ except Exception as e:
175
+ logger.error("[READ] Error scanning subdirectory %s: %s", path, e)
176
+ results[path] = {
177
+ "error": str(e),
178
+ "files": [],
179
+ "dirs": [],
180
+ "stats": {},
181
+ }
182
+
183
+ return results
184
+
185
+ def _cache_result(self, key: str, result: Dict, path: Path):
186
+ """Cache the scan result"""
187
+ try:
188
+ self.cache.set(
189
+ key,
190
+ {
191
+ "data": result,
192
+ "dir_mtime": path.stat().st_mtime,
193
+ "cached_at": time.time(),
194
+ },
195
+ )
196
+ logger.info("[READ] Cached scan result for path: %s", path)
197
+ except OSError as e:
198
+ logger.error("[READ] Failed to cache scan result for path %s: %s", path, e)
199
+
200
+ def _is_allowed_file(self, path: Path) -> bool:
201
+ """Check if the file has an allowed suffix"""
202
+ if self.allowed_suffix is None:
203
+ return True
204
+ suffix = path.suffix.lower().lstrip(".")
205
+ return suffix in self.allowed_suffix
206
+
207
+ def invalidate(self, path: str):
208
+ """Invalidate cache for a specific path"""
209
+ path = Path(path).resolve()
210
+ keys = [k for k in self.cache if k.startswith(f"scan::{path}")]
211
+ for k in keys:
212
+ self.cache.delete(k)
213
+ logger.info("[READ] Invalidated cache for path: %s", path)
214
+
215
+ def close(self):
216
+ self.cache.close()
217
+
218
+ def __enter__(self):
219
+ return self
220
+
221
+ def __exit__(self, *args):
222
+ self.close()
223
+
224
+ @staticmethod
225
+ def _empty_result(path: str) -> Dict[str, Any]:
226
+ return {
227
+ "path": path,
228
+ "files": [],
229
+ "dirs": [],
230
+ "stats": {"total_size": 0, "file_count": 0, "dir_count": 0, "errors": 0},
231
+ }
graphgen/operators/read/read_files.py CHANGED
@@ -1,5 +1,5 @@
1
  from pathlib import Path
2
- from typing import Any, Dict, List, Optional
3
 
4
  from graphgen.models import (
5
  CSVReader,
@@ -13,6 +13,8 @@ from graphgen.models import (
13
  )
14
  from graphgen.utils import logger
15
 
 
 
16
  _MAPPING = {
17
  "jsonl": JSONLReader,
18
  "json": JSONReader,
@@ -39,7 +41,20 @@ def read_files(
39
  input_file: str,
40
  allowed_suffix: Optional[List[str]] = None,
41
  cache_dir: Optional[str] = None,
42
- ) -> list[dict]:
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  path = Path(input_file).expanduser()
44
  if not path.exists():
45
  raise FileNotFoundError(f"input_path not found: {input_file}")
@@ -49,24 +64,22 @@ def read_files(
49
  else:
50
  support_suffix = {s.lower().lstrip(".") for s in allowed_suffix}
51
 
52
- # single file
53
- if path.is_file():
54
- suffix = path.suffix.lstrip(".").lower()
55
- if suffix not in support_suffix:
56
- logger.warning(
57
- "Skip file %s (suffix '%s' not in allowed_suffix %s)",
58
- path,
59
- suffix,
60
- support_suffix,
61
- )
62
- return []
63
- reader = _build_reader(suffix, cache_dir)
64
- return reader.read(str(path))
65
-
66
- # folder
67
- files_to_read = [
68
- p for p in path.rglob("*") if p.suffix.lstrip(".").lower() in support_suffix
69
- ]
70
  logger.info(
71
  "Found %d eligible file(s) under folder %s (allowed_suffix=%s)",
72
  len(files_to_read),
@@ -74,13 +87,13 @@ def read_files(
74
  support_suffix,
75
  )
76
 
77
- all_docs: List[Dict[str, Any]] = []
78
- for p in files_to_read:
79
  try:
80
- suffix = p.suffix.lstrip(".").lower()
 
81
  reader = _build_reader(suffix, cache_dir)
82
- all_docs.extend(reader.read(str(p)))
83
- except Exception as e: # pylint: disable=broad-except
84
- logger.exception("Error reading %s: %s", p, e)
85
 
86
- return all_docs
 
 
 
 
1
  from pathlib import Path
2
+ from typing import Any, Dict, Iterator, List, Optional
3
 
4
  from graphgen.models import (
5
  CSVReader,
 
13
  )
14
  from graphgen.utils import logger
15
 
16
+ from .parallel_file_scanner import ParallelFileScanner
17
+
18
  _MAPPING = {
19
  "jsonl": JSONLReader,
20
  "json": JSONReader,
 
41
  input_file: str,
42
  allowed_suffix: Optional[List[str]] = None,
43
  cache_dir: Optional[str] = None,
44
+ max_workers: int = 4,
45
+ rescan: bool = False,
46
+ ) -> Iterator[Dict[str, Any]]:
47
+ """
48
+ Read files from a path using parallel scanning and appropriate readers.
49
+
50
+ Args:
51
+ input_file: Path to a file or directory
52
+ allowed_suffix: List of file suffixes to read. If None, uses all supported types
53
+ cache_dir: Directory for caching PDF extraction and scan results
54
+ max_workers: Number of workers for parallel scanning
55
+ rescan: Whether to force rescan even if cached results exist
56
+ """
57
+
58
  path = Path(input_file).expanduser()
59
  if not path.exists():
60
  raise FileNotFoundError(f"input_path not found: {input_file}")
 
64
  else:
65
  support_suffix = {s.lower().lstrip(".") for s in allowed_suffix}
66
 
67
+ with ParallelFileScanner(
68
+ cache_dir=cache_dir or "cache",
69
+ allowed_suffix=support_suffix,
70
+ rescan=rescan,
71
+ max_workers=max_workers,
72
+ ) as scanner:
73
+ scan_results = scanner.scan(str(path), recursive=True)
74
+
75
+ # Extract files from scan results
76
+ files_to_read = []
77
+ for path_result in scan_results.values():
78
+ if "error" in path_result:
79
+ logger.warning("Error scanning %s: %s", path_result.path, path_result.error)
80
+ continue
81
+ files_to_read.extend(path_result.get("files", []))
82
+
 
 
83
  logger.info(
84
  "Found %d eligible file(s) under folder %s (allowed_suffix=%s)",
85
  len(files_to_read),
 
87
  support_suffix,
88
  )
89
 
90
+ for file_info in files_to_read:
 
91
  try:
92
+ file_path = file_info["path"]
93
+ suffix = Path(file_path).suffix.lstrip(".").lower()
94
  reader = _build_reader(suffix, cache_dir)
 
 
 
95
 
96
+ yield from reader.read(file_path)
97
+
98
+ except Exception as e: # pylint: disable=broad-except
99
+ logger.exception("Error reading %s: %s", file_info.get("path"), e)
requirements.txt CHANGED
@@ -20,6 +20,7 @@ requests
20
  fastapi
21
  trafilatura
22
  aiohttp
 
23
 
24
  leidenalg
25
  igraph
 
20
  fastapi
21
  trafilatura
22
  aiohttp
23
+ diskcache
24
 
25
  leidenalg
26
  igraph