cbrkit.sim.graphs

Graph similarity algorithms for structural matching and comparison.

This module provides various algorithms for computing similarity between graph structures represented as cbrkit.model.graph.Graph instances. Each algorithm takes node and edge similarity functions and returns a global graph similarity score.

Algorithms:

  • astar: A* search for optimal graph edit distance. Guarantees optimal results but may be slow for large graphs.
  • vf2: VF2 algorithm for (sub)graph isomorphism. Available in pure Python (vf2), NetworkX (vf2_networkx), and RustWorkX (vf2_rustworkx) variants.
  • greedy: Fast greedy matching that pairs nodes by highest similarity.
  • lap: Linear Assignment Problem solver using the Hungarian algorithm.
  • brute_force: Exhaustive search over all possible node matchings. Only practical for small graphs.
  • dfs: Depth-first search based matching (requires graphs extra).
  • dtw: Dynamic Time Warping for sequential graph alignment (requires timeseries extra).
  • smith_waterman: Smith-Waterman local alignment for sequential graphs (requires timeseries extra).

Initialization Functions:

Types:

Example:
>>> from cbrkit.sim.generic import equality
>>> graph_sim = astar.build(node_sim_func=equality())
 1"""Graph similarity algorithms for structural matching and comparison.
 2
 3This module provides various algorithms for computing similarity between
 4graph structures represented as `cbrkit.model.graph.Graph` instances.
 5Each algorithm takes node and edge similarity functions and returns a
 6global graph similarity score.
 7
 8Algorithms:
 9- `astar`: A* search for optimal graph edit distance.
10  Guarantees optimal results but may be slow for large graphs.
11- `vf2`: VF2 algorithm for (sub)graph isomorphism.
12  Available in pure Python (`vf2`), NetworkX (`vf2_networkx`),
13  and RustWorkX (`vf2_rustworkx`) variants.
14- `greedy`: Fast greedy matching that pairs nodes by highest similarity.
15- `lap`: Linear Assignment Problem solver using the Hungarian algorithm.
16- `brute_force`: Exhaustive search over all possible node matchings.
17  Only practical for small graphs.
18- `dfs`: Depth-first search based matching (requires `graphs` extra).
19- `dtw`: Dynamic Time Warping for sequential graph alignment
20  (requires `timeseries` extra).
21- `smith_waterman`: Smith-Waterman local alignment for sequential graphs
22  (requires `timeseries` extra).
23
24Initialization Functions:
25- `init_empty`: Initializes the search state with no pre-matched nodes.
26- `init_unique_matches`: Initializes with uniquely matchable node pairs.
27
28Types:
29- `GraphSim`: Protocol for graph similarity functions.
30- `ElementMatcher`: Protocol for node/edge matching predicates.
31- `SemanticEdgeSim`: Semantic edge similarity function type.
32- `BaseGraphSimFunc`: Base class for graph similarity functions.
33- `SearchGraphSimFunc`: Base class for search-based graph similarity.
34- `SearchState` / `SearchStateInit`: Search state types.
35
36Example:
37    >>> from cbrkit.sim.generic import equality
38    >>> graph_sim = astar.build(node_sim_func=equality())
39"""
40
41from ...helpers import optional_dependencies
42from . import astar
43from .brute_force import brute_force
44from .common import (
45    BaseGraphSimFunc,
46    ElementMatcher,
47    GraphSim,
48    SearchGraphSimFunc,
49    SearchState,
50    SearchStateInit,
51    SemanticEdgeSim,
52    init_empty,
53    init_unique_matches,
54)
55from .greedy import greedy
56from .lap import lap
57from .vf2 import vf2, vf2_networkx, vf2_rustworkx
58
59with optional_dependencies():
60    from .alignment import dtw
61
62with optional_dependencies():
63    from .alignment import smith_waterman
64
65with optional_dependencies():
66    from .dfs import dfs
67
68__all__ = [
69    "astar",
70    "brute_force",
71    "dfs",
72    "greedy",
73    "lap",
74    "vf2",
75    "vf2_networkx",
76    "vf2_rustworkx",
77    "dtw",
78    "smith_waterman",
79    "init_empty",
80    "init_unique_matches",
81    "GraphSim",
82    "ElementMatcher",
83    "SemanticEdgeSim",
84    "BaseGraphSimFunc",
85    "SearchGraphSimFunc",
86    "SearchState",
87    "SearchStateInit",
88]
15@dataclass(slots=True)
16class brute_force[K, N, E, G](
17    BaseGraphSimFunc[K, N, E, G], SimFunc[Graph[K, N, E, G], GraphSim[K]]
18):
19    """
20    Computes the similarity between two graphs by exhaustively computing all possible mappings
21    and selecting the one with the highest similarity score.
22
23    Args:
24        node_sim_func: A similarity function for graph nodes that receives two node objects.
25        edge_sim_func: A similarity function for graph edges that receives two edge objects.
26
27    Returns:
28        The similarity between the two graphs and the best mapping.
29    """
30
31    def expand(
32        self,
33        x: Graph[K, N, E, G],
34        y: Graph[K, N, E, G],
35        y_nodes: Sequence[K],
36        x_nodes: Sequence[K],
37    ) -> GraphSim[K] | None:
38        """Evaluates a single node mapping candidate and returns its similarity."""
39        node_mapping = frozendict(zip(y_nodes, x_nodes, strict=True))
40
41        # if one node can't be matched to another, skip this permutation
42        for y_key, x_key in node_mapping.items():
43            if not self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value):
44                return None
45
46        edge_mapping = self.induced_edge_mapping(x, y, node_mapping)
47
48        node_pair_sims = self.node_pair_similarities(x, y, list(node_mapping.items()))
49        edge_pair_sims = self.edge_pair_similarities(
50            x, y, node_pair_sims, list(edge_mapping.items())
51        )
52
53        return self.similarity(
54            x,
55            y,
56            frozendict(node_mapping),
57            frozendict(edge_mapping),
58            frozendict(node_pair_sims),
59            frozendict(edge_pair_sims),
60        )
61
62    def __call__(self, x: Graph[K, N, E, G], y: Graph[K, N, E, G]) -> GraphSim[K]:
63        y_node_keys = list(y.nodes.keys())
64        x_node_keys = list(x.nodes.keys())
65        best_sim: GraphSim[K] = GraphSim(
66            0.0, frozendict(), frozendict(), frozendict(), frozendict()
67        )
68
69        # iterate all possible subset sizes of query (up to target size)
70        for k in range(1, min(len(y_node_keys), len(x_node_keys)) + 1):
71            # all subsets of query nodes of size k
72            for y_candidates in itertools.combinations(y_node_keys, k):
73                # all injective mappings from this subset to target nodes
74                for x_candidates in itertools.permutations(x_node_keys, k):
75                    next_sim = self.expand(x, y, y_candidates, x_candidates)
76
77                    if next_sim and (
78                        next_sim.value > best_sim.value
79                        or (
80                            next_sim.value >= best_sim.value
81                            and (
82                                len(next_sim.node_mapping) > len(best_sim.node_mapping)
83                                or (
84                                    len(next_sim.edge_mapping)
85                                    > len(best_sim.edge_mapping)
86                                )
87                            )
88                        )
89                    ):
90                        best_sim = next_sim
91
92        return best_sim

Computes the similarity between two graphs by exhaustively computing all possible mappings and selecting the one with the highest similarity score.

Arguments:
  • node_sim_func: A similarity function for graph nodes that receives two node objects.
  • edge_sim_func: A similarity function for graph edges that receives two edge objects.
Returns:

The similarity between the two graphs and the best mapping.

brute_force( node_sim_func: AnySimFunc[N, Float], edge_sim_func: SemanticEdgeSim[K, N, E] = SemanticEdgeSim(source_weight=1.0, target_weight=1.0, edge_sim_func=None), node_matcher: ElementMatcher[N] = <function default_element_matcher>, edge_matcher: ElementMatcher[E] = <function default_element_matcher>, *, node_del_cost: float = 1.0, node_ins_cost: float = 0.0, edge_del_cost: float = 1.0, edge_ins_cost: float = 0.0)
def expand( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], y_nodes: Sequence[K], x_nodes: Sequence[K]) -> Optional[GraphSim[K]]:
31    def expand(
32        self,
33        x: Graph[K, N, E, G],
34        y: Graph[K, N, E, G],
35        y_nodes: Sequence[K],
36        x_nodes: Sequence[K],
37    ) -> GraphSim[K] | None:
38        """Evaluates a single node mapping candidate and returns its similarity."""
39        node_mapping = frozendict(zip(y_nodes, x_nodes, strict=True))
40
41        # if one node can't be matched to another, skip this permutation
42        for y_key, x_key in node_mapping.items():
43            if not self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value):
44                return None
45
46        edge_mapping = self.induced_edge_mapping(x, y, node_mapping)
47
48        node_pair_sims = self.node_pair_similarities(x, y, list(node_mapping.items()))
49        edge_pair_sims = self.edge_pair_similarities(
50            x, y, node_pair_sims, list(edge_mapping.items())
51        )
52
53        return self.similarity(
54            x,
55            y,
56            frozendict(node_mapping),
57            frozendict(edge_mapping),
58            frozendict(node_pair_sims),
59            frozendict(edge_pair_sims),
60        )

Evaluates a single node mapping candidate and returns its similarity.

 35    @dataclass(slots=True)
 36    class dfs[K, N, E, G](
 37        BaseGraphSimFunc[K, N, E, G], SimFunc[Graph[K, N, E, G], GraphSim[K]]
 38    ):
 39        """Graph similarity using depth-first search over node and edge mappings."""
 40
 41        max_iterations: int = 0
 42        upper_bound: float | None = None
 43        strictly_decreasing: bool = True
 44        timeout: float | None = None
 45        roots_func: RootsFunc[K, N, E, G] | None = None
 46
 47        def __call__(
 48            self,
 49            x: Graph[K, N, E, G],
 50            y: Graph[K, N, E, G],
 51        ) -> GraphSim[K]:
 52            x_nx = to_networkx(x)
 53            y_nx = to_networkx(y)
 54
 55            y_edges_lookup = {
 56                (e.source.key, e.target.key): e.key for e in y.edges.values()
 57            }
 58            x_edges_lookup = {
 59                (e.source.key, e.target.key): e.key for e in x.edges.values()
 60            }
 61
 62            node_pair_sims, edge_pair_sims = self.pair_similarities(x, y)
 63
 64            def node_subst_cost(y: NetworkxNode[K, N], x: NetworkxNode[K, N]) -> float:
 65                """Returns the substitution cost for a node pair as one minus similarity."""
 66                if sim := node_pair_sims.get((y["key"], x["key"])):
 67                    return 1.0 - sim
 68
 69                return float("inf")
 70
 71            def edge_subst_cost(
 72                y: NetworkxEdge[K, N, E], x: NetworkxEdge[K, N, E]
 73            ) -> float:
 74                """Returns the substitution cost for an edge pair as one minus similarity."""
 75                if sim := edge_pair_sims.get((y["key"], x["key"])):
 76                    return 1.0 - sim
 77
 78                return float("inf")
 79
 80            ged_iter = nx.optimize_edit_paths(
 81                y_nx,
 82                x_nx,
 83                node_subst_cost=node_subst_cost,
 84                edge_subst_cost=edge_subst_cost,
 85                node_del_cost=lambda y: self.node_del_cost,
 86                node_ins_cost=lambda x: self.node_ins_cost,
 87                edge_del_cost=lambda y: self.edge_del_cost,
 88                edge_ins_cost=lambda x: self.edge_ins_cost,
 89                upper_bound=self.upper_bound,
 90                strictly_decreasing=self.strictly_decreasing,
 91                timeout=self.timeout,
 92                roots=self.roots_func(x, y) if self.roots_func else None,
 93            )
 94
 95            node_edit_path: list[tuple[K, K]] = []
 96            edge_edit_path: list[tuple[tuple[K, K], tuple[K, K]]] = []
 97
 98            for idx in itertools.count():
 99                if self.max_iterations > 0 and idx >= self.max_iterations:
100                    break
101
102                try:
103                    node_edit_path, edge_edit_path, _ = next(ged_iter)
104                except StopIteration:
105                    break
106
107            node_mapping = frozendict(
108                (y_key, x_key)
109                for y_key, x_key in node_edit_path
110                if x_key is not None and y_key is not None
111            )
112            edge_mapping = frozendict(
113                (
114                    y_edges_lookup[y_key],
115                    x_edges_lookup[x_key],
116                )
117                for y_key, x_key in edge_edit_path
118                if x_key is not None
119                and y_key is not None
120                and x_key in x_edges_lookup
121                and y_key in y_edges_lookup
122            )
123
124            return self.similarity(
125                x,
126                y,
127                node_mapping,  # type: ignore[arg-type]
128                edge_mapping,
129                node_pair_sims,
130                edge_pair_sims,
131            )

Graph similarity using depth-first search over node and edge mappings.

dfs( node_sim_func: AnySimFunc[N, Float], edge_sim_func: SemanticEdgeSim[K, N, E] = SemanticEdgeSim(source_weight=1.0, target_weight=1.0, edge_sim_func=None), node_matcher: ElementMatcher[N] = <function default_element_matcher>, edge_matcher: ElementMatcher[E] = <function default_element_matcher>, max_iterations: int = 0, upper_bound: float | None = None, strictly_decreasing: bool = True, timeout: float | None = None, roots_func: Optional[cbrkit.sim.graphs.dfs.RootsFunc[K, N, E, G]] = None, *, node_del_cost: float = 1.0, node_ins_cost: float = 0.0, edge_del_cost: float = 1.0, edge_ins_cost: float = 0.0)
max_iterations: int
upper_bound: float | None
strictly_decreasing: bool
timeout: float | None
roots_func: Optional[cbrkit.sim.graphs.dfs.RootsFunc[K, N, E, G]]
16@dataclass(slots=True)
17class greedy[K, N, E, G](
18    SearchGraphSimFunc[K, N, E, G], SimFunc[Graph[K, N, E, G], GraphSim[K]]
19):
20    """Performs a Greedy search as described by [Dijkman et al. (2009)](https://doi.org/10.1007/978-3-642-03848-8_5)
21
22    Args:
23        node_sim_func: A function to compute the similarity between two nodes.
24        edge_sim_func: A function to compute the similarity between two edges.
25        node_matcher: A function that returns true if two nodes can be mapped legally.
26        edge_matcher: A function that returns true if two edges can be mapped legally.
27
28    Returns:
29        The similarity between a query and a case graph along with the mapping.
30    """
31
32    def __call__(
33        self,
34        x: Graph[K, N, E, G],
35        y: Graph[K, N, E, G],
36    ) -> GraphSim[K]:
37        node_pair_sims, edge_pair_sims = self.pair_similarities(x, y)
38
39        current_state = self.init_search_state(x, y)
40        current_sim = self.similarity(
41            x,
42            y,
43            current_state.node_mapping,
44            current_state.edge_mapping,
45            node_pair_sims,
46            edge_pair_sims,
47        )
48
49        while not self.finished(current_state):
50            # Iterate over all open pairs and find the best pair
51            next_states: list[SearchState[K]] = []
52
53            for y_key in current_state.open_y_nodes:
54                next_states.extend(self.expand_node(x, y, current_state, y_key))
55
56            for y_key in current_state.open_y_edges:
57                next_states.extend(self.expand_edge(x, y, current_state, y_key))
58
59            new_sims = [
60                self.similarity(
61                    x,
62                    y,
63                    state.node_mapping,
64                    state.edge_mapping,
65                    node_pair_sims,
66                    edge_pair_sims,
67                )
68                for state in next_states
69            ]
70
71            best_sim, best_state = max(
72                zip(new_sims, next_states, strict=True),
73                key=lambda item: item[0].value,
74            )
75
76            # Update the current state and similarity
77            current_state = best_state
78            current_sim = best_sim
79
80        return current_sim

Performs a Greedy search as described by Dijkman et al. (2009)

Arguments:
  • node_sim_func: A function to compute the similarity between two nodes.
  • edge_sim_func: A function to compute the similarity between two edges.
  • node_matcher: A function that returns true if two nodes can be mapped legally.
  • edge_matcher: A function that returns true if two edges can be mapped legally.
Returns:

The similarity between a query and a case graph along with the mapping.

greedy( node_sim_func: AnySimFunc[N, Float], edge_sim_func: SemanticEdgeSim[K, N, E] = SemanticEdgeSim(source_weight=1.0, target_weight=1.0, edge_sim_func=None), node_matcher: ElementMatcher[N] = <function default_element_matcher>, edge_matcher: ElementMatcher[E] = <function default_element_matcher>, init_func: Union[SearchStateInit[K, N, E, G], AnySimFunc[cbrkit.model.graph.Graph[K, N, E, G], GraphSim[K]]] = <factory>, *, node_del_cost: float = 1.0, node_ins_cost: float = 0.0, edge_del_cost: float = 1.0, edge_ins_cost: float = 0.0)
223@dataclass(slots=True)
224class lap[K, N, E, G](
225    lap_base[K, N, E, G],
226    BaseGraphSimFunc[K, N, E, G],
227    SimFunc[Graph[K, N, E, G], GraphSim[K]],
228):
229    """Graph similarity using the Linear Assignment Problem (LAP)."""
230
231    def __call__(
232        self,
233        x: Graph[K, N, E, G],
234        y: Graph[K, N, E, G],
235    ) -> GraphSim[K]:
236        node_pair_sims, edge_pair_sims = self.pair_similarities(x, y)
237        cost_matrix = self.build_cost_matrix(x, y, node_pair_sims, edge_pair_sims)
238        row2y = {r: k for r, k in enumerate(y.nodes.keys())}
239        col2x = {c: k for c, k in enumerate(x.nodes.keys())}
240
241        row_idx, col_idx = linear_sum_assignment(cost_matrix)
242
243        node_mapping = frozendict(
244            (row2y[int(r)], col2x[int(c)])
245            for r, c in zip(row_idx, col_idx, strict=True)
246            # only consider substitutions
247            if cost_matrix[r, c] < np.inf and r in row2y and c in col2x
248        )
249
250        edge_mapping = self.induced_edge_mapping(x, y, node_mapping)
251
252        return self.similarity(
253            x,
254            y,
255            node_mapping,
256            edge_mapping,
257            node_pair_sims,
258            edge_pair_sims,
259        )

Graph similarity using the Linear Assignment Problem (LAP).

lap( node_sim_func: AnySimFunc[N, Float], edge_sim_func: SemanticEdgeSim[K, N, E] = SemanticEdgeSim(source_weight=1.0, target_weight=1.0, edge_sim_func=None), node_matcher: ElementMatcher[N] = <function default_element_matcher>, edge_matcher: ElementMatcher[E] = <function default_element_matcher>, edge_handling: LapEdgeHandling = 'greedy', edge_edit_factor: float = 0.5, print_matrix: bool = False, *, node_del_cost: float = 1.0, node_ins_cost: float = 0.0, edge_del_cost: float = 1.0, edge_ins_cost: float = 0.0)
edge_edit_factor: float
print_matrix: bool
vf2 = <class 'vf2_rustworkx'>
@dataclass(slots=True)
class vf2_networkx(cbrkit.sim.graphs.vf2.VF2Base[K, N, E, G], typing.Generic[K, N, E, G]):
185@dataclass(slots=True)
186class vf2_networkx[K, N, E, G](VF2Base[K, N, E, G]):
187    """Graph similarity using the VF2 algorithm via NetworkX."""
188
189    def node_mappings(
190        self,
191        x: Graph[K, N, E, G],
192        y: Graph[K, N, E, G],
193    ) -> list[frozendict[K, K]]:
194        """Finds subgraph isomorphism node mappings using NetworkX."""
195        if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges):
196            larger_graph = to_networkx(y)
197            smaller_graph = to_networkx(x)
198            node_matcher = reverse_positional(self.node_matcher)
199            edge_matcher = reverse_positional(self.edge_matcher)
200        else:
201            larger_graph = to_networkx(x)
202            smaller_graph = to_networkx(y)
203            node_matcher = self.node_matcher
204            edge_matcher = self.edge_matcher
205
206        # `first` must be the larger graph and `second` the smaller one.
207        graph_matcher = DiGraphMatcher(
208            larger_graph,
209            smaller_graph,
210            node_match=lambda x, y: node_matcher(x["value"], y["value"]),
211            edge_match=lambda x, y: edge_matcher(x["value"], y["value"]),
212        )
213
214        mappings_iter = graph_matcher.subgraph_isomorphisms_iter()
215        node_mappings: list[frozendict[K, K]] = []
216
217        for idx in itertools.count():
218            if self.max_iterations > 0 and idx >= self.max_iterations:
219                break
220
221            try:
222                if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges):
223                    # y -> x (as needed)
224                    node_mappings.append(
225                        frozendict(
226                            (larger_idx, smaller_idx)
227                            for larger_idx, smaller_idx in next(mappings_iter).items()
228                        )
229                    )
230                else:
231                    # x ->  y (needs to be inverted)
232                    node_mappings.append(
233                        frozendict(
234                            (smaller_idx, larger_idx)
235                            for larger_idx, smaller_idx in next(mappings_iter).items()
236                        )
237                    )
238            except StopIteration:
239                break
240
241        return node_mappings

Graph similarity using the VF2 algorithm via NetworkX.

vf2_networkx( node_sim_func: AnySimFunc[N, Float], edge_sim_func: SemanticEdgeSim[K, N, E] = SemanticEdgeSim(source_weight=1.0, target_weight=1.0, edge_sim_func=None), node_matcher: ElementMatcher[N] = <function default_element_matcher>, edge_matcher: ElementMatcher[E] = <function default_element_matcher>, max_iterations: int = 0, maximum_common_subgraph: bool = True, *, node_del_cost: float = 1.0, node_ins_cost: float = 0.0, edge_del_cost: float = 1.0, edge_ins_cost: float = 0.0)
def node_mappings( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G]) -> list[frozendict.frozendict[K, K]]:
189    def node_mappings(
190        self,
191        x: Graph[K, N, E, G],
192        y: Graph[K, N, E, G],
193    ) -> list[frozendict[K, K]]:
194        """Finds subgraph isomorphism node mappings using NetworkX."""
195        if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges):
196            larger_graph = to_networkx(y)
197            smaller_graph = to_networkx(x)
198            node_matcher = reverse_positional(self.node_matcher)
199            edge_matcher = reverse_positional(self.edge_matcher)
200        else:
201            larger_graph = to_networkx(x)
202            smaller_graph = to_networkx(y)
203            node_matcher = self.node_matcher
204            edge_matcher = self.edge_matcher
205
206        # `first` must be the larger graph and `second` the smaller one.
207        graph_matcher = DiGraphMatcher(
208            larger_graph,
209            smaller_graph,
210            node_match=lambda x, y: node_matcher(x["value"], y["value"]),
211            edge_match=lambda x, y: edge_matcher(x["value"], y["value"]),
212        )
213
214        mappings_iter = graph_matcher.subgraph_isomorphisms_iter()
215        node_mappings: list[frozendict[K, K]] = []
216
217        for idx in itertools.count():
218            if self.max_iterations > 0 and idx >= self.max_iterations:
219                break
220
221            try:
222                if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges):
223                    # y -> x (as needed)
224                    node_mappings.append(
225                        frozendict(
226                            (larger_idx, smaller_idx)
227                            for larger_idx, smaller_idx in next(mappings_iter).items()
228                        )
229                    )
230                else:
231                    # x ->  y (needs to be inverted)
232                    node_mappings.append(
233                        frozendict(
234                            (smaller_idx, larger_idx)
235                            for larger_idx, smaller_idx in next(mappings_iter).items()
236                        )
237                    )
238            except StopIteration:
239                break
240
241        return node_mappings

Finds subgraph isomorphism node mappings using NetworkX.

@dataclass(slots=True)
class vf2_rustworkx(cbrkit.sim.graphs.vf2.VF2Base[K, N, E, G], typing.Generic[K, N, E, G]):
110@dataclass(slots=True)
111class vf2_rustworkx[K, N, E, G](VF2Base[K, N, E, G]):
112    """Graph similarity using the VF2 algorithm via rustworkx."""
113
114    id_order: bool = False
115    induced: bool = False
116    call_limit: int | None = None
117
118    def node_mappings(
119        self,
120        x: Graph[K, N, E, G],
121        y: Graph[K, N, E, G],
122    ) -> list[frozendict[K, K]]:
123        """Finds subgraph isomorphism node mappings using rustworkx."""
124        if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges):
125            larger_graph, larger_graph_lookup = to_rustworkx_with_lookup(y)
126            smaller_graph, smaller_graph_lookup = to_rustworkx_with_lookup(x)
127            node_matcher = reverse_positional(self.node_matcher)
128            edge_matcher = reverse_positional(self.edge_matcher)
129        else:
130            larger_graph, larger_graph_lookup = to_rustworkx_with_lookup(x)
131            smaller_graph, smaller_graph_lookup = to_rustworkx_with_lookup(y)
132            node_matcher = self.node_matcher
133            edge_matcher = self.edge_matcher
134
135        # Checks if there is a subgraph of `first` isomorphic to `second`.
136        # Returns an iterator over dictionaries of node indices from `first`
137        # to node indices in `second` representing the mapping found.
138        # As such, `first` must be the larger graph and `second` the smaller one.
139        mappings_iter = rustworkx.vf2_mapping(
140            larger_graph,
141            smaller_graph,
142            node_matcher=node_matcher,
143            edge_matcher=edge_matcher,
144            subgraph=True,
145            id_order=self.id_order,
146            induced=self.induced,
147            call_limit=self.call_limit,
148        )
149
150        node_mappings: list[frozendict[K, K]] = []
151
152        for idx in itertools.count():
153            if self.max_iterations > 0 and idx >= self.max_iterations:
154                break
155
156            try:
157                if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges):
158                    # y -> x (as needed)
159                    node_mappings.append(
160                        frozendict(
161                            (
162                                larger_graph_lookup[larger_idx],
163                                smaller_graph_lookup[smaller_idx],
164                            )
165                            for larger_idx, smaller_idx in next(mappings_iter).items()
166                        )
167                    )
168                else:
169                    # x ->  y (needs to be inverted)
170                    node_mappings.append(
171                        frozendict(
172                            (
173                                smaller_graph_lookup[smaller_idx],
174                                larger_graph_lookup[larger_idx],
175                            )
176                            for larger_idx, smaller_idx in next(mappings_iter).items()
177                        )
178                    )
179            except StopIteration:
180                break
181
182        return node_mappings

Graph similarity using the VF2 algorithm via rustworkx.

vf2_rustworkx( node_sim_func: AnySimFunc[N, Float], edge_sim_func: SemanticEdgeSim[K, N, E] = SemanticEdgeSim(source_weight=1.0, target_weight=1.0, edge_sim_func=None), node_matcher: ElementMatcher[N] = <function default_element_matcher>, edge_matcher: ElementMatcher[E] = <function default_element_matcher>, max_iterations: int = 0, maximum_common_subgraph: bool = True, id_order: bool = False, induced: bool = False, call_limit: int | None = None, *, node_del_cost: float = 1.0, node_ins_cost: float = 0.0, edge_del_cost: float = 1.0, edge_ins_cost: float = 0.0)
id_order: bool
induced: bool
call_limit: int | None
def node_mappings( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G]) -> list[frozendict.frozendict[K, K]]:
118    def node_mappings(
119        self,
120        x: Graph[K, N, E, G],
121        y: Graph[K, N, E, G],
122    ) -> list[frozendict[K, K]]:
123        """Finds subgraph isomorphism node mappings using rustworkx."""
124        if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges):
125            larger_graph, larger_graph_lookup = to_rustworkx_with_lookup(y)
126            smaller_graph, smaller_graph_lookup = to_rustworkx_with_lookup(x)
127            node_matcher = reverse_positional(self.node_matcher)
128            edge_matcher = reverse_positional(self.edge_matcher)
129        else:
130            larger_graph, larger_graph_lookup = to_rustworkx_with_lookup(x)
131            smaller_graph, smaller_graph_lookup = to_rustworkx_with_lookup(y)
132            node_matcher = self.node_matcher
133            edge_matcher = self.edge_matcher
134
135        # Checks if there is a subgraph of `first` isomorphic to `second`.
136        # Returns an iterator over dictionaries of node indices from `first`
137        # to node indices in `second` representing the mapping found.
138        # As such, `first` must be the larger graph and `second` the smaller one.
139        mappings_iter = rustworkx.vf2_mapping(
140            larger_graph,
141            smaller_graph,
142            node_matcher=node_matcher,
143            edge_matcher=edge_matcher,
144            subgraph=True,
145            id_order=self.id_order,
146            induced=self.induced,
147            call_limit=self.call_limit,
148        )
149
150        node_mappings: list[frozendict[K, K]] = []
151
152        for idx in itertools.count():
153            if self.max_iterations > 0 and idx >= self.max_iterations:
154                break
155
156            try:
157                if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges):
158                    # y -> x (as needed)
159                    node_mappings.append(
160                        frozendict(
161                            (
162                                larger_graph_lookup[larger_idx],
163                                smaller_graph_lookup[smaller_idx],
164                            )
165                            for larger_idx, smaller_idx in next(mappings_iter).items()
166                        )
167                    )
168                else:
169                    # x ->  y (needs to be inverted)
170                    node_mappings.append(
171                        frozendict(
172                            (
173                                smaller_graph_lookup[smaller_idx],
174                                larger_graph_lookup[larger_idx],
175                            )
176                            for larger_idx, smaller_idx in next(mappings_iter).items()
177                        )
178                    )
179            except StopIteration:
180                break
181
182        return node_mappings

Finds subgraph isomorphism node mappings using rustworkx.

@dataclass(slots=True, frozen=True)
class dtw(cbrkit.typing.SimFunc[cbrkit.model.graph.Graph[K, N, E, G], cbrkit.sim.graphs.common.GraphSim[K]], typing.Generic[K, N, E, G]):
 99    @dataclass(slots=True, frozen=True)
100    class dtw[K, N, E, G](SimFunc[Graph[K, N, E, G], GraphSim[K]]):
101        """
102        Graph-based Dynamic Time Warping similarity function leveraging sequence alignment.
103
104        Args:
105            node_sim_func: A similarity function for nodes (SimFunc or BatchSimFunc).
106            edge_sim_func: An optional similarity function for edges (SimFunc or BatchSimFunc).
107            normalize: Whether to normalize the similarity score by the alignment length (default: True).
108
109        Examples:
110            >>> from ...model.graph import Node, Edge, Graph, SerializedGraph, from_dict
111            >>> node_sim_func = lambda n1, n2: 1.0 if n1 == n2 else 0.0
112            >>> edge_sim_func = lambda e1, e2: 1.0 if e1.value == e2.value else 0.0
113
114            >>> data_x = {
115            ...     "nodes": {
116            ...         "1": "A",
117            ...         "2": "B",
118            ...     },
119            ...     "edges": {
120            ...         "e1": {"source": "1", "target": "2", "value": "X"},
121            ...     },
122            ...     "value": None
123            ... }
124            >>> data_y = {
125            ...     "nodes": {
126            ...         "1": "A",
127            ...         "2": "C",
128            ...     },
129            ...     "edges": {
130            ...         "e1": {"source": "1", "target": "2", "value": "Y"},
131            ...     },
132            ...     "value": None
133            ... }
134            >>> graph_x = from_dict(data_x)
135            >>> graph_y = from_dict(data_y)
136
137            >>> g_dtw = dtw(node_sim_func, edge_sim_func)
138            >>> result = g_dtw(graph_x, graph_y)
139            >>> print(result)
140            GraphSim(value=0.05555555555555555,
141                node_mapping=frozendict.frozendict({'1': '2', '2': '2'}), edge_mapping=frozendict.frozendict({'e1': 'e1'}),
142                node_similarities=frozendict.frozendict({'1': 0.0, '2': 0.0}), edge_similarities=frozendict.frozendict({'e1': 0.0}))
143        """
144
145        node_sim_func: AnySimFunc[N, Float]
146        edge_sim_func: AnySimFunc[Edge[K, N, E], Float] | None = None
147        normalize: bool = True
148
149        def __call__(self, x: Graph[K, N, E, G], y: Graph[K, N, E, G]) -> GraphSim[K]:
150            sequence_x, edges_x = to_sequence(x)
151            sequence_y, edges_y = to_sequence(y)
152            node_sim_func: BatchSimFunc[Node[K, N], Float] = batchify_sim(
153                transpose_value(self.node_sim_func)
154            )
155            edge_sim_func = (
156                batchify_sim(self.edge_sim_func) if self.edge_sim_func else None
157            )
158
159            # 3) Use DTW for nodes, with alignment
160            node_result = collections_dtw(node_sim_func)(
161                sequence_x, sequence_y, return_alignment=True
162            )
163            alignment = node_result.mapping  # matched (x_node, y_node)
164
165            # 4) Build node/edge mapping & average similarity using the helper
166            similarity_data = _build_node_edge_mappings_and_similarity(
167                alignment, node_sim_func, edges_x, edges_y, edge_sim_func
168            )
169
170            # 5) Combine node & edge similarity
171            total_similarity = (
172                (similarity_data.node_similarity + similarity_data.edge_similarity)
173                / 2.0
174                if similarity_data.edge_similarity is not None
175                else similarity_data.node_similarity
176            )
177
178            # 6) Normalize by alignment length if applicable
179            if self.normalize and alignment:
180                alen = len([pair for pair in alignment if pair[0] and pair[1]])
181                if alen > 0:
182                    total_similarity /= alen
183
184            return GraphSim(
185                value=total_similarity,
186                node_mapping=frozendict(similarity_data.node_mapping),
187                edge_mapping=frozendict(similarity_data.edge_mapping),
188                node_similarities=frozendict(similarity_data.node_similarities),
189                edge_similarities=frozendict(similarity_data.edge_similarities),
190            )

Graph-based Dynamic Time Warping similarity function leveraging sequence alignment.

Arguments:
  • node_sim_func: A similarity function for nodes (SimFunc or BatchSimFunc).
  • edge_sim_func: An optional similarity function for edges (SimFunc or BatchSimFunc).
  • normalize: Whether to normalize the similarity score by the alignment length (default: True).
Examples:
>>> from cbrkit.model.graph import Node, Edge, Graph, SerializedGraph, from_dict
>>> node_sim_func = lambda n1, n2: 1.0 if n1 == n2 else 0.0
>>> edge_sim_func = lambda e1, e2: 1.0 if e1.value == e2.value else 0.0
>>> data_x = {
...     "nodes": {
...         "1": "A",
...         "2": "B",
...     },
...     "edges": {
...         "e1": {"source": "1", "target": "2", "value": "X"},
...     },
...     "value": None
... }
>>> data_y = {
...     "nodes": {
...         "1": "A",
...         "2": "C",
...     },
...     "edges": {
...         "e1": {"source": "1", "target": "2", "value": "Y"},
...     },
...     "value": None
... }
>>> graph_x = from_dict(data_x)
>>> graph_y = from_dict(data_y)
>>> g_dtw = dtw(node_sim_func, edge_sim_func)
>>> result = g_dtw(graph_x, graph_y)
>>> print(result)
GraphSim(value=0.05555555555555555,
    node_mapping=frozendict.frozendict({'1': '2', '2': '2'}), edge_mapping=frozendict.frozendict({'e1': 'e1'}),
    node_similarities=frozendict.frozendict({'1': 0.0, '2': 0.0}), edge_similarities=frozendict.frozendict({'e1': 0.0}))
dtw( node_sim_func: AnySimFunc[N, Float], edge_sim_func: AnySimFunc[cbrkit.model.graph.Edge[K, N, E], Float] | None = None, normalize: bool = True)
node_sim_func: AnySimFunc[N, Float]
edge_sim_func: AnySimFunc[cbrkit.model.graph.Edge[K, N, E], Float] | None
normalize: bool
@dataclass(slots=True)
class smith_waterman(cbrkit.typing.SimFunc[cbrkit.model.graph.Graph[K, N, E, G], cbrkit.sim.graphs.common.GraphSim[K]], typing.Generic[K, N, E, G]):
196    @dataclass(slots=True)
197    class smith_waterman[K, N, E, G](SimFunc[Graph[K, N, E, G], GraphSim[K]]):
198        """
199        Graph-based Smith-Waterman similarity function leveraging local sequence alignment
200        plus ProCAKE-style weighting for local similarities.
201
202        Args:
203            node_sim_func: Base node similarity function (SimFunc or BatchSimFunc).
204            dataflow_in_sim_func: Similarity for incoming data flow (optional).
205            dataflow_out_sim_func: Similarity for outgoing data flow (optional).
206            l_t: Weight for task/node similarity. Default = 1.0
207            l_i: Weight for incoming dataflow similarity. Default = 0.0 (ignored if 0).
208            l_o: Weight for outgoing dataflow similarity. Default = 0.0 (ignored if 0).
209            edge_sim_func: If provided, an edge similarity function (SimFunc or BatchSimFunc).
210            match_score: The SW match score (default 2).
211            mismatch_penalty: The SW mismatch penalty (default -1).
212            gap_penalty: The SW gap penalty (default -1).
213            normalize: Whether to normalize the final similarity (default True).
214
215        Examples:
216            >>> from ...model.graph import Node, Edge, Graph, SerializedGraph, from_dict
217            >>> node_sim_func = lambda n1, n2: 1.0 if n1 == n2 else 0.0
218            >>> edge_sim_func = lambda e1, e2: 1.0 if e1.value == e2.value else 0.0
219            >>> def dataflow_dummy(a, b): return 0.5  # pretend data flow sim
220
221            >>> data_x = {
222            ...     "nodes": {
223            ...         "1": "A",
224            ...         "2": "B",
225            ...     },
226            ...     "edges": {
227            ...         "e1": {"source": "1", "target": "2", "value": "X"},
228            ...     },
229            ...     "value": None
230            ... }
231
232            >>> data_y = {
233            ...     "nodes": {
234            ...         "1": "A",
235            ...         "2": "C",
236            ...     },
237            ...     "edges": {
238            ...         "e1": {"source": "1", "target": "2", "value": "X"},
239            ...     },
240            ...     "value": None
241            ... }
242            >>> graph_x = from_dict(data_x)
243            >>> graph_y = from_dict(data_y)
244
245            >>> g_swa = smith_waterman(
246            ...     node_sim_func,
247            ...     edge_sim_func,
248            ...     dataflow_in_sim_func=dataflow_dummy,
249            ...     dataflow_out_sim_func=dataflow_dummy,
250            ...     l_t=1.0, l_i=0.5, l_o=0.5,
251            ...     use_procake_formula=True
252            ... )
253            >>> result = g_swa(graph_x, graph_y)
254            >>> print(result)
255            GraphSim(value=0.015,
256                node_mapping=frozendict.frozendict({'1': '1', '2': '2'}), edge_mapping=frozendict.frozendict({'e1': 'e1'}),
257                node_similarities=frozendict.frozendict({'1': 0.75, '2': 0.25}), edge_similarities=frozendict.frozendict({'e1': 1.0}))
258
259            >>> # Another example without the ProCAKE weighting:
260            >>> g_swa_naive = smith_waterman(
261            ...     node_sim_func,
262            ...     match_score=2,
263            ...     mismatch_penalty=-1,
264            ...     gap_penalty=-1,
265            ...     normalize=True,
266            ...     use_procake_formula=False
267            ... )
268            >>> result_naive = g_swa_naive(graph_x, graph_y)
269            >>> print(result_naive)
270            GraphSim(value=0.01,
271                node_mapping=frozendict.frozendict({'1': '1', '2': '2'}), edge_mapping=frozendict.frozendict({}),
272                node_similarities=frozendict.frozendict({'1': 1.0, '2': 0.0}), edge_similarities=frozendict.frozendict({}))
273        """
274
275        node_sim_func: InitVar[AnySimFunc[N, Float]]
276        edge_sim_func: InitVar[AnySimFunc[Edge[K, N, E], Float] | None] = None
277        batched_node_sim_func: BatchSimFunc[Node[K, N], Float] = field(init=False)
278        unbatched_node_sim_func: SimFunc[Node[K, N], Float] = field(init=False)
279        batched_edge_sim_func: BatchSimFunc[Edge[K, N, E], Float] | None = field(
280            init=False
281        )
282        dataflow_in_sim_func: SimFunc[Any, float] | None = None
283        dataflow_out_sim_func: SimFunc[Any, float] | None = None
284        l_t: float = 1.0
285        l_i: float = 0.0
286        l_o: float = 0.0
287
288        match_score: int = 2
289        mismatch_penalty: int = -1
290        gap_penalty: int = -1
291        normalize: bool = True
292
293        use_procake_formula: bool = False
294
295        def __post_init__(
296            self,
297            node_sim_func: AnySimFunc[N, Float],
298            edge_sim_func: AnySimFunc[Edge[K, N, E], Float] | None,
299        ) -> None:
300            self.batched_node_sim_func = batchify_sim(transpose_value(node_sim_func))
301            self.unbatched_node_sim_func = unbatchify_sim(
302                transpose_value(node_sim_func)
303            )
304            self.batched_edge_sim_func = (
305                batchify_sim(edge_sim_func) if edge_sim_func else None
306            )
307
308        def local_node_sim(self, q_node: Node[K, N], c_node: Node[K, N]) -> float:
309            """Computes weighted local node similarity using optional dataflow scores."""
310            base = unpack_float(self.unbatched_node_sim_func(q_node, c_node))
311
312            if self.use_procake_formula:
313                in_flow = 0.0
314                if self.l_i != 0.0 and self.dataflow_in_sim_func:
315                    # In real usage, pass appropriate node/edge data
316                    in_flow = self.dataflow_in_sim_func(None, None)
317
318                out_flow = 0.0
319                if self.l_o != 0.0 and self.dataflow_out_sim_func:
320                    out_flow = self.dataflow_out_sim_func(None, None)
321
322                total_weight = self.l_t + self.l_i + self.l_o
323
324                if total_weight == 0:
325                    return 0.0
326
327                return (
328                    self.l_t * base + self.l_i * in_flow + self.l_o * out_flow
329                ) / total_weight
330
331            return base
332
333        def __call__(self, x: Graph[K, N, E, G], y: Graph[K, N, E, G]) -> GraphSim[K]:
334            # 1) Convert graphs to sequences
335            sequence_x, edges_x = to_sequence(x)
336            sequence_y, edges_y = to_sequence(y)
337
338            # 3) We'll rely on the minineedle-based SW for a raw score.
339            sw_obj = collections_smith_waterman(
340                match_score=self.match_score,
341                mismatch_penalty=self.mismatch_penalty,
342                gap_penalty=self.gap_penalty,
343            )
344
345            # Use the length of each sequence as tokens
346            tokens_x = list(range(len(sequence_x)))
347            tokens_y = list(range(len(sequence_y)))
348            raw_sw_score = sw_obj(tokens_x, tokens_y)
349
350            # 4) We do not have a direct alignment from the raw minineedle call,
351            #    so let's do a naive 1:1 for the shorter length.
352            length = min(len(sequence_x), len(sequence_y))
353            alignment = [(sequence_x[i], sequence_y[i]) for i in range(length)]
354
355            # 5) Build node/edge mappings & average similarities using local_node_sim
356            similarity_data = _build_node_edge_mappings_and_similarity(
357                alignment,
358                batchify_sim(self.local_node_sim),
359                edges_x,
360                edges_y,
361                self.batched_edge_sim_func,
362            )
363
364            # 6) Combine node & edge sim, scale by raw SW score (if > 0)
365            if similarity_data.edge_similarity is not None:
366                total_similarity = (
367                    similarity_data.node_similarity + similarity_data.edge_similarity
368                ) / 2.0
369            else:
370                total_similarity = similarity_data.node_similarity
371
372            if raw_sw_score > 0:
373                total_similarity *= raw_sw_score / 100.0
374
375            # 7) Normalize if requested
376            if self.normalize and length > 0:
377                total_similarity /= length
378
379            return GraphSim(
380                value=total_similarity,
381                node_mapping=frozendict(similarity_data.node_mapping),
382                edge_mapping=frozendict(similarity_data.edge_mapping),
383                node_similarities=frozendict(similarity_data.node_similarities),
384                edge_similarities=frozendict(similarity_data.edge_similarities),
385            )

Graph-based Smith-Waterman similarity function leveraging local sequence alignment plus ProCAKE-style weighting for local similarities.

Arguments:
  • node_sim_func: Base node similarity function (SimFunc or BatchSimFunc).
  • dataflow_in_sim_func: Similarity for incoming data flow (optional).
  • dataflow_out_sim_func: Similarity for outgoing data flow (optional).
  • l_t: Weight for task/node similarity. Default = 1.0
  • l_i: Weight for incoming dataflow similarity. Default = 0.0 (ignored if 0).
  • l_o: Weight for outgoing dataflow similarity. Default = 0.0 (ignored if 0).
  • edge_sim_func: If provided, an edge similarity function (SimFunc or BatchSimFunc).
  • match_score: The SW match score (default 2).
  • mismatch_penalty: The SW mismatch penalty (default -1).
  • gap_penalty: The SW gap penalty (default -1).
  • normalize: Whether to normalize the final similarity (default True).
Examples:
>>> from cbrkit.model.graph import Node, Edge, Graph, SerializedGraph, from_dict
>>> node_sim_func = lambda n1, n2: 1.0 if n1 == n2 else 0.0
>>> edge_sim_func = lambda e1, e2: 1.0 if e1.value == e2.value else 0.0
>>> def dataflow_dummy(a, b): return 0.5  # pretend data flow sim
>>> data_x = {
...     "nodes": {
...         "1": "A",
...         "2": "B",
...     },
...     "edges": {
...         "e1": {"source": "1", "target": "2", "value": "X"},
...     },
...     "value": None
... }
>>> data_y = {
...     "nodes": {
...         "1": "A",
...         "2": "C",
...     },
...     "edges": {
...         "e1": {"source": "1", "target": "2", "value": "X"},
...     },
...     "value": None
... }
>>> graph_x = from_dict(data_x)
>>> graph_y = from_dict(data_y)
>>> g_swa = smith_waterman(
...     node_sim_func,
...     edge_sim_func,
...     dataflow_in_sim_func=dataflow_dummy,
...     dataflow_out_sim_func=dataflow_dummy,
...     l_t=1.0, l_i=0.5, l_o=0.5,
...     use_procake_formula=True
... )
>>> result = g_swa(graph_x, graph_y)
>>> print(result)
GraphSim(value=0.015,
    node_mapping=frozendict.frozendict({'1': '1', '2': '2'}), edge_mapping=frozendict.frozendict({'e1': 'e1'}),
    node_similarities=frozendict.frozendict({'1': 0.75, '2': 0.25}), edge_similarities=frozendict.frozendict({'e1': 1.0}))
>>> # Another example without the ProCAKE weighting:
>>> g_swa_naive = smith_waterman(
...     node_sim_func,
...     match_score=2,
...     mismatch_penalty=-1,
...     gap_penalty=-1,
...     normalize=True,
...     use_procake_formula=False
... )
>>> result_naive = g_swa_naive(graph_x, graph_y)
>>> print(result_naive)
GraphSim(value=0.01,
    node_mapping=frozendict.frozendict({'1': '1', '2': '2'}), edge_mapping=frozendict.frozendict({}),
    node_similarities=frozendict.frozendict({'1': 1.0, '2': 0.0}), edge_similarities=frozendict.frozendict({}))
smith_waterman( node_sim_func: dataclasses.InitVar[AnySimFunc[N, Float]], edge_sim_func: dataclasses.InitVar[AnySimFunc[cbrkit.model.graph.Edge[K, N, E], Float] | None] = None, dataflow_in_sim_func: Optional[cbrkit.typing.SimFunc[Any, float]] = None, dataflow_out_sim_func: Optional[cbrkit.typing.SimFunc[Any, float]] = None, l_t: float = 1.0, l_i: float = 0.0, l_o: float = 0.0, match_score: int = 2, mismatch_penalty: int = -1, gap_penalty: int = -1, normalize: bool = True, use_procake_formula: bool = False)
node_sim_func: dataclasses.InitVar[AnySimFunc[N, Float]]
edge_sim_func: dataclasses.InitVar[AnySimFunc[cbrkit.model.graph.Edge[K, N, E], Float] | None] = None
batched_node_sim_func: cbrkit.typing.BatchSimFunc[cbrkit.model.graph.Node[K, N], Float]
unbatched_node_sim_func: cbrkit.typing.SimFunc[cbrkit.model.graph.Node[K, N], Float]
batched_edge_sim_func: Optional[cbrkit.typing.BatchSimFunc[cbrkit.model.graph.Edge[K, N, E], Float]]
dataflow_in_sim_func: Optional[cbrkit.typing.SimFunc[Any, float]]
dataflow_out_sim_func: Optional[cbrkit.typing.SimFunc[Any, float]]
l_t: float
l_i: float
l_o: float
match_score: int
mismatch_penalty: int
gap_penalty: int
normalize: bool
use_procake_formula: bool
def local_node_sim( self, q_node: cbrkit.model.graph.Node[K, N], c_node: cbrkit.model.graph.Node[K, N]) -> float:
308        def local_node_sim(self, q_node: Node[K, N], c_node: Node[K, N]) -> float:
309            """Computes weighted local node similarity using optional dataflow scores."""
310            base = unpack_float(self.unbatched_node_sim_func(q_node, c_node))
311
312            if self.use_procake_formula:
313                in_flow = 0.0
314                if self.l_i != 0.0 and self.dataflow_in_sim_func:
315                    # In real usage, pass appropriate node/edge data
316                    in_flow = self.dataflow_in_sim_func(None, None)
317
318                out_flow = 0.0
319                if self.l_o != 0.0 and self.dataflow_out_sim_func:
320                    out_flow = self.dataflow_out_sim_func(None, None)
321
322                total_weight = self.l_t + self.l_i + self.l_o
323
324                if total_weight == 0:
325                    return 0.0
326
327                return (
328                    self.l_t * base + self.l_i * in_flow + self.l_o * out_flow
329                ) / total_weight
330
331            return base

Computes weighted local node similarity using optional dataflow scores.

@dataclass(slots=True, frozen=True)
class init_empty(cbrkit.sim.graphs.SearchStateInit[K, N, E, G], typing.Generic[K, N, E, G]):
356@dataclass(slots=True, frozen=True)
357class init_empty[K, N, E, G](SearchStateInit[K, N, E, G]):
358    """Initializes search with an empty mapping."""
359
360    def __call__(
361        self,
362        x: Graph[K, N, E, G],
363        y: Graph[K, N, E, G],
364        node_matcher: ElementMatcher[N],
365        edge_matcher: ElementMatcher[E],
366    ) -> SearchState[K]:
367        return SearchState(
368            frozendict(),
369            frozendict(),
370            frozenset(y.nodes.keys()),
371            frozenset(y.edges.keys()),
372            frozenset(x.nodes.keys()),
373            frozenset(x.edges.keys()),
374        )

Initializes search with an empty mapping.

@dataclass(slots=True, init=False)
class init_unique_matches(cbrkit.sim.graphs.SearchStateInit[K, N, E, G], typing.Generic[K, N, E, G]):
377@dataclass(slots=True, init=False)
378class init_unique_matches[K, N, E, G](SearchStateInit[K, N, E, G]):
379    """Initializes search by pre-mapping uniquely matchable nodes."""
380
381    def __call__(
382        self,
383        x: Graph[K, N, E, G],
384        y: Graph[K, N, E, G],
385        node_matcher: ElementMatcher[N],
386        edge_matcher: ElementMatcher[E],
387    ) -> SearchState[K]:
388        # pre-populate the mapping with nodes/edges that only have one possible legal mapping
389        y2x_map: defaultdict[K, set[K]] = defaultdict(set)
390        x2y_map: defaultdict[K, set[K]] = defaultdict(set)
391
392        for y_key, x_key in itertools.product(y.nodes.keys(), x.nodes.keys()):
393            if node_matcher(x.nodes[x_key].value, y.nodes[y_key].value):
394                y2x_map[y_key].add(x_key)
395                x2y_map[x_key].add(y_key)
396
397        node_mapping: frozendict[K, K] = frozendict(
398            (y_key, next(iter(x_keys)))
399            for y_key, x_keys in y2x_map.items()
400            if len(x_keys) == 1 and len(x2y_map[next(iter(x_keys))]) == 1
401        )
402
403        edge_mapping: frozendict[K, K] = _induced_edge_mapping(
404            x, y, node_mapping, edge_matcher
405        )
406
407        return SearchState(
408            node_mapping,
409            edge_mapping,
410            frozenset(y.nodes.keys() - node_mapping.keys()),
411            frozenset(y.edges.keys() - edge_mapping.keys()),
412            frozenset(x.nodes.keys() - node_mapping.values()),
413            frozenset(x.edges.keys() - edge_mapping.values()),
414        )

Initializes search by pre-mapping uniquely matchable nodes.

@dataclass(slots=True, frozen=True)
class GraphSim(cbrkit.typing.StructuredValue[float], typing.Generic[K]):
23@dataclass(slots=True, frozen=True)
24class GraphSim[K](StructuredValue[float]):
25    """Result of a graph similarity computation with node and edge mappings."""
26
27    node_mapping: frozendict[K, K]
28    edge_mapping: frozendict[K, K]
29    node_similarities: frozendict[K, float]
30    edge_similarities: frozendict[K, float]

Result of a graph similarity computation with node and edge mappings.

GraphSim( value: T, node_mapping: frozendict.frozendict[K, K], edge_mapping: frozendict.frozendict[K, K], node_similarities: frozendict.frozendict[K, float], edge_similarities: frozendict.frozendict[K, float])
node_mapping: frozendict.frozendict[K, K]
edge_mapping: frozendict.frozendict[K, K]
node_similarities: frozendict.frozendict[K, float]
edge_similarities: frozendict.frozendict[K, float]
class ElementMatcher(typing.Protocol, typing.Generic[T]):
33class ElementMatcher[T](Protocol):
34    """Determines whether two graph elements can be legally mapped."""
35
36    def __call__(self, x: T, y: T, /) -> bool: ...

Determines whether two graph elements can be legally mapped.

ElementMatcher(*args, **kwargs)
1957def _no_init_or_replace_init(self, *args, **kwargs):
1958    cls = type(self)
1959
1960    if cls._is_protocol:
1961        raise TypeError('Protocols cannot be instantiated')
1962
1963    # Already using a custom `__init__`. No need to calculate correct
1964    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1965    if cls.__init__ is not _no_init_or_replace_init:
1966        return
1967
1968    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1969    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1970    # searches for a proper new `__init__` in the MRO. The new `__init__`
1971    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1972    # instantiation of the protocol subclass will thus use the new
1973    # `__init__` and no longer call `_no_init_or_replace_init`.
1974    for base in cls.__mro__:
1975        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1976        if init is not _no_init_or_replace_init:
1977            cls.__init__ = init
1978            break
1979    else:
1980        # should not happen
1981        cls.__init__ = object.__init__
1982
1983    cls.__init__(self, *args, **kwargs)
@dataclass(slots=True, frozen=True)
class SemanticEdgeSim(typing.Generic[K, N, E]):
44@dataclass(slots=True, frozen=True)
45class SemanticEdgeSim[K, N, E]:
46    """Computes edge similarity using weighted source and target node similarities."""
47
48    source_weight: float = 1.0
49    target_weight: float = 1.0
50    edge_sim_func: AnySimFunc[E, Float] | None = None
51
52    def __call__(
53        self,
54        batches: Sequence[tuple[E, E, float, float]],
55    ) -> list[float]:
56        source_sims = (source_sim for _, _, source_sim, _ in batches)
57        target_sims = (target_sim for _, _, _, target_sim in batches)
58
59        if self.edge_sim_func is not None:
60            edge_sim_func = batchify_sim(self.edge_sim_func)
61            edge_sims = unpack_floats(
62                edge_sim_func(
63                    [(x, y) for x, y, _, _ in batches],
64                )
65            )
66        else:
67            edge_sims = [1.0] * len(batches)
68
69        scaling_factor = self.source_weight + self.target_weight
70
71        if scaling_factor == 0:
72            return edge_sims
73
74        return [
75            (edge * source * self.source_weight / scaling_factor)
76            + (edge * target * self.target_weight / scaling_factor)
77            for source, target, edge in zip(
78                source_sims, target_sims, edge_sims, strict=True
79            )
80        ]

Computes edge similarity using weighted source and target node similarities.

SemanticEdgeSim( source_weight: float = 1.0, target_weight: float = 1.0, edge_sim_func: AnySimFunc[E, Float] | None = None)
source_weight: float
target_weight: float
edge_sim_func: AnySimFunc[E, Float] | None
@dataclass(slots=True)
class BaseGraphSimFunc(cbrkit.sim.graphs.common.BaseGraphEditFunc[K, N, E, G], typing.Generic[K, N, E, G]):
124@dataclass(slots=True)
125class BaseGraphSimFunc[K, N, E, G](BaseGraphEditFunc[K, N, E, G]):
126    """Base class for graph similarity functions with node and edge matching."""
127
128    node_sim_func: AnySimFunc[N, Float]
129    edge_sim_func: SemanticEdgeSim[K, N, E] = default_edge_sim
130    node_matcher: ElementMatcher[N] = default_element_matcher
131    edge_matcher: ElementMatcher[E] = default_element_matcher
132    batch_node_sim_func: BatchSimFunc[Node[K, N], Float] = field(init=False)
133
134    def __post_init__(self) -> None:
135        self.batch_node_sim_func = batchify_sim(transpose_value(self.node_sim_func))
136
137    def induced_edge_mapping(
138        self,
139        x: Graph[K, N, E, G],
140        y: Graph[K, N, E, G],
141        node_mapping: Mapping[K, K],
142    ) -> frozendict[K, K]:
143        """Derives an edge mapping induced by the given node mapping."""
144        return _induced_edge_mapping(x, y, node_mapping, self.edge_matcher)
145
146    def node_pairs(
147        self,
148        x: Graph[K, N, E, G],
149        y: Graph[K, N, E, G],
150    ) -> list[tuple[K, K]]:
151        """Returns all legal node pairs between the two graphs."""
152        return [
153            (y_key, x_key)
154            for x_key, y_key in itertools.product(x.nodes.keys(), y.nodes.keys())
155            if self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value)
156        ]
157
158    def edge_pairs(
159        self,
160        x: Graph[K, N, E, G],
161        y: Graph[K, N, E, G],
162        node_pairs: Collection[tuple[K, K]],
163    ) -> list[tuple[K, K]]:
164        """Returns all legal edge pairs consistent with the given node pairs."""
165        return [
166            (y_key, x_key)
167            for x_key, y_key in itertools.product(x.edges.keys(), y.edges.keys())
168            if self.edge_matcher(x.edges[x_key].value, y.edges[y_key].value)
169            and (y.edges[y_key].source.key, x.edges[x_key].source.key) in node_pairs
170            and (y.edges[y_key].target.key, x.edges[x_key].target.key) in node_pairs
171        ]
172
173    def node_pair_similarities(
174        self,
175        x: Graph[K, N, E, G],
176        y: Graph[K, N, E, G],
177        pairs: Sequence[tuple[K, K]] | None = None,
178    ) -> PairSim[K]:
179        """Computes pairwise similarities for all legal node pairs."""
180        if pairs is None:
181            pairs = self.node_pairs(x, y)
182
183        node_pair_values = [(x.nodes[x_key], y.nodes[y_key]) for y_key, x_key in pairs]
184        node_pair_sims = self.batch_node_sim_func(node_pair_values)
185
186        return {
187            (y_node.key, x_node.key): unpack_float(sim)
188            for (x_node, y_node), sim in zip(
189                node_pair_values, node_pair_sims, strict=True
190            )
191        }
192
193    def edge_pair_similarities(
194        self,
195        x: Graph[K, N, E, G],
196        y: Graph[K, N, E, G],
197        node_pair_sims: PairSim[K],
198        pairs: Sequence[tuple[K, K]] | None = None,
199    ) -> PairSim[K]:
200        """Computes pairwise similarities for all legal edge pairs."""
201        if pairs is None:
202            pairs = self.edge_pairs(x, y, node_pair_sims.keys())
203
204        edge_pair_values = [(x.edges[x_key], y.edges[y_key]) for y_key, x_key in pairs]
205        edge_pair_sims = self.edge_sim_func(
206            [
207                (
208                    x_edge.value,
209                    y_edge.value,
210                    node_pair_sims[(y_edge.source.key, x_edge.source.key)],
211                    node_pair_sims[(y_edge.target.key, x_edge.target.key)],
212                )
213                for x_edge, y_edge in edge_pair_values
214            ]
215        )
216
217        return {
218            (y_edge.key, x_edge.key): unpack_float(sim)
219            for (x_edge, y_edge), sim in zip(
220                edge_pair_values, edge_pair_sims, strict=True
221            )
222        }
223
224    def pair_similarities(
225        self,
226        x: Graph[K, N, E, G],
227        y: Graph[K, N, E, G],
228        node_pairs: Sequence[tuple[K, K]] | None = None,
229        edge_pairs: Sequence[tuple[K, K]] | None = None,
230    ) -> tuple[PairSim[K], PairSim[K]]:
231        """Computes both node and edge pairwise similarities."""
232        node_pair_sims = self.node_pair_similarities(x, y, node_pairs)
233        edge_pair_sims = self.edge_pair_similarities(x, y, node_pair_sims, edge_pairs)
234        return node_pair_sims, edge_pair_sims
235
236    def similarity(
237        self,
238        x: Graph[K, N, E, G],
239        y: Graph[K, N, E, G],
240        node_mapping: frozendict[K, K],
241        edge_mapping: frozendict[K, K],
242        node_pair_sims: Mapping[tuple[K, K], float],
243        edge_pair_sims: Mapping[tuple[K, K], float],
244    ) -> GraphSim[K]:
245        """Function to compute the similarity of all previous steps"""
246
247        node_sims = [
248            node_pair_sims[(y_key, x_key)] for y_key, x_key in node_mapping.items()
249        ]
250
251        edge_sims = [
252            edge_pair_sims[(y_key, x_key)] for y_key, x_key in edge_mapping.items()
253        ]
254
255        all_sims = itertools.chain(node_sims, edge_sims)
256        upper_bound = self.cost_upper_bound(x, y)
257        total_sim = sum(all_sims) / upper_bound if upper_bound > 0 else 0.0
258
259        return GraphSim(
260            total_sim,
261            node_mapping,
262            edge_mapping,
263            frozendict(zip(node_mapping.keys(), node_sims, strict=True)),
264            frozendict(zip(edge_mapping.keys(), edge_sims, strict=True)),
265        )
266
267    def invert_similarity(
268        self, x: Graph[K, N, E, G], y: Graph[K, N, E, G], sim: GraphSim[K]
269    ) -> GraphSim[K]:
270        """Recomputes similarity with the node and edge mappings inverted."""
271        node_mapping = frozendict((v, k) for k, v in sim.node_mapping.items())
272        edge_mapping = frozendict((v, k) for k, v in sim.edge_mapping.items())
273
274        node_similarities, edge_similarities = self.pair_similarities(
275            x, y, list(node_mapping.items()), list(edge_mapping.items())
276        )
277
278        return self.similarity(
279            x,
280            y,
281            node_mapping,
282            edge_mapping,
283            frozendict(node_similarities),
284            frozendict(edge_similarities),
285        )

Base class for graph similarity functions with node and edge matching.

BaseGraphSimFunc( node_sim_func: AnySimFunc[N, Float], edge_sim_func: SemanticEdgeSim[K, N, E] = SemanticEdgeSim(source_weight=1.0, target_weight=1.0, edge_sim_func=None), node_matcher: ElementMatcher[N] = <function default_element_matcher>, edge_matcher: ElementMatcher[E] = <function default_element_matcher>, *, node_del_cost: float = 1.0, node_ins_cost: float = 0.0, edge_del_cost: float = 1.0, edge_ins_cost: float = 0.0)
node_sim_func: AnySimFunc[N, Float]
edge_sim_func: SemanticEdgeSim[K, N, E]
node_matcher: ElementMatcher[N]
edge_matcher: ElementMatcher[E]
batch_node_sim_func: cbrkit.typing.BatchSimFunc[cbrkit.model.graph.Node[K, N], Float]
def induced_edge_mapping( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], node_mapping: Mapping[K, K]) -> frozendict.frozendict[K, K]:
137    def induced_edge_mapping(
138        self,
139        x: Graph[K, N, E, G],
140        y: Graph[K, N, E, G],
141        node_mapping: Mapping[K, K],
142    ) -> frozendict[K, K]:
143        """Derives an edge mapping induced by the given node mapping."""
144        return _induced_edge_mapping(x, y, node_mapping, self.edge_matcher)

Derives an edge mapping induced by the given node mapping.

def node_pairs( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G]) -> list[tuple[K, K]]:
146    def node_pairs(
147        self,
148        x: Graph[K, N, E, G],
149        y: Graph[K, N, E, G],
150    ) -> list[tuple[K, K]]:
151        """Returns all legal node pairs between the two graphs."""
152        return [
153            (y_key, x_key)
154            for x_key, y_key in itertools.product(x.nodes.keys(), y.nodes.keys())
155            if self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value)
156        ]

Returns all legal node pairs between the two graphs.

def edge_pairs( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], node_pairs: Collection[tuple[K, K]]) -> list[tuple[K, K]]:
158    def edge_pairs(
159        self,
160        x: Graph[K, N, E, G],
161        y: Graph[K, N, E, G],
162        node_pairs: Collection[tuple[K, K]],
163    ) -> list[tuple[K, K]]:
164        """Returns all legal edge pairs consistent with the given node pairs."""
165        return [
166            (y_key, x_key)
167            for x_key, y_key in itertools.product(x.edges.keys(), y.edges.keys())
168            if self.edge_matcher(x.edges[x_key].value, y.edges[y_key].value)
169            and (y.edges[y_key].source.key, x.edges[x_key].source.key) in node_pairs
170            and (y.edges[y_key].target.key, x.edges[x_key].target.key) in node_pairs
171        ]

Returns all legal edge pairs consistent with the given node pairs.

def node_pair_similarities( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], pairs: Sequence[tuple[K, K]] | None = None) -> PairSim[K]:
173    def node_pair_similarities(
174        self,
175        x: Graph[K, N, E, G],
176        y: Graph[K, N, E, G],
177        pairs: Sequence[tuple[K, K]] | None = None,
178    ) -> PairSim[K]:
179        """Computes pairwise similarities for all legal node pairs."""
180        if pairs is None:
181            pairs = self.node_pairs(x, y)
182
183        node_pair_values = [(x.nodes[x_key], y.nodes[y_key]) for y_key, x_key in pairs]
184        node_pair_sims = self.batch_node_sim_func(node_pair_values)
185
186        return {
187            (y_node.key, x_node.key): unpack_float(sim)
188            for (x_node, y_node), sim in zip(
189                node_pair_values, node_pair_sims, strict=True
190            )
191        }

Computes pairwise similarities for all legal node pairs.

def edge_pair_similarities( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], node_pair_sims: PairSim[K], pairs: Sequence[tuple[K, K]] | None = None) -> PairSim[K]:
193    def edge_pair_similarities(
194        self,
195        x: Graph[K, N, E, G],
196        y: Graph[K, N, E, G],
197        node_pair_sims: PairSim[K],
198        pairs: Sequence[tuple[K, K]] | None = None,
199    ) -> PairSim[K]:
200        """Computes pairwise similarities for all legal edge pairs."""
201        if pairs is None:
202            pairs = self.edge_pairs(x, y, node_pair_sims.keys())
203
204        edge_pair_values = [(x.edges[x_key], y.edges[y_key]) for y_key, x_key in pairs]
205        edge_pair_sims = self.edge_sim_func(
206            [
207                (
208                    x_edge.value,
209                    y_edge.value,
210                    node_pair_sims[(y_edge.source.key, x_edge.source.key)],
211                    node_pair_sims[(y_edge.target.key, x_edge.target.key)],
212                )
213                for x_edge, y_edge in edge_pair_values
214            ]
215        )
216
217        return {
218            (y_edge.key, x_edge.key): unpack_float(sim)
219            for (x_edge, y_edge), sim in zip(
220                edge_pair_values, edge_pair_sims, strict=True
221            )
222        }

Computes pairwise similarities for all legal edge pairs.

def pair_similarities( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], node_pairs: Sequence[tuple[K, K]] | None = None, edge_pairs: Sequence[tuple[K, K]] | None = None) -> tuple[PairSim[K], PairSim[K]]:
224    def pair_similarities(
225        self,
226        x: Graph[K, N, E, G],
227        y: Graph[K, N, E, G],
228        node_pairs: Sequence[tuple[K, K]] | None = None,
229        edge_pairs: Sequence[tuple[K, K]] | None = None,
230    ) -> tuple[PairSim[K], PairSim[K]]:
231        """Computes both node and edge pairwise similarities."""
232        node_pair_sims = self.node_pair_similarities(x, y, node_pairs)
233        edge_pair_sims = self.edge_pair_similarities(x, y, node_pair_sims, edge_pairs)
234        return node_pair_sims, edge_pair_sims

Computes both node and edge pairwise similarities.

def similarity( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], node_mapping: frozendict.frozendict[K, K], edge_mapping: frozendict.frozendict[K, K], node_pair_sims: Mapping[tuple[K, K], float], edge_pair_sims: Mapping[tuple[K, K], float]) -> GraphSim[K]:
236    def similarity(
237        self,
238        x: Graph[K, N, E, G],
239        y: Graph[K, N, E, G],
240        node_mapping: frozendict[K, K],
241        edge_mapping: frozendict[K, K],
242        node_pair_sims: Mapping[tuple[K, K], float],
243        edge_pair_sims: Mapping[tuple[K, K], float],
244    ) -> GraphSim[K]:
245        """Function to compute the similarity of all previous steps"""
246
247        node_sims = [
248            node_pair_sims[(y_key, x_key)] for y_key, x_key in node_mapping.items()
249        ]
250
251        edge_sims = [
252            edge_pair_sims[(y_key, x_key)] for y_key, x_key in edge_mapping.items()
253        ]
254
255        all_sims = itertools.chain(node_sims, edge_sims)
256        upper_bound = self.cost_upper_bound(x, y)
257        total_sim = sum(all_sims) / upper_bound if upper_bound > 0 else 0.0
258
259        return GraphSim(
260            total_sim,
261            node_mapping,
262            edge_mapping,
263            frozendict(zip(node_mapping.keys(), node_sims, strict=True)),
264            frozendict(zip(edge_mapping.keys(), edge_sims, strict=True)),
265        )

Function to compute the similarity of all previous steps

def invert_similarity( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], sim: GraphSim[K]) -> GraphSim[K]:
267    def invert_similarity(
268        self, x: Graph[K, N, E, G], y: Graph[K, N, E, G], sim: GraphSim[K]
269    ) -> GraphSim[K]:
270        """Recomputes similarity with the node and edge mappings inverted."""
271        node_mapping = frozendict((v, k) for k, v in sim.node_mapping.items())
272        edge_mapping = frozendict((v, k) for k, v in sim.edge_mapping.items())
273
274        node_similarities, edge_similarities = self.pair_similarities(
275            x, y, list(node_mapping.items()), list(edge_mapping.items())
276        )
277
278        return self.similarity(
279            x,
280            y,
281            node_mapping,
282            edge_mapping,
283            frozendict(node_similarities),
284            frozendict(edge_similarities),
285        )

Recomputes similarity with the node and edge mappings inverted.

@dataclass(slots=True)
class SearchGraphSimFunc(cbrkit.sim.graphs.BaseGraphSimFunc[K, N, E, G], typing.Generic[K, N, E, G]):
417@dataclass(slots=True)
418class SearchGraphSimFunc[K, N, E, G](BaseGraphSimFunc[K, N, E, G]):
419    """Graph similarity function using search-based node and edge mapping."""
420
421    init_func: (
422        SearchStateInit[K, N, E, G] | AnySimFunc[Graph[K, N, E, G], GraphSim[K]]
423    ) = field(default_factory=init_unique_matches)
424
425    def init_search_state(
426        self, x: Graph[K, N, E, G], y: Graph[K, N, E, G]
427    ) -> SearchState[K]:
428        """Initializes the search state using the configured init function."""
429        init_func_params = total_params(self.init_func)
430        sim: GraphSim[K]
431
432        if init_func_params == 4:
433            init_func = cast(SearchStateInit[K, N, E, G], self.init_func)
434
435            return init_func(x, y, self.node_matcher, self.edge_matcher)
436
437        elif init_func_params == 2:
438            init_func = cast(SimFunc[Graph[K, N, E, G], GraphSim[K]], self.init_func)
439            sim = init_func(x, y)
440
441        elif init_func_params == 1:
442            init_func = cast(
443                BatchSimFunc[Graph[K, N, E, G], GraphSim[K]], self.init_func
444            )
445            sim = init_func([(x, y)])[0]
446
447        else:
448            raise ValueError(
449                f"Invalid number of parameters for init_func: {init_func_params}"
450            )
451
452        return SearchState(
453            node_mapping=sim.node_mapping,
454            edge_mapping=sim.edge_mapping,
455            open_y_nodes=frozenset(y.nodes.keys() - sim.node_mapping.keys()),
456            open_y_edges=frozenset(y.edges.keys() - sim.edge_mapping.keys()),
457            open_x_nodes=frozenset(x.nodes.keys() - sim.node_mapping.values()),
458            open_x_edges=frozenset(x.edges.keys() - sim.edge_mapping.values()),
459        )
460
461    def finished(self, state: SearchState[K]) -> bool:
462        """Returns True when all query nodes and edges have been processed."""
463        # the following condition could save a few iterations, but needs to be tested
464        # return (not state.open_y_nodes and not state.open_y_edges) or (
465        #     not state.open_x_nodes and not state.open_x_edges
466        # )
467        return not state.open_y_nodes and not state.open_y_edges
468
469    def legal_node_mapping(
470        self,
471        x: Graph[K, N, E, G],
472        y: Graph[K, N, E, G],
473        state: SearchState[K],
474        x_key: K,
475        y_key: K,
476    ) -> bool:
477        """Checks whether mapping a query node to a case node is legal."""
478        return (
479            self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value)
480            and y_key in state.open_y_nodes
481            and x_key in state.open_x_nodes
482        )
483
484    def legal_edge_mapping(
485        self,
486        x: Graph[K, N, E, G],
487        y: Graph[K, N, E, G],
488        state: SearchState[K],
489        x_key: K,
490        y_key: K,
491    ) -> bool:
492        """Checks whether mapping a query edge to a case edge is legal."""
493        x_value = x.edges[x_key]
494        y_value = y.edges[y_key]
495
496        return (
497            self.edge_matcher(x_value.value, y_value.value)
498            and y_key in state.open_y_edges
499            and x_key in state.open_x_edges
500            # source and target of the edge must be mapped to the same nodes
501            and x_value.source.key == state.node_mapping.get(y_value.source.key)
502            and x_value.target.key == state.node_mapping.get(y_value.target.key)
503        )
504
505    def expand_node(
506        self,
507        x: Graph[K, N, E, G],
508        y: Graph[K, N, E, G],
509        state: SearchState[K],
510        y_key: K,
511    ) -> list[SearchState[K]]:
512        """Generates successor states by mapping a query node to all legal case nodes."""
513        next_states: list[SearchState[K]] = [
514            SearchState(
515                state.node_mapping.set(y_key, x_key),
516                state.edge_mapping,
517                state.open_y_nodes - {y_key},
518                state.open_y_edges,
519                state.open_x_nodes - {x_key},
520                state.open_x_edges,
521            )
522            for x_key in sorted_iter(state.open_x_nodes)
523            if self.legal_node_mapping(x, y, state, x_key, y_key)
524        ]
525
526        if not next_states:
527            next_states.append(
528                SearchState(
529                    state.node_mapping,
530                    state.edge_mapping,
531                    state.open_y_nodes - {y_key},
532                    state.open_y_edges,
533                    state.open_x_nodes,
534                    state.open_x_edges,
535                )
536            )
537
538        return next_states
539
540    def expand_edge(
541        self,
542        x: Graph[K, N, E, G],
543        y: Graph[K, N, E, G],
544        state: SearchState[K],
545        y_key: K,
546    ) -> list[SearchState[K]]:
547        """Generates successor states by mapping a query edge to all legal case edges."""
548        next_states: list[SearchState[K]] = [
549            SearchState(
550                state.node_mapping,
551                state.edge_mapping.set(y_key, x_key),
552                state.open_y_nodes,
553                state.open_y_edges - {y_key},
554                state.open_x_nodes,
555                state.open_x_edges - {x_key},
556            )
557            for x_key in sorted_iter(state.open_x_edges)
558            if self.legal_edge_mapping(x, y, state, x_key, y_key)
559        ]
560
561        if not next_states:
562            next_states.append(
563                SearchState(
564                    state.node_mapping,
565                    state.edge_mapping,
566                    state.open_y_nodes,
567                    state.open_y_edges - {y_key},
568                    state.open_x_nodes,
569                    state.open_x_edges,
570                )
571            )
572
573        return next_states
574
575    def expand_edge_with_nodes(
576        self,
577        x: Graph[K, N, E, G],
578        y: Graph[K, N, E, G],
579        state: SearchState[K],
580        y_key: K,
581    ) -> list[SearchState[K]]:
582        """Expand a given edge and map its source/target node if not already mapped"""
583        next_states: list[SearchState[K]] = []
584
585        for x_key in sorted_iter(state.open_x_edges):
586            next_state = state
587            x_source_key = x.edges[x_key].source.key
588            x_target_key = x.edges[x_key].target.key
589            y_source_key = y.edges[y_key].source.key
590            y_target_key = y.edges[y_key].target.key
591
592            if (
593                y_source_key in next_state.open_y_nodes
594                and x_source_key in next_state.open_x_nodes
595                and self.legal_node_mapping(
596                    x, y, next_state, x_source_key, y_source_key
597                )
598            ):
599                next_state = SearchState(
600                    next_state.node_mapping.set(y_source_key, x_source_key),
601                    next_state.edge_mapping,
602                    next_state.open_y_nodes - {y_source_key},
603                    next_state.open_y_edges,
604                    next_state.open_x_nodes - {x_source_key},
605                    next_state.open_x_edges,
606                )
607
608            if (
609                y_target_key in next_state.open_y_nodes
610                and x_target_key in next_state.open_x_nodes
611                and self.legal_node_mapping(
612                    x, y, next_state, x_target_key, y_target_key
613                )
614            ):
615                next_state = SearchState(
616                    next_state.node_mapping.set(y_target_key, x_target_key),
617                    next_state.edge_mapping,
618                    next_state.open_y_nodes - {y_target_key},
619                    next_state.open_y_edges,
620                    next_state.open_x_nodes - {x_target_key},
621                    next_state.open_x_edges,
622                )
623
624            if self.legal_edge_mapping(x, y, next_state, x_key, y_key):
625                next_states.append(
626                    SearchState(
627                        next_state.node_mapping,
628                        next_state.edge_mapping.set(y_key, x_key),
629                        next_state.open_y_nodes,
630                        next_state.open_y_edges - {y_key},
631                        next_state.open_x_nodes,
632                        next_state.open_x_edges - {x_key},
633                    )
634                )
635
636        if not next_states:
637            next_states.append(
638                SearchState(
639                    state.node_mapping,
640                    state.edge_mapping,
641                    state.open_y_nodes,
642                    state.open_y_edges - {y_key},
643                    state.open_x_nodes,
644                    state.open_x_edges,
645                )
646            )
647
648        return next_states

Graph similarity function using search-based node and edge mapping.

SearchGraphSimFunc( node_sim_func: AnySimFunc[N, Float], edge_sim_func: SemanticEdgeSim[K, N, E] = SemanticEdgeSim(source_weight=1.0, target_weight=1.0, edge_sim_func=None), node_matcher: ElementMatcher[N] = <function default_element_matcher>, edge_matcher: ElementMatcher[E] = <function default_element_matcher>, init_func: Union[SearchStateInit[K, N, E, G], AnySimFunc[cbrkit.model.graph.Graph[K, N, E, G], GraphSim[K]]] = <factory>, *, node_del_cost: float = 1.0, node_ins_cost: float = 0.0, edge_del_cost: float = 1.0, edge_ins_cost: float = 0.0)
init_func: Union[SearchStateInit[K, N, E, G], AnySimFunc[cbrkit.model.graph.Graph[K, N, E, G], GraphSim[K]]]
def init_search_state( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G]) -> SearchState[K]:
425    def init_search_state(
426        self, x: Graph[K, N, E, G], y: Graph[K, N, E, G]
427    ) -> SearchState[K]:
428        """Initializes the search state using the configured init function."""
429        init_func_params = total_params(self.init_func)
430        sim: GraphSim[K]
431
432        if init_func_params == 4:
433            init_func = cast(SearchStateInit[K, N, E, G], self.init_func)
434
435            return init_func(x, y, self.node_matcher, self.edge_matcher)
436
437        elif init_func_params == 2:
438            init_func = cast(SimFunc[Graph[K, N, E, G], GraphSim[K]], self.init_func)
439            sim = init_func(x, y)
440
441        elif init_func_params == 1:
442            init_func = cast(
443                BatchSimFunc[Graph[K, N, E, G], GraphSim[K]], self.init_func
444            )
445            sim = init_func([(x, y)])[0]
446
447        else:
448            raise ValueError(
449                f"Invalid number of parameters for init_func: {init_func_params}"
450            )
451
452        return SearchState(
453            node_mapping=sim.node_mapping,
454            edge_mapping=sim.edge_mapping,
455            open_y_nodes=frozenset(y.nodes.keys() - sim.node_mapping.keys()),
456            open_y_edges=frozenset(y.edges.keys() - sim.edge_mapping.keys()),
457            open_x_nodes=frozenset(x.nodes.keys() - sim.node_mapping.values()),
458            open_x_edges=frozenset(x.edges.keys() - sim.edge_mapping.values()),
459        )

Initializes the search state using the configured init function.

def finished(self, state: SearchState[K]) -> bool:
461    def finished(self, state: SearchState[K]) -> bool:
462        """Returns True when all query nodes and edges have been processed."""
463        # the following condition could save a few iterations, but needs to be tested
464        # return (not state.open_y_nodes and not state.open_y_edges) or (
465        #     not state.open_x_nodes and not state.open_x_edges
466        # )
467        return not state.open_y_nodes and not state.open_y_edges

Returns True when all query nodes and edges have been processed.

def legal_node_mapping( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], state: SearchState[K], x_key: K, y_key: K) -> bool:
469    def legal_node_mapping(
470        self,
471        x: Graph[K, N, E, G],
472        y: Graph[K, N, E, G],
473        state: SearchState[K],
474        x_key: K,
475        y_key: K,
476    ) -> bool:
477        """Checks whether mapping a query node to a case node is legal."""
478        return (
479            self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value)
480            and y_key in state.open_y_nodes
481            and x_key in state.open_x_nodes
482        )

Checks whether mapping a query node to a case node is legal.

def legal_edge_mapping( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], state: SearchState[K], x_key: K, y_key: K) -> bool:
484    def legal_edge_mapping(
485        self,
486        x: Graph[K, N, E, G],
487        y: Graph[K, N, E, G],
488        state: SearchState[K],
489        x_key: K,
490        y_key: K,
491    ) -> bool:
492        """Checks whether mapping a query edge to a case edge is legal."""
493        x_value = x.edges[x_key]
494        y_value = y.edges[y_key]
495
496        return (
497            self.edge_matcher(x_value.value, y_value.value)
498            and y_key in state.open_y_edges
499            and x_key in state.open_x_edges
500            # source and target of the edge must be mapped to the same nodes
501            and x_value.source.key == state.node_mapping.get(y_value.source.key)
502            and x_value.target.key == state.node_mapping.get(y_value.target.key)
503        )

Checks whether mapping a query edge to a case edge is legal.

def expand_node( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], state: SearchState[K], y_key: K) -> list[SearchState[K]]:
505    def expand_node(
506        self,
507        x: Graph[K, N, E, G],
508        y: Graph[K, N, E, G],
509        state: SearchState[K],
510        y_key: K,
511    ) -> list[SearchState[K]]:
512        """Generates successor states by mapping a query node to all legal case nodes."""
513        next_states: list[SearchState[K]] = [
514            SearchState(
515                state.node_mapping.set(y_key, x_key),
516                state.edge_mapping,
517                state.open_y_nodes - {y_key},
518                state.open_y_edges,
519                state.open_x_nodes - {x_key},
520                state.open_x_edges,
521            )
522            for x_key in sorted_iter(state.open_x_nodes)
523            if self.legal_node_mapping(x, y, state, x_key, y_key)
524        ]
525
526        if not next_states:
527            next_states.append(
528                SearchState(
529                    state.node_mapping,
530                    state.edge_mapping,
531                    state.open_y_nodes - {y_key},
532                    state.open_y_edges,
533                    state.open_x_nodes,
534                    state.open_x_edges,
535                )
536            )
537
538        return next_states

Generates successor states by mapping a query node to all legal case nodes.

def expand_edge( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], state: SearchState[K], y_key: K) -> list[SearchState[K]]:
540    def expand_edge(
541        self,
542        x: Graph[K, N, E, G],
543        y: Graph[K, N, E, G],
544        state: SearchState[K],
545        y_key: K,
546    ) -> list[SearchState[K]]:
547        """Generates successor states by mapping a query edge to all legal case edges."""
548        next_states: list[SearchState[K]] = [
549            SearchState(
550                state.node_mapping,
551                state.edge_mapping.set(y_key, x_key),
552                state.open_y_nodes,
553                state.open_y_edges - {y_key},
554                state.open_x_nodes,
555                state.open_x_edges - {x_key},
556            )
557            for x_key in sorted_iter(state.open_x_edges)
558            if self.legal_edge_mapping(x, y, state, x_key, y_key)
559        ]
560
561        if not next_states:
562            next_states.append(
563                SearchState(
564                    state.node_mapping,
565                    state.edge_mapping,
566                    state.open_y_nodes,
567                    state.open_y_edges - {y_key},
568                    state.open_x_nodes,
569                    state.open_x_edges,
570                )
571            )
572
573        return next_states

Generates successor states by mapping a query edge to all legal case edges.

def expand_edge_with_nodes( self, x: cbrkit.model.graph.Graph[K, N, E, G], y: cbrkit.model.graph.Graph[K, N, E, G], state: SearchState[K], y_key: K) -> list[SearchState[K]]:
575    def expand_edge_with_nodes(
576        self,
577        x: Graph[K, N, E, G],
578        y: Graph[K, N, E, G],
579        state: SearchState[K],
580        y_key: K,
581    ) -> list[SearchState[K]]:
582        """Expand a given edge and map its source/target node if not already mapped"""
583        next_states: list[SearchState[K]] = []
584
585        for x_key in sorted_iter(state.open_x_edges):
586            next_state = state
587            x_source_key = x.edges[x_key].source.key
588            x_target_key = x.edges[x_key].target.key
589            y_source_key = y.edges[y_key].source.key
590            y_target_key = y.edges[y_key].target.key
591
592            if (
593                y_source_key in next_state.open_y_nodes
594                and x_source_key in next_state.open_x_nodes
595                and self.legal_node_mapping(
596                    x, y, next_state, x_source_key, y_source_key
597                )
598            ):
599                next_state = SearchState(
600                    next_state.node_mapping.set(y_source_key, x_source_key),
601                    next_state.edge_mapping,
602                    next_state.open_y_nodes - {y_source_key},
603                    next_state.open_y_edges,
604                    next_state.open_x_nodes - {x_source_key},
605                    next_state.open_x_edges,
606                )
607
608            if (
609                y_target_key in next_state.open_y_nodes
610                and x_target_key in next_state.open_x_nodes
611                and self.legal_node_mapping(
612                    x, y, next_state, x_target_key, y_target_key
613                )
614            ):
615                next_state = SearchState(
616                    next_state.node_mapping.set(y_target_key, x_target_key),
617                    next_state.edge_mapping,
618                    next_state.open_y_nodes - {y_target_key},
619                    next_state.open_y_edges,
620                    next_state.open_x_nodes - {x_target_key},
621                    next_state.open_x_edges,
622                )
623
624            if self.legal_edge_mapping(x, y, next_state, x_key, y_key):
625                next_states.append(
626                    SearchState(
627                        next_state.node_mapping,
628                        next_state.edge_mapping.set(y_key, x_key),
629                        next_state.open_y_nodes,
630                        next_state.open_y_edges - {y_key},
631                        next_state.open_x_nodes,
632                        next_state.open_x_edges - {x_key},
633                    )
634                )
635
636        if not next_states:
637            next_states.append(
638                SearchState(
639                    state.node_mapping,
640                    state.edge_mapping,
641                    state.open_y_nodes,
642                    state.open_y_edges - {y_key},
643                    state.open_x_nodes,
644                    state.open_x_edges,
645                )
646            )
647
648        return next_states

Expand a given edge and map its source/target node if not already mapped

@dataclass(slots=True, frozen=True)
class SearchState(typing.Generic[K]):
288@dataclass(slots=True, frozen=True)
289class SearchState[K]:
290    """Current state of a graph search with node and edge mappings."""
291
292    # mappings are from y/query to x/case
293    node_mapping: frozendict[K, K]
294    edge_mapping: frozendict[K, K]
295    # contains all elements from the query that are not yet mapped
296    # can be different from mapping.keys() if no candidate in x/case exists
297    open_y_nodes: frozenset[K]
298    open_y_edges: frozenset[K]
299    # contains all elements from the case that are not yet mapped
300    # must be identical to mapping.values() but is stored to optimize lookup
301    open_x_nodes: frozenset[K]
302    open_x_edges: frozenset[K]

Current state of a graph search with node and edge mappings.

SearchState( node_mapping: frozendict.frozendict[K, K], edge_mapping: frozendict.frozendict[K, K], open_y_nodes: frozenset[K], open_y_edges: frozenset[K], open_x_nodes: frozenset[K], open_x_edges: frozenset[K])
node_mapping: frozendict.frozendict[K, K]
edge_mapping: frozendict.frozendict[K, K]
open_y_nodes: frozenset[K]
open_y_edges: frozenset[K]
open_x_nodes: frozenset[K]
open_x_edges: frozenset[K]
class SearchStateInit(typing.Protocol, typing.Generic[K, N, E, G]):
343class SearchStateInit[K, N, E, G](Protocol):
344    """Initializes the search state for graph similarity computation."""
345
346    def __call__(
347        self,
348        x: Graph[K, N, E, G],
349        y: Graph[K, N, E, G],
350        node_matcher: ElementMatcher[N],
351        edge_matcher: ElementMatcher[E],
352        /,
353    ) -> SearchState[K]: ...

Initializes the search state for graph similarity computation.

SearchStateInit(*args, **kwargs)
1957def _no_init_or_replace_init(self, *args, **kwargs):
1958    cls = type(self)
1959
1960    if cls._is_protocol:
1961        raise TypeError('Protocols cannot be instantiated')
1962
1963    # Already using a custom `__init__`. No need to calculate correct
1964    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1965    if cls.__init__ is not _no_init_or_replace_init:
1966        return
1967
1968    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1969    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1970    # searches for a proper new `__init__` in the MRO. The new `__init__`
1971    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1972    # instantiation of the protocol subclass will thus use the new
1973    # `__init__` and no longer call `_no_init_or_replace_init`.
1974    for base in cls.__mro__:
1975        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1976        if init is not _no_init_or_replace_init:
1977            cls.__init__ = init
1978            break
1979    else:
1980        # should not happen
1981        cls.__init__ = object.__init__
1982
1983    cls.__init__(self, *args, **kwargs)