File size: 914 Bytes
31086ae
7735526
31086ae
 
7735526
 
 
 
 
 
 
52419fe
 
 
7735526
 
31086ae
 
 
7735526
31086ae
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from typing import List, Union

import ray
from ray.data import Dataset

from graphgen.bases.base_reader import BaseReader


class ParquetReader(BaseReader):
    """
    Read parquet files, requiring the schema to be restored to List[Dict[str, Any]].
    Columns:
    - type: The type of the document (e.g., "text", "image", etc.)
    - if type is "text", "content" column must be present.
    """

    def read(self, input_path: Union[str, List[str]]) -> Dataset:
        """
        Read Parquet files using Ray Data.

        :param input_path: Path to Parquet file or list of Parquet files.
        :return: Ray Dataset containing validated documents.
        """
        if not ray.is_initialized():
            ray.init()

        ds = ray.data.read_parquet(input_path)
        ds = ds.map_batches(self._validate_batch, batch_format="pandas")
        ds = ds.filter(self._should_keep_item)
        return ds