File size: 4,310 Bytes
799ac7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31086ae
799ac7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e67c3b
 
799ac7c
31086ae
799ac7c
 
31086ae
799ac7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31086ae
799ac7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31086ae
799ac7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from collections import defaultdict
from typing import Any, Dict, List, Set, Tuple

import igraph as ig
from leidenalg import ModularityVertexPartition, find_partition

from graphgen.bases import BaseGraphStorage, BasePartitioner
from graphgen.bases.datatypes import Community


class LeidenPartitioner(BasePartitioner):
    """
    Leiden partitioner that partitions the graph into communities using the Leiden algorithm.
    """

    def partition(
        self,
        g: BaseGraphStorage,
        max_size: int = 20,
        use_lcc: bool = False,
        random_seed: int = 42,
        **kwargs: Any,
    ) -> List[Community]:
        """
        Leiden Partition follows these steps:
        1. export the graph from graph storage
        2. use the leiden algorithm to detect communities, get {node: community_id}
        3. split large communities if max_size is given
        4. convert {node: community_id} to List[Community]
        :param g
        :param max_size: maximum size of each community, if None or <=0, no limit
        :param use_lcc: whether to use the largest connected component only
        :param random_seed
        :param kwargs: other parameters for the leiden algorithm
        :return:
        """
        nodes = g.get_all_nodes()  # List[Tuple[str, dict]]
        edges = g.get_all_edges()  # List[Tuple[str, str, dict]]

        node2cid: Dict[str, int] = self._run_leiden(nodes, edges, use_lcc, random_seed)

        if max_size is not None and max_size > 0:
            node2cid = self._split_communities(node2cid, max_size)

        cid2nodes: Dict[int, List[str]] = defaultdict(list)
        for n, cid in node2cid.items():
            cid2nodes[cid].append(n)

        communities: List[Community] = []
        for cid, nodes in cid2nodes.items():
            node_set: Set[str] = set(nodes)
            comm_edges: List[Tuple[str, str]] = [
                (u, v) for u, v, _ in edges if u in node_set and v in node_set
            ]
            communities.append(Community(id=cid, nodes=nodes, edges=comm_edges))
        return communities

    @staticmethod
    def _run_leiden(
        nodes: List[Tuple[str, dict]],
        edges: List[Tuple[str, str, dict]],
        use_lcc: bool = False,
        random_seed: int = 42,
    ) -> Dict[str, int]:
        # build igraph
        ig_graph = ig.Graph.TupleList(((u, v) for u, v, _ in edges), directed=False)

        # remove isolated nodes
        ig_graph.delete_vertices(ig_graph.vs.select(_degree_eq=0))

        node2cid: Dict[str, int] = {}
        if use_lcc:
            lcc = ig_graph.components().giant()
            partition = find_partition(lcc, ModularityVertexPartition, seed=random_seed)
            for part_id, cluster in enumerate(partition):
                for v in cluster:
                    node2cid[lcc.vs[v]["name"]] = part_id
        else:
            offset = 0
            for component in ig_graph.components():
                subgraph = ig_graph.induced_subgraph(component)
                partition = find_partition(
                    subgraph, ModularityVertexPartition, seed=random_seed
                )
                for part_id, cluster in enumerate(partition):
                    for v in cluster:
                        original_node = subgraph.vs[v]["name"]
                        node2cid[original_node] = part_id + offset
                offset += len(partition)
        return node2cid

    @staticmethod
    def _split_communities(node2cid: Dict[str, int], max_size: int) -> Dict[str, int]:
        """
        Split communities larger than max_size into smaller sub-communities.
        """
        cid2nodes: Dict[int, List[str]] = defaultdict(list)
        for n, cid in node2cid.items():
            cid2nodes[cid].append(n)

        new_mapping: Dict[str, int] = {}
        new_cid = 0
        for nodes in cid2nodes.values():
            if len(nodes) <= max_size:
                for n in nodes:
                    new_mapping[n] = new_cid
                new_cid += 1
            else:
                for start in range(0, len(nodes), max_size):
                    chunk = nodes[start : start + max_size]
                    for n in chunk:
                        new_mapping[n] = new_cid
                    new_cid += 1
        return new_mapping