cbrkit.sim.collections
1import heapq 2from collections.abc import Collection, Sequence, Set 3from dataclasses import asdict, dataclass, field 4from itertools import product 5from typing import Any, cast, override 6 7import numpy as np 8 9from ..helpers import ( 10 dist2sim, 11 get_metadata, 12 optional_dependencies, 13 unbatchify_sim, 14 unpack_float, 15) 16from ..typing import ( 17 AnySimFunc, 18 Float, 19 HasMetadata, 20 JsonDict, 21 SimFunc, 22 StructuredValue, 23) 24 25Number = float | int 26 27__all__ = [ 28 "dtw", 29 "smith_waterman", 30 "jaccard", 31 "mapping", 32 "isolated_mapping", 33 "sequence_mapping", 34 "sequence_correctness", 35 "SequenceSim", 36 "Weight", 37] 38 39with optional_dependencies(): 40 from nltk.metrics import jaccard_distance 41 42 @dataclass(slots=True, frozen=True) 43 class jaccard[V](SimFunc[Collection[V], float]): 44 """Jaccard similarity function. 45 46 Examples: 47 >>> sim = jaccard() 48 >>> sim(["a", "b", "c", "d"], ["a", "b", "c"]) 49 0.8 50 """ 51 52 @override 53 def __call__(self, x: Collection[V], y: Collection[V]) -> float: 54 if not isinstance(x, Set): 55 x = set(x) 56 if not isinstance(y, Set): 57 y = set(y) 58 59 return dist2sim(jaccard_distance(x, y)) 60 61 62with optional_dependencies(): 63 from minineedle import core, smith 64 65 @dataclass(slots=True, frozen=True) 66 class smith_waterman[V](SimFunc[Sequence[V], float]): 67 """ 68 Performs the Smith-Waterman alignment with configurable scoring parameters. If no element matches it returns 0.0. 69 70 Args: 71 match_score: Score for matching characters. Defaults to 2. 72 mismatch_penalty: Penalty for mismatching characters. Defaults to -1. 73 gap_penalty: Penalty for gaps. Defaults to -1. 74 75 Example: 76 >>> sim = smith_waterman() 77 >>> sim("abcde", "fghe") 78 2 79 """ 80 81 match_score: int = 2 82 mismatch_penalty: int = -1 83 gap_penalty: int = -1 84 85 @override 86 def __call__(self, x: Sequence[V], y: Sequence[V]) -> float: 87 try: 88 alignment = smith.SmithWaterman( 89 cast(Sequence[Any], x), cast(Sequence[Any], y) 90 ) 91 alignment.change_matrix( 92 core.ScoreMatrix( 93 match=self.match_score, 94 miss=self.mismatch_penalty, 95 gap=self.gap_penalty, 96 ) 97 ) 98 alignment.align() 99 100 return alignment.get_score() 101 except ZeroDivisionError: 102 return 0.0 103 104 105@dataclass(slots=True, frozen=True) 106class SequenceSim[V, S: Float](StructuredValue[float]): 107 """ 108 A class representing sequence similarity with optional mapping and similarity scores. 109 110 Attributes: 111 value: The overall similarity score as a float. 112 similarities: Optional local similarity scores as a sequence of floats. 113 mapping: Optional alignment information as a sequence of tuples. 114 """ 115 116 similarities: Sequence[S] | None = field(default=None) 117 mapping: Sequence[tuple[V, V]] | None = field(default=None) 118 119 120@dataclass(slots=True, init=False) 121class dtw[V](SimFunc[Collection[V] | np.ndarray, SequenceSim[V, float]]): 122 """ 123 Dynamic Time Warping similarity function with optional backtracking for alignment. 124 125 Examples: 126 >>> sim = dtw() 127 >>> sim([1, 2, 3], [1, 2, 3, 4]) 128 SequenceSim(value=0.5, similarities=None, mapping=None) 129 >>> sim = dtw(distance_func=lambda a, b: abs(a - b)) 130 >>> sim([1, 2, 3], [3, 4, 5]) 131 SequenceSim(value=0.14285714285714285, similarities=None, mapping=None) 132 >>> sim = dtw(distance_func=lambda a, b: abs(len(str(a)) - len(str(b)))) 133 >>> sim(["a", "bb", "ccc"], ["aa", "bbb", "c"], return_alignment=True) 134 SequenceSim(value=0.25, similarities=[0.5, 1.0, 1.0, 0.3333333333333333], mapping=[('aa', 'a'), ('aa', 'bb'), ('bbb', 'ccc'), ('c', 'ccc')]) 135 >>> sim = dtw(distance_func=lambda a, b: abs(a - b)) 136 >>> sim([1, 2, 3], [1, 2, 3, 4], return_alignment=True) 137 SequenceSim(value=0.5, similarities=[1.0, 1.0, 1.0, 0.5], mapping=[(1, 1), (2, 2), (3, 3), (4, 3)]) 138 """ 139 140 distance_func: SimFunc[V, Float] | None 141 142 def __init__(self, distance_func: AnySimFunc[V, Float] | None = None): 143 self.distance_func = unbatchify_sim(distance_func) if distance_func else None 144 145 def __call__( 146 self, 147 x: Collection[V] | np.ndarray, 148 y: Collection[V] | np.ndarray, 149 return_alignment: bool = False, 150 ) -> SequenceSim[V, float]: 151 """ 152 Perform DTW and optionally return alignment information. 153 154 Args: 155 x: The first sequence as a collection or numpy array. 156 y: The second sequence as a collection or numpy array. 157 return_alignment: Whether to compute and return the alignment (default: False). 158 159 Returns: 160 A SequenceSim object containing the similarity value, local similarities, and optional alignment. 161 """ 162 if not isinstance(x, np.ndarray): 163 x = np.array(x, dtype=object) # Allow non-numeric types 164 if not isinstance(y, np.ndarray): 165 y = np.array(y, dtype=object) 166 167 # Compute the DTW distance 168 dtw_distance, alignment, local_similarities = self.compute_dtw( 169 x, y, return_alignment 170 ) 171 172 # Convert DTW distance to similarity 173 similarity = dist2sim(dtw_distance) 174 175 # Return SequenceSim with updated attributes 176 return SequenceSim( 177 value=float(similarity), 178 similarities=local_similarities, 179 mapping=alignment if return_alignment else None, 180 ) 181 182 def compute_dtw( 183 self, x: np.ndarray, y: np.ndarray, return_alignment: bool 184 ) -> tuple[float, list[tuple[V, V]] | None, list[float] | None]: 185 """ 186 Compute DTW distance and optionally compute the best alignment and local similarities. 187 188 Args: 189 x: The first sequence as a numpy array. 190 y: The second sequence as a numpy array. 191 return_alignment: Whether to compute the alignment. 192 193 Returns: 194 A tuple of (DTW distance, best alignment or None, local similarities or None). 195 """ 196 n, m = len(x), len(y) 197 dtw_matrix = np.full((n + 1, m + 1), np.inf) 198 dtw_matrix[0, 0] = 0 199 200 for i in range(1, n + 1): 201 for j in range(1, m + 1): 202 cost = unpack_float( 203 self.distance_func(y[j - 1], x[i - 1]) 204 if self.distance_func 205 else abs(y[j - 1] - x[i - 1]) 206 ) 207 # Take last min from a square box 208 last_min: float = min( 209 dtw_matrix[i - 1, j], # Insertion 210 dtw_matrix[i, j - 1], # Deletion 211 dtw_matrix[i - 1, j - 1], # Match 212 ) 213 dtw_matrix[i, j] = cost + last_min 214 215 # If alignment is not requested, skip backtracking 216 if not return_alignment: 217 return dtw_matrix[n, m], None, None 218 219 # Backtracking to find the best alignment and local similarities 220 mapping, local_similarities = self.backtrack(dtw_matrix, x, y, n, m) 221 222 return dtw_matrix[n, m], mapping, local_similarities 223 224 def backtrack( 225 self, dtw_matrix: np.ndarray, x: np.ndarray, y: np.ndarray, n: int, m: int 226 ) -> tuple[list[tuple[V, V]], list[float]]: 227 """ 228 Backtrack through the DTW matrix to find the best alignment and local similarities. 229 230 Args: 231 dtw_matrix: The DTW matrix. 232 x: The first sequence as a numpy array. 233 y: The second sequence as a numpy array. 234 n: The length of the first sequence. 235 m: The length of the second sequence. 236 237 Returns: 238 A tuple of (alignment, local similarities). 239 """ 240 i, j = n, m 241 alignment = [] 242 local_similarities = [] 243 244 while i > 0 and j > 0: 245 alignment.append((y[j - 1], x[i - 1])) # Align elements as (query, case) 246 cost = unpack_float( 247 self.distance_func(y[j - 1], x[i - 1]) 248 if self.distance_func 249 else abs(y[j - 1] - x[i - 1]) 250 ) 251 local_similarities.append(dist2sim(cost)) # Convert cost to similarity 252 # Move in the direction of the minimum cost 253 if dtw_matrix[i - 1, j] == min( 254 dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1] 255 ): 256 i -= 1 # Move up 257 elif dtw_matrix[i, j - 1] == min( 258 dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1] 259 ): 260 j -= 1 # Move left 261 else: 262 i -= 1 # Move diagonally 263 j -= 1 264 265 # Handle remaining elements in i or j 266 while i > 0: 267 alignment.append((None, x[i - 1])) # Unmatched element from x (case) 268 local_similarities.append(0.0) # No similarity for unmatched 269 i -= 1 270 while j > 0: 271 alignment.append((y[j - 1], None)) # Unmatched element from y (query) 272 local_similarities.append(0.0) # No similarity for unmatched 273 j -= 1 274 275 return alignment[::-1], local_similarities[ 276 ::-1 277 ] # Reverse to start from the beginning 278 279 280@dataclass(slots=True, frozen=True) 281class isolated_mapping[V](SimFunc[Sequence[V], float]): 282 """ 283 Isolated Mapping similarity function that compares each element in 'y' (query) 284 with all elements in 'x' (case) 285 and takes the maximum similarity for each element in 'y', then averages 286 these maximums. Assumes y -> x (query operated onto case). 287 288 Args: 289 element_similarity: A function that takes two elements (query_item, case_item) 290 and returns a similarity score between them. 291 292 Examples: 293 >>> from cbrkit.sim.strings import levenshtein 294 >>> sim = isolated_mapping(levenshtein()) 295 >>> sim(["kitten", "sitting"], ["sitting", "fitted"]) 296 0.8333333333333334 297 """ 298 299 element_similarity: SimFunc[V, float] 300 301 @override 302 def __call__(self, x: Sequence[V], y: Sequence[V]) -> float: 303 # x = case sequence, y = query sequence 304 # Logic: For each element in query y, find its best match in case x. Average these best scores. 305 306 if not y: # If the query is empty, similarity is undefined or trivially 0 or 1? Let's return 0. 307 return 0.0 308 # Avoid division by zero if x is empty but y is not. max default handles this. 309 310 total_similarity = 0.0 311 for yi in y: # Iterate through QUERY (y) 312 # Find the best match for the current query element yi within the CASE sequence x 313 # Pass (query_item, case_item) to element_similarity, assuming sim(query, case) convention 314 # Use default=0.0 for max() in case the case sequence x is empty 315 max_similarity = max( 316 (self.element_similarity(yi, xi) for xi in x), default=0.0 317 ) 318 total_similarity += max_similarity 319 320 # Normalize by the length of the QUERY sequence (y) 321 average_similarity = total_similarity / len(y) 322 323 return average_similarity 324 325 326@dataclass(slots=True, frozen=True) 327class mapping[V](SimFunc[Sequence[V], float]): 328 """ 329 Implements an A* algorithm to find the best matching between query items and case items 330 based on the provided similarity function, maximizing the overall similarity score. 331 332 Args: 333 similarity_function: A function that calculates the similarity between two elements. 334 max_queue_size: Maximum size of the priority queue. Defaults to 1000. 335 336 Returns: 337 A similarity function for sequences. 338 339 Examples: 340 >>> def example_similarity_function(x, y) -> float: 341 ... return 1.0 if x == y else 0.0 342 >>> sim_func = mapping(example_similarity_function) 343 >>> result = sim_func(["Monday", "Tuesday", "Wednesday"], ["Monday", "Tuesday", "Sunday"]) 344 >>> print(f"Normalized Similarity Score: {result}") 345 Normalized Similarity Score: 0.6666666666666666 346 """ 347 348 element_similarity: SimFunc[V, float] 349 max_queue_size: int = 1000 350 351 @override 352 def __call__(self, x: Sequence[V], y: Sequence[V]) -> float: 353 # Priority queue to store potential solutions with their scores 354 pq = [] 355 initial_solution = (0.0, set(), frozenset(y), frozenset(x)) 356 heapq.heappush(pq, initial_solution) 357 358 best_score = 0.0 359 360 while pq: 361 current_score, current_mapping, remaining_query, remaining_case = ( 362 heapq.heappop(pq) 363 ) 364 365 if not remaining_query: # All query items are mapped 366 best_score = max(best_score, current_score / len(y)) 367 continue # Continue to process remaining potential mappings 368 369 for query_item in remaining_query: 370 for case_item in remaining_case: 371 sim_score = self.element_similarity(query_item, case_item) 372 new_mapping = current_mapping | {(query_item, case_item)} 373 new_score = current_score + sim_score # Accumulate score correctly 374 new_remaining_query = remaining_query - {query_item} 375 new_remaining_case = remaining_case - {case_item} 376 377 heapq.heappush( 378 pq, 379 ( 380 new_score, 381 new_mapping, 382 new_remaining_query, 383 new_remaining_case, 384 ), 385 ) 386 387 if len(pq) > self.max_queue_size: 388 heapq.heappop(pq) 389 390 return best_score 391 392 393@dataclass 394class Weight: 395 """A weighted interval with bounds for sequence mapping similarity.""" 396 397 weight: float 398 lower_bound: float 399 upper_bound: float 400 inclusive_lower: bool 401 inclusive_upper: bool 402 normalized_weight: float | None = None 403 404 405@dataclass(slots=True, frozen=True) 406class sequence_mapping[V, S: Float]( 407 SimFunc[Sequence[V], SequenceSim[V, S]], HasMetadata 408): 409 """ 410 List Mapping similarity function. 411 412 Parameters: 413 element_similarity: The similarity function to use for comparing elements. 414 exact: Whether to use exact or inexact comparison. Default is False (inexact). 415 weights: Optional list of weights for weighted similarity calculation. 416 417 Examples: 418 >>> sim = sequence_mapping(lambda x, y: 1.0 if x == y else 0.0, True) 419 >>> result = sim(["a", "b", "c"], ["a", "b", "c"]) 420 >>> result.value 421 1.0 422 >>> result.similarities 423 [1.0, 1.0, 1.0] 424 """ 425 426 element_similarity: SimFunc[V, S] 427 exact: bool = False 428 weights: list[Weight] | None = None 429 430 @property 431 def metadata(self) -> JsonDict: 432 """Return metadata describing the sequence mapping configuration.""" 433 return { 434 "element_similarity": get_metadata(self.element_similarity), 435 "exact": self.exact, 436 "weights": [asdict(weight) for weight in self.weights] 437 if self.weights 438 else None, 439 } 440 441 def compute_contains_exact( 442 self, query: Sequence[V], case: Sequence[V] 443 ) -> SequenceSim[V, S]: 444 """Compute element-wise similarity for sequences of equal length.""" 445 if len(query) != len(case): 446 return SequenceSim(value=0.0, similarities=None, mapping=None) 447 448 sim_sum = 0.0 449 local_similarities: list[S] = [] 450 451 for elem_q, elem_c in zip(query, case, strict=False): 452 sim = self.element_similarity(elem_q, elem_c) 453 sim_sum += unpack_float(sim) 454 local_similarities.append(sim) 455 456 return SequenceSim( 457 value=sim_sum / len(query), 458 similarities=local_similarities, 459 mapping=None, 460 ) 461 462 def compute_contains_inexact( 463 self, case_list: Sequence[V], query_list: Sequence[V] 464 ) -> SequenceSim[V, S]: 465 """ 466 Slides the *shorter* sequence across the *longer* one and always 467 evaluates query → case (i.e. query elements are compared against 468 the current window cut from the case list). 469 """ 470 case_is_longer = len(case_list) >= len(query_list) 471 larger, smaller = ( 472 (case_list, query_list) if case_is_longer else (query_list, case_list) 473 ) 474 475 best_value = -1.0 476 best_sims: Sequence[S] = [] 477 478 for start in range(len(larger) - len(smaller) + 1): 479 window = larger[start : start + len(smaller)] 480 481 if case_is_longer: 482 sim_res = self.compute_contains_exact(smaller, window) 483 else: 484 sim_res = self.compute_contains_exact(window, smaller) 485 486 if sim_res.value > best_value: 487 best_value = sim_res.value 488 best_sims = sim_res.similarities or [] 489 490 return SequenceSim(value=best_value, similarities=best_sims, mapping=None) 491 492 def __call__(self, x: Sequence[V], y: Sequence[V]) -> SequenceSim[V, S]: 493 # x is the “case”, y is the “query” 494 if self.exact: 495 result = self.compute_contains_exact(y, x) 496 else: 497 result = self.compute_contains_inexact(x, y) 498 499 if self.weights and result.similarities: 500 total_weighted_sim = 0.0 501 total_weight = 0.0 502 503 # Normalize weights 504 for weight in self.weights: 505 weight_range = weight.upper_bound - weight.lower_bound 506 weight.normalized_weight = weight.weight / weight_range 507 508 # Apply weights 509 for sim in result.similarities: 510 sim_val = unpack_float(sim) 511 512 for weight in self.weights: 513 lower_bound = weight.lower_bound 514 upper_bound = weight.upper_bound 515 inclusive_lower = weight.inclusive_lower 516 inclusive_upper = weight.inclusive_upper 517 518 if ( 519 (inclusive_lower and lower_bound <= sim_val <= upper_bound) 520 or ( 521 not inclusive_lower and lower_bound < sim_val <= upper_bound 522 ) 523 ) and (inclusive_upper or sim_val < upper_bound): 524 assert weight.normalized_weight is not None 525 weighted_sim = weight.normalized_weight * sim_val 526 total_weighted_sim += weighted_sim 527 total_weight += weight.normalized_weight 528 529 if total_weight > 0: 530 final_similarity = total_weighted_sim / total_weight 531 else: 532 final_similarity = result.value 533 534 return SequenceSim( 535 value=final_similarity, 536 similarities=result.similarities, 537 mapping=result.mapping, 538 ) 539 540 return result 541 542 543@dataclass(slots=True, frozen=True) 544class sequence_correctness[V](SimFunc[Sequence[V], float]): 545 """List Correctness similarity function. 546 547 Parameters: 548 worst_case_sim (float): The similarity value to use when all pairs are discordant. Default is 0.0. 549 550 Examples: 551 >>> sim = sequence_correctness(0.5) 552 >>> sim(["Monday", "Tuesday", "Wednesday"], ["Monday", "Wednesday", "Tuesday"]) 553 0.3333333333333333 554 """ 555 556 worst_case_sim: float = 0.0 557 558 @override 559 def __call__(self, x: Sequence[V], y: Sequence[V]) -> float: 560 if len(x) != len(y): 561 return 0.0 562 563 count_concordant = 0 564 count_discordant = 0 565 566 for i, j in product(range(len(x)), repeat=2): 567 if i >= j: 568 continue 569 570 index_x1 = x.index(x[i]) 571 index_x2 = x.index(x[j]) 572 index_y1 = y.index(x[i]) 573 index_y2 = y.index(x[j]) 574 575 if index_y1 == -1 or index_y2 == -1: 576 continue 577 elif (index_x1 < index_x2 and index_y1 < index_y2) or ( 578 index_x1 > index_x2 and index_y1 > index_y2 579 ): 580 count_concordant += 1 581 else: 582 count_discordant += 1 583 584 if count_concordant + count_discordant == 0: 585 return 0.0 586 587 correctness = (count_concordant - count_discordant) / ( 588 count_concordant + count_discordant 589 ) 590 591 if correctness >= 0: 592 return correctness 593 else: 594 return abs(correctness) * self.worst_case_sim
121@dataclass(slots=True, init=False) 122class dtw[V](SimFunc[Collection[V] | np.ndarray, SequenceSim[V, float]]): 123 """ 124 Dynamic Time Warping similarity function with optional backtracking for alignment. 125 126 Examples: 127 >>> sim = dtw() 128 >>> sim([1, 2, 3], [1, 2, 3, 4]) 129 SequenceSim(value=0.5, similarities=None, mapping=None) 130 >>> sim = dtw(distance_func=lambda a, b: abs(a - b)) 131 >>> sim([1, 2, 3], [3, 4, 5]) 132 SequenceSim(value=0.14285714285714285, similarities=None, mapping=None) 133 >>> sim = dtw(distance_func=lambda a, b: abs(len(str(a)) - len(str(b)))) 134 >>> sim(["a", "bb", "ccc"], ["aa", "bbb", "c"], return_alignment=True) 135 SequenceSim(value=0.25, similarities=[0.5, 1.0, 1.0, 0.3333333333333333], mapping=[('aa', 'a'), ('aa', 'bb'), ('bbb', 'ccc'), ('c', 'ccc')]) 136 >>> sim = dtw(distance_func=lambda a, b: abs(a - b)) 137 >>> sim([1, 2, 3], [1, 2, 3, 4], return_alignment=True) 138 SequenceSim(value=0.5, similarities=[1.0, 1.0, 1.0, 0.5], mapping=[(1, 1), (2, 2), (3, 3), (4, 3)]) 139 """ 140 141 distance_func: SimFunc[V, Float] | None 142 143 def __init__(self, distance_func: AnySimFunc[V, Float] | None = None): 144 self.distance_func = unbatchify_sim(distance_func) if distance_func else None 145 146 def __call__( 147 self, 148 x: Collection[V] | np.ndarray, 149 y: Collection[V] | np.ndarray, 150 return_alignment: bool = False, 151 ) -> SequenceSim[V, float]: 152 """ 153 Perform DTW and optionally return alignment information. 154 155 Args: 156 x: The first sequence as a collection or numpy array. 157 y: The second sequence as a collection or numpy array. 158 return_alignment: Whether to compute and return the alignment (default: False). 159 160 Returns: 161 A SequenceSim object containing the similarity value, local similarities, and optional alignment. 162 """ 163 if not isinstance(x, np.ndarray): 164 x = np.array(x, dtype=object) # Allow non-numeric types 165 if not isinstance(y, np.ndarray): 166 y = np.array(y, dtype=object) 167 168 # Compute the DTW distance 169 dtw_distance, alignment, local_similarities = self.compute_dtw( 170 x, y, return_alignment 171 ) 172 173 # Convert DTW distance to similarity 174 similarity = dist2sim(dtw_distance) 175 176 # Return SequenceSim with updated attributes 177 return SequenceSim( 178 value=float(similarity), 179 similarities=local_similarities, 180 mapping=alignment if return_alignment else None, 181 ) 182 183 def compute_dtw( 184 self, x: np.ndarray, y: np.ndarray, return_alignment: bool 185 ) -> tuple[float, list[tuple[V, V]] | None, list[float] | None]: 186 """ 187 Compute DTW distance and optionally compute the best alignment and local similarities. 188 189 Args: 190 x: The first sequence as a numpy array. 191 y: The second sequence as a numpy array. 192 return_alignment: Whether to compute the alignment. 193 194 Returns: 195 A tuple of (DTW distance, best alignment or None, local similarities or None). 196 """ 197 n, m = len(x), len(y) 198 dtw_matrix = np.full((n + 1, m + 1), np.inf) 199 dtw_matrix[0, 0] = 0 200 201 for i in range(1, n + 1): 202 for j in range(1, m + 1): 203 cost = unpack_float( 204 self.distance_func(y[j - 1], x[i - 1]) 205 if self.distance_func 206 else abs(y[j - 1] - x[i - 1]) 207 ) 208 # Take last min from a square box 209 last_min: float = min( 210 dtw_matrix[i - 1, j], # Insertion 211 dtw_matrix[i, j - 1], # Deletion 212 dtw_matrix[i - 1, j - 1], # Match 213 ) 214 dtw_matrix[i, j] = cost + last_min 215 216 # If alignment is not requested, skip backtracking 217 if not return_alignment: 218 return dtw_matrix[n, m], None, None 219 220 # Backtracking to find the best alignment and local similarities 221 mapping, local_similarities = self.backtrack(dtw_matrix, x, y, n, m) 222 223 return dtw_matrix[n, m], mapping, local_similarities 224 225 def backtrack( 226 self, dtw_matrix: np.ndarray, x: np.ndarray, y: np.ndarray, n: int, m: int 227 ) -> tuple[list[tuple[V, V]], list[float]]: 228 """ 229 Backtrack through the DTW matrix to find the best alignment and local similarities. 230 231 Args: 232 dtw_matrix: The DTW matrix. 233 x: The first sequence as a numpy array. 234 y: The second sequence as a numpy array. 235 n: The length of the first sequence. 236 m: The length of the second sequence. 237 238 Returns: 239 A tuple of (alignment, local similarities). 240 """ 241 i, j = n, m 242 alignment = [] 243 local_similarities = [] 244 245 while i > 0 and j > 0: 246 alignment.append((y[j - 1], x[i - 1])) # Align elements as (query, case) 247 cost = unpack_float( 248 self.distance_func(y[j - 1], x[i - 1]) 249 if self.distance_func 250 else abs(y[j - 1] - x[i - 1]) 251 ) 252 local_similarities.append(dist2sim(cost)) # Convert cost to similarity 253 # Move in the direction of the minimum cost 254 if dtw_matrix[i - 1, j] == min( 255 dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1] 256 ): 257 i -= 1 # Move up 258 elif dtw_matrix[i, j - 1] == min( 259 dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1] 260 ): 261 j -= 1 # Move left 262 else: 263 i -= 1 # Move diagonally 264 j -= 1 265 266 # Handle remaining elements in i or j 267 while i > 0: 268 alignment.append((None, x[i - 1])) # Unmatched element from x (case) 269 local_similarities.append(0.0) # No similarity for unmatched 270 i -= 1 271 while j > 0: 272 alignment.append((y[j - 1], None)) # Unmatched element from y (query) 273 local_similarities.append(0.0) # No similarity for unmatched 274 j -= 1 275 276 return alignment[::-1], local_similarities[ 277 ::-1 278 ] # Reverse to start from the beginning
Dynamic Time Warping similarity function with optional backtracking for alignment.
Examples:
>>> sim = dtw() >>> sim([1, 2, 3], [1, 2, 3, 4]) SequenceSim(value=0.5, similarities=None, mapping=None) >>> sim = dtw(distance_func=lambda a, b: abs(a - b)) >>> sim([1, 2, 3], [3, 4, 5]) SequenceSim(value=0.14285714285714285, similarities=None, mapping=None) >>> sim = dtw(distance_func=lambda a, b: abs(len(str(a)) - len(str(b)))) >>> sim(["a", "bb", "ccc"], ["aa", "bbb", "c"], return_alignment=True) SequenceSim(value=0.25, similarities=[0.5, 1.0, 1.0, 0.3333333333333333], mapping=[('aa', 'a'), ('aa', 'bb'), ('bbb', 'ccc'), ('c', 'ccc')]) >>> sim = dtw(distance_func=lambda a, b: abs(a - b)) >>> sim([1, 2, 3], [1, 2, 3, 4], return_alignment=True) SequenceSim(value=0.5, similarities=[1.0, 1.0, 1.0, 0.5], mapping=[(1, 1), (2, 2), (3, 3), (4, 3)])
183 def compute_dtw( 184 self, x: np.ndarray, y: np.ndarray, return_alignment: bool 185 ) -> tuple[float, list[tuple[V, V]] | None, list[float] | None]: 186 """ 187 Compute DTW distance and optionally compute the best alignment and local similarities. 188 189 Args: 190 x: The first sequence as a numpy array. 191 y: The second sequence as a numpy array. 192 return_alignment: Whether to compute the alignment. 193 194 Returns: 195 A tuple of (DTW distance, best alignment or None, local similarities or None). 196 """ 197 n, m = len(x), len(y) 198 dtw_matrix = np.full((n + 1, m + 1), np.inf) 199 dtw_matrix[0, 0] = 0 200 201 for i in range(1, n + 1): 202 for j in range(1, m + 1): 203 cost = unpack_float( 204 self.distance_func(y[j - 1], x[i - 1]) 205 if self.distance_func 206 else abs(y[j - 1] - x[i - 1]) 207 ) 208 # Take last min from a square box 209 last_min: float = min( 210 dtw_matrix[i - 1, j], # Insertion 211 dtw_matrix[i, j - 1], # Deletion 212 dtw_matrix[i - 1, j - 1], # Match 213 ) 214 dtw_matrix[i, j] = cost + last_min 215 216 # If alignment is not requested, skip backtracking 217 if not return_alignment: 218 return dtw_matrix[n, m], None, None 219 220 # Backtracking to find the best alignment and local similarities 221 mapping, local_similarities = self.backtrack(dtw_matrix, x, y, n, m) 222 223 return dtw_matrix[n, m], mapping, local_similarities
Compute DTW distance and optionally compute the best alignment and local similarities.
Arguments:
- x: The first sequence as a numpy array.
- y: The second sequence as a numpy array.
- return_alignment: Whether to compute the alignment.
Returns:
A tuple of (DTW distance, best alignment or None, local similarities or None).
225 def backtrack( 226 self, dtw_matrix: np.ndarray, x: np.ndarray, y: np.ndarray, n: int, m: int 227 ) -> tuple[list[tuple[V, V]], list[float]]: 228 """ 229 Backtrack through the DTW matrix to find the best alignment and local similarities. 230 231 Args: 232 dtw_matrix: The DTW matrix. 233 x: The first sequence as a numpy array. 234 y: The second sequence as a numpy array. 235 n: The length of the first sequence. 236 m: The length of the second sequence. 237 238 Returns: 239 A tuple of (alignment, local similarities). 240 """ 241 i, j = n, m 242 alignment = [] 243 local_similarities = [] 244 245 while i > 0 and j > 0: 246 alignment.append((y[j - 1], x[i - 1])) # Align elements as (query, case) 247 cost = unpack_float( 248 self.distance_func(y[j - 1], x[i - 1]) 249 if self.distance_func 250 else abs(y[j - 1] - x[i - 1]) 251 ) 252 local_similarities.append(dist2sim(cost)) # Convert cost to similarity 253 # Move in the direction of the minimum cost 254 if dtw_matrix[i - 1, j] == min( 255 dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1] 256 ): 257 i -= 1 # Move up 258 elif dtw_matrix[i, j - 1] == min( 259 dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1] 260 ): 261 j -= 1 # Move left 262 else: 263 i -= 1 # Move diagonally 264 j -= 1 265 266 # Handle remaining elements in i or j 267 while i > 0: 268 alignment.append((None, x[i - 1])) # Unmatched element from x (case) 269 local_similarities.append(0.0) # No similarity for unmatched 270 i -= 1 271 while j > 0: 272 alignment.append((y[j - 1], None)) # Unmatched element from y (query) 273 local_similarities.append(0.0) # No similarity for unmatched 274 j -= 1 275 276 return alignment[::-1], local_similarities[ 277 ::-1 278 ] # Reverse to start from the beginning
Backtrack through the DTW matrix to find the best alignment and local similarities.
Arguments:
- dtw_matrix: The DTW matrix.
- x: The first sequence as a numpy array.
- y: The second sequence as a numpy array.
- n: The length of the first sequence.
- m: The length of the second sequence.
Returns:
A tuple of (alignment, local similarities).
66 @dataclass(slots=True, frozen=True) 67 class smith_waterman[V](SimFunc[Sequence[V], float]): 68 """ 69 Performs the Smith-Waterman alignment with configurable scoring parameters. If no element matches it returns 0.0. 70 71 Args: 72 match_score: Score for matching characters. Defaults to 2. 73 mismatch_penalty: Penalty for mismatching characters. Defaults to -1. 74 gap_penalty: Penalty for gaps. Defaults to -1. 75 76 Example: 77 >>> sim = smith_waterman() 78 >>> sim("abcde", "fghe") 79 2 80 """ 81 82 match_score: int = 2 83 mismatch_penalty: int = -1 84 gap_penalty: int = -1 85 86 @override 87 def __call__(self, x: Sequence[V], y: Sequence[V]) -> float: 88 try: 89 alignment = smith.SmithWaterman( 90 cast(Sequence[Any], x), cast(Sequence[Any], y) 91 ) 92 alignment.change_matrix( 93 core.ScoreMatrix( 94 match=self.match_score, 95 miss=self.mismatch_penalty, 96 gap=self.gap_penalty, 97 ) 98 ) 99 alignment.align() 100 101 return alignment.get_score() 102 except ZeroDivisionError: 103 return 0.0
Performs the Smith-Waterman alignment with configurable scoring parameters. If no element matches it returns 0.0.
Arguments:
- match_score: Score for matching characters. Defaults to 2.
- mismatch_penalty: Penalty for mismatching characters. Defaults to -1.
- gap_penalty: Penalty for gaps. Defaults to -1.
Example:
>>> sim = smith_waterman() >>> sim("abcde", "fghe") 2
43 @dataclass(slots=True, frozen=True) 44 class jaccard[V](SimFunc[Collection[V], float]): 45 """Jaccard similarity function. 46 47 Examples: 48 >>> sim = jaccard() 49 >>> sim(["a", "b", "c", "d"], ["a", "b", "c"]) 50 0.8 51 """ 52 53 @override 54 def __call__(self, x: Collection[V], y: Collection[V]) -> float: 55 if not isinstance(x, Set): 56 x = set(x) 57 if not isinstance(y, Set): 58 y = set(y) 59 60 return dist2sim(jaccard_distance(x, y))
Jaccard similarity function.
Examples:
>>> sim = jaccard() >>> sim(["a", "b", "c", "d"], ["a", "b", "c"]) 0.8
327@dataclass(slots=True, frozen=True) 328class mapping[V](SimFunc[Sequence[V], float]): 329 """ 330 Implements an A* algorithm to find the best matching between query items and case items 331 based on the provided similarity function, maximizing the overall similarity score. 332 333 Args: 334 similarity_function: A function that calculates the similarity between two elements. 335 max_queue_size: Maximum size of the priority queue. Defaults to 1000. 336 337 Returns: 338 A similarity function for sequences. 339 340 Examples: 341 >>> def example_similarity_function(x, y) -> float: 342 ... return 1.0 if x == y else 0.0 343 >>> sim_func = mapping(example_similarity_function) 344 >>> result = sim_func(["Monday", "Tuesday", "Wednesday"], ["Monday", "Tuesday", "Sunday"]) 345 >>> print(f"Normalized Similarity Score: {result}") 346 Normalized Similarity Score: 0.6666666666666666 347 """ 348 349 element_similarity: SimFunc[V, float] 350 max_queue_size: int = 1000 351 352 @override 353 def __call__(self, x: Sequence[V], y: Sequence[V]) -> float: 354 # Priority queue to store potential solutions with their scores 355 pq = [] 356 initial_solution = (0.0, set(), frozenset(y), frozenset(x)) 357 heapq.heappush(pq, initial_solution) 358 359 best_score = 0.0 360 361 while pq: 362 current_score, current_mapping, remaining_query, remaining_case = ( 363 heapq.heappop(pq) 364 ) 365 366 if not remaining_query: # All query items are mapped 367 best_score = max(best_score, current_score / len(y)) 368 continue # Continue to process remaining potential mappings 369 370 for query_item in remaining_query: 371 for case_item in remaining_case: 372 sim_score = self.element_similarity(query_item, case_item) 373 new_mapping = current_mapping | {(query_item, case_item)} 374 new_score = current_score + sim_score # Accumulate score correctly 375 new_remaining_query = remaining_query - {query_item} 376 new_remaining_case = remaining_case - {case_item} 377 378 heapq.heappush( 379 pq, 380 ( 381 new_score, 382 new_mapping, 383 new_remaining_query, 384 new_remaining_case, 385 ), 386 ) 387 388 if len(pq) > self.max_queue_size: 389 heapq.heappop(pq) 390 391 return best_score
Implements an A* algorithm to find the best matching between query items and case items based on the provided similarity function, maximizing the overall similarity score.
Arguments:
- similarity_function: A function that calculates the similarity between two elements.
- max_queue_size: Maximum size of the priority queue. Defaults to 1000.
Returns:
A similarity function for sequences.
Examples:
>>> def example_similarity_function(x, y) -> float: ... return 1.0 if x == y else 0.0 >>> sim_func = mapping(example_similarity_function) >>> result = sim_func(["Monday", "Tuesday", "Wednesday"], ["Monday", "Tuesday", "Sunday"]) >>> print(f"Normalized Similarity Score: {result}") Normalized Similarity Score: 0.6666666666666666
281@dataclass(slots=True, frozen=True) 282class isolated_mapping[V](SimFunc[Sequence[V], float]): 283 """ 284 Isolated Mapping similarity function that compares each element in 'y' (query) 285 with all elements in 'x' (case) 286 and takes the maximum similarity for each element in 'y', then averages 287 these maximums. Assumes y -> x (query operated onto case). 288 289 Args: 290 element_similarity: A function that takes two elements (query_item, case_item) 291 and returns a similarity score between them. 292 293 Examples: 294 >>> from cbrkit.sim.strings import levenshtein 295 >>> sim = isolated_mapping(levenshtein()) 296 >>> sim(["kitten", "sitting"], ["sitting", "fitted"]) 297 0.8333333333333334 298 """ 299 300 element_similarity: SimFunc[V, float] 301 302 @override 303 def __call__(self, x: Sequence[V], y: Sequence[V]) -> float: 304 # x = case sequence, y = query sequence 305 # Logic: For each element in query y, find its best match in case x. Average these best scores. 306 307 if not y: # If the query is empty, similarity is undefined or trivially 0 or 1? Let's return 0. 308 return 0.0 309 # Avoid division by zero if x is empty but y is not. max default handles this. 310 311 total_similarity = 0.0 312 for yi in y: # Iterate through QUERY (y) 313 # Find the best match for the current query element yi within the CASE sequence x 314 # Pass (query_item, case_item) to element_similarity, assuming sim(query, case) convention 315 # Use default=0.0 for max() in case the case sequence x is empty 316 max_similarity = max( 317 (self.element_similarity(yi, xi) for xi in x), default=0.0 318 ) 319 total_similarity += max_similarity 320 321 # Normalize by the length of the QUERY sequence (y) 322 average_similarity = total_similarity / len(y) 323 324 return average_similarity
Isolated Mapping similarity function that compares each element in 'y' (query) with all elements in 'x' (case) and takes the maximum similarity for each element in 'y', then averages these maximums. Assumes y -> x (query operated onto case).
Arguments:
- element_similarity: A function that takes two elements (query_item, case_item)
- and returns a similarity score between them.
Examples:
>>> from cbrkit.sim.strings import levenshtein >>> sim = isolated_mapping(levenshtein()) >>> sim(["kitten", "sitting"], ["sitting", "fitted"]) 0.8333333333333334
406@dataclass(slots=True, frozen=True) 407class sequence_mapping[V, S: Float]( 408 SimFunc[Sequence[V], SequenceSim[V, S]], HasMetadata 409): 410 """ 411 List Mapping similarity function. 412 413 Parameters: 414 element_similarity: The similarity function to use for comparing elements. 415 exact: Whether to use exact or inexact comparison. Default is False (inexact). 416 weights: Optional list of weights for weighted similarity calculation. 417 418 Examples: 419 >>> sim = sequence_mapping(lambda x, y: 1.0 if x == y else 0.0, True) 420 >>> result = sim(["a", "b", "c"], ["a", "b", "c"]) 421 >>> result.value 422 1.0 423 >>> result.similarities 424 [1.0, 1.0, 1.0] 425 """ 426 427 element_similarity: SimFunc[V, S] 428 exact: bool = False 429 weights: list[Weight] | None = None 430 431 @property 432 def metadata(self) -> JsonDict: 433 """Return metadata describing the sequence mapping configuration.""" 434 return { 435 "element_similarity": get_metadata(self.element_similarity), 436 "exact": self.exact, 437 "weights": [asdict(weight) for weight in self.weights] 438 if self.weights 439 else None, 440 } 441 442 def compute_contains_exact( 443 self, query: Sequence[V], case: Sequence[V] 444 ) -> SequenceSim[V, S]: 445 """Compute element-wise similarity for sequences of equal length.""" 446 if len(query) != len(case): 447 return SequenceSim(value=0.0, similarities=None, mapping=None) 448 449 sim_sum = 0.0 450 local_similarities: list[S] = [] 451 452 for elem_q, elem_c in zip(query, case, strict=False): 453 sim = self.element_similarity(elem_q, elem_c) 454 sim_sum += unpack_float(sim) 455 local_similarities.append(sim) 456 457 return SequenceSim( 458 value=sim_sum / len(query), 459 similarities=local_similarities, 460 mapping=None, 461 ) 462 463 def compute_contains_inexact( 464 self, case_list: Sequence[V], query_list: Sequence[V] 465 ) -> SequenceSim[V, S]: 466 """ 467 Slides the *shorter* sequence across the *longer* one and always 468 evaluates query → case (i.e. query elements are compared against 469 the current window cut from the case list). 470 """ 471 case_is_longer = len(case_list) >= len(query_list) 472 larger, smaller = ( 473 (case_list, query_list) if case_is_longer else (query_list, case_list) 474 ) 475 476 best_value = -1.0 477 best_sims: Sequence[S] = [] 478 479 for start in range(len(larger) - len(smaller) + 1): 480 window = larger[start : start + len(smaller)] 481 482 if case_is_longer: 483 sim_res = self.compute_contains_exact(smaller, window) 484 else: 485 sim_res = self.compute_contains_exact(window, smaller) 486 487 if sim_res.value > best_value: 488 best_value = sim_res.value 489 best_sims = sim_res.similarities or [] 490 491 return SequenceSim(value=best_value, similarities=best_sims, mapping=None) 492 493 def __call__(self, x: Sequence[V], y: Sequence[V]) -> SequenceSim[V, S]: 494 # x is the “case”, y is the “query” 495 if self.exact: 496 result = self.compute_contains_exact(y, x) 497 else: 498 result = self.compute_contains_inexact(x, y) 499 500 if self.weights and result.similarities: 501 total_weighted_sim = 0.0 502 total_weight = 0.0 503 504 # Normalize weights 505 for weight in self.weights: 506 weight_range = weight.upper_bound - weight.lower_bound 507 weight.normalized_weight = weight.weight / weight_range 508 509 # Apply weights 510 for sim in result.similarities: 511 sim_val = unpack_float(sim) 512 513 for weight in self.weights: 514 lower_bound = weight.lower_bound 515 upper_bound = weight.upper_bound 516 inclusive_lower = weight.inclusive_lower 517 inclusive_upper = weight.inclusive_upper 518 519 if ( 520 (inclusive_lower and lower_bound <= sim_val <= upper_bound) 521 or ( 522 not inclusive_lower and lower_bound < sim_val <= upper_bound 523 ) 524 ) and (inclusive_upper or sim_val < upper_bound): 525 assert weight.normalized_weight is not None 526 weighted_sim = weight.normalized_weight * sim_val 527 total_weighted_sim += weighted_sim 528 total_weight += weight.normalized_weight 529 530 if total_weight > 0: 531 final_similarity = total_weighted_sim / total_weight 532 else: 533 final_similarity = result.value 534 535 return SequenceSim( 536 value=final_similarity, 537 similarities=result.similarities, 538 mapping=result.mapping, 539 ) 540 541 return result
List Mapping similarity function.
Arguments:
- element_similarity: The similarity function to use for comparing elements.
- exact: Whether to use exact or inexact comparison. Default is False (inexact).
- weights: Optional list of weights for weighted similarity calculation.
Examples:
>>> sim = sequence_mapping(lambda x, y: 1.0 if x == y else 0.0, True) >>> result = sim(["a", "b", "c"], ["a", "b", "c"]) >>> result.value 1.0 >>> result.similarities [1.0, 1.0, 1.0]
442 def compute_contains_exact( 443 self, query: Sequence[V], case: Sequence[V] 444 ) -> SequenceSim[V, S]: 445 """Compute element-wise similarity for sequences of equal length.""" 446 if len(query) != len(case): 447 return SequenceSim(value=0.0, similarities=None, mapping=None) 448 449 sim_sum = 0.0 450 local_similarities: list[S] = [] 451 452 for elem_q, elem_c in zip(query, case, strict=False): 453 sim = self.element_similarity(elem_q, elem_c) 454 sim_sum += unpack_float(sim) 455 local_similarities.append(sim) 456 457 return SequenceSim( 458 value=sim_sum / len(query), 459 similarities=local_similarities, 460 mapping=None, 461 )
Compute element-wise similarity for sequences of equal length.
463 def compute_contains_inexact( 464 self, case_list: Sequence[V], query_list: Sequence[V] 465 ) -> SequenceSim[V, S]: 466 """ 467 Slides the *shorter* sequence across the *longer* one and always 468 evaluates query → case (i.e. query elements are compared against 469 the current window cut from the case list). 470 """ 471 case_is_longer = len(case_list) >= len(query_list) 472 larger, smaller = ( 473 (case_list, query_list) if case_is_longer else (query_list, case_list) 474 ) 475 476 best_value = -1.0 477 best_sims: Sequence[S] = [] 478 479 for start in range(len(larger) - len(smaller) + 1): 480 window = larger[start : start + len(smaller)] 481 482 if case_is_longer: 483 sim_res = self.compute_contains_exact(smaller, window) 484 else: 485 sim_res = self.compute_contains_exact(window, smaller) 486 487 if sim_res.value > best_value: 488 best_value = sim_res.value 489 best_sims = sim_res.similarities or [] 490 491 return SequenceSim(value=best_value, similarities=best_sims, mapping=None)
Slides the shorter sequence across the longer one and always evaluates query → case (i.e. query elements are compared against the current window cut from the case list).
Inherited Members
544@dataclass(slots=True, frozen=True) 545class sequence_correctness[V](SimFunc[Sequence[V], float]): 546 """List Correctness similarity function. 547 548 Parameters: 549 worst_case_sim (float): The similarity value to use when all pairs are discordant. Default is 0.0. 550 551 Examples: 552 >>> sim = sequence_correctness(0.5) 553 >>> sim(["Monday", "Tuesday", "Wednesday"], ["Monday", "Wednesday", "Tuesday"]) 554 0.3333333333333333 555 """ 556 557 worst_case_sim: float = 0.0 558 559 @override 560 def __call__(self, x: Sequence[V], y: Sequence[V]) -> float: 561 if len(x) != len(y): 562 return 0.0 563 564 count_concordant = 0 565 count_discordant = 0 566 567 for i, j in product(range(len(x)), repeat=2): 568 if i >= j: 569 continue 570 571 index_x1 = x.index(x[i]) 572 index_x2 = x.index(x[j]) 573 index_y1 = y.index(x[i]) 574 index_y2 = y.index(x[j]) 575 576 if index_y1 == -1 or index_y2 == -1: 577 continue 578 elif (index_x1 < index_x2 and index_y1 < index_y2) or ( 579 index_x1 > index_x2 and index_y1 > index_y2 580 ): 581 count_concordant += 1 582 else: 583 count_discordant += 1 584 585 if count_concordant + count_discordant == 0: 586 return 0.0 587 588 correctness = (count_concordant - count_discordant) / ( 589 count_concordant + count_discordant 590 ) 591 592 if correctness >= 0: 593 return correctness 594 else: 595 return abs(correctness) * self.worst_case_sim
List Correctness similarity function.
Parameters: worst_case_sim (float): The similarity value to use when all pairs are discordant. Default is 0.0.
Examples:
>>> sim = sequence_correctness(0.5) >>> sim(["Monday", "Tuesday", "Wednesday"], ["Monday", "Wednesday", "Tuesday"]) 0.3333333333333333
106@dataclass(slots=True, frozen=True) 107class SequenceSim[V, S: Float](StructuredValue[float]): 108 """ 109 A class representing sequence similarity with optional mapping and similarity scores. 110 111 Attributes: 112 value: The overall similarity score as a float. 113 similarities: Optional local similarity scores as a sequence of floats. 114 mapping: Optional alignment information as a sequence of tuples. 115 """ 116 117 similarities: Sequence[S] | None = field(default=None) 118 mapping: Sequence[tuple[V, V]] | None = field(default=None)
A class representing sequence similarity with optional mapping and similarity scores.
Attributes:
- value: The overall similarity score as a float.
- similarities: Optional local similarity scores as a sequence of floats.
- mapping: Optional alignment information as a sequence of tuples.
Inherited Members
394@dataclass 395class Weight: 396 """A weighted interval with bounds for sequence mapping similarity.""" 397 398 weight: float 399 lower_bound: float 400 upper_bound: float 401 inclusive_lower: bool 402 inclusive_upper: bool 403 normalized_weight: float | None = None
A weighted interval with bounds for sequence mapping similarity.