File size: 4,043 Bytes
31086ae
 
90052d3
31086ae
90052d3
31086ae
90052d3
 
 
 
31086ae
90052d3
 
 
 
 
31086ae
 
90052d3
 
31086ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90052d3
31086ae
 
90052d3
31086ae
90052d3
 
 
 
 
31086ae
90052d3
 
 
31086ae
 
90052d3
 
31086ae
 
 
90052d3
31086ae
 
 
 
90052d3
31086ae
90052d3
 
31086ae
 
 
 
 
90052d3
 
31086ae
 
 
 
 
 
 
90052d3
 
 
31086ae
90052d3
31086ae
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from pathlib import Path
from typing import Any, Dict, List, Union

import ray
import rdflib
from ray.data import Dataset
from rdflib import Literal
from rdflib.util import guess_format

from graphgen.bases.base_reader import BaseReader
from graphgen.utils import logger


class RDFReader(BaseReader):
    """
    Reader for RDF files that extracts triples and represents them as dictionaries.

    Uses Ray Data for distributed processing of multiple RDF files.
    """

    def __init__(self, *, text_column: str = "content", **kwargs):
        """
        Initialize RDFReader.

        :param text_column: The column name for text content (default: "content").
        """
        super().__init__(**kwargs)
        self.text_column = text_column

    def read(
        self,
        input_path: Union[str, List[str]],
    ) -> Dataset:
        """
        Read RDF file(s) using Ray Data.

        :param input_path: Path to RDF file or list of RDF files.
        :return: Ray Dataset containing extracted documents.
        """
        if not ray.is_initialized():
            ray.init()

        # Ensure input_path is a list to prevent Ray from splitting string into characters
        if isinstance(input_path, str):
            input_path = [input_path]

        # Create dataset from file paths
        paths_ds = ray.data.from_items(input_path)

        def process_rdf(row: Dict[str, Any]) -> List[Dict[str, Any]]:
            """Process a single RDF file and return list of documents."""
            try:
                file_path = row["item"]
                return self._parse_rdf_file(Path(file_path))
            except Exception as e:
                logger.error(
                    "Failed to process RDF file %s: %s", row.get("item", "unknown"), e
                )
                return []

        # Process files in parallel and flatten results
        docs_ds = paths_ds.flat_map(process_rdf)

        # Filter valid documents
        docs_ds = docs_ds.filter(self._should_keep_item)

        return docs_ds

    def _parse_rdf_file(self, file_path: Path) -> List[Dict[str, Any]]:
        """
        Parse a single RDF file and extract documents.

        :param file_path: Path to RDF file.
        :return: List of document dictionaries.
        """
        if not file_path.is_file():
            raise FileNotFoundError(f"RDF file not found: {file_path}")

        g = rdflib.Graph()
        fmt = guess_format(str(file_path))

        try:
            g.parse(str(file_path), format=fmt)
        except Exception as e:
            raise ValueError(f"Cannot parse RDF file {file_path}: {e}") from e

        docs: List[Dict[str, Any]] = []

        # Process each unique subject in the RDF graph
        for subj in set(g.subjects()):
            literals = []
            props = {}

            # Extract all triples for this subject
            for _, pred, obj in g.triples((subj, None, None)):
                pred_str = str(pred)
                obj_str = str(obj)

                # Collect literal values as text content
                if isinstance(obj, Literal):
                    literals.append(obj_str)

                # Store all properties (including non-literals)
                props.setdefault(pred_str, []).append(obj_str)

            # Join all literal values as the text content
            text = " ".join(literals).strip()
            if not text:
                logger.warning(
                    "Subject %s in %s has no literal values; document will have empty '%s' field.",
                    subj,
                    file_path,
                    self.text_column,
                )

            # Create document dictionary
            doc = {
                "id": str(subj),
                self.text_column: text,
                "properties": props,
                "source_file": str(file_path),
            }
            docs.append(doc)

        if not docs:
            logger.warning("RDF file %s contains no valid documents.", file_path)

        return docs