cbrkit.sim.collections

  1import heapq
  2from collections.abc import Collection, Sequence, Set
  3from dataclasses import asdict, dataclass, field
  4from itertools import product
  5from typing import Any, cast, override
  6
  7import numpy as np
  8
  9from ..helpers import (
 10    dist2sim,
 11    get_metadata,
 12    optional_dependencies,
 13    unbatchify_sim,
 14    unpack_float,
 15)
 16from ..typing import (
 17    AnySimFunc,
 18    Float,
 19    HasMetadata,
 20    JsonDict,
 21    SimFunc,
 22    StructuredValue,
 23)
 24
 25Number = float | int
 26
 27__all__ = [
 28    "dtw",
 29    "smith_waterman",
 30    "jaccard",
 31    "mapping",
 32    "isolated_mapping",
 33    "sequence_mapping",
 34    "sequence_correctness",
 35    "SequenceSim",
 36    "Weight",
 37]
 38
 39with optional_dependencies():
 40    from nltk.metrics import jaccard_distance
 41
 42    @dataclass(slots=True, frozen=True)
 43    class jaccard[V](SimFunc[Collection[V], float]):
 44        """Jaccard similarity function.
 45
 46        Examples:
 47            >>> sim = jaccard()
 48            >>> sim(["a", "b", "c", "d"], ["a", "b", "c"])
 49            0.8
 50        """
 51
 52        @override
 53        def __call__(self, x: Collection[V], y: Collection[V]) -> float:
 54            if not isinstance(x, Set):
 55                x = set(x)
 56            if not isinstance(y, Set):
 57                y = set(y)
 58
 59            return dist2sim(jaccard_distance(x, y))
 60
 61
 62with optional_dependencies():
 63    from minineedle import core, smith
 64
 65    @dataclass(slots=True, frozen=True)
 66    class smith_waterman[V](SimFunc[Sequence[V], float]):
 67        """
 68        Performs the Smith-Waterman alignment with configurable scoring parameters. If no element matches it returns 0.0.
 69
 70        Args:
 71            match_score: Score for matching characters. Defaults to 2.
 72            mismatch_penalty: Penalty for mismatching characters. Defaults to -1.
 73            gap_penalty: Penalty for gaps. Defaults to -1.
 74
 75        Example:
 76            >>> sim = smith_waterman()
 77            >>> sim("abcde", "fghe")
 78            2
 79        """
 80
 81        match_score: int = 2
 82        mismatch_penalty: int = -1
 83        gap_penalty: int = -1
 84
 85        @override
 86        def __call__(self, x: Sequence[V], y: Sequence[V]) -> float:
 87            try:
 88                alignment = smith.SmithWaterman(
 89                    cast(Sequence[Any], x), cast(Sequence[Any], y)
 90                )
 91                alignment.change_matrix(
 92                    core.ScoreMatrix(
 93                        match=self.match_score,
 94                        miss=self.mismatch_penalty,
 95                        gap=self.gap_penalty,
 96                    )
 97                )
 98                alignment.align()
 99
100                return alignment.get_score()
101            except ZeroDivisionError:
102                return 0.0
103
104
105@dataclass(slots=True, frozen=True)
106class SequenceSim[V, S: Float](StructuredValue[float]):
107    """
108    A class representing sequence similarity with optional mapping and similarity scores.
109
110    Attributes:
111        value: The overall similarity score as a float.
112        similarities: Optional local similarity scores as a sequence of floats.
113        mapping: Optional alignment information as a sequence of tuples.
114    """
115
116    similarities: Sequence[S] | None = field(default=None)
117    mapping: Sequence[tuple[V, V]] | None = field(default=None)
118
119
120@dataclass(slots=True, init=False)
121class dtw[V](SimFunc[Collection[V] | np.ndarray, SequenceSim[V, float]]):
122    """
123    Dynamic Time Warping similarity function with optional backtracking for alignment.
124
125    Examples:
126        >>> sim = dtw()
127        >>> sim([1, 2, 3], [1, 2, 3, 4])
128        SequenceSim(value=0.5, similarities=None, mapping=None)
129        >>> sim = dtw(distance_func=lambda a, b: abs(a - b))
130        >>> sim([1, 2, 3], [3, 4, 5])
131        SequenceSim(value=0.14285714285714285, similarities=None, mapping=None)
132        >>> sim = dtw(distance_func=lambda a, b: abs(len(str(a)) - len(str(b))))
133        >>> sim(["a", "bb", "ccc"], ["aa", "bbb", "c"], return_alignment=True)
134        SequenceSim(value=0.25, similarities=[0.5, 1.0, 1.0, 0.3333333333333333], mapping=[('aa', 'a'), ('aa', 'bb'), ('bbb', 'ccc'), ('c', 'ccc')])
135        >>> sim = dtw(distance_func=lambda a, b: abs(a - b))
136        >>> sim([1, 2, 3], [1, 2, 3, 4], return_alignment=True)
137        SequenceSim(value=0.5, similarities=[1.0, 1.0, 1.0, 0.5], mapping=[(1, 1), (2, 2), (3, 3), (4, 3)])
138    """
139
140    distance_func: SimFunc[V, Float] | None
141
142    def __init__(self, distance_func: AnySimFunc[V, Float] | None = None):
143        self.distance_func = unbatchify_sim(distance_func) if distance_func else None
144
145    def __call__(
146        self,
147        x: Collection[V] | np.ndarray,
148        y: Collection[V] | np.ndarray,
149        return_alignment: bool = False,
150    ) -> SequenceSim[V, float]:
151        """
152        Perform DTW and optionally return alignment information.
153
154        Args:
155            x: The first sequence as a collection or numpy array.
156            y: The second sequence as a collection or numpy array.
157            return_alignment: Whether to compute and return the alignment (default: False).
158
159        Returns:
160            A SequenceSim object containing the similarity value, local similarities, and optional alignment.
161        """
162        if not isinstance(x, np.ndarray):
163            x = np.array(x, dtype=object)  # Allow non-numeric types
164        if not isinstance(y, np.ndarray):
165            y = np.array(y, dtype=object)
166
167        # Compute the DTW distance
168        dtw_distance, alignment, local_similarities = self.compute_dtw(
169            x, y, return_alignment
170        )
171
172        # Convert DTW distance to similarity
173        similarity = dist2sim(dtw_distance)
174
175        # Return SequenceSim with updated attributes
176        return SequenceSim(
177            value=float(similarity),
178            similarities=local_similarities,
179            mapping=alignment if return_alignment else None,
180        )
181
182    def compute_dtw(
183        self, x: np.ndarray, y: np.ndarray, return_alignment: bool
184    ) -> tuple[float, list[tuple[V, V]] | None, list[float] | None]:
185        """
186        Compute DTW distance and optionally compute the best alignment and local similarities.
187
188        Args:
189            x: The first sequence as a numpy array.
190            y: The second sequence as a numpy array.
191            return_alignment: Whether to compute the alignment.
192
193        Returns:
194            A tuple of (DTW distance, best alignment or None, local similarities or None).
195        """
196        n, m = len(x), len(y)
197        dtw_matrix = np.full((n + 1, m + 1), np.inf)
198        dtw_matrix[0, 0] = 0
199
200        for i in range(1, n + 1):
201            for j in range(1, m + 1):
202                cost = unpack_float(
203                    self.distance_func(y[j - 1], x[i - 1])
204                    if self.distance_func
205                    else abs(y[j - 1] - x[i - 1])
206                )
207                # Take last min from a square box
208                last_min: float = min(
209                    dtw_matrix[i - 1, j],  # Insertion
210                    dtw_matrix[i, j - 1],  # Deletion
211                    dtw_matrix[i - 1, j - 1],  # Match
212                )
213                dtw_matrix[i, j] = cost + last_min
214
215        # If alignment is not requested, skip backtracking
216        if not return_alignment:
217            return dtw_matrix[n, m], None, None
218
219        # Backtracking to find the best alignment and local similarities
220        mapping, local_similarities = self.backtrack(dtw_matrix, x, y, n, m)
221
222        return dtw_matrix[n, m], mapping, local_similarities
223
224    def backtrack(
225        self, dtw_matrix: np.ndarray, x: np.ndarray, y: np.ndarray, n: int, m: int
226    ) -> tuple[list[tuple[V, V]], list[float]]:
227        """
228        Backtrack through the DTW matrix to find the best alignment and local similarities.
229
230        Args:
231            dtw_matrix: The DTW matrix.
232            x: The first sequence as a numpy array.
233            y: The second sequence as a numpy array.
234            n: The length of the first sequence.
235            m: The length of the second sequence.
236
237        Returns:
238            A tuple of (alignment, local similarities).
239        """
240        i, j = n, m
241        alignment = []
242        local_similarities = []
243
244        while i > 0 and j > 0:
245            alignment.append((y[j - 1], x[i - 1]))  # Align elements as (query, case)
246            cost = unpack_float(
247                self.distance_func(y[j - 1], x[i - 1])
248                if self.distance_func
249                else abs(y[j - 1] - x[i - 1])
250            )
251            local_similarities.append(dist2sim(cost))  # Convert cost to similarity
252            # Move in the direction of the minimum cost
253            if dtw_matrix[i - 1, j] == min(
254                dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1]
255            ):
256                i -= 1  # Move up
257            elif dtw_matrix[i, j - 1] == min(
258                dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1]
259            ):
260                j -= 1  # Move left
261            else:
262                i -= 1  # Move diagonally
263                j -= 1
264
265        # Handle remaining elements in i or j
266        while i > 0:
267            alignment.append((None, x[i - 1]))  # Unmatched element from x (case)
268            local_similarities.append(0.0)  # No similarity for unmatched
269            i -= 1
270        while j > 0:
271            alignment.append((y[j - 1], None))  # Unmatched element from y (query)
272            local_similarities.append(0.0)  # No similarity for unmatched
273            j -= 1
274
275        return alignment[::-1], local_similarities[
276            ::-1
277        ]  # Reverse to start from the beginning
278
279
280@dataclass(slots=True, frozen=True)
281class isolated_mapping[V](SimFunc[Sequence[V], float]):
282    """
283    Isolated Mapping similarity function that compares each element in 'y' (query)
284    with all elements in 'x' (case)
285    and takes the maximum similarity for each element in 'y', then averages
286    these maximums. Assumes y -> x (query operated onto case).
287
288    Args:
289        element_similarity: A function that takes two elements (query_item, case_item)
290        and returns a similarity score between them.
291
292    Examples:
293        >>> from cbrkit.sim.strings import levenshtein
294        >>> sim = isolated_mapping(levenshtein())
295        >>> sim(["kitten", "sitting"], ["sitting", "fitted"])
296        0.8333333333333334
297    """
298
299    element_similarity: SimFunc[V, float]
300
301    @override
302    def __call__(self, x: Sequence[V], y: Sequence[V]) -> float:
303        # x = case sequence, y = query sequence
304        # Logic: For each element in query y, find its best match in case x. Average these best scores.
305
306        if not y:  # If the query is empty, similarity is undefined or trivially 0 or 1? Let's return 0.
307            return 0.0
308        # Avoid division by zero if x is empty but y is not. max default handles this.
309
310        total_similarity = 0.0
311        for yi in y:  # Iterate through QUERY (y)
312            # Find the best match for the current query element yi within the CASE sequence x
313            # Pass (query_item, case_item) to element_similarity, assuming sim(query, case) convention
314            # Use default=0.0 for max() in case the case sequence x is empty
315            max_similarity = max(
316                (self.element_similarity(yi, xi) for xi in x), default=0.0
317            )
318            total_similarity += max_similarity
319
320        # Normalize by the length of the QUERY sequence (y)
321        average_similarity = total_similarity / len(y)
322
323        return average_similarity
324
325
326@dataclass(slots=True, frozen=True)
327class mapping[V](SimFunc[Sequence[V], float]):
328    """
329    Implements an A* algorithm to find the best matching between query items and case items
330    based on the provided similarity function, maximizing the overall similarity score.
331
332    Args:
333        similarity_function: A function that calculates the similarity between two elements.
334        max_queue_size: Maximum size of the priority queue. Defaults to 1000.
335
336    Returns:
337        A similarity function for sequences.
338
339    Examples:
340        >>> def example_similarity_function(x, y) -> float:
341        ...     return 1.0 if x == y else 0.0
342        >>> sim_func = mapping(example_similarity_function)
343        >>> result = sim_func(["Monday", "Tuesday", "Wednesday"], ["Monday", "Tuesday", "Sunday"])
344        >>> print(f"Normalized Similarity Score: {result}")
345        Normalized Similarity Score: 0.6666666666666666
346    """
347
348    element_similarity: SimFunc[V, float]
349    max_queue_size: int = 1000
350
351    @override
352    def __call__(self, x: Sequence[V], y: Sequence[V]) -> float:
353        # Priority queue to store potential solutions with their scores
354        pq = []
355        initial_solution = (0.0, set(), frozenset(y), frozenset(x))
356        heapq.heappush(pq, initial_solution)
357
358        best_score = 0.0
359
360        while pq:
361            current_score, current_mapping, remaining_query, remaining_case = (
362                heapq.heappop(pq)
363            )
364
365            if not remaining_query:  # All query items are mapped
366                best_score = max(best_score, current_score / len(y))
367                continue  # Continue to process remaining potential mappings
368
369            for query_item in remaining_query:
370                for case_item in remaining_case:
371                    sim_score = self.element_similarity(query_item, case_item)
372                    new_mapping = current_mapping | {(query_item, case_item)}
373                    new_score = current_score + sim_score  # Accumulate score correctly
374                    new_remaining_query = remaining_query - {query_item}
375                    new_remaining_case = remaining_case - {case_item}
376
377                    heapq.heappush(
378                        pq,
379                        (
380                            new_score,
381                            new_mapping,
382                            new_remaining_query,
383                            new_remaining_case,
384                        ),
385                    )
386
387                    if len(pq) > self.max_queue_size:
388                        heapq.heappop(pq)
389
390        return best_score
391
392
393@dataclass
394class Weight:
395    """A weighted interval with bounds for sequence mapping similarity."""
396
397    weight: float
398    lower_bound: float
399    upper_bound: float
400    inclusive_lower: bool
401    inclusive_upper: bool
402    normalized_weight: float | None = None
403
404
405@dataclass(slots=True, frozen=True)
406class sequence_mapping[V, S: Float](
407    SimFunc[Sequence[V], SequenceSim[V, S]], HasMetadata
408):
409    """
410    List Mapping similarity function.
411
412    Parameters:
413        element_similarity: The similarity function to use for comparing elements.
414        exact: Whether to use exact or inexact comparison. Default is False (inexact).
415        weights: Optional list of weights for weighted similarity calculation.
416
417    Examples:
418        >>> sim = sequence_mapping(lambda x, y: 1.0 if x == y else 0.0, True)
419        >>> result = sim(["a", "b", "c"], ["a", "b", "c"])
420        >>> result.value
421        1.0
422        >>> result.similarities
423        [1.0, 1.0, 1.0]
424    """
425
426    element_similarity: SimFunc[V, S]
427    exact: bool = False
428    weights: list[Weight] | None = None
429
430    @property
431    def metadata(self) -> JsonDict:
432        """Return metadata describing the sequence mapping configuration."""
433        return {
434            "element_similarity": get_metadata(self.element_similarity),
435            "exact": self.exact,
436            "weights": [asdict(weight) for weight in self.weights]
437            if self.weights
438            else None,
439        }
440
441    def compute_contains_exact(
442        self, query: Sequence[V], case: Sequence[V]
443    ) -> SequenceSim[V, S]:
444        """Compute element-wise similarity for sequences of equal length."""
445        if len(query) != len(case):
446            return SequenceSim(value=0.0, similarities=None, mapping=None)
447
448        sim_sum = 0.0
449        local_similarities: list[S] = []
450
451        for elem_q, elem_c in zip(query, case, strict=False):
452            sim = self.element_similarity(elem_q, elem_c)
453            sim_sum += unpack_float(sim)
454            local_similarities.append(sim)
455
456        return SequenceSim(
457            value=sim_sum / len(query),
458            similarities=local_similarities,
459            mapping=None,
460        )
461
462    def compute_contains_inexact(
463        self, case_list: Sequence[V], query_list: Sequence[V]
464    ) -> SequenceSim[V, S]:
465        """
466        Slides the *shorter* sequence across the *longer* one and always
467        evaluates   query → case   (i.e. query elements are compared against
468        the current window cut from the case list).
469        """
470        case_is_longer = len(case_list) >= len(query_list)
471        larger, smaller = (
472            (case_list, query_list) if case_is_longer else (query_list, case_list)
473        )
474
475        best_value = -1.0
476        best_sims: Sequence[S] = []
477
478        for start in range(len(larger) - len(smaller) + 1):
479            window = larger[start : start + len(smaller)]
480
481            if case_is_longer:
482                sim_res = self.compute_contains_exact(smaller, window)
483            else:
484                sim_res = self.compute_contains_exact(window, smaller)
485
486            if sim_res.value > best_value:
487                best_value = sim_res.value
488                best_sims = sim_res.similarities or []
489
490        return SequenceSim(value=best_value, similarities=best_sims, mapping=None)
491
492    def __call__(self, x: Sequence[V], y: Sequence[V]) -> SequenceSim[V, S]:
493        # x is the “case”, y is the “query”
494        if self.exact:
495            result = self.compute_contains_exact(y, x)
496        else:
497            result = self.compute_contains_inexact(x, y)
498
499        if self.weights and result.similarities:
500            total_weighted_sim = 0.0
501            total_weight = 0.0
502
503            # Normalize weights
504            for weight in self.weights:
505                weight_range = weight.upper_bound - weight.lower_bound
506                weight.normalized_weight = weight.weight / weight_range
507
508            # Apply weights
509            for sim in result.similarities:
510                sim_val = unpack_float(sim)
511
512                for weight in self.weights:
513                    lower_bound = weight.lower_bound
514                    upper_bound = weight.upper_bound
515                    inclusive_lower = weight.inclusive_lower
516                    inclusive_upper = weight.inclusive_upper
517
518                    if (
519                        (inclusive_lower and lower_bound <= sim_val <= upper_bound)
520                        or (
521                            not inclusive_lower and lower_bound < sim_val <= upper_bound
522                        )
523                    ) and (inclusive_upper or sim_val < upper_bound):
524                        assert weight.normalized_weight is not None
525                        weighted_sim = weight.normalized_weight * sim_val
526                        total_weighted_sim += weighted_sim
527                        total_weight += weight.normalized_weight
528
529            if total_weight > 0:
530                final_similarity = total_weighted_sim / total_weight
531            else:
532                final_similarity = result.value
533
534            return SequenceSim(
535                value=final_similarity,
536                similarities=result.similarities,
537                mapping=result.mapping,
538            )
539
540        return result
541
542
543@dataclass(slots=True, frozen=True)
544class sequence_correctness[V](SimFunc[Sequence[V], float]):
545    """List Correctness similarity function.
546
547    Parameters:
548    worst_case_sim (float): The similarity value to use when all pairs are discordant. Default is 0.0.
549
550    Examples:
551        >>> sim = sequence_correctness(0.5)
552        >>> sim(["Monday", "Tuesday", "Wednesday"], ["Monday", "Wednesday", "Tuesday"])
553        0.3333333333333333
554    """
555
556    worst_case_sim: float = 0.0
557
558    @override
559    def __call__(self, x: Sequence[V], y: Sequence[V]) -> float:
560        if len(x) != len(y):
561            return 0.0
562
563        count_concordant = 0
564        count_discordant = 0
565
566        for i, j in product(range(len(x)), repeat=2):
567            if i >= j:
568                continue
569
570            index_x1 = x.index(x[i])
571            index_x2 = x.index(x[j])
572            index_y1 = y.index(x[i])
573            index_y2 = y.index(x[j])
574
575            if index_y1 == -1 or index_y2 == -1:
576                continue
577            elif (index_x1 < index_x2 and index_y1 < index_y2) or (
578                index_x1 > index_x2 and index_y1 > index_y2
579            ):
580                count_concordant += 1
581            else:
582                count_discordant += 1
583
584        if count_concordant + count_discordant == 0:
585            return 0.0
586
587        correctness = (count_concordant - count_discordant) / (
588            count_concordant + count_discordant
589        )
590
591        if correctness >= 0:
592            return correctness
593        else:
594            return abs(correctness) * self.worst_case_sim
@dataclass(slots=True, init=False)
class dtw(cbrkit.typing.SimFunc[collections.abc.Collection[V] | numpy.ndarray, cbrkit.sim.collections.SequenceSim[V, float]], typing.Generic[V]):
121@dataclass(slots=True, init=False)
122class dtw[V](SimFunc[Collection[V] | np.ndarray, SequenceSim[V, float]]):
123    """
124    Dynamic Time Warping similarity function with optional backtracking for alignment.
125
126    Examples:
127        >>> sim = dtw()
128        >>> sim([1, 2, 3], [1, 2, 3, 4])
129        SequenceSim(value=0.5, similarities=None, mapping=None)
130        >>> sim = dtw(distance_func=lambda a, b: abs(a - b))
131        >>> sim([1, 2, 3], [3, 4, 5])
132        SequenceSim(value=0.14285714285714285, similarities=None, mapping=None)
133        >>> sim = dtw(distance_func=lambda a, b: abs(len(str(a)) - len(str(b))))
134        >>> sim(["a", "bb", "ccc"], ["aa", "bbb", "c"], return_alignment=True)
135        SequenceSim(value=0.25, similarities=[0.5, 1.0, 1.0, 0.3333333333333333], mapping=[('aa', 'a'), ('aa', 'bb'), ('bbb', 'ccc'), ('c', 'ccc')])
136        >>> sim = dtw(distance_func=lambda a, b: abs(a - b))
137        >>> sim([1, 2, 3], [1, 2, 3, 4], return_alignment=True)
138        SequenceSim(value=0.5, similarities=[1.0, 1.0, 1.0, 0.5], mapping=[(1, 1), (2, 2), (3, 3), (4, 3)])
139    """
140
141    distance_func: SimFunc[V, Float] | None
142
143    def __init__(self, distance_func: AnySimFunc[V, Float] | None = None):
144        self.distance_func = unbatchify_sim(distance_func) if distance_func else None
145
146    def __call__(
147        self,
148        x: Collection[V] | np.ndarray,
149        y: Collection[V] | np.ndarray,
150        return_alignment: bool = False,
151    ) -> SequenceSim[V, float]:
152        """
153        Perform DTW and optionally return alignment information.
154
155        Args:
156            x: The first sequence as a collection or numpy array.
157            y: The second sequence as a collection or numpy array.
158            return_alignment: Whether to compute and return the alignment (default: False).
159
160        Returns:
161            A SequenceSim object containing the similarity value, local similarities, and optional alignment.
162        """
163        if not isinstance(x, np.ndarray):
164            x = np.array(x, dtype=object)  # Allow non-numeric types
165        if not isinstance(y, np.ndarray):
166            y = np.array(y, dtype=object)
167
168        # Compute the DTW distance
169        dtw_distance, alignment, local_similarities = self.compute_dtw(
170            x, y, return_alignment
171        )
172
173        # Convert DTW distance to similarity
174        similarity = dist2sim(dtw_distance)
175
176        # Return SequenceSim with updated attributes
177        return SequenceSim(
178            value=float(similarity),
179            similarities=local_similarities,
180            mapping=alignment if return_alignment else None,
181        )
182
183    def compute_dtw(
184        self, x: np.ndarray, y: np.ndarray, return_alignment: bool
185    ) -> tuple[float, list[tuple[V, V]] | None, list[float] | None]:
186        """
187        Compute DTW distance and optionally compute the best alignment and local similarities.
188
189        Args:
190            x: The first sequence as a numpy array.
191            y: The second sequence as a numpy array.
192            return_alignment: Whether to compute the alignment.
193
194        Returns:
195            A tuple of (DTW distance, best alignment or None, local similarities or None).
196        """
197        n, m = len(x), len(y)
198        dtw_matrix = np.full((n + 1, m + 1), np.inf)
199        dtw_matrix[0, 0] = 0
200
201        for i in range(1, n + 1):
202            for j in range(1, m + 1):
203                cost = unpack_float(
204                    self.distance_func(y[j - 1], x[i - 1])
205                    if self.distance_func
206                    else abs(y[j - 1] - x[i - 1])
207                )
208                # Take last min from a square box
209                last_min: float = min(
210                    dtw_matrix[i - 1, j],  # Insertion
211                    dtw_matrix[i, j - 1],  # Deletion
212                    dtw_matrix[i - 1, j - 1],  # Match
213                )
214                dtw_matrix[i, j] = cost + last_min
215
216        # If alignment is not requested, skip backtracking
217        if not return_alignment:
218            return dtw_matrix[n, m], None, None
219
220        # Backtracking to find the best alignment and local similarities
221        mapping, local_similarities = self.backtrack(dtw_matrix, x, y, n, m)
222
223        return dtw_matrix[n, m], mapping, local_similarities
224
225    def backtrack(
226        self, dtw_matrix: np.ndarray, x: np.ndarray, y: np.ndarray, n: int, m: int
227    ) -> tuple[list[tuple[V, V]], list[float]]:
228        """
229        Backtrack through the DTW matrix to find the best alignment and local similarities.
230
231        Args:
232            dtw_matrix: The DTW matrix.
233            x: The first sequence as a numpy array.
234            y: The second sequence as a numpy array.
235            n: The length of the first sequence.
236            m: The length of the second sequence.
237
238        Returns:
239            A tuple of (alignment, local similarities).
240        """
241        i, j = n, m
242        alignment = []
243        local_similarities = []
244
245        while i > 0 and j > 0:
246            alignment.append((y[j - 1], x[i - 1]))  # Align elements as (query, case)
247            cost = unpack_float(
248                self.distance_func(y[j - 1], x[i - 1])
249                if self.distance_func
250                else abs(y[j - 1] - x[i - 1])
251            )
252            local_similarities.append(dist2sim(cost))  # Convert cost to similarity
253            # Move in the direction of the minimum cost
254            if dtw_matrix[i - 1, j] == min(
255                dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1]
256            ):
257                i -= 1  # Move up
258            elif dtw_matrix[i, j - 1] == min(
259                dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1]
260            ):
261                j -= 1  # Move left
262            else:
263                i -= 1  # Move diagonally
264                j -= 1
265
266        # Handle remaining elements in i or j
267        while i > 0:
268            alignment.append((None, x[i - 1]))  # Unmatched element from x (case)
269            local_similarities.append(0.0)  # No similarity for unmatched
270            i -= 1
271        while j > 0:
272            alignment.append((y[j - 1], None))  # Unmatched element from y (query)
273            local_similarities.append(0.0)  # No similarity for unmatched
274            j -= 1
275
276        return alignment[::-1], local_similarities[
277            ::-1
278        ]  # Reverse to start from the beginning

Dynamic Time Warping similarity function with optional backtracking for alignment.

Examples:
>>> sim = dtw()
>>> sim([1, 2, 3], [1, 2, 3, 4])
SequenceSim(value=0.5, similarities=None, mapping=None)
>>> sim = dtw(distance_func=lambda a, b: abs(a - b))
>>> sim([1, 2, 3], [3, 4, 5])
SequenceSim(value=0.14285714285714285, similarities=None, mapping=None)
>>> sim = dtw(distance_func=lambda a, b: abs(len(str(a)) - len(str(b))))
>>> sim(["a", "bb", "ccc"], ["aa", "bbb", "c"], return_alignment=True)
SequenceSim(value=0.25, similarities=[0.5, 1.0, 1.0, 0.3333333333333333], mapping=[('aa', 'a'), ('aa', 'bb'), ('bbb', 'ccc'), ('c', 'ccc')])
>>> sim = dtw(distance_func=lambda a, b: abs(a - b))
>>> sim([1, 2, 3], [1, 2, 3, 4], return_alignment=True)
SequenceSim(value=0.5, similarities=[1.0, 1.0, 1.0, 0.5], mapping=[(1, 1), (2, 2), (3, 3), (4, 3)])
dtw(distance_func: AnySimFunc[V, Float] | None = None)
143    def __init__(self, distance_func: AnySimFunc[V, Float] | None = None):
144        self.distance_func = unbatchify_sim(distance_func) if distance_func else None
distance_func: Optional[cbrkit.typing.SimFunc[V, Float]]
def compute_dtw( self, x: numpy.ndarray, y: numpy.ndarray, return_alignment: bool) -> tuple[float, list[tuple[V, V]] | None, list[float] | None]:
183    def compute_dtw(
184        self, x: np.ndarray, y: np.ndarray, return_alignment: bool
185    ) -> tuple[float, list[tuple[V, V]] | None, list[float] | None]:
186        """
187        Compute DTW distance and optionally compute the best alignment and local similarities.
188
189        Args:
190            x: The first sequence as a numpy array.
191            y: The second sequence as a numpy array.
192            return_alignment: Whether to compute the alignment.
193
194        Returns:
195            A tuple of (DTW distance, best alignment or None, local similarities or None).
196        """
197        n, m = len(x), len(y)
198        dtw_matrix = np.full((n + 1, m + 1), np.inf)
199        dtw_matrix[0, 0] = 0
200
201        for i in range(1, n + 1):
202            for j in range(1, m + 1):
203                cost = unpack_float(
204                    self.distance_func(y[j - 1], x[i - 1])
205                    if self.distance_func
206                    else abs(y[j - 1] - x[i - 1])
207                )
208                # Take last min from a square box
209                last_min: float = min(
210                    dtw_matrix[i - 1, j],  # Insertion
211                    dtw_matrix[i, j - 1],  # Deletion
212                    dtw_matrix[i - 1, j - 1],  # Match
213                )
214                dtw_matrix[i, j] = cost + last_min
215
216        # If alignment is not requested, skip backtracking
217        if not return_alignment:
218            return dtw_matrix[n, m], None, None
219
220        # Backtracking to find the best alignment and local similarities
221        mapping, local_similarities = self.backtrack(dtw_matrix, x, y, n, m)
222
223        return dtw_matrix[n, m], mapping, local_similarities

Compute DTW distance and optionally compute the best alignment and local similarities.

Arguments:
  • x: The first sequence as a numpy array.
  • y: The second sequence as a numpy array.
  • return_alignment: Whether to compute the alignment.
Returns:

A tuple of (DTW distance, best alignment or None, local similarities or None).

def backtrack( self, dtw_matrix: numpy.ndarray, x: numpy.ndarray, y: numpy.ndarray, n: int, m: int) -> tuple[list[tuple[V, V]], list[float]]:
225    def backtrack(
226        self, dtw_matrix: np.ndarray, x: np.ndarray, y: np.ndarray, n: int, m: int
227    ) -> tuple[list[tuple[V, V]], list[float]]:
228        """
229        Backtrack through the DTW matrix to find the best alignment and local similarities.
230
231        Args:
232            dtw_matrix: The DTW matrix.
233            x: The first sequence as a numpy array.
234            y: The second sequence as a numpy array.
235            n: The length of the first sequence.
236            m: The length of the second sequence.
237
238        Returns:
239            A tuple of (alignment, local similarities).
240        """
241        i, j = n, m
242        alignment = []
243        local_similarities = []
244
245        while i > 0 and j > 0:
246            alignment.append((y[j - 1], x[i - 1]))  # Align elements as (query, case)
247            cost = unpack_float(
248                self.distance_func(y[j - 1], x[i - 1])
249                if self.distance_func
250                else abs(y[j - 1] - x[i - 1])
251            )
252            local_similarities.append(dist2sim(cost))  # Convert cost to similarity
253            # Move in the direction of the minimum cost
254            if dtw_matrix[i - 1, j] == min(
255                dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1]
256            ):
257                i -= 1  # Move up
258            elif dtw_matrix[i, j - 1] == min(
259                dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1]
260            ):
261                j -= 1  # Move left
262            else:
263                i -= 1  # Move diagonally
264                j -= 1
265
266        # Handle remaining elements in i or j
267        while i > 0:
268            alignment.append((None, x[i - 1]))  # Unmatched element from x (case)
269            local_similarities.append(0.0)  # No similarity for unmatched
270            i -= 1
271        while j > 0:
272            alignment.append((y[j - 1], None))  # Unmatched element from y (query)
273            local_similarities.append(0.0)  # No similarity for unmatched
274            j -= 1
275
276        return alignment[::-1], local_similarities[
277            ::-1
278        ]  # Reverse to start from the beginning

Backtrack through the DTW matrix to find the best alignment and local similarities.

Arguments:
  • dtw_matrix: The DTW matrix.
  • x: The first sequence as a numpy array.
  • y: The second sequence as a numpy array.
  • n: The length of the first sequence.
  • m: The length of the second sequence.
Returns:

A tuple of (alignment, local similarities).

@dataclass(slots=True, frozen=True)
class smith_waterman(cbrkit.typing.SimFunc[collections.abc.Sequence[V], float], typing.Generic[V]):
 66    @dataclass(slots=True, frozen=True)
 67    class smith_waterman[V](SimFunc[Sequence[V], float]):
 68        """
 69        Performs the Smith-Waterman alignment with configurable scoring parameters. If no element matches it returns 0.0.
 70
 71        Args:
 72            match_score: Score for matching characters. Defaults to 2.
 73            mismatch_penalty: Penalty for mismatching characters. Defaults to -1.
 74            gap_penalty: Penalty for gaps. Defaults to -1.
 75
 76        Example:
 77            >>> sim = smith_waterman()
 78            >>> sim("abcde", "fghe")
 79            2
 80        """
 81
 82        match_score: int = 2
 83        mismatch_penalty: int = -1
 84        gap_penalty: int = -1
 85
 86        @override
 87        def __call__(self, x: Sequence[V], y: Sequence[V]) -> float:
 88            try:
 89                alignment = smith.SmithWaterman(
 90                    cast(Sequence[Any], x), cast(Sequence[Any], y)
 91                )
 92                alignment.change_matrix(
 93                    core.ScoreMatrix(
 94                        match=self.match_score,
 95                        miss=self.mismatch_penalty,
 96                        gap=self.gap_penalty,
 97                    )
 98                )
 99                alignment.align()
100
101                return alignment.get_score()
102            except ZeroDivisionError:
103                return 0.0

Performs the Smith-Waterman alignment with configurable scoring parameters. If no element matches it returns 0.0.

Arguments:
  • match_score: Score for matching characters. Defaults to 2.
  • mismatch_penalty: Penalty for mismatching characters. Defaults to -1.
  • gap_penalty: Penalty for gaps. Defaults to -1.
Example:
>>> sim = smith_waterman()
>>> sim("abcde", "fghe")
2
smith_waterman( match_score: int = 2, mismatch_penalty: int = -1, gap_penalty: int = -1)
match_score: int
mismatch_penalty: int
gap_penalty: int
@dataclass(slots=True, frozen=True)
class jaccard(cbrkit.typing.SimFunc[collections.abc.Collection[V], float], typing.Generic[V]):
43    @dataclass(slots=True, frozen=True)
44    class jaccard[V](SimFunc[Collection[V], float]):
45        """Jaccard similarity function.
46
47        Examples:
48            >>> sim = jaccard()
49            >>> sim(["a", "b", "c", "d"], ["a", "b", "c"])
50            0.8
51        """
52
53        @override
54        def __call__(self, x: Collection[V], y: Collection[V]) -> float:
55            if not isinstance(x, Set):
56                x = set(x)
57            if not isinstance(y, Set):
58                y = set(y)
59
60            return dist2sim(jaccard_distance(x, y))

Jaccard similarity function.

Examples:
>>> sim = jaccard()
>>> sim(["a", "b", "c", "d"], ["a", "b", "c"])
0.8
@dataclass(slots=True, frozen=True)
class mapping(cbrkit.typing.SimFunc[collections.abc.Sequence[V], float], typing.Generic[V]):
327@dataclass(slots=True, frozen=True)
328class mapping[V](SimFunc[Sequence[V], float]):
329    """
330    Implements an A* algorithm to find the best matching between query items and case items
331    based on the provided similarity function, maximizing the overall similarity score.
332
333    Args:
334        similarity_function: A function that calculates the similarity between two elements.
335        max_queue_size: Maximum size of the priority queue. Defaults to 1000.
336
337    Returns:
338        A similarity function for sequences.
339
340    Examples:
341        >>> def example_similarity_function(x, y) -> float:
342        ...     return 1.0 if x == y else 0.0
343        >>> sim_func = mapping(example_similarity_function)
344        >>> result = sim_func(["Monday", "Tuesday", "Wednesday"], ["Monday", "Tuesday", "Sunday"])
345        >>> print(f"Normalized Similarity Score: {result}")
346        Normalized Similarity Score: 0.6666666666666666
347    """
348
349    element_similarity: SimFunc[V, float]
350    max_queue_size: int = 1000
351
352    @override
353    def __call__(self, x: Sequence[V], y: Sequence[V]) -> float:
354        # Priority queue to store potential solutions with their scores
355        pq = []
356        initial_solution = (0.0, set(), frozenset(y), frozenset(x))
357        heapq.heappush(pq, initial_solution)
358
359        best_score = 0.0
360
361        while pq:
362            current_score, current_mapping, remaining_query, remaining_case = (
363                heapq.heappop(pq)
364            )
365
366            if not remaining_query:  # All query items are mapped
367                best_score = max(best_score, current_score / len(y))
368                continue  # Continue to process remaining potential mappings
369
370            for query_item in remaining_query:
371                for case_item in remaining_case:
372                    sim_score = self.element_similarity(query_item, case_item)
373                    new_mapping = current_mapping | {(query_item, case_item)}
374                    new_score = current_score + sim_score  # Accumulate score correctly
375                    new_remaining_query = remaining_query - {query_item}
376                    new_remaining_case = remaining_case - {case_item}
377
378                    heapq.heappush(
379                        pq,
380                        (
381                            new_score,
382                            new_mapping,
383                            new_remaining_query,
384                            new_remaining_case,
385                        ),
386                    )
387
388                    if len(pq) > self.max_queue_size:
389                        heapq.heappop(pq)
390
391        return best_score

Implements an A* algorithm to find the best matching between query items and case items based on the provided similarity function, maximizing the overall similarity score.

Arguments:
  • similarity_function: A function that calculates the similarity between two elements.
  • max_queue_size: Maximum size of the priority queue. Defaults to 1000.
Returns:

A similarity function for sequences.

Examples:
>>> def example_similarity_function(x, y) -> float:
...     return 1.0 if x == y else 0.0
>>> sim_func = mapping(example_similarity_function)
>>> result = sim_func(["Monday", "Tuesday", "Wednesday"], ["Monday", "Tuesday", "Sunday"])
>>> print(f"Normalized Similarity Score: {result}")
Normalized Similarity Score: 0.6666666666666666
mapping( element_similarity: cbrkit.typing.SimFunc[V, float], max_queue_size: int = 1000)
element_similarity: cbrkit.typing.SimFunc[V, float]
max_queue_size: int
@dataclass(slots=True, frozen=True)
class isolated_mapping(cbrkit.typing.SimFunc[collections.abc.Sequence[V], float], typing.Generic[V]):
281@dataclass(slots=True, frozen=True)
282class isolated_mapping[V](SimFunc[Sequence[V], float]):
283    """
284    Isolated Mapping similarity function that compares each element in 'y' (query)
285    with all elements in 'x' (case)
286    and takes the maximum similarity for each element in 'y', then averages
287    these maximums. Assumes y -> x (query operated onto case).
288
289    Args:
290        element_similarity: A function that takes two elements (query_item, case_item)
291        and returns a similarity score between them.
292
293    Examples:
294        >>> from cbrkit.sim.strings import levenshtein
295        >>> sim = isolated_mapping(levenshtein())
296        >>> sim(["kitten", "sitting"], ["sitting", "fitted"])
297        0.8333333333333334
298    """
299
300    element_similarity: SimFunc[V, float]
301
302    @override
303    def __call__(self, x: Sequence[V], y: Sequence[V]) -> float:
304        # x = case sequence, y = query sequence
305        # Logic: For each element in query y, find its best match in case x. Average these best scores.
306
307        if not y:  # If the query is empty, similarity is undefined or trivially 0 or 1? Let's return 0.
308            return 0.0
309        # Avoid division by zero if x is empty but y is not. max default handles this.
310
311        total_similarity = 0.0
312        for yi in y:  # Iterate through QUERY (y)
313            # Find the best match for the current query element yi within the CASE sequence x
314            # Pass (query_item, case_item) to element_similarity, assuming sim(query, case) convention
315            # Use default=0.0 for max() in case the case sequence x is empty
316            max_similarity = max(
317                (self.element_similarity(yi, xi) for xi in x), default=0.0
318            )
319            total_similarity += max_similarity
320
321        # Normalize by the length of the QUERY sequence (y)
322        average_similarity = total_similarity / len(y)
323
324        return average_similarity

Isolated Mapping similarity function that compares each element in 'y' (query) with all elements in 'x' (case) and takes the maximum similarity for each element in 'y', then averages these maximums. Assumes y -> x (query operated onto case).

Arguments:
  • element_similarity: A function that takes two elements (query_item, case_item)
  • and returns a similarity score between them.
Examples:
>>> from cbrkit.sim.strings import levenshtein
>>> sim = isolated_mapping(levenshtein())
>>> sim(["kitten", "sitting"], ["sitting", "fitted"])
0.8333333333333334
isolated_mapping(element_similarity: cbrkit.typing.SimFunc[V, float])
element_similarity: cbrkit.typing.SimFunc[V, float]
@dataclass(slots=True, frozen=True)
class sequence_mapping(cbrkit.typing.SimFunc[collections.abc.Sequence[V], cbrkit.sim.collections.SequenceSim[V, S]], cbrkit.typing.HasMetadata, typing.Generic[V, S]):
406@dataclass(slots=True, frozen=True)
407class sequence_mapping[V, S: Float](
408    SimFunc[Sequence[V], SequenceSim[V, S]], HasMetadata
409):
410    """
411    List Mapping similarity function.
412
413    Parameters:
414        element_similarity: The similarity function to use for comparing elements.
415        exact: Whether to use exact or inexact comparison. Default is False (inexact).
416        weights: Optional list of weights for weighted similarity calculation.
417
418    Examples:
419        >>> sim = sequence_mapping(lambda x, y: 1.0 if x == y else 0.0, True)
420        >>> result = sim(["a", "b", "c"], ["a", "b", "c"])
421        >>> result.value
422        1.0
423        >>> result.similarities
424        [1.0, 1.0, 1.0]
425    """
426
427    element_similarity: SimFunc[V, S]
428    exact: bool = False
429    weights: list[Weight] | None = None
430
431    @property
432    def metadata(self) -> JsonDict:
433        """Return metadata describing the sequence mapping configuration."""
434        return {
435            "element_similarity": get_metadata(self.element_similarity),
436            "exact": self.exact,
437            "weights": [asdict(weight) for weight in self.weights]
438            if self.weights
439            else None,
440        }
441
442    def compute_contains_exact(
443        self, query: Sequence[V], case: Sequence[V]
444    ) -> SequenceSim[V, S]:
445        """Compute element-wise similarity for sequences of equal length."""
446        if len(query) != len(case):
447            return SequenceSim(value=0.0, similarities=None, mapping=None)
448
449        sim_sum = 0.0
450        local_similarities: list[S] = []
451
452        for elem_q, elem_c in zip(query, case, strict=False):
453            sim = self.element_similarity(elem_q, elem_c)
454            sim_sum += unpack_float(sim)
455            local_similarities.append(sim)
456
457        return SequenceSim(
458            value=sim_sum / len(query),
459            similarities=local_similarities,
460            mapping=None,
461        )
462
463    def compute_contains_inexact(
464        self, case_list: Sequence[V], query_list: Sequence[V]
465    ) -> SequenceSim[V, S]:
466        """
467        Slides the *shorter* sequence across the *longer* one and always
468        evaluates   query → case   (i.e. query elements are compared against
469        the current window cut from the case list).
470        """
471        case_is_longer = len(case_list) >= len(query_list)
472        larger, smaller = (
473            (case_list, query_list) if case_is_longer else (query_list, case_list)
474        )
475
476        best_value = -1.0
477        best_sims: Sequence[S] = []
478
479        for start in range(len(larger) - len(smaller) + 1):
480            window = larger[start : start + len(smaller)]
481
482            if case_is_longer:
483                sim_res = self.compute_contains_exact(smaller, window)
484            else:
485                sim_res = self.compute_contains_exact(window, smaller)
486
487            if sim_res.value > best_value:
488                best_value = sim_res.value
489                best_sims = sim_res.similarities or []
490
491        return SequenceSim(value=best_value, similarities=best_sims, mapping=None)
492
493    def __call__(self, x: Sequence[V], y: Sequence[V]) -> SequenceSim[V, S]:
494        # x is the “case”, y is the “query”
495        if self.exact:
496            result = self.compute_contains_exact(y, x)
497        else:
498            result = self.compute_contains_inexact(x, y)
499
500        if self.weights and result.similarities:
501            total_weighted_sim = 0.0
502            total_weight = 0.0
503
504            # Normalize weights
505            for weight in self.weights:
506                weight_range = weight.upper_bound - weight.lower_bound
507                weight.normalized_weight = weight.weight / weight_range
508
509            # Apply weights
510            for sim in result.similarities:
511                sim_val = unpack_float(sim)
512
513                for weight in self.weights:
514                    lower_bound = weight.lower_bound
515                    upper_bound = weight.upper_bound
516                    inclusive_lower = weight.inclusive_lower
517                    inclusive_upper = weight.inclusive_upper
518
519                    if (
520                        (inclusive_lower and lower_bound <= sim_val <= upper_bound)
521                        or (
522                            not inclusive_lower and lower_bound < sim_val <= upper_bound
523                        )
524                    ) and (inclusive_upper or sim_val < upper_bound):
525                        assert weight.normalized_weight is not None
526                        weighted_sim = weight.normalized_weight * sim_val
527                        total_weighted_sim += weighted_sim
528                        total_weight += weight.normalized_weight
529
530            if total_weight > 0:
531                final_similarity = total_weighted_sim / total_weight
532            else:
533                final_similarity = result.value
534
535            return SequenceSim(
536                value=final_similarity,
537                similarities=result.similarities,
538                mapping=result.mapping,
539            )
540
541        return result

List Mapping similarity function.

Arguments:
  • element_similarity: The similarity function to use for comparing elements.
  • exact: Whether to use exact or inexact comparison. Default is False (inexact).
  • weights: Optional list of weights for weighted similarity calculation.
Examples:
>>> sim = sequence_mapping(lambda x, y: 1.0 if x == y else 0.0, True)
>>> result = sim(["a", "b", "c"], ["a", "b", "c"])
>>> result.value
1.0
>>> result.similarities
[1.0, 1.0, 1.0]
sequence_mapping( element_similarity: cbrkit.typing.SimFunc[V, S], exact: bool = False, weights: list[Weight] | None = None)
element_similarity: cbrkit.typing.SimFunc[V, S]
exact: bool
weights: list[Weight] | None
def compute_contains_exact( self, query: Sequence[V], case: Sequence[V]) -> SequenceSim[V, S]:
442    def compute_contains_exact(
443        self, query: Sequence[V], case: Sequence[V]
444    ) -> SequenceSim[V, S]:
445        """Compute element-wise similarity for sequences of equal length."""
446        if len(query) != len(case):
447            return SequenceSim(value=0.0, similarities=None, mapping=None)
448
449        sim_sum = 0.0
450        local_similarities: list[S] = []
451
452        for elem_q, elem_c in zip(query, case, strict=False):
453            sim = self.element_similarity(elem_q, elem_c)
454            sim_sum += unpack_float(sim)
455            local_similarities.append(sim)
456
457        return SequenceSim(
458            value=sim_sum / len(query),
459            similarities=local_similarities,
460            mapping=None,
461        )

Compute element-wise similarity for sequences of equal length.

def compute_contains_inexact( self, case_list: Sequence[V], query_list: Sequence[V]) -> SequenceSim[V, S]:
463    def compute_contains_inexact(
464        self, case_list: Sequence[V], query_list: Sequence[V]
465    ) -> SequenceSim[V, S]:
466        """
467        Slides the *shorter* sequence across the *longer* one and always
468        evaluates   query → case   (i.e. query elements are compared against
469        the current window cut from the case list).
470        """
471        case_is_longer = len(case_list) >= len(query_list)
472        larger, smaller = (
473            (case_list, query_list) if case_is_longer else (query_list, case_list)
474        )
475
476        best_value = -1.0
477        best_sims: Sequence[S] = []
478
479        for start in range(len(larger) - len(smaller) + 1):
480            window = larger[start : start + len(smaller)]
481
482            if case_is_longer:
483                sim_res = self.compute_contains_exact(smaller, window)
484            else:
485                sim_res = self.compute_contains_exact(window, smaller)
486
487            if sim_res.value > best_value:
488                best_value = sim_res.value
489                best_sims = sim_res.similarities or []
490
491        return SequenceSim(value=best_value, similarities=best_sims, mapping=None)

Slides the shorter sequence across the longer one and always evaluates query → case (i.e. query elements are compared against the current window cut from the case list).

@dataclass(slots=True, frozen=True)
class sequence_correctness(cbrkit.typing.SimFunc[collections.abc.Sequence[V], float], typing.Generic[V]):
544@dataclass(slots=True, frozen=True)
545class sequence_correctness[V](SimFunc[Sequence[V], float]):
546    """List Correctness similarity function.
547
548    Parameters:
549    worst_case_sim (float): The similarity value to use when all pairs are discordant. Default is 0.0.
550
551    Examples:
552        >>> sim = sequence_correctness(0.5)
553        >>> sim(["Monday", "Tuesday", "Wednesday"], ["Monday", "Wednesday", "Tuesday"])
554        0.3333333333333333
555    """
556
557    worst_case_sim: float = 0.0
558
559    @override
560    def __call__(self, x: Sequence[V], y: Sequence[V]) -> float:
561        if len(x) != len(y):
562            return 0.0
563
564        count_concordant = 0
565        count_discordant = 0
566
567        for i, j in product(range(len(x)), repeat=2):
568            if i >= j:
569                continue
570
571            index_x1 = x.index(x[i])
572            index_x2 = x.index(x[j])
573            index_y1 = y.index(x[i])
574            index_y2 = y.index(x[j])
575
576            if index_y1 == -1 or index_y2 == -1:
577                continue
578            elif (index_x1 < index_x2 and index_y1 < index_y2) or (
579                index_x1 > index_x2 and index_y1 > index_y2
580            ):
581                count_concordant += 1
582            else:
583                count_discordant += 1
584
585        if count_concordant + count_discordant == 0:
586            return 0.0
587
588        correctness = (count_concordant - count_discordant) / (
589            count_concordant + count_discordant
590        )
591
592        if correctness >= 0:
593            return correctness
594        else:
595            return abs(correctness) * self.worst_case_sim

List Correctness similarity function.

Parameters: worst_case_sim (float): The similarity value to use when all pairs are discordant. Default is 0.0.

Examples:
>>> sim = sequence_correctness(0.5)
>>> sim(["Monday", "Tuesday", "Wednesday"], ["Monday", "Wednesday", "Tuesday"])
0.3333333333333333
sequence_correctness(worst_case_sim: float = 0.0)
worst_case_sim: float
@dataclass(slots=True, frozen=True)
class SequenceSim(cbrkit.typing.StructuredValue[float], typing.Generic[V, S]):
106@dataclass(slots=True, frozen=True)
107class SequenceSim[V, S: Float](StructuredValue[float]):
108    """
109    A class representing sequence similarity with optional mapping and similarity scores.
110
111    Attributes:
112        value: The overall similarity score as a float.
113        similarities: Optional local similarity scores as a sequence of floats.
114        mapping: Optional alignment information as a sequence of tuples.
115    """
116
117    similarities: Sequence[S] | None = field(default=None)
118    mapping: Sequence[tuple[V, V]] | None = field(default=None)

A class representing sequence similarity with optional mapping and similarity scores.

Attributes:
  • value: The overall similarity score as a float.
  • similarities: Optional local similarity scores as a sequence of floats.
  • mapping: Optional alignment information as a sequence of tuples.
SequenceSim( value: T, similarities: Sequence[S] | None = None, mapping: Sequence[tuple[V, V]] | None = None)
similarities: Sequence[S] | None
mapping: Sequence[tuple[V, V]] | None
@dataclass
class Weight:
394@dataclass
395class Weight:
396    """A weighted interval with bounds for sequence mapping similarity."""
397
398    weight: float
399    lower_bound: float
400    upper_bound: float
401    inclusive_lower: bool
402    inclusive_upper: bool
403    normalized_weight: float | None = None

A weighted interval with bounds for sequence mapping similarity.

Weight( weight: float, lower_bound: float, upper_bound: float, inclusive_lower: bool, inclusive_upper: bool, normalized_weight: float | None = None)
weight: float
lower_bound: float
upper_bound: float
inclusive_lower: bool
inclusive_upper: bool
normalized_weight: float | None = None