github-actions[bot]
Auto-sync from demo at Tue Dec 16 08:21:05 UTC 2025
31086ae
from pathlib import Path
from typing import Any, Dict, List, Union
import ray
import rdflib
from ray.data import Dataset
from rdflib import Literal
from rdflib.util import guess_format
from graphgen.bases.base_reader import BaseReader
from graphgen.utils import logger
class RDFReader(BaseReader):
"""
Reader for RDF files that extracts triples and represents them as dictionaries.
Uses Ray Data for distributed processing of multiple RDF files.
"""
def __init__(self, *, text_column: str = "content", **kwargs):
"""
Initialize RDFReader.
:param text_column: The column name for text content (default: "content").
"""
super().__init__(**kwargs)
self.text_column = text_column
def read(
self,
input_path: Union[str, List[str]],
) -> Dataset:
"""
Read RDF file(s) using Ray Data.
:param input_path: Path to RDF file or list of RDF files.
:return: Ray Dataset containing extracted documents.
"""
if not ray.is_initialized():
ray.init()
# Ensure input_path is a list to prevent Ray from splitting string into characters
if isinstance(input_path, str):
input_path = [input_path]
# Create dataset from file paths
paths_ds = ray.data.from_items(input_path)
def process_rdf(row: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Process a single RDF file and return list of documents."""
try:
file_path = row["item"]
return self._parse_rdf_file(Path(file_path))
except Exception as e:
logger.error(
"Failed to process RDF file %s: %s", row.get("item", "unknown"), e
)
return []
# Process files in parallel and flatten results
docs_ds = paths_ds.flat_map(process_rdf)
# Filter valid documents
docs_ds = docs_ds.filter(self._should_keep_item)
return docs_ds
def _parse_rdf_file(self, file_path: Path) -> List[Dict[str, Any]]:
"""
Parse a single RDF file and extract documents.
:param file_path: Path to RDF file.
:return: List of document dictionaries.
"""
if not file_path.is_file():
raise FileNotFoundError(f"RDF file not found: {file_path}")
g = rdflib.Graph()
fmt = guess_format(str(file_path))
try:
g.parse(str(file_path), format=fmt)
except Exception as e:
raise ValueError(f"Cannot parse RDF file {file_path}: {e}") from e
docs: List[Dict[str, Any]] = []
# Process each unique subject in the RDF graph
for subj in set(g.subjects()):
literals = []
props = {}
# Extract all triples for this subject
for _, pred, obj in g.triples((subj, None, None)):
pred_str = str(pred)
obj_str = str(obj)
# Collect literal values as text content
if isinstance(obj, Literal):
literals.append(obj_str)
# Store all properties (including non-literals)
props.setdefault(pred_str, []).append(obj_str)
# Join all literal values as the text content
text = " ".join(literals).strip()
if not text:
logger.warning(
"Subject %s in %s has no literal values; document will have empty '%s' field.",
subj,
file_path,
self.text_column,
)
# Create document dictionary
doc = {
"id": str(subj),
self.text_column: text,
"properties": props,
"source_file": str(file_path),
}
docs.append(doc)
if not docs:
logger.warning("RDF file %s contains no valid documents.", file_path)
return docs