GraphGen / graphgen /models /reader /parquet_reader.py
github-actions[bot]
Auto-sync from demo at Tue Dec 16 08:21:05 UTC 2025
31086ae
from typing import List, Union
import ray
from ray.data import Dataset
from graphgen.bases.base_reader import BaseReader
class ParquetReader(BaseReader):
"""
Read parquet files, requiring the schema to be restored to List[Dict[str, Any]].
Columns:
- type: The type of the document (e.g., "text", "image", etc.)
- if type is "text", "content" column must be present.
"""
def read(self, input_path: Union[str, List[str]]) -> Dataset:
"""
Read Parquet files using Ray Data.
:param input_path: Path to Parquet file or list of Parquet files.
:return: Ray Dataset containing validated documents.
"""
if not ray.is_initialized():
ray.init()
ds = ray.data.read_parquet(input_path)
ds = ds.map_batches(self._validate_batch, batch_format="pandas")
ds = ds.filter(self._should_keep_item)
return ds