GraphGen / graphgen /models /partitioner /leiden_partitioner.py
github-actions[bot]
Auto-sync from demo at Tue Nov 25 11:19:13 UTC 2025
9e67c3b
from collections import defaultdict
from typing import Any, Dict, List, Set, Tuple
import igraph as ig
from leidenalg import ModularityVertexPartition, find_partition
from graphgen.bases import BaseGraphStorage, BasePartitioner
from graphgen.bases.datatypes import Community
class LeidenPartitioner(BasePartitioner):
"""
Leiden partitioner that partitions the graph into communities using the Leiden algorithm.
"""
async def partition(
self,
g: BaseGraphStorage,
max_size: int = 20,
use_lcc: bool = False,
random_seed: int = 42,
**kwargs: Any,
) -> List[Community]:
"""
Leiden Partition follows these steps:
1. export the graph from graph storage
2. use the leiden algorithm to detect communities, get {node: community_id}
3. split large communities if max_size is given
4. convert {node: community_id} to List[Community]
:param g
:param max_size: maximum size of each community, if None or <=0, no limit
:param use_lcc: whether to use the largest connected component only
:param random_seed
:param kwargs: other parameters for the leiden algorithm
:return:
"""
nodes = g.get_all_nodes() # List[Tuple[str, dict]]
edges = g.get_all_edges() # List[Tuple[str, str, dict]]
node2cid: Dict[str, int] = await self._run_leiden(
nodes, edges, use_lcc, random_seed
)
if max_size is not None and max_size > 0:
node2cid = await self._split_communities(node2cid, max_size)
cid2nodes: Dict[int, List[str]] = defaultdict(list)
for n, cid in node2cid.items():
cid2nodes[cid].append(n)
communities: List[Community] = []
for cid, nodes in cid2nodes.items():
node_set: Set[str] = set(nodes)
comm_edges: List[Tuple[str, str]] = [
(u, v) for u, v, _ in edges if u in node_set and v in node_set
]
communities.append(Community(id=cid, nodes=nodes, edges=comm_edges))
return communities
@staticmethod
async def _run_leiden(
nodes: List[Tuple[str, dict]],
edges: List[Tuple[str, str, dict]],
use_lcc: bool = False,
random_seed: int = 42,
) -> Dict[str, int]:
# build igraph
ig_graph = ig.Graph.TupleList(((u, v) for u, v, _ in edges), directed=False)
# remove isolated nodes
ig_graph.delete_vertices(ig_graph.vs.select(_degree_eq=0))
node2cid: Dict[str, int] = {}
if use_lcc:
lcc = ig_graph.components().giant()
partition = find_partition(lcc, ModularityVertexPartition, seed=random_seed)
for part_id, cluster in enumerate(partition):
for v in cluster:
node2cid[lcc.vs[v]["name"]] = part_id
else:
offset = 0
for component in ig_graph.components():
subgraph = ig_graph.induced_subgraph(component)
partition = find_partition(
subgraph, ModularityVertexPartition, seed=random_seed
)
for part_id, cluster in enumerate(partition):
for v in cluster:
original_node = subgraph.vs[v]["name"]
node2cid[original_node] = part_id + offset
offset += len(partition)
return node2cid
@staticmethod
async def _split_communities(
node2cid: Dict[str, int], max_size: int
) -> Dict[str, int]:
"""
Split communities larger than max_size into smaller sub-communities.
"""
cid2nodes: Dict[int, List[str]] = defaultdict(list)
for n, cid in node2cid.items():
cid2nodes[cid].append(n)
new_mapping: Dict[str, int] = {}
new_cid = 0
for nodes in cid2nodes.values():
if len(nodes) <= max_size:
for n in nodes:
new_mapping[n] = new_cid
new_cid += 1
else:
for start in range(0, len(nodes), max_size):
chunk = nodes[start : start + max_size]
for n in chunk:
new_mapping[n] = new_cid
new_cid += 1
return new_mapping