Spaces:
Running
Running
File size: 4,043 Bytes
31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae 90052d3 31086ae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
from pathlib import Path
from typing import Any, Dict, List, Union
import ray
import rdflib
from ray.data import Dataset
from rdflib import Literal
from rdflib.util import guess_format
from graphgen.bases.base_reader import BaseReader
from graphgen.utils import logger
class RDFReader(BaseReader):
"""
Reader for RDF files that extracts triples and represents them as dictionaries.
Uses Ray Data for distributed processing of multiple RDF files.
"""
def __init__(self, *, text_column: str = "content", **kwargs):
"""
Initialize RDFReader.
:param text_column: The column name for text content (default: "content").
"""
super().__init__(**kwargs)
self.text_column = text_column
def read(
self,
input_path: Union[str, List[str]],
) -> Dataset:
"""
Read RDF file(s) using Ray Data.
:param input_path: Path to RDF file or list of RDF files.
:return: Ray Dataset containing extracted documents.
"""
if not ray.is_initialized():
ray.init()
# Ensure input_path is a list to prevent Ray from splitting string into characters
if isinstance(input_path, str):
input_path = [input_path]
# Create dataset from file paths
paths_ds = ray.data.from_items(input_path)
def process_rdf(row: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Process a single RDF file and return list of documents."""
try:
file_path = row["item"]
return self._parse_rdf_file(Path(file_path))
except Exception as e:
logger.error(
"Failed to process RDF file %s: %s", row.get("item", "unknown"), e
)
return []
# Process files in parallel and flatten results
docs_ds = paths_ds.flat_map(process_rdf)
# Filter valid documents
docs_ds = docs_ds.filter(self._should_keep_item)
return docs_ds
def _parse_rdf_file(self, file_path: Path) -> List[Dict[str, Any]]:
"""
Parse a single RDF file and extract documents.
:param file_path: Path to RDF file.
:return: List of document dictionaries.
"""
if not file_path.is_file():
raise FileNotFoundError(f"RDF file not found: {file_path}")
g = rdflib.Graph()
fmt = guess_format(str(file_path))
try:
g.parse(str(file_path), format=fmt)
except Exception as e:
raise ValueError(f"Cannot parse RDF file {file_path}: {e}") from e
docs: List[Dict[str, Any]] = []
# Process each unique subject in the RDF graph
for subj in set(g.subjects()):
literals = []
props = {}
# Extract all triples for this subject
for _, pred, obj in g.triples((subj, None, None)):
pred_str = str(pred)
obj_str = str(obj)
# Collect literal values as text content
if isinstance(obj, Literal):
literals.append(obj_str)
# Store all properties (including non-literals)
props.setdefault(pred_str, []).append(obj_str)
# Join all literal values as the text content
text = " ".join(literals).strip()
if not text:
logger.warning(
"Subject %s in %s has no literal values; document will have empty '%s' field.",
subj,
file_path,
self.text_column,
)
# Create document dictionary
doc = {
"id": str(subj),
self.text_column: text,
"properties": props,
"source_file": str(file_path),
}
docs.append(doc)
if not docs:
logger.warning("RDF file %s contains no valid documents.", file_path)
return docs
|