cbrkit.sim.graphs
Graph similarity algorithms for structural matching and comparison.
This module provides various algorithms for computing similarity between
graph structures represented as cbrkit.model.graph.Graph instances.
Each algorithm takes node and edge similarity functions and returns a
global graph similarity score.
Algorithms:
astar: A* search for optimal graph edit distance. Guarantees optimal results but may be slow for large graphs.vf2: VF2 algorithm for (sub)graph isomorphism. Available in pure Python (vf2), NetworkX (vf2_networkx), and RustWorkX (vf2_rustworkx) variants.greedy: Fast greedy matching that pairs nodes by highest similarity.lap: Linear Assignment Problem solver using the Hungarian algorithm.brute_force: Exhaustive search over all possible node matchings. Only practical for small graphs.dfs: Depth-first search based matching (requiresgraphsextra).dtw: Dynamic Time Warping for sequential graph alignment (requirestimeseriesextra).smith_waterman: Smith-Waterman local alignment for sequential graphs (requirestimeseriesextra).
Initialization Functions:
init_empty: Initializes the search state with no pre-matched nodes.init_unique_matches: Initializes with uniquely matchable node pairs.
Types:
GraphSim: Protocol for graph similarity functions.ElementMatcher: Protocol for node/edge matching predicates.SemanticEdgeSim: Semantic edge similarity function type.BaseGraphSimFunc: Base class for graph similarity functions.SearchGraphSimFunc: Base class for search-based graph similarity.SearchState/SearchStateInit: Search state types.
Example:
>>> from cbrkit.sim.generic import equality >>> graph_sim = astar.build(node_sim_func=equality())
1"""Graph similarity algorithms for structural matching and comparison. 2 3This module provides various algorithms for computing similarity between 4graph structures represented as `cbrkit.model.graph.Graph` instances. 5Each algorithm takes node and edge similarity functions and returns a 6global graph similarity score. 7 8Algorithms: 9- `astar`: A* search for optimal graph edit distance. 10 Guarantees optimal results but may be slow for large graphs. 11- `vf2`: VF2 algorithm for (sub)graph isomorphism. 12 Available in pure Python (`vf2`), NetworkX (`vf2_networkx`), 13 and RustWorkX (`vf2_rustworkx`) variants. 14- `greedy`: Fast greedy matching that pairs nodes by highest similarity. 15- `lap`: Linear Assignment Problem solver using the Hungarian algorithm. 16- `brute_force`: Exhaustive search over all possible node matchings. 17 Only practical for small graphs. 18- `dfs`: Depth-first search based matching (requires `graphs` extra). 19- `dtw`: Dynamic Time Warping for sequential graph alignment 20 (requires `timeseries` extra). 21- `smith_waterman`: Smith-Waterman local alignment for sequential graphs 22 (requires `timeseries` extra). 23 24Initialization Functions: 25- `init_empty`: Initializes the search state with no pre-matched nodes. 26- `init_unique_matches`: Initializes with uniquely matchable node pairs. 27 28Types: 29- `GraphSim`: Protocol for graph similarity functions. 30- `ElementMatcher`: Protocol for node/edge matching predicates. 31- `SemanticEdgeSim`: Semantic edge similarity function type. 32- `BaseGraphSimFunc`: Base class for graph similarity functions. 33- `SearchGraphSimFunc`: Base class for search-based graph similarity. 34- `SearchState` / `SearchStateInit`: Search state types. 35 36Example: 37 >>> from cbrkit.sim.generic import equality 38 >>> graph_sim = astar.build(node_sim_func=equality()) 39""" 40 41from ...helpers import optional_dependencies 42from . import astar 43from .brute_force import brute_force 44from .common import ( 45 BaseGraphSimFunc, 46 ElementMatcher, 47 GraphSim, 48 SearchGraphSimFunc, 49 SearchState, 50 SearchStateInit, 51 SemanticEdgeSim, 52 init_empty, 53 init_unique_matches, 54) 55from .greedy import greedy 56from .lap import lap 57from .vf2 import vf2, vf2_networkx, vf2_rustworkx 58 59with optional_dependencies(): 60 from .alignment import dtw 61 62with optional_dependencies(): 63 from .alignment import smith_waterman 64 65with optional_dependencies(): 66 from .dfs import dfs 67 68__all__ = [ 69 "astar", 70 "brute_force", 71 "dfs", 72 "greedy", 73 "lap", 74 "vf2", 75 "vf2_networkx", 76 "vf2_rustworkx", 77 "dtw", 78 "smith_waterman", 79 "init_empty", 80 "init_unique_matches", 81 "GraphSim", 82 "ElementMatcher", 83 "SemanticEdgeSim", 84 "BaseGraphSimFunc", 85 "SearchGraphSimFunc", 86 "SearchState", 87 "SearchStateInit", 88]
15@dataclass(slots=True) 16class brute_force[K, N, E, G]( 17 BaseGraphSimFunc[K, N, E, G], SimFunc[Graph[K, N, E, G], GraphSim[K]] 18): 19 """ 20 Computes the similarity between two graphs by exhaustively computing all possible mappings 21 and selecting the one with the highest similarity score. 22 23 Args: 24 node_sim_func: A similarity function for graph nodes that receives two node objects. 25 edge_sim_func: A similarity function for graph edges that receives two edge objects. 26 27 Returns: 28 The similarity between the two graphs and the best mapping. 29 """ 30 31 def expand( 32 self, 33 x: Graph[K, N, E, G], 34 y: Graph[K, N, E, G], 35 y_nodes: Sequence[K], 36 x_nodes: Sequence[K], 37 ) -> GraphSim[K] | None: 38 """Evaluates a single node mapping candidate and returns its similarity.""" 39 node_mapping = frozendict(zip(y_nodes, x_nodes, strict=True)) 40 41 # if one node can't be matched to another, skip this permutation 42 for y_key, x_key in node_mapping.items(): 43 if not self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value): 44 return None 45 46 edge_mapping = self.induced_edge_mapping(x, y, node_mapping) 47 48 node_pair_sims = self.node_pair_similarities(x, y, list(node_mapping.items())) 49 edge_pair_sims = self.edge_pair_similarities( 50 x, y, node_pair_sims, list(edge_mapping.items()) 51 ) 52 53 return self.similarity( 54 x, 55 y, 56 frozendict(node_mapping), 57 frozendict(edge_mapping), 58 frozendict(node_pair_sims), 59 frozendict(edge_pair_sims), 60 ) 61 62 def __call__(self, x: Graph[K, N, E, G], y: Graph[K, N, E, G]) -> GraphSim[K]: 63 y_node_keys = list(y.nodes.keys()) 64 x_node_keys = list(x.nodes.keys()) 65 best_sim: GraphSim[K] = GraphSim( 66 0.0, frozendict(), frozendict(), frozendict(), frozendict() 67 ) 68 69 # iterate all possible subset sizes of query (up to target size) 70 for k in range(1, min(len(y_node_keys), len(x_node_keys)) + 1): 71 # all subsets of query nodes of size k 72 for y_candidates in itertools.combinations(y_node_keys, k): 73 # all injective mappings from this subset to target nodes 74 for x_candidates in itertools.permutations(x_node_keys, k): 75 next_sim = self.expand(x, y, y_candidates, x_candidates) 76 77 if next_sim and ( 78 next_sim.value > best_sim.value 79 or ( 80 next_sim.value >= best_sim.value 81 and ( 82 len(next_sim.node_mapping) > len(best_sim.node_mapping) 83 or ( 84 len(next_sim.edge_mapping) 85 > len(best_sim.edge_mapping) 86 ) 87 ) 88 ) 89 ): 90 best_sim = next_sim 91 92 return best_sim
Computes the similarity between two graphs by exhaustively computing all possible mappings and selecting the one with the highest similarity score.
Arguments:
- node_sim_func: A similarity function for graph nodes that receives two node objects.
- edge_sim_func: A similarity function for graph edges that receives two edge objects.
Returns:
The similarity between the two graphs and the best mapping.
31 def expand( 32 self, 33 x: Graph[K, N, E, G], 34 y: Graph[K, N, E, G], 35 y_nodes: Sequence[K], 36 x_nodes: Sequence[K], 37 ) -> GraphSim[K] | None: 38 """Evaluates a single node mapping candidate and returns its similarity.""" 39 node_mapping = frozendict(zip(y_nodes, x_nodes, strict=True)) 40 41 # if one node can't be matched to another, skip this permutation 42 for y_key, x_key in node_mapping.items(): 43 if not self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value): 44 return None 45 46 edge_mapping = self.induced_edge_mapping(x, y, node_mapping) 47 48 node_pair_sims = self.node_pair_similarities(x, y, list(node_mapping.items())) 49 edge_pair_sims = self.edge_pair_similarities( 50 x, y, node_pair_sims, list(edge_mapping.items()) 51 ) 52 53 return self.similarity( 54 x, 55 y, 56 frozendict(node_mapping), 57 frozendict(edge_mapping), 58 frozendict(node_pair_sims), 59 frozendict(edge_pair_sims), 60 )
Evaluates a single node mapping candidate and returns its similarity.
35 @dataclass(slots=True) 36 class dfs[K, N, E, G]( 37 BaseGraphSimFunc[K, N, E, G], SimFunc[Graph[K, N, E, G], GraphSim[K]] 38 ): 39 """Graph similarity using depth-first search over node and edge mappings.""" 40 41 max_iterations: int = 0 42 upper_bound: float | None = None 43 strictly_decreasing: bool = True 44 timeout: float | None = None 45 roots_func: RootsFunc[K, N, E, G] | None = None 46 47 def __call__( 48 self, 49 x: Graph[K, N, E, G], 50 y: Graph[K, N, E, G], 51 ) -> GraphSim[K]: 52 x_nx = to_networkx(x) 53 y_nx = to_networkx(y) 54 55 y_edges_lookup = { 56 (e.source.key, e.target.key): e.key for e in y.edges.values() 57 } 58 x_edges_lookup = { 59 (e.source.key, e.target.key): e.key for e in x.edges.values() 60 } 61 62 node_pair_sims, edge_pair_sims = self.pair_similarities(x, y) 63 64 def node_subst_cost(y: NetworkxNode[K, N], x: NetworkxNode[K, N]) -> float: 65 """Returns the substitution cost for a node pair as one minus similarity.""" 66 if sim := node_pair_sims.get((y["key"], x["key"])): 67 return 1.0 - sim 68 69 return float("inf") 70 71 def edge_subst_cost( 72 y: NetworkxEdge[K, N, E], x: NetworkxEdge[K, N, E] 73 ) -> float: 74 """Returns the substitution cost for an edge pair as one minus similarity.""" 75 if sim := edge_pair_sims.get((y["key"], x["key"])): 76 return 1.0 - sim 77 78 return float("inf") 79 80 ged_iter = nx.optimize_edit_paths( 81 y_nx, 82 x_nx, 83 node_subst_cost=node_subst_cost, 84 edge_subst_cost=edge_subst_cost, 85 node_del_cost=lambda y: self.node_del_cost, 86 node_ins_cost=lambda x: self.node_ins_cost, 87 edge_del_cost=lambda y: self.edge_del_cost, 88 edge_ins_cost=lambda x: self.edge_ins_cost, 89 upper_bound=self.upper_bound, 90 strictly_decreasing=self.strictly_decreasing, 91 timeout=self.timeout, 92 roots=self.roots_func(x, y) if self.roots_func else None, 93 ) 94 95 node_edit_path: list[tuple[K, K]] = [] 96 edge_edit_path: list[tuple[tuple[K, K], tuple[K, K]]] = [] 97 98 for idx in itertools.count(): 99 if self.max_iterations > 0 and idx >= self.max_iterations: 100 break 101 102 try: 103 node_edit_path, edge_edit_path, _ = next(ged_iter) 104 except StopIteration: 105 break 106 107 node_mapping = frozendict( 108 (y_key, x_key) 109 for y_key, x_key in node_edit_path 110 if x_key is not None and y_key is not None 111 ) 112 edge_mapping = frozendict( 113 ( 114 y_edges_lookup[y_key], 115 x_edges_lookup[x_key], 116 ) 117 for y_key, x_key in edge_edit_path 118 if x_key is not None 119 and y_key is not None 120 and x_key in x_edges_lookup 121 and y_key in y_edges_lookup 122 ) 123 124 return self.similarity( 125 x, 126 y, 127 node_mapping, # type: ignore[arg-type] 128 edge_mapping, 129 node_pair_sims, 130 edge_pair_sims, 131 )
Graph similarity using depth-first search over node and edge mappings.
16@dataclass(slots=True) 17class greedy[K, N, E, G]( 18 SearchGraphSimFunc[K, N, E, G], SimFunc[Graph[K, N, E, G], GraphSim[K]] 19): 20 """Performs a Greedy search as described by [Dijkman et al. (2009)](https://doi.org/10.1007/978-3-642-03848-8_5) 21 22 Args: 23 node_sim_func: A function to compute the similarity between two nodes. 24 edge_sim_func: A function to compute the similarity between two edges. 25 node_matcher: A function that returns true if two nodes can be mapped legally. 26 edge_matcher: A function that returns true if two edges can be mapped legally. 27 28 Returns: 29 The similarity between a query and a case graph along with the mapping. 30 """ 31 32 def __call__( 33 self, 34 x: Graph[K, N, E, G], 35 y: Graph[K, N, E, G], 36 ) -> GraphSim[K]: 37 node_pair_sims, edge_pair_sims = self.pair_similarities(x, y) 38 39 current_state = self.init_search_state(x, y) 40 current_sim = self.similarity( 41 x, 42 y, 43 current_state.node_mapping, 44 current_state.edge_mapping, 45 node_pair_sims, 46 edge_pair_sims, 47 ) 48 49 while not self.finished(current_state): 50 # Iterate over all open pairs and find the best pair 51 next_states: list[SearchState[K]] = [] 52 53 for y_key in current_state.open_y_nodes: 54 next_states.extend(self.expand_node(x, y, current_state, y_key)) 55 56 for y_key in current_state.open_y_edges: 57 next_states.extend(self.expand_edge(x, y, current_state, y_key)) 58 59 new_sims = [ 60 self.similarity( 61 x, 62 y, 63 state.node_mapping, 64 state.edge_mapping, 65 node_pair_sims, 66 edge_pair_sims, 67 ) 68 for state in next_states 69 ] 70 71 best_sim, best_state = max( 72 zip(new_sims, next_states, strict=True), 73 key=lambda item: item[0].value, 74 ) 75 76 # Update the current state and similarity 77 current_state = best_state 78 current_sim = best_sim 79 80 return current_sim
Performs a Greedy search as described by Dijkman et al. (2009)
Arguments:
- node_sim_func: A function to compute the similarity between two nodes.
- edge_sim_func: A function to compute the similarity between two edges.
- node_matcher: A function that returns true if two nodes can be mapped legally.
- edge_matcher: A function that returns true if two edges can be mapped legally.
Returns:
The similarity between a query and a case graph along with the mapping.
223@dataclass(slots=True) 224class lap[K, N, E, G]( 225 lap_base[K, N, E, G], 226 BaseGraphSimFunc[K, N, E, G], 227 SimFunc[Graph[K, N, E, G], GraphSim[K]], 228): 229 """Graph similarity using the Linear Assignment Problem (LAP).""" 230 231 def __call__( 232 self, 233 x: Graph[K, N, E, G], 234 y: Graph[K, N, E, G], 235 ) -> GraphSim[K]: 236 node_pair_sims, edge_pair_sims = self.pair_similarities(x, y) 237 cost_matrix = self.build_cost_matrix(x, y, node_pair_sims, edge_pair_sims) 238 row2y = {r: k for r, k in enumerate(y.nodes.keys())} 239 col2x = {c: k for c, k in enumerate(x.nodes.keys())} 240 241 row_idx, col_idx = linear_sum_assignment(cost_matrix) 242 243 node_mapping = frozendict( 244 (row2y[int(r)], col2x[int(c)]) 245 for r, c in zip(row_idx, col_idx, strict=True) 246 # only consider substitutions 247 if cost_matrix[r, c] < np.inf and r in row2y and c in col2x 248 ) 249 250 edge_mapping = self.induced_edge_mapping(x, y, node_mapping) 251 252 return self.similarity( 253 x, 254 y, 255 node_mapping, 256 edge_mapping, 257 node_pair_sims, 258 edge_pair_sims, 259 )
Graph similarity using the Linear Assignment Problem (LAP).
185@dataclass(slots=True) 186class vf2_networkx[K, N, E, G](VF2Base[K, N, E, G]): 187 """Graph similarity using the VF2 algorithm via NetworkX.""" 188 189 def node_mappings( 190 self, 191 x: Graph[K, N, E, G], 192 y: Graph[K, N, E, G], 193 ) -> list[frozendict[K, K]]: 194 """Finds subgraph isomorphism node mappings using NetworkX.""" 195 if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges): 196 larger_graph = to_networkx(y) 197 smaller_graph = to_networkx(x) 198 node_matcher = reverse_positional(self.node_matcher) 199 edge_matcher = reverse_positional(self.edge_matcher) 200 else: 201 larger_graph = to_networkx(x) 202 smaller_graph = to_networkx(y) 203 node_matcher = self.node_matcher 204 edge_matcher = self.edge_matcher 205 206 # `first` must be the larger graph and `second` the smaller one. 207 graph_matcher = DiGraphMatcher( 208 larger_graph, 209 smaller_graph, 210 node_match=lambda x, y: node_matcher(x["value"], y["value"]), 211 edge_match=lambda x, y: edge_matcher(x["value"], y["value"]), 212 ) 213 214 mappings_iter = graph_matcher.subgraph_isomorphisms_iter() 215 node_mappings: list[frozendict[K, K]] = [] 216 217 for idx in itertools.count(): 218 if self.max_iterations > 0 and idx >= self.max_iterations: 219 break 220 221 try: 222 if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges): 223 # y -> x (as needed) 224 node_mappings.append( 225 frozendict( 226 (larger_idx, smaller_idx) 227 for larger_idx, smaller_idx in next(mappings_iter).items() 228 ) 229 ) 230 else: 231 # x -> y (needs to be inverted) 232 node_mappings.append( 233 frozendict( 234 (smaller_idx, larger_idx) 235 for larger_idx, smaller_idx in next(mappings_iter).items() 236 ) 237 ) 238 except StopIteration: 239 break 240 241 return node_mappings
Graph similarity using the VF2 algorithm via NetworkX.
189 def node_mappings( 190 self, 191 x: Graph[K, N, E, G], 192 y: Graph[K, N, E, G], 193 ) -> list[frozendict[K, K]]: 194 """Finds subgraph isomorphism node mappings using NetworkX.""" 195 if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges): 196 larger_graph = to_networkx(y) 197 smaller_graph = to_networkx(x) 198 node_matcher = reverse_positional(self.node_matcher) 199 edge_matcher = reverse_positional(self.edge_matcher) 200 else: 201 larger_graph = to_networkx(x) 202 smaller_graph = to_networkx(y) 203 node_matcher = self.node_matcher 204 edge_matcher = self.edge_matcher 205 206 # `first` must be the larger graph and `second` the smaller one. 207 graph_matcher = DiGraphMatcher( 208 larger_graph, 209 smaller_graph, 210 node_match=lambda x, y: node_matcher(x["value"], y["value"]), 211 edge_match=lambda x, y: edge_matcher(x["value"], y["value"]), 212 ) 213 214 mappings_iter = graph_matcher.subgraph_isomorphisms_iter() 215 node_mappings: list[frozendict[K, K]] = [] 216 217 for idx in itertools.count(): 218 if self.max_iterations > 0 and idx >= self.max_iterations: 219 break 220 221 try: 222 if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges): 223 # y -> x (as needed) 224 node_mappings.append( 225 frozendict( 226 (larger_idx, smaller_idx) 227 for larger_idx, smaller_idx in next(mappings_iter).items() 228 ) 229 ) 230 else: 231 # x -> y (needs to be inverted) 232 node_mappings.append( 233 frozendict( 234 (smaller_idx, larger_idx) 235 for larger_idx, smaller_idx in next(mappings_iter).items() 236 ) 237 ) 238 except StopIteration: 239 break 240 241 return node_mappings
Finds subgraph isomorphism node mappings using NetworkX.
Inherited Members
110@dataclass(slots=True) 111class vf2_rustworkx[K, N, E, G](VF2Base[K, N, E, G]): 112 """Graph similarity using the VF2 algorithm via rustworkx.""" 113 114 id_order: bool = False 115 induced: bool = False 116 call_limit: int | None = None 117 118 def node_mappings( 119 self, 120 x: Graph[K, N, E, G], 121 y: Graph[K, N, E, G], 122 ) -> list[frozendict[K, K]]: 123 """Finds subgraph isomorphism node mappings using rustworkx.""" 124 if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges): 125 larger_graph, larger_graph_lookup = to_rustworkx_with_lookup(y) 126 smaller_graph, smaller_graph_lookup = to_rustworkx_with_lookup(x) 127 node_matcher = reverse_positional(self.node_matcher) 128 edge_matcher = reverse_positional(self.edge_matcher) 129 else: 130 larger_graph, larger_graph_lookup = to_rustworkx_with_lookup(x) 131 smaller_graph, smaller_graph_lookup = to_rustworkx_with_lookup(y) 132 node_matcher = self.node_matcher 133 edge_matcher = self.edge_matcher 134 135 # Checks if there is a subgraph of `first` isomorphic to `second`. 136 # Returns an iterator over dictionaries of node indices from `first` 137 # to node indices in `second` representing the mapping found. 138 # As such, `first` must be the larger graph and `second` the smaller one. 139 mappings_iter = rustworkx.vf2_mapping( 140 larger_graph, 141 smaller_graph, 142 node_matcher=node_matcher, 143 edge_matcher=edge_matcher, 144 subgraph=True, 145 id_order=self.id_order, 146 induced=self.induced, 147 call_limit=self.call_limit, 148 ) 149 150 node_mappings: list[frozendict[K, K]] = [] 151 152 for idx in itertools.count(): 153 if self.max_iterations > 0 and idx >= self.max_iterations: 154 break 155 156 try: 157 if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges): 158 # y -> x (as needed) 159 node_mappings.append( 160 frozendict( 161 ( 162 larger_graph_lookup[larger_idx], 163 smaller_graph_lookup[smaller_idx], 164 ) 165 for larger_idx, smaller_idx in next(mappings_iter).items() 166 ) 167 ) 168 else: 169 # x -> y (needs to be inverted) 170 node_mappings.append( 171 frozendict( 172 ( 173 smaller_graph_lookup[smaller_idx], 174 larger_graph_lookup[larger_idx], 175 ) 176 for larger_idx, smaller_idx in next(mappings_iter).items() 177 ) 178 ) 179 except StopIteration: 180 break 181 182 return node_mappings
Graph similarity using the VF2 algorithm via rustworkx.
118 def node_mappings( 119 self, 120 x: Graph[K, N, E, G], 121 y: Graph[K, N, E, G], 122 ) -> list[frozendict[K, K]]: 123 """Finds subgraph isomorphism node mappings using rustworkx.""" 124 if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges): 125 larger_graph, larger_graph_lookup = to_rustworkx_with_lookup(y) 126 smaller_graph, smaller_graph_lookup = to_rustworkx_with_lookup(x) 127 node_matcher = reverse_positional(self.node_matcher) 128 edge_matcher = reverse_positional(self.edge_matcher) 129 else: 130 larger_graph, larger_graph_lookup = to_rustworkx_with_lookup(x) 131 smaller_graph, smaller_graph_lookup = to_rustworkx_with_lookup(y) 132 node_matcher = self.node_matcher 133 edge_matcher = self.edge_matcher 134 135 # Checks if there is a subgraph of `first` isomorphic to `second`. 136 # Returns an iterator over dictionaries of node indices from `first` 137 # to node indices in `second` representing the mapping found. 138 # As such, `first` must be the larger graph and `second` the smaller one. 139 mappings_iter = rustworkx.vf2_mapping( 140 larger_graph, 141 smaller_graph, 142 node_matcher=node_matcher, 143 edge_matcher=edge_matcher, 144 subgraph=True, 145 id_order=self.id_order, 146 induced=self.induced, 147 call_limit=self.call_limit, 148 ) 149 150 node_mappings: list[frozendict[K, K]] = [] 151 152 for idx in itertools.count(): 153 if self.max_iterations > 0 and idx >= self.max_iterations: 154 break 155 156 try: 157 if len(y.nodes) + len(y.edges) > len(x.nodes) + len(x.edges): 158 # y -> x (as needed) 159 node_mappings.append( 160 frozendict( 161 ( 162 larger_graph_lookup[larger_idx], 163 smaller_graph_lookup[smaller_idx], 164 ) 165 for larger_idx, smaller_idx in next(mappings_iter).items() 166 ) 167 ) 168 else: 169 # x -> y (needs to be inverted) 170 node_mappings.append( 171 frozendict( 172 ( 173 smaller_graph_lookup[smaller_idx], 174 larger_graph_lookup[larger_idx], 175 ) 176 for larger_idx, smaller_idx in next(mappings_iter).items() 177 ) 178 ) 179 except StopIteration: 180 break 181 182 return node_mappings
Finds subgraph isomorphism node mappings using rustworkx.
Inherited Members
99 @dataclass(slots=True, frozen=True) 100 class dtw[K, N, E, G](SimFunc[Graph[K, N, E, G], GraphSim[K]]): 101 """ 102 Graph-based Dynamic Time Warping similarity function leveraging sequence alignment. 103 104 Args: 105 node_sim_func: A similarity function for nodes (SimFunc or BatchSimFunc). 106 edge_sim_func: An optional similarity function for edges (SimFunc or BatchSimFunc). 107 normalize: Whether to normalize the similarity score by the alignment length (default: True). 108 109 Examples: 110 >>> from ...model.graph import Node, Edge, Graph, SerializedGraph, from_dict 111 >>> node_sim_func = lambda n1, n2: 1.0 if n1 == n2 else 0.0 112 >>> edge_sim_func = lambda e1, e2: 1.0 if e1.value == e2.value else 0.0 113 114 >>> data_x = { 115 ... "nodes": { 116 ... "1": "A", 117 ... "2": "B", 118 ... }, 119 ... "edges": { 120 ... "e1": {"source": "1", "target": "2", "value": "X"}, 121 ... }, 122 ... "value": None 123 ... } 124 >>> data_y = { 125 ... "nodes": { 126 ... "1": "A", 127 ... "2": "C", 128 ... }, 129 ... "edges": { 130 ... "e1": {"source": "1", "target": "2", "value": "Y"}, 131 ... }, 132 ... "value": None 133 ... } 134 >>> graph_x = from_dict(data_x) 135 >>> graph_y = from_dict(data_y) 136 137 >>> g_dtw = dtw(node_sim_func, edge_sim_func) 138 >>> result = g_dtw(graph_x, graph_y) 139 >>> print(result) 140 GraphSim(value=0.05555555555555555, 141 node_mapping=frozendict.frozendict({'1': '2', '2': '2'}), edge_mapping=frozendict.frozendict({'e1': 'e1'}), 142 node_similarities=frozendict.frozendict({'1': 0.0, '2': 0.0}), edge_similarities=frozendict.frozendict({'e1': 0.0})) 143 """ 144 145 node_sim_func: AnySimFunc[N, Float] 146 edge_sim_func: AnySimFunc[Edge[K, N, E], Float] | None = None 147 normalize: bool = True 148 149 def __call__(self, x: Graph[K, N, E, G], y: Graph[K, N, E, G]) -> GraphSim[K]: 150 sequence_x, edges_x = to_sequence(x) 151 sequence_y, edges_y = to_sequence(y) 152 node_sim_func: BatchSimFunc[Node[K, N], Float] = batchify_sim( 153 transpose_value(self.node_sim_func) 154 ) 155 edge_sim_func = ( 156 batchify_sim(self.edge_sim_func) if self.edge_sim_func else None 157 ) 158 159 # 3) Use DTW for nodes, with alignment 160 node_result = collections_dtw(node_sim_func)( 161 sequence_x, sequence_y, return_alignment=True 162 ) 163 alignment = node_result.mapping # matched (x_node, y_node) 164 165 # 4) Build node/edge mapping & average similarity using the helper 166 similarity_data = _build_node_edge_mappings_and_similarity( 167 alignment, node_sim_func, edges_x, edges_y, edge_sim_func 168 ) 169 170 # 5) Combine node & edge similarity 171 total_similarity = ( 172 (similarity_data.node_similarity + similarity_data.edge_similarity) 173 / 2.0 174 if similarity_data.edge_similarity is not None 175 else similarity_data.node_similarity 176 ) 177 178 # 6) Normalize by alignment length if applicable 179 if self.normalize and alignment: 180 alen = len([pair for pair in alignment if pair[0] and pair[1]]) 181 if alen > 0: 182 total_similarity /= alen 183 184 return GraphSim( 185 value=total_similarity, 186 node_mapping=frozendict(similarity_data.node_mapping), 187 edge_mapping=frozendict(similarity_data.edge_mapping), 188 node_similarities=frozendict(similarity_data.node_similarities), 189 edge_similarities=frozendict(similarity_data.edge_similarities), 190 )
Graph-based Dynamic Time Warping similarity function leveraging sequence alignment.
Arguments:
- node_sim_func: A similarity function for nodes (SimFunc or BatchSimFunc).
- edge_sim_func: An optional similarity function for edges (SimFunc or BatchSimFunc).
- normalize: Whether to normalize the similarity score by the alignment length (default: True).
Examples:
>>> from cbrkit.model.graph import Node, Edge, Graph, SerializedGraph, from_dict >>> node_sim_func = lambda n1, n2: 1.0 if n1 == n2 else 0.0 >>> edge_sim_func = lambda e1, e2: 1.0 if e1.value == e2.value else 0.0>>> data_x = { ... "nodes": { ... "1": "A", ... "2": "B", ... }, ... "edges": { ... "e1": {"source": "1", "target": "2", "value": "X"}, ... }, ... "value": None ... } >>> data_y = { ... "nodes": { ... "1": "A", ... "2": "C", ... }, ... "edges": { ... "e1": {"source": "1", "target": "2", "value": "Y"}, ... }, ... "value": None ... } >>> graph_x = from_dict(data_x) >>> graph_y = from_dict(data_y)>>> g_dtw = dtw(node_sim_func, edge_sim_func) >>> result = g_dtw(graph_x, graph_y) >>> print(result) GraphSim(value=0.05555555555555555, node_mapping=frozendict.frozendict({'1': '2', '2': '2'}), edge_mapping=frozendict.frozendict({'e1': 'e1'}), node_similarities=frozendict.frozendict({'1': 0.0, '2': 0.0}), edge_similarities=frozendict.frozendict({'e1': 0.0}))
196 @dataclass(slots=True) 197 class smith_waterman[K, N, E, G](SimFunc[Graph[K, N, E, G], GraphSim[K]]): 198 """ 199 Graph-based Smith-Waterman similarity function leveraging local sequence alignment 200 plus ProCAKE-style weighting for local similarities. 201 202 Args: 203 node_sim_func: Base node similarity function (SimFunc or BatchSimFunc). 204 dataflow_in_sim_func: Similarity for incoming data flow (optional). 205 dataflow_out_sim_func: Similarity for outgoing data flow (optional). 206 l_t: Weight for task/node similarity. Default = 1.0 207 l_i: Weight for incoming dataflow similarity. Default = 0.0 (ignored if 0). 208 l_o: Weight for outgoing dataflow similarity. Default = 0.0 (ignored if 0). 209 edge_sim_func: If provided, an edge similarity function (SimFunc or BatchSimFunc). 210 match_score: The SW match score (default 2). 211 mismatch_penalty: The SW mismatch penalty (default -1). 212 gap_penalty: The SW gap penalty (default -1). 213 normalize: Whether to normalize the final similarity (default True). 214 215 Examples: 216 >>> from ...model.graph import Node, Edge, Graph, SerializedGraph, from_dict 217 >>> node_sim_func = lambda n1, n2: 1.0 if n1 == n2 else 0.0 218 >>> edge_sim_func = lambda e1, e2: 1.0 if e1.value == e2.value else 0.0 219 >>> def dataflow_dummy(a, b): return 0.5 # pretend data flow sim 220 221 >>> data_x = { 222 ... "nodes": { 223 ... "1": "A", 224 ... "2": "B", 225 ... }, 226 ... "edges": { 227 ... "e1": {"source": "1", "target": "2", "value": "X"}, 228 ... }, 229 ... "value": None 230 ... } 231 232 >>> data_y = { 233 ... "nodes": { 234 ... "1": "A", 235 ... "2": "C", 236 ... }, 237 ... "edges": { 238 ... "e1": {"source": "1", "target": "2", "value": "X"}, 239 ... }, 240 ... "value": None 241 ... } 242 >>> graph_x = from_dict(data_x) 243 >>> graph_y = from_dict(data_y) 244 245 >>> g_swa = smith_waterman( 246 ... node_sim_func, 247 ... edge_sim_func, 248 ... dataflow_in_sim_func=dataflow_dummy, 249 ... dataflow_out_sim_func=dataflow_dummy, 250 ... l_t=1.0, l_i=0.5, l_o=0.5, 251 ... use_procake_formula=True 252 ... ) 253 >>> result = g_swa(graph_x, graph_y) 254 >>> print(result) 255 GraphSim(value=0.015, 256 node_mapping=frozendict.frozendict({'1': '1', '2': '2'}), edge_mapping=frozendict.frozendict({'e1': 'e1'}), 257 node_similarities=frozendict.frozendict({'1': 0.75, '2': 0.25}), edge_similarities=frozendict.frozendict({'e1': 1.0})) 258 259 >>> # Another example without the ProCAKE weighting: 260 >>> g_swa_naive = smith_waterman( 261 ... node_sim_func, 262 ... match_score=2, 263 ... mismatch_penalty=-1, 264 ... gap_penalty=-1, 265 ... normalize=True, 266 ... use_procake_formula=False 267 ... ) 268 >>> result_naive = g_swa_naive(graph_x, graph_y) 269 >>> print(result_naive) 270 GraphSim(value=0.01, 271 node_mapping=frozendict.frozendict({'1': '1', '2': '2'}), edge_mapping=frozendict.frozendict({}), 272 node_similarities=frozendict.frozendict({'1': 1.0, '2': 0.0}), edge_similarities=frozendict.frozendict({})) 273 """ 274 275 node_sim_func: InitVar[AnySimFunc[N, Float]] 276 edge_sim_func: InitVar[AnySimFunc[Edge[K, N, E], Float] | None] = None 277 batched_node_sim_func: BatchSimFunc[Node[K, N], Float] = field(init=False) 278 unbatched_node_sim_func: SimFunc[Node[K, N], Float] = field(init=False) 279 batched_edge_sim_func: BatchSimFunc[Edge[K, N, E], Float] | None = field( 280 init=False 281 ) 282 dataflow_in_sim_func: SimFunc[Any, float] | None = None 283 dataflow_out_sim_func: SimFunc[Any, float] | None = None 284 l_t: float = 1.0 285 l_i: float = 0.0 286 l_o: float = 0.0 287 288 match_score: int = 2 289 mismatch_penalty: int = -1 290 gap_penalty: int = -1 291 normalize: bool = True 292 293 use_procake_formula: bool = False 294 295 def __post_init__( 296 self, 297 node_sim_func: AnySimFunc[N, Float], 298 edge_sim_func: AnySimFunc[Edge[K, N, E], Float] | None, 299 ) -> None: 300 self.batched_node_sim_func = batchify_sim(transpose_value(node_sim_func)) 301 self.unbatched_node_sim_func = unbatchify_sim( 302 transpose_value(node_sim_func) 303 ) 304 self.batched_edge_sim_func = ( 305 batchify_sim(edge_sim_func) if edge_sim_func else None 306 ) 307 308 def local_node_sim(self, q_node: Node[K, N], c_node: Node[K, N]) -> float: 309 """Computes weighted local node similarity using optional dataflow scores.""" 310 base = unpack_float(self.unbatched_node_sim_func(q_node, c_node)) 311 312 if self.use_procake_formula: 313 in_flow = 0.0 314 if self.l_i != 0.0 and self.dataflow_in_sim_func: 315 # In real usage, pass appropriate node/edge data 316 in_flow = self.dataflow_in_sim_func(None, None) 317 318 out_flow = 0.0 319 if self.l_o != 0.0 and self.dataflow_out_sim_func: 320 out_flow = self.dataflow_out_sim_func(None, None) 321 322 total_weight = self.l_t + self.l_i + self.l_o 323 324 if total_weight == 0: 325 return 0.0 326 327 return ( 328 self.l_t * base + self.l_i * in_flow + self.l_o * out_flow 329 ) / total_weight 330 331 return base 332 333 def __call__(self, x: Graph[K, N, E, G], y: Graph[K, N, E, G]) -> GraphSim[K]: 334 # 1) Convert graphs to sequences 335 sequence_x, edges_x = to_sequence(x) 336 sequence_y, edges_y = to_sequence(y) 337 338 # 3) We'll rely on the minineedle-based SW for a raw score. 339 sw_obj = collections_smith_waterman( 340 match_score=self.match_score, 341 mismatch_penalty=self.mismatch_penalty, 342 gap_penalty=self.gap_penalty, 343 ) 344 345 # Use the length of each sequence as tokens 346 tokens_x = list(range(len(sequence_x))) 347 tokens_y = list(range(len(sequence_y))) 348 raw_sw_score = sw_obj(tokens_x, tokens_y) 349 350 # 4) We do not have a direct alignment from the raw minineedle call, 351 # so let's do a naive 1:1 for the shorter length. 352 length = min(len(sequence_x), len(sequence_y)) 353 alignment = [(sequence_x[i], sequence_y[i]) for i in range(length)] 354 355 # 5) Build node/edge mappings & average similarities using local_node_sim 356 similarity_data = _build_node_edge_mappings_and_similarity( 357 alignment, 358 batchify_sim(self.local_node_sim), 359 edges_x, 360 edges_y, 361 self.batched_edge_sim_func, 362 ) 363 364 # 6) Combine node & edge sim, scale by raw SW score (if > 0) 365 if similarity_data.edge_similarity is not None: 366 total_similarity = ( 367 similarity_data.node_similarity + similarity_data.edge_similarity 368 ) / 2.0 369 else: 370 total_similarity = similarity_data.node_similarity 371 372 if raw_sw_score > 0: 373 total_similarity *= raw_sw_score / 100.0 374 375 # 7) Normalize if requested 376 if self.normalize and length > 0: 377 total_similarity /= length 378 379 return GraphSim( 380 value=total_similarity, 381 node_mapping=frozendict(similarity_data.node_mapping), 382 edge_mapping=frozendict(similarity_data.edge_mapping), 383 node_similarities=frozendict(similarity_data.node_similarities), 384 edge_similarities=frozendict(similarity_data.edge_similarities), 385 )
Graph-based Smith-Waterman similarity function leveraging local sequence alignment plus ProCAKE-style weighting for local similarities.
Arguments:
- node_sim_func: Base node similarity function (SimFunc or BatchSimFunc).
- dataflow_in_sim_func: Similarity for incoming data flow (optional).
- dataflow_out_sim_func: Similarity for outgoing data flow (optional).
- l_t: Weight for task/node similarity. Default = 1.0
- l_i: Weight for incoming dataflow similarity. Default = 0.0 (ignored if 0).
- l_o: Weight for outgoing dataflow similarity. Default = 0.0 (ignored if 0).
- edge_sim_func: If provided, an edge similarity function (SimFunc or BatchSimFunc).
- match_score: The SW match score (default 2).
- mismatch_penalty: The SW mismatch penalty (default -1).
- gap_penalty: The SW gap penalty (default -1).
- normalize: Whether to normalize the final similarity (default True).
Examples:
>>> from cbrkit.model.graph import Node, Edge, Graph, SerializedGraph, from_dict >>> node_sim_func = lambda n1, n2: 1.0 if n1 == n2 else 0.0 >>> edge_sim_func = lambda e1, e2: 1.0 if e1.value == e2.value else 0.0 >>> def dataflow_dummy(a, b): return 0.5 # pretend data flow sim>>> data_x = { ... "nodes": { ... "1": "A", ... "2": "B", ... }, ... "edges": { ... "e1": {"source": "1", "target": "2", "value": "X"}, ... }, ... "value": None ... }>>> data_y = { ... "nodes": { ... "1": "A", ... "2": "C", ... }, ... "edges": { ... "e1": {"source": "1", "target": "2", "value": "X"}, ... }, ... "value": None ... } >>> graph_x = from_dict(data_x) >>> graph_y = from_dict(data_y)>>> g_swa = smith_waterman( ... node_sim_func, ... edge_sim_func, ... dataflow_in_sim_func=dataflow_dummy, ... dataflow_out_sim_func=dataflow_dummy, ... l_t=1.0, l_i=0.5, l_o=0.5, ... use_procake_formula=True ... ) >>> result = g_swa(graph_x, graph_y) >>> print(result) GraphSim(value=0.015, node_mapping=frozendict.frozendict({'1': '1', '2': '2'}), edge_mapping=frozendict.frozendict({'e1': 'e1'}), node_similarities=frozendict.frozendict({'1': 0.75, '2': 0.25}), edge_similarities=frozendict.frozendict({'e1': 1.0}))>>> # Another example without the ProCAKE weighting: >>> g_swa_naive = smith_waterman( ... node_sim_func, ... match_score=2, ... mismatch_penalty=-1, ... gap_penalty=-1, ... normalize=True, ... use_procake_formula=False ... ) >>> result_naive = g_swa_naive(graph_x, graph_y) >>> print(result_naive) GraphSim(value=0.01, node_mapping=frozendict.frozendict({'1': '1', '2': '2'}), edge_mapping=frozendict.frozendict({}), node_similarities=frozendict.frozendict({'1': 1.0, '2': 0.0}), edge_similarities=frozendict.frozendict({}))
308 def local_node_sim(self, q_node: Node[K, N], c_node: Node[K, N]) -> float: 309 """Computes weighted local node similarity using optional dataflow scores.""" 310 base = unpack_float(self.unbatched_node_sim_func(q_node, c_node)) 311 312 if self.use_procake_formula: 313 in_flow = 0.0 314 if self.l_i != 0.0 and self.dataflow_in_sim_func: 315 # In real usage, pass appropriate node/edge data 316 in_flow = self.dataflow_in_sim_func(None, None) 317 318 out_flow = 0.0 319 if self.l_o != 0.0 and self.dataflow_out_sim_func: 320 out_flow = self.dataflow_out_sim_func(None, None) 321 322 total_weight = self.l_t + self.l_i + self.l_o 323 324 if total_weight == 0: 325 return 0.0 326 327 return ( 328 self.l_t * base + self.l_i * in_flow + self.l_o * out_flow 329 ) / total_weight 330 331 return base
Computes weighted local node similarity using optional dataflow scores.
356@dataclass(slots=True, frozen=True) 357class init_empty[K, N, E, G](SearchStateInit[K, N, E, G]): 358 """Initializes search with an empty mapping.""" 359 360 def __call__( 361 self, 362 x: Graph[K, N, E, G], 363 y: Graph[K, N, E, G], 364 node_matcher: ElementMatcher[N], 365 edge_matcher: ElementMatcher[E], 366 ) -> SearchState[K]: 367 return SearchState( 368 frozendict(), 369 frozendict(), 370 frozenset(y.nodes.keys()), 371 frozenset(y.edges.keys()), 372 frozenset(x.nodes.keys()), 373 frozenset(x.edges.keys()), 374 )
Initializes search with an empty mapping.
377@dataclass(slots=True, init=False) 378class init_unique_matches[K, N, E, G](SearchStateInit[K, N, E, G]): 379 """Initializes search by pre-mapping uniquely matchable nodes.""" 380 381 def __call__( 382 self, 383 x: Graph[K, N, E, G], 384 y: Graph[K, N, E, G], 385 node_matcher: ElementMatcher[N], 386 edge_matcher: ElementMatcher[E], 387 ) -> SearchState[K]: 388 # pre-populate the mapping with nodes/edges that only have one possible legal mapping 389 y2x_map: defaultdict[K, set[K]] = defaultdict(set) 390 x2y_map: defaultdict[K, set[K]] = defaultdict(set) 391 392 for y_key, x_key in itertools.product(y.nodes.keys(), x.nodes.keys()): 393 if node_matcher(x.nodes[x_key].value, y.nodes[y_key].value): 394 y2x_map[y_key].add(x_key) 395 x2y_map[x_key].add(y_key) 396 397 node_mapping: frozendict[K, K] = frozendict( 398 (y_key, next(iter(x_keys))) 399 for y_key, x_keys in y2x_map.items() 400 if len(x_keys) == 1 and len(x2y_map[next(iter(x_keys))]) == 1 401 ) 402 403 edge_mapping: frozendict[K, K] = _induced_edge_mapping( 404 x, y, node_mapping, edge_matcher 405 ) 406 407 return SearchState( 408 node_mapping, 409 edge_mapping, 410 frozenset(y.nodes.keys() - node_mapping.keys()), 411 frozenset(y.edges.keys() - edge_mapping.keys()), 412 frozenset(x.nodes.keys() - node_mapping.values()), 413 frozenset(x.edges.keys() - edge_mapping.values()), 414 )
Initializes search by pre-mapping uniquely matchable nodes.
23@dataclass(slots=True, frozen=True) 24class GraphSim[K](StructuredValue[float]): 25 """Result of a graph similarity computation with node and edge mappings.""" 26 27 node_mapping: frozendict[K, K] 28 edge_mapping: frozendict[K, K] 29 node_similarities: frozendict[K, float] 30 edge_similarities: frozendict[K, float]
Result of a graph similarity computation with node and edge mappings.
Inherited Members
33class ElementMatcher[T](Protocol): 34 """Determines whether two graph elements can be legally mapped.""" 35 36 def __call__(self, x: T, y: T, /) -> bool: ...
Determines whether two graph elements can be legally mapped.
1957def _no_init_or_replace_init(self, *args, **kwargs): 1958 cls = type(self) 1959 1960 if cls._is_protocol: 1961 raise TypeError('Protocols cannot be instantiated') 1962 1963 # Already using a custom `__init__`. No need to calculate correct 1964 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1965 if cls.__init__ is not _no_init_or_replace_init: 1966 return 1967 1968 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1969 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1970 # searches for a proper new `__init__` in the MRO. The new `__init__` 1971 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1972 # instantiation of the protocol subclass will thus use the new 1973 # `__init__` and no longer call `_no_init_or_replace_init`. 1974 for base in cls.__mro__: 1975 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1976 if init is not _no_init_or_replace_init: 1977 cls.__init__ = init 1978 break 1979 else: 1980 # should not happen 1981 cls.__init__ = object.__init__ 1982 1983 cls.__init__(self, *args, **kwargs)
44@dataclass(slots=True, frozen=True) 45class SemanticEdgeSim[K, N, E]: 46 """Computes edge similarity using weighted source and target node similarities.""" 47 48 source_weight: float = 1.0 49 target_weight: float = 1.0 50 edge_sim_func: AnySimFunc[E, Float] | None = None 51 52 def __call__( 53 self, 54 batches: Sequence[tuple[E, E, float, float]], 55 ) -> list[float]: 56 source_sims = (source_sim for _, _, source_sim, _ in batches) 57 target_sims = (target_sim for _, _, _, target_sim in batches) 58 59 if self.edge_sim_func is not None: 60 edge_sim_func = batchify_sim(self.edge_sim_func) 61 edge_sims = unpack_floats( 62 edge_sim_func( 63 [(x, y) for x, y, _, _ in batches], 64 ) 65 ) 66 else: 67 edge_sims = [1.0] * len(batches) 68 69 scaling_factor = self.source_weight + self.target_weight 70 71 if scaling_factor == 0: 72 return edge_sims 73 74 return [ 75 (edge * source * self.source_weight / scaling_factor) 76 + (edge * target * self.target_weight / scaling_factor) 77 for source, target, edge in zip( 78 source_sims, target_sims, edge_sims, strict=True 79 ) 80 ]
Computes edge similarity using weighted source and target node similarities.
124@dataclass(slots=True) 125class BaseGraphSimFunc[K, N, E, G](BaseGraphEditFunc[K, N, E, G]): 126 """Base class for graph similarity functions with node and edge matching.""" 127 128 node_sim_func: AnySimFunc[N, Float] 129 edge_sim_func: SemanticEdgeSim[K, N, E] = default_edge_sim 130 node_matcher: ElementMatcher[N] = default_element_matcher 131 edge_matcher: ElementMatcher[E] = default_element_matcher 132 batch_node_sim_func: BatchSimFunc[Node[K, N], Float] = field(init=False) 133 134 def __post_init__(self) -> None: 135 self.batch_node_sim_func = batchify_sim(transpose_value(self.node_sim_func)) 136 137 def induced_edge_mapping( 138 self, 139 x: Graph[K, N, E, G], 140 y: Graph[K, N, E, G], 141 node_mapping: Mapping[K, K], 142 ) -> frozendict[K, K]: 143 """Derives an edge mapping induced by the given node mapping.""" 144 return _induced_edge_mapping(x, y, node_mapping, self.edge_matcher) 145 146 def node_pairs( 147 self, 148 x: Graph[K, N, E, G], 149 y: Graph[K, N, E, G], 150 ) -> list[tuple[K, K]]: 151 """Returns all legal node pairs between the two graphs.""" 152 return [ 153 (y_key, x_key) 154 for x_key, y_key in itertools.product(x.nodes.keys(), y.nodes.keys()) 155 if self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value) 156 ] 157 158 def edge_pairs( 159 self, 160 x: Graph[K, N, E, G], 161 y: Graph[K, N, E, G], 162 node_pairs: Collection[tuple[K, K]], 163 ) -> list[tuple[K, K]]: 164 """Returns all legal edge pairs consistent with the given node pairs.""" 165 return [ 166 (y_key, x_key) 167 for x_key, y_key in itertools.product(x.edges.keys(), y.edges.keys()) 168 if self.edge_matcher(x.edges[x_key].value, y.edges[y_key].value) 169 and (y.edges[y_key].source.key, x.edges[x_key].source.key) in node_pairs 170 and (y.edges[y_key].target.key, x.edges[x_key].target.key) in node_pairs 171 ] 172 173 def node_pair_similarities( 174 self, 175 x: Graph[K, N, E, G], 176 y: Graph[K, N, E, G], 177 pairs: Sequence[tuple[K, K]] | None = None, 178 ) -> PairSim[K]: 179 """Computes pairwise similarities for all legal node pairs.""" 180 if pairs is None: 181 pairs = self.node_pairs(x, y) 182 183 node_pair_values = [(x.nodes[x_key], y.nodes[y_key]) for y_key, x_key in pairs] 184 node_pair_sims = self.batch_node_sim_func(node_pair_values) 185 186 return { 187 (y_node.key, x_node.key): unpack_float(sim) 188 for (x_node, y_node), sim in zip( 189 node_pair_values, node_pair_sims, strict=True 190 ) 191 } 192 193 def edge_pair_similarities( 194 self, 195 x: Graph[K, N, E, G], 196 y: Graph[K, N, E, G], 197 node_pair_sims: PairSim[K], 198 pairs: Sequence[tuple[K, K]] | None = None, 199 ) -> PairSim[K]: 200 """Computes pairwise similarities for all legal edge pairs.""" 201 if pairs is None: 202 pairs = self.edge_pairs(x, y, node_pair_sims.keys()) 203 204 edge_pair_values = [(x.edges[x_key], y.edges[y_key]) for y_key, x_key in pairs] 205 edge_pair_sims = self.edge_sim_func( 206 [ 207 ( 208 x_edge.value, 209 y_edge.value, 210 node_pair_sims[(y_edge.source.key, x_edge.source.key)], 211 node_pair_sims[(y_edge.target.key, x_edge.target.key)], 212 ) 213 for x_edge, y_edge in edge_pair_values 214 ] 215 ) 216 217 return { 218 (y_edge.key, x_edge.key): unpack_float(sim) 219 for (x_edge, y_edge), sim in zip( 220 edge_pair_values, edge_pair_sims, strict=True 221 ) 222 } 223 224 def pair_similarities( 225 self, 226 x: Graph[K, N, E, G], 227 y: Graph[K, N, E, G], 228 node_pairs: Sequence[tuple[K, K]] | None = None, 229 edge_pairs: Sequence[tuple[K, K]] | None = None, 230 ) -> tuple[PairSim[K], PairSim[K]]: 231 """Computes both node and edge pairwise similarities.""" 232 node_pair_sims = self.node_pair_similarities(x, y, node_pairs) 233 edge_pair_sims = self.edge_pair_similarities(x, y, node_pair_sims, edge_pairs) 234 return node_pair_sims, edge_pair_sims 235 236 def similarity( 237 self, 238 x: Graph[K, N, E, G], 239 y: Graph[K, N, E, G], 240 node_mapping: frozendict[K, K], 241 edge_mapping: frozendict[K, K], 242 node_pair_sims: Mapping[tuple[K, K], float], 243 edge_pair_sims: Mapping[tuple[K, K], float], 244 ) -> GraphSim[K]: 245 """Function to compute the similarity of all previous steps""" 246 247 node_sims = [ 248 node_pair_sims[(y_key, x_key)] for y_key, x_key in node_mapping.items() 249 ] 250 251 edge_sims = [ 252 edge_pair_sims[(y_key, x_key)] for y_key, x_key in edge_mapping.items() 253 ] 254 255 all_sims = itertools.chain(node_sims, edge_sims) 256 upper_bound = self.cost_upper_bound(x, y) 257 total_sim = sum(all_sims) / upper_bound if upper_bound > 0 else 0.0 258 259 return GraphSim( 260 total_sim, 261 node_mapping, 262 edge_mapping, 263 frozendict(zip(node_mapping.keys(), node_sims, strict=True)), 264 frozendict(zip(edge_mapping.keys(), edge_sims, strict=True)), 265 ) 266 267 def invert_similarity( 268 self, x: Graph[K, N, E, G], y: Graph[K, N, E, G], sim: GraphSim[K] 269 ) -> GraphSim[K]: 270 """Recomputes similarity with the node and edge mappings inverted.""" 271 node_mapping = frozendict((v, k) for k, v in sim.node_mapping.items()) 272 edge_mapping = frozendict((v, k) for k, v in sim.edge_mapping.items()) 273 274 node_similarities, edge_similarities = self.pair_similarities( 275 x, y, list(node_mapping.items()), list(edge_mapping.items()) 276 ) 277 278 return self.similarity( 279 x, 280 y, 281 node_mapping, 282 edge_mapping, 283 frozendict(node_similarities), 284 frozendict(edge_similarities), 285 )
Base class for graph similarity functions with node and edge matching.
137 def induced_edge_mapping( 138 self, 139 x: Graph[K, N, E, G], 140 y: Graph[K, N, E, G], 141 node_mapping: Mapping[K, K], 142 ) -> frozendict[K, K]: 143 """Derives an edge mapping induced by the given node mapping.""" 144 return _induced_edge_mapping(x, y, node_mapping, self.edge_matcher)
Derives an edge mapping induced by the given node mapping.
146 def node_pairs( 147 self, 148 x: Graph[K, N, E, G], 149 y: Graph[K, N, E, G], 150 ) -> list[tuple[K, K]]: 151 """Returns all legal node pairs between the two graphs.""" 152 return [ 153 (y_key, x_key) 154 for x_key, y_key in itertools.product(x.nodes.keys(), y.nodes.keys()) 155 if self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value) 156 ]
Returns all legal node pairs between the two graphs.
158 def edge_pairs( 159 self, 160 x: Graph[K, N, E, G], 161 y: Graph[K, N, E, G], 162 node_pairs: Collection[tuple[K, K]], 163 ) -> list[tuple[K, K]]: 164 """Returns all legal edge pairs consistent with the given node pairs.""" 165 return [ 166 (y_key, x_key) 167 for x_key, y_key in itertools.product(x.edges.keys(), y.edges.keys()) 168 if self.edge_matcher(x.edges[x_key].value, y.edges[y_key].value) 169 and (y.edges[y_key].source.key, x.edges[x_key].source.key) in node_pairs 170 and (y.edges[y_key].target.key, x.edges[x_key].target.key) in node_pairs 171 ]
Returns all legal edge pairs consistent with the given node pairs.
173 def node_pair_similarities( 174 self, 175 x: Graph[K, N, E, G], 176 y: Graph[K, N, E, G], 177 pairs: Sequence[tuple[K, K]] | None = None, 178 ) -> PairSim[K]: 179 """Computes pairwise similarities for all legal node pairs.""" 180 if pairs is None: 181 pairs = self.node_pairs(x, y) 182 183 node_pair_values = [(x.nodes[x_key], y.nodes[y_key]) for y_key, x_key in pairs] 184 node_pair_sims = self.batch_node_sim_func(node_pair_values) 185 186 return { 187 (y_node.key, x_node.key): unpack_float(sim) 188 for (x_node, y_node), sim in zip( 189 node_pair_values, node_pair_sims, strict=True 190 ) 191 }
Computes pairwise similarities for all legal node pairs.
193 def edge_pair_similarities( 194 self, 195 x: Graph[K, N, E, G], 196 y: Graph[K, N, E, G], 197 node_pair_sims: PairSim[K], 198 pairs: Sequence[tuple[K, K]] | None = None, 199 ) -> PairSim[K]: 200 """Computes pairwise similarities for all legal edge pairs.""" 201 if pairs is None: 202 pairs = self.edge_pairs(x, y, node_pair_sims.keys()) 203 204 edge_pair_values = [(x.edges[x_key], y.edges[y_key]) for y_key, x_key in pairs] 205 edge_pair_sims = self.edge_sim_func( 206 [ 207 ( 208 x_edge.value, 209 y_edge.value, 210 node_pair_sims[(y_edge.source.key, x_edge.source.key)], 211 node_pair_sims[(y_edge.target.key, x_edge.target.key)], 212 ) 213 for x_edge, y_edge in edge_pair_values 214 ] 215 ) 216 217 return { 218 (y_edge.key, x_edge.key): unpack_float(sim) 219 for (x_edge, y_edge), sim in zip( 220 edge_pair_values, edge_pair_sims, strict=True 221 ) 222 }
Computes pairwise similarities for all legal edge pairs.
224 def pair_similarities( 225 self, 226 x: Graph[K, N, E, G], 227 y: Graph[K, N, E, G], 228 node_pairs: Sequence[tuple[K, K]] | None = None, 229 edge_pairs: Sequence[tuple[K, K]] | None = None, 230 ) -> tuple[PairSim[K], PairSim[K]]: 231 """Computes both node and edge pairwise similarities.""" 232 node_pair_sims = self.node_pair_similarities(x, y, node_pairs) 233 edge_pair_sims = self.edge_pair_similarities(x, y, node_pair_sims, edge_pairs) 234 return node_pair_sims, edge_pair_sims
Computes both node and edge pairwise similarities.
236 def similarity( 237 self, 238 x: Graph[K, N, E, G], 239 y: Graph[K, N, E, G], 240 node_mapping: frozendict[K, K], 241 edge_mapping: frozendict[K, K], 242 node_pair_sims: Mapping[tuple[K, K], float], 243 edge_pair_sims: Mapping[tuple[K, K], float], 244 ) -> GraphSim[K]: 245 """Function to compute the similarity of all previous steps""" 246 247 node_sims = [ 248 node_pair_sims[(y_key, x_key)] for y_key, x_key in node_mapping.items() 249 ] 250 251 edge_sims = [ 252 edge_pair_sims[(y_key, x_key)] for y_key, x_key in edge_mapping.items() 253 ] 254 255 all_sims = itertools.chain(node_sims, edge_sims) 256 upper_bound = self.cost_upper_bound(x, y) 257 total_sim = sum(all_sims) / upper_bound if upper_bound > 0 else 0.0 258 259 return GraphSim( 260 total_sim, 261 node_mapping, 262 edge_mapping, 263 frozendict(zip(node_mapping.keys(), node_sims, strict=True)), 264 frozendict(zip(edge_mapping.keys(), edge_sims, strict=True)), 265 )
Function to compute the similarity of all previous steps
267 def invert_similarity( 268 self, x: Graph[K, N, E, G], y: Graph[K, N, E, G], sim: GraphSim[K] 269 ) -> GraphSim[K]: 270 """Recomputes similarity with the node and edge mappings inverted.""" 271 node_mapping = frozendict((v, k) for k, v in sim.node_mapping.items()) 272 edge_mapping = frozendict((v, k) for k, v in sim.edge_mapping.items()) 273 274 node_similarities, edge_similarities = self.pair_similarities( 275 x, y, list(node_mapping.items()), list(edge_mapping.items()) 276 ) 277 278 return self.similarity( 279 x, 280 y, 281 node_mapping, 282 edge_mapping, 283 frozendict(node_similarities), 284 frozendict(edge_similarities), 285 )
Recomputes similarity with the node and edge mappings inverted.
417@dataclass(slots=True) 418class SearchGraphSimFunc[K, N, E, G](BaseGraphSimFunc[K, N, E, G]): 419 """Graph similarity function using search-based node and edge mapping.""" 420 421 init_func: ( 422 SearchStateInit[K, N, E, G] | AnySimFunc[Graph[K, N, E, G], GraphSim[K]] 423 ) = field(default_factory=init_unique_matches) 424 425 def init_search_state( 426 self, x: Graph[K, N, E, G], y: Graph[K, N, E, G] 427 ) -> SearchState[K]: 428 """Initializes the search state using the configured init function.""" 429 init_func_params = total_params(self.init_func) 430 sim: GraphSim[K] 431 432 if init_func_params == 4: 433 init_func = cast(SearchStateInit[K, N, E, G], self.init_func) 434 435 return init_func(x, y, self.node_matcher, self.edge_matcher) 436 437 elif init_func_params == 2: 438 init_func = cast(SimFunc[Graph[K, N, E, G], GraphSim[K]], self.init_func) 439 sim = init_func(x, y) 440 441 elif init_func_params == 1: 442 init_func = cast( 443 BatchSimFunc[Graph[K, N, E, G], GraphSim[K]], self.init_func 444 ) 445 sim = init_func([(x, y)])[0] 446 447 else: 448 raise ValueError( 449 f"Invalid number of parameters for init_func: {init_func_params}" 450 ) 451 452 return SearchState( 453 node_mapping=sim.node_mapping, 454 edge_mapping=sim.edge_mapping, 455 open_y_nodes=frozenset(y.nodes.keys() - sim.node_mapping.keys()), 456 open_y_edges=frozenset(y.edges.keys() - sim.edge_mapping.keys()), 457 open_x_nodes=frozenset(x.nodes.keys() - sim.node_mapping.values()), 458 open_x_edges=frozenset(x.edges.keys() - sim.edge_mapping.values()), 459 ) 460 461 def finished(self, state: SearchState[K]) -> bool: 462 """Returns True when all query nodes and edges have been processed.""" 463 # the following condition could save a few iterations, but needs to be tested 464 # return (not state.open_y_nodes and not state.open_y_edges) or ( 465 # not state.open_x_nodes and not state.open_x_edges 466 # ) 467 return not state.open_y_nodes and not state.open_y_edges 468 469 def legal_node_mapping( 470 self, 471 x: Graph[K, N, E, G], 472 y: Graph[K, N, E, G], 473 state: SearchState[K], 474 x_key: K, 475 y_key: K, 476 ) -> bool: 477 """Checks whether mapping a query node to a case node is legal.""" 478 return ( 479 self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value) 480 and y_key in state.open_y_nodes 481 and x_key in state.open_x_nodes 482 ) 483 484 def legal_edge_mapping( 485 self, 486 x: Graph[K, N, E, G], 487 y: Graph[K, N, E, G], 488 state: SearchState[K], 489 x_key: K, 490 y_key: K, 491 ) -> bool: 492 """Checks whether mapping a query edge to a case edge is legal.""" 493 x_value = x.edges[x_key] 494 y_value = y.edges[y_key] 495 496 return ( 497 self.edge_matcher(x_value.value, y_value.value) 498 and y_key in state.open_y_edges 499 and x_key in state.open_x_edges 500 # source and target of the edge must be mapped to the same nodes 501 and x_value.source.key == state.node_mapping.get(y_value.source.key) 502 and x_value.target.key == state.node_mapping.get(y_value.target.key) 503 ) 504 505 def expand_node( 506 self, 507 x: Graph[K, N, E, G], 508 y: Graph[K, N, E, G], 509 state: SearchState[K], 510 y_key: K, 511 ) -> list[SearchState[K]]: 512 """Generates successor states by mapping a query node to all legal case nodes.""" 513 next_states: list[SearchState[K]] = [ 514 SearchState( 515 state.node_mapping.set(y_key, x_key), 516 state.edge_mapping, 517 state.open_y_nodes - {y_key}, 518 state.open_y_edges, 519 state.open_x_nodes - {x_key}, 520 state.open_x_edges, 521 ) 522 for x_key in sorted_iter(state.open_x_nodes) 523 if self.legal_node_mapping(x, y, state, x_key, y_key) 524 ] 525 526 if not next_states: 527 next_states.append( 528 SearchState( 529 state.node_mapping, 530 state.edge_mapping, 531 state.open_y_nodes - {y_key}, 532 state.open_y_edges, 533 state.open_x_nodes, 534 state.open_x_edges, 535 ) 536 ) 537 538 return next_states 539 540 def expand_edge( 541 self, 542 x: Graph[K, N, E, G], 543 y: Graph[K, N, E, G], 544 state: SearchState[K], 545 y_key: K, 546 ) -> list[SearchState[K]]: 547 """Generates successor states by mapping a query edge to all legal case edges.""" 548 next_states: list[SearchState[K]] = [ 549 SearchState( 550 state.node_mapping, 551 state.edge_mapping.set(y_key, x_key), 552 state.open_y_nodes, 553 state.open_y_edges - {y_key}, 554 state.open_x_nodes, 555 state.open_x_edges - {x_key}, 556 ) 557 for x_key in sorted_iter(state.open_x_edges) 558 if self.legal_edge_mapping(x, y, state, x_key, y_key) 559 ] 560 561 if not next_states: 562 next_states.append( 563 SearchState( 564 state.node_mapping, 565 state.edge_mapping, 566 state.open_y_nodes, 567 state.open_y_edges - {y_key}, 568 state.open_x_nodes, 569 state.open_x_edges, 570 ) 571 ) 572 573 return next_states 574 575 def expand_edge_with_nodes( 576 self, 577 x: Graph[K, N, E, G], 578 y: Graph[K, N, E, G], 579 state: SearchState[K], 580 y_key: K, 581 ) -> list[SearchState[K]]: 582 """Expand a given edge and map its source/target node if not already mapped""" 583 next_states: list[SearchState[K]] = [] 584 585 for x_key in sorted_iter(state.open_x_edges): 586 next_state = state 587 x_source_key = x.edges[x_key].source.key 588 x_target_key = x.edges[x_key].target.key 589 y_source_key = y.edges[y_key].source.key 590 y_target_key = y.edges[y_key].target.key 591 592 if ( 593 y_source_key in next_state.open_y_nodes 594 and x_source_key in next_state.open_x_nodes 595 and self.legal_node_mapping( 596 x, y, next_state, x_source_key, y_source_key 597 ) 598 ): 599 next_state = SearchState( 600 next_state.node_mapping.set(y_source_key, x_source_key), 601 next_state.edge_mapping, 602 next_state.open_y_nodes - {y_source_key}, 603 next_state.open_y_edges, 604 next_state.open_x_nodes - {x_source_key}, 605 next_state.open_x_edges, 606 ) 607 608 if ( 609 y_target_key in next_state.open_y_nodes 610 and x_target_key in next_state.open_x_nodes 611 and self.legal_node_mapping( 612 x, y, next_state, x_target_key, y_target_key 613 ) 614 ): 615 next_state = SearchState( 616 next_state.node_mapping.set(y_target_key, x_target_key), 617 next_state.edge_mapping, 618 next_state.open_y_nodes - {y_target_key}, 619 next_state.open_y_edges, 620 next_state.open_x_nodes - {x_target_key}, 621 next_state.open_x_edges, 622 ) 623 624 if self.legal_edge_mapping(x, y, next_state, x_key, y_key): 625 next_states.append( 626 SearchState( 627 next_state.node_mapping, 628 next_state.edge_mapping.set(y_key, x_key), 629 next_state.open_y_nodes, 630 next_state.open_y_edges - {y_key}, 631 next_state.open_x_nodes, 632 next_state.open_x_edges - {x_key}, 633 ) 634 ) 635 636 if not next_states: 637 next_states.append( 638 SearchState( 639 state.node_mapping, 640 state.edge_mapping, 641 state.open_y_nodes, 642 state.open_y_edges - {y_key}, 643 state.open_x_nodes, 644 state.open_x_edges, 645 ) 646 ) 647 648 return next_states
Graph similarity function using search-based node and edge mapping.
425 def init_search_state( 426 self, x: Graph[K, N, E, G], y: Graph[K, N, E, G] 427 ) -> SearchState[K]: 428 """Initializes the search state using the configured init function.""" 429 init_func_params = total_params(self.init_func) 430 sim: GraphSim[K] 431 432 if init_func_params == 4: 433 init_func = cast(SearchStateInit[K, N, E, G], self.init_func) 434 435 return init_func(x, y, self.node_matcher, self.edge_matcher) 436 437 elif init_func_params == 2: 438 init_func = cast(SimFunc[Graph[K, N, E, G], GraphSim[K]], self.init_func) 439 sim = init_func(x, y) 440 441 elif init_func_params == 1: 442 init_func = cast( 443 BatchSimFunc[Graph[K, N, E, G], GraphSim[K]], self.init_func 444 ) 445 sim = init_func([(x, y)])[0] 446 447 else: 448 raise ValueError( 449 f"Invalid number of parameters for init_func: {init_func_params}" 450 ) 451 452 return SearchState( 453 node_mapping=sim.node_mapping, 454 edge_mapping=sim.edge_mapping, 455 open_y_nodes=frozenset(y.nodes.keys() - sim.node_mapping.keys()), 456 open_y_edges=frozenset(y.edges.keys() - sim.edge_mapping.keys()), 457 open_x_nodes=frozenset(x.nodes.keys() - sim.node_mapping.values()), 458 open_x_edges=frozenset(x.edges.keys() - sim.edge_mapping.values()), 459 )
Initializes the search state using the configured init function.
461 def finished(self, state: SearchState[K]) -> bool: 462 """Returns True when all query nodes and edges have been processed.""" 463 # the following condition could save a few iterations, but needs to be tested 464 # return (not state.open_y_nodes and not state.open_y_edges) or ( 465 # not state.open_x_nodes and not state.open_x_edges 466 # ) 467 return not state.open_y_nodes and not state.open_y_edges
Returns True when all query nodes and edges have been processed.
469 def legal_node_mapping( 470 self, 471 x: Graph[K, N, E, G], 472 y: Graph[K, N, E, G], 473 state: SearchState[K], 474 x_key: K, 475 y_key: K, 476 ) -> bool: 477 """Checks whether mapping a query node to a case node is legal.""" 478 return ( 479 self.node_matcher(x.nodes[x_key].value, y.nodes[y_key].value) 480 and y_key in state.open_y_nodes 481 and x_key in state.open_x_nodes 482 )
Checks whether mapping a query node to a case node is legal.
484 def legal_edge_mapping( 485 self, 486 x: Graph[K, N, E, G], 487 y: Graph[K, N, E, G], 488 state: SearchState[K], 489 x_key: K, 490 y_key: K, 491 ) -> bool: 492 """Checks whether mapping a query edge to a case edge is legal.""" 493 x_value = x.edges[x_key] 494 y_value = y.edges[y_key] 495 496 return ( 497 self.edge_matcher(x_value.value, y_value.value) 498 and y_key in state.open_y_edges 499 and x_key in state.open_x_edges 500 # source and target of the edge must be mapped to the same nodes 501 and x_value.source.key == state.node_mapping.get(y_value.source.key) 502 and x_value.target.key == state.node_mapping.get(y_value.target.key) 503 )
Checks whether mapping a query edge to a case edge is legal.
505 def expand_node( 506 self, 507 x: Graph[K, N, E, G], 508 y: Graph[K, N, E, G], 509 state: SearchState[K], 510 y_key: K, 511 ) -> list[SearchState[K]]: 512 """Generates successor states by mapping a query node to all legal case nodes.""" 513 next_states: list[SearchState[K]] = [ 514 SearchState( 515 state.node_mapping.set(y_key, x_key), 516 state.edge_mapping, 517 state.open_y_nodes - {y_key}, 518 state.open_y_edges, 519 state.open_x_nodes - {x_key}, 520 state.open_x_edges, 521 ) 522 for x_key in sorted_iter(state.open_x_nodes) 523 if self.legal_node_mapping(x, y, state, x_key, y_key) 524 ] 525 526 if not next_states: 527 next_states.append( 528 SearchState( 529 state.node_mapping, 530 state.edge_mapping, 531 state.open_y_nodes - {y_key}, 532 state.open_y_edges, 533 state.open_x_nodes, 534 state.open_x_edges, 535 ) 536 ) 537 538 return next_states
Generates successor states by mapping a query node to all legal case nodes.
540 def expand_edge( 541 self, 542 x: Graph[K, N, E, G], 543 y: Graph[K, N, E, G], 544 state: SearchState[K], 545 y_key: K, 546 ) -> list[SearchState[K]]: 547 """Generates successor states by mapping a query edge to all legal case edges.""" 548 next_states: list[SearchState[K]] = [ 549 SearchState( 550 state.node_mapping, 551 state.edge_mapping.set(y_key, x_key), 552 state.open_y_nodes, 553 state.open_y_edges - {y_key}, 554 state.open_x_nodes, 555 state.open_x_edges - {x_key}, 556 ) 557 for x_key in sorted_iter(state.open_x_edges) 558 if self.legal_edge_mapping(x, y, state, x_key, y_key) 559 ] 560 561 if not next_states: 562 next_states.append( 563 SearchState( 564 state.node_mapping, 565 state.edge_mapping, 566 state.open_y_nodes, 567 state.open_y_edges - {y_key}, 568 state.open_x_nodes, 569 state.open_x_edges, 570 ) 571 ) 572 573 return next_states
Generates successor states by mapping a query edge to all legal case edges.
575 def expand_edge_with_nodes( 576 self, 577 x: Graph[K, N, E, G], 578 y: Graph[K, N, E, G], 579 state: SearchState[K], 580 y_key: K, 581 ) -> list[SearchState[K]]: 582 """Expand a given edge and map its source/target node if not already mapped""" 583 next_states: list[SearchState[K]] = [] 584 585 for x_key in sorted_iter(state.open_x_edges): 586 next_state = state 587 x_source_key = x.edges[x_key].source.key 588 x_target_key = x.edges[x_key].target.key 589 y_source_key = y.edges[y_key].source.key 590 y_target_key = y.edges[y_key].target.key 591 592 if ( 593 y_source_key in next_state.open_y_nodes 594 and x_source_key in next_state.open_x_nodes 595 and self.legal_node_mapping( 596 x, y, next_state, x_source_key, y_source_key 597 ) 598 ): 599 next_state = SearchState( 600 next_state.node_mapping.set(y_source_key, x_source_key), 601 next_state.edge_mapping, 602 next_state.open_y_nodes - {y_source_key}, 603 next_state.open_y_edges, 604 next_state.open_x_nodes - {x_source_key}, 605 next_state.open_x_edges, 606 ) 607 608 if ( 609 y_target_key in next_state.open_y_nodes 610 and x_target_key in next_state.open_x_nodes 611 and self.legal_node_mapping( 612 x, y, next_state, x_target_key, y_target_key 613 ) 614 ): 615 next_state = SearchState( 616 next_state.node_mapping.set(y_target_key, x_target_key), 617 next_state.edge_mapping, 618 next_state.open_y_nodes - {y_target_key}, 619 next_state.open_y_edges, 620 next_state.open_x_nodes - {x_target_key}, 621 next_state.open_x_edges, 622 ) 623 624 if self.legal_edge_mapping(x, y, next_state, x_key, y_key): 625 next_states.append( 626 SearchState( 627 next_state.node_mapping, 628 next_state.edge_mapping.set(y_key, x_key), 629 next_state.open_y_nodes, 630 next_state.open_y_edges - {y_key}, 631 next_state.open_x_nodes, 632 next_state.open_x_edges - {x_key}, 633 ) 634 ) 635 636 if not next_states: 637 next_states.append( 638 SearchState( 639 state.node_mapping, 640 state.edge_mapping, 641 state.open_y_nodes, 642 state.open_y_edges - {y_key}, 643 state.open_x_nodes, 644 state.open_x_edges, 645 ) 646 ) 647 648 return next_states
Expand a given edge and map its source/target node if not already mapped
288@dataclass(slots=True, frozen=True) 289class SearchState[K]: 290 """Current state of a graph search with node and edge mappings.""" 291 292 # mappings are from y/query to x/case 293 node_mapping: frozendict[K, K] 294 edge_mapping: frozendict[K, K] 295 # contains all elements from the query that are not yet mapped 296 # can be different from mapping.keys() if no candidate in x/case exists 297 open_y_nodes: frozenset[K] 298 open_y_edges: frozenset[K] 299 # contains all elements from the case that are not yet mapped 300 # must be identical to mapping.values() but is stored to optimize lookup 301 open_x_nodes: frozenset[K] 302 open_x_edges: frozenset[K]
Current state of a graph search with node and edge mappings.
343class SearchStateInit[K, N, E, G](Protocol): 344 """Initializes the search state for graph similarity computation.""" 345 346 def __call__( 347 self, 348 x: Graph[K, N, E, G], 349 y: Graph[K, N, E, G], 350 node_matcher: ElementMatcher[N], 351 edge_matcher: ElementMatcher[E], 352 /, 353 ) -> SearchState[K]: ...
Initializes the search state for graph similarity computation.
1957def _no_init_or_replace_init(self, *args, **kwargs): 1958 cls = type(self) 1959 1960 if cls._is_protocol: 1961 raise TypeError('Protocols cannot be instantiated') 1962 1963 # Already using a custom `__init__`. No need to calculate correct 1964 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1965 if cls.__init__ is not _no_init_or_replace_init: 1966 return 1967 1968 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1969 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1970 # searches for a proper new `__init__` in the MRO. The new `__init__` 1971 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1972 # instantiation of the protocol subclass will thus use the new 1973 # `__init__` and no longer call `_no_init_or_replace_init`. 1974 for base in cls.__mro__: 1975 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1976 if init is not _no_init_or_replace_init: 1977 cls.__init__ = init 1978 break 1979 else: 1980 # should not happen 1981 cls.__init__ = object.__init__ 1982 1983 cls.__init__(self, *args, **kwargs)