Spaces:
Running
Running
File size: 6,400 Bytes
31086ae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import json
import os
from typing import Iterable
import pandas as pd
from graphgen.bases import BaseGraphStorage, BaseKVStorage, BaseOperator, BaseTokenizer
from graphgen.common import init_storage
from graphgen.models import (
AnchorBFSPartitioner,
BFSPartitioner,
DFSPartitioner,
ECEPartitioner,
LeidenPartitioner,
Tokenizer,
)
from graphgen.utils import logger
class PartitionService(BaseOperator):
def __init__(
self,
working_dir: str = "cache",
graph_backend: str = "kuzu",
kv_backend: str = "rocksdb",
**partition_kwargs,
):
super().__init__(working_dir=working_dir, op_name="partition_service")
self.kg_instance: BaseGraphStorage = init_storage(
backend=graph_backend,
working_dir=working_dir,
namespace="graph",
)
self.chunk_storage: BaseKVStorage = init_storage(
backend=kv_backend,
working_dir=working_dir,
namespace="chunk",
)
tokenizer_model = os.getenv("TOKENIZER_MODEL", "cl100k_base")
self.tokenizer_instance: BaseTokenizer = Tokenizer(model_name=tokenizer_model)
self.partition_kwargs = partition_kwargs
def process(self, batch: pd.DataFrame) -> Iterable[pd.DataFrame]:
# this operator does not consume any batch data
# but for compatibility we keep the interface
_ = batch.to_dict(orient="records")
self.kg_instance.reload()
self.chunk_storage.reload()
yield from self.partition()
def partition(self) -> Iterable[pd.DataFrame]:
method = self.partition_kwargs["method"]
method_params = self.partition_kwargs["method_params"]
if method == "bfs":
logger.info("Partitioning knowledge graph using BFS method.")
partitioner = BFSPartitioner()
elif method == "dfs":
logger.info("Partitioning knowledge graph using DFS method.")
partitioner = DFSPartitioner()
elif method == "ece":
logger.info("Partitioning knowledge graph using ECE method.")
# TODO: before ECE partitioning, we need to:
# 1. 'quiz' and 'judge' to get the comprehension loss if unit_sampling is not random
# 2. pre-tokenize nodes and edges to get the token length
self._pre_tokenize()
partitioner = ECEPartitioner()
elif method == "leiden":
logger.info("Partitioning knowledge graph using Leiden method.")
partitioner = LeidenPartitioner()
elif method == "anchor_bfs":
logger.info("Partitioning knowledge graph using Anchor BFS method.")
partitioner = AnchorBFSPartitioner(
anchor_type=method_params.get("anchor_type"),
anchor_ids=set(method_params.get("anchor_ids", []))
if method_params.get("anchor_ids")
else None,
)
else:
raise ValueError(f"Unsupported partition method: {method}")
communities = partitioner.partition(g=self.kg_instance, **method_params)
for community in communities:
batch = partitioner.community2batch(community, g=self.kg_instance)
batch = self._attach_additional_data_to_node(batch)
yield pd.DataFrame(
{
"nodes": [batch[0]],
"edges": [batch[1]],
}
)
def _pre_tokenize(self) -> None:
"""Pre-tokenize all nodes and edges to add token length information."""
logger.info("Starting pre-tokenization of nodes and edges...")
nodes = self.kg_instance.get_all_nodes()
edges = self.kg_instance.get_all_edges()
# Process nodes
for node_id, node_data in nodes:
if "length" not in node_data:
try:
description = node_data.get("description", "")
tokens = self.tokenizer_instance.encode(description)
node_data["length"] = len(tokens)
self.kg_instance.update_node(node_id, node_data)
except Exception as e:
logger.warning("Failed to tokenize node %s: %s", node_id, e)
node_data["length"] = 0
# Process edges
for u, v, edge_data in edges:
if "length" not in edge_data:
try:
description = edge_data.get("description", "")
tokens = self.tokenizer_instance.encode(description)
edge_data["length"] = len(tokens)
self.kg_instance.update_edge(u, v, edge_data)
except Exception as e:
logger.warning("Failed to tokenize edge %s-%s: %s", u, v, e)
edge_data["length"] = 0
# Persist changes
self.kg_instance.index_done_callback()
logger.info("Pre-tokenization completed.")
def _attach_additional_data_to_node(self, batch: tuple) -> tuple:
"""
Attach additional data from chunk_storage to nodes in the batch.
:param batch: tuple of (nodes_data, edges_data)
:return: updated batch with additional data attached to nodes
"""
nodes_data, edges_data = batch
for node_id, node_data in nodes_data:
entity_type = (node_data.get("entity_type") or "").lower()
if not entity_type:
continue
source_ids = [
sid.strip()
for sid in node_data.get("source_id", "").split("<SEP>")
if sid.strip()
]
# Handle images
if "image" in entity_type:
image_chunks = [
data
for sid in source_ids
if "image" in sid.lower()
and (data := self.chunk_storage.get_by_id(sid))
]
if image_chunks:
# The generator expects a dictionary with an 'img_path' key, not a list of captions.
# We'll use the first image chunk found for this node.
node_data["image_data"] = json.loads(image_chunks[0]["content"])
logger.debug("Attached image data to node %s", node_id)
return nodes_data, edges_data
|