Spaces:
Running
Running
| import os | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Set, Union | |
| from graphgen.models import RocksDBCache | |
| class ParallelFileScanner: | |
| def __init__( | |
| self, cache_dir: str, allowed_suffix, rescan: bool = False, max_workers: int = 4 | |
| ): | |
| self.cache = RocksDBCache(os.path.join(cache_dir, "input_paths.db")) | |
| self.allowed_suffix = set(allowed_suffix) if allowed_suffix else None | |
| self.rescan = rescan | |
| self.max_workers = max_workers | |
| def scan( | |
| self, paths: Union[str, List[str]], recursive: bool = True | |
| ) -> Dict[str, Any]: | |
| if isinstance(paths, str): | |
| paths = [paths] | |
| results = {} | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| future_to_path = {} | |
| for p in paths: | |
| if os.path.exists(p): | |
| future = executor.submit( | |
| self._scan_files, Path(p).resolve(), recursive, set() | |
| ) | |
| future_to_path[future] = p | |
| for future in as_completed(future_to_path): | |
| path = future_to_path[future] | |
| try: | |
| results[path] = future.result() | |
| except Exception as e: | |
| results[path] = { | |
| "error": str(e), | |
| "files": [], | |
| "dirs": [], | |
| "stats": {}, | |
| } | |
| return results | |
| def _scan_files( | |
| self, path: Path, recursive: bool, visited: Set[str] | |
| ) -> Dict[str, Any]: | |
| path_str = str(path) | |
| # Avoid cycles due to symlinks | |
| if path_str in visited: | |
| return self._empty_result(path_str) | |
| # cache check | |
| cache_key = f"scan::{path_str}::recursive::{recursive}" | |
| cached = self.cache.get(cache_key) | |
| if cached and not self.rescan: | |
| return cached["data"] | |
| files, dirs = [], [] | |
| stats = {"total_size": 0, "file_count": 0, "dir_count": 0, "errors": 0} | |
| try: | |
| path_stat = path.stat() | |
| if path.is_file(): | |
| return self._scan_single_file(path, path_str, path_stat) | |
| if path.is_dir(): | |
| with os.scandir(path_str) as entries: | |
| for entry in entries: | |
| try: | |
| entry_stat = entry.stat(follow_symlinks=False) | |
| if entry.is_dir(): | |
| dirs.append( | |
| { | |
| "path": entry.path, | |
| "name": entry.name, | |
| "mtime": entry_stat.st_mtime, | |
| } | |
| ) | |
| stats["dir_count"] += 1 | |
| else: | |
| # allowed suffix filter | |
| if not self._is_allowed_file(Path(entry.path)): | |
| continue | |
| files.append( | |
| { | |
| "path": entry.path, | |
| "name": entry.name, | |
| "size": entry_stat.st_size, | |
| "mtime": entry_stat.st_mtime, | |
| } | |
| ) | |
| stats["total_size"] += entry_stat.st_size | |
| stats["file_count"] += 1 | |
| except OSError: | |
| stats["errors"] += 1 | |
| except (PermissionError, FileNotFoundError, OSError) as e: | |
| return {"error": str(e), "files": [], "dirs": [], "stats": stats} | |
| if recursive: | |
| sub_visited = visited | {path_str} | |
| sub_results = self._scan_subdirs(dirs, sub_visited) | |
| for sub_data in sub_results.values(): | |
| files.extend(sub_data.get("files", [])) | |
| stats["total_size"] += sub_data["stats"].get("total_size", 0) | |
| stats["file_count"] += sub_data["stats"].get("file_count", 0) | |
| result = {"path": path_str, "files": files, "dirs": dirs, "stats": stats} | |
| self._cache_result(cache_key, result, path) | |
| return result | |
| def _scan_single_file( | |
| self, path: Path, path_str: str, stat: os.stat_result | |
| ) -> Dict[str, Any]: | |
| """Scan a single file and return its metadata""" | |
| if not self._is_allowed_file(path): | |
| return self._empty_result(path_str) | |
| return { | |
| "path": path_str, | |
| "files": [ | |
| { | |
| "path": path_str, | |
| "name": path.name, | |
| "size": stat.st_size, | |
| "mtime": stat.st_mtime, | |
| } | |
| ], | |
| "dirs": [], | |
| "stats": { | |
| "total_size": stat.st_size, | |
| "file_count": 1, | |
| "dir_count": 0, | |
| "errors": 0, | |
| }, | |
| } | |
| def _scan_subdirs(self, dir_list: List[Dict], visited: Set[str]) -> Dict[str, Any]: | |
| """ | |
| Parallel scan subdirectories | |
| :param dir_list | |
| :param visited | |
| :return: | |
| """ | |
| results = {} | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| futures = { | |
| executor.submit(self._scan_files, Path(d["path"]), True, visited): d[ | |
| "path" | |
| ] | |
| for d in dir_list | |
| } | |
| for future in as_completed(futures): | |
| path = futures[future] | |
| try: | |
| results[path] = future.result() | |
| except Exception as e: | |
| results[path] = { | |
| "error": str(e), | |
| "files": [], | |
| "dirs": [], | |
| "stats": {}, | |
| } | |
| return results | |
| def _cache_result(self, key: str, result: Dict, path: Path): | |
| """Cache the scan result""" | |
| self.cache.set( | |
| key, | |
| { | |
| "data": result, | |
| "dir_mtime": path.stat().st_mtime, | |
| "cached_at": time.time(), | |
| }, | |
| ) | |
| def _is_allowed_file(self, path: Path) -> bool: | |
| """Check if the file has an allowed suffix""" | |
| if self.allowed_suffix is None: | |
| return True | |
| suffix = path.suffix.lower().lstrip(".") | |
| return suffix in self.allowed_suffix | |
| def invalidate(self, path: str): | |
| """Invalidate cache for a specific path""" | |
| path = Path(path).resolve() | |
| keys = [k for k in self.cache if k.startswith(f"scan::{path}")] | |
| for k in keys: | |
| self.cache.delete(k) | |
| def close(self): | |
| self.cache.close() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |
| def _empty_result(path: str) -> Dict[str, Any]: | |
| return { | |
| "path": path, | |
| "files": [], | |
| "dirs": [], | |
| "stats": {"total_size": 0, "file_count": 0, "dir_count": 0, "errors": 0}, | |
| } | |