diff --git a/tools/tdgpt/taosanalytics/algo/tool/profile_search.py b/tools/tdgpt/taosanalytics/algo/tool/profile_search.py index cfede385665a..097e13b0bfc4 100644 --- a/tools/tdgpt/taosanalytics/algo/tool/profile_search.py +++ b/tools/tdgpt/taosanalytics/algo/tool/profile_search.py @@ -388,11 +388,8 @@ def _validate_params(parsed_input): has_threshold, top_n = _validate_result_constraints(result_obj, algo_type) source_arr, source_ts_window = _parse_source_data(parsed_input["source_data"]) - exclude_contained = _validate_bool_field(result_obj, "exclude_contained") - if algo_type != "dtw" and "exclude_contained" in result_obj: - raise ValueError('"result.exclude_contained" can only be set for dtw algorithm') - exclude_source = _validate_bool_field(result_obj, "exclude_source") + exclude_overlap = _validate_bool_field(result_obj, "exclude_overlap") min_window, max_window = _validate_min_max_window( algo_params.get("min_window", None), @@ -434,8 +431,8 @@ def _validate_params(parsed_input): "max_window": max_window, "window_size_step": window_size_step, "window_sliding_step": window_sliding_step, - "exclude_contained": exclude_contained, "exclude_source": exclude_source, + "exclude_overlap": exclude_overlap, "is_profile_list": is_profile_list } @@ -461,42 +458,44 @@ def _validate_possible_candidates(source_arr, data_list_size, min_window, max_wi ) -def _is_interval_contained(inner_window, outer_window): - """Return whether ``outer_window`` strictly contains ``inner_window``. +def _is_interval_overlapping(window_a, window_b): + """Return whether ``window_a`` and ``window_b`` overlap. - "Strict" means ``outer_window`` fully covers ``inner_window`` and the two - windows are not identical. At least one outer bound must extend beyond the - corresponding inner bound, so equal start/end bounds do not count as - containment. + Endpoint-touching multi-point windows (e.g. [1,5] and [5,8]) are treated as + adjacent and not overlapping. Two single-point windows [t,t] are considered + overlapping when they share the same timestamp. """ - return (outer_window[0] <= inner_window[0] - and outer_window[1] >= inner_window[1] - and (outer_window[0] < inner_window[0] or outer_window[1] > inner_window[1])) + # Single-point windows with identical timestamps are the same point and overlap. + if window_a[0] == window_a[1] and window_b[0] == window_b[1]: + return window_a[0] == window_b[0] + return window_a[0] < window_b[1] and window_b[0] < window_a[1] + +def _filter_exclude_overlap(matches, limit=None): + """Greedily keep matches whose ts_window does not overlap with any already-kept match. -def _filter_exclude_contained(matches, limit=None): + matches must be sorted best-first. For each candidate, it is discarded if its + ts_window overlaps with an already-kept match's ts_window (adjacent windows + sharing only an endpoint are not considered overlapping). + """ if len(matches) <= 1: return matches - # matches are expected to be sorted by criteria ascending (best first for DTW). - # We greedily keep each match unless it is in a strict containment relationship - # (either direction) with an already-kept match. Because we process best-first, - # every already-kept match has a better (or equal) criteria value, so the current - # match is always the worse one in any containment pair and should be discarded. kept = [] # list of (ts_window, original_index) for idx, match in enumerate(matches): ts_window = match.get("ts_window") if not isinstance(ts_window, (list, tuple)) or len(ts_window) != 2: raise ValueError(f'matches[{idx}].ts_window must be a [start_ts, end_ts] pair') + if ts_window[0] > ts_window[1]: + raise ValueError(f'matches[{idx}].ts_window must satisfy start_ts <= end_ts') - in_containment = any( - _is_interval_contained(ts_window, k_window) - or _is_interval_contained(k_window, ts_window) + has_overlap = any( + _is_interval_overlapping(ts_window, k_window) for k_window, _ in kept ) - if not in_containment: + if not has_overlap: kept.append((ts_window, idx)) if limit is not None and len(kept) >= limit: @@ -506,9 +505,9 @@ def _filter_exclude_contained(matches, limit=None): return [m for i, m in enumerate(matches) if i in kept_indices] -# When exclude_contained is active, the heap is oversampled by this factor so -# that containment filtering still yields target_rows results in most cases. -_CONTAINMENT_OVERSAMPLE = 8 +# When exclusion filters are active, the heap is oversampled by this factor so +# that filtering still yields target_rows results in most cases. +_EXCLUSION_OVERSAMPLE = 8 def _heap_key(algo_type, criteria_val, seq_idx): # Higher heap key means a better candidate after normalization of the metric: @@ -536,14 +535,14 @@ def do_profile_search_impl(req_json): max_window = parsed["max_window"] window_size_step = parsed["window_size_step"] window_sliding_step = parsed["window_sliding_step"] - exclude_contained = parsed["exclude_contained"] exclude_source = parsed["exclude_source"] + exclude_overlap = parsed["exclude_overlap"] source_norm = _normalize_series(source_arr, norm_type) metric_type = "dtw_distance" if algo_type == "dtw" else "cosine_similarity" threshold = float(result_obj["threshold"]) if has_threshold else None target_rows = ProfileSearchLimits.MAX_PROFILE_SEARCH_RESULTS if top_n is None else top_n - need_exclusion_filter = (algo_type == "dtw" and exclude_contained) + need_exclusion_filter = exclude_overlap def _build_candidates(): if parsed["is_profile_list"]: @@ -558,10 +557,10 @@ def _build_candidates(): ) # Score all candidates once. - # - Without exclude_contained: stream results directly into a fixed-size heap, + # - Without exclude_overlap: stream results directly into a fixed-size heap, # discarding weaker candidates on the fly. No retry is needed so there is no # reason to accumulate a separate all_passed list. - # - With exclude_contained: every passing result is saved in all_passed so that + # - With exclude_overlap: every passing result is saved in all_passed so that # the retry loop can rebuild the heap with a larger limit without recomputing # any distances. all_passed = [] if need_exclusion_filter else None @@ -611,9 +610,9 @@ def _build_candidates(): top_heap.sort(key=lambda x: (-x[2]["criteria"], x[1])) matches = [x[2] for x in top_heap] else: - # exclude_contained is active: rebuild the heap from all_passed with a - # progressively larger heap_limit until containment filtering yields enough results. - oversample = _CONTAINMENT_OVERSAMPLE + # Exclusion filters are active: rebuild the heap from all_passed with a + # progressively larger heap_limit until filtering yields enough results. + oversample = _EXCLUSION_OVERSAMPLE matches = [] total_passed = len(all_passed) @@ -628,21 +627,20 @@ def _build_candidates(): elif key > top_heap[0][0]: heapq.heapreplace(top_heap, heap_item) - if algo_type != "dtw": - raise RuntimeError('exclude_contained logic requires algo_type to be "dtw"') - - top_heap.sort(key=lambda x: (x[2]["criteria"], x[1])) + if algo_type == "dtw": + top_heap.sort(key=lambda x: (x[2]["criteria"], x[1])) + else: + top_heap.sort(key=lambda x: (-x[2]["criteria"], x[1])) matches = [x[2] for x in top_heap] - matches = _filter_exclude_contained(matches, limit=target_rows) - matches = matches[:target_rows] + matches = _filter_exclude_overlap(matches, limit=target_rows) # Got enough results, or all passing candidates already fit in the heap. if len(matches) >= target_rows or total_passed <= heap_limit: break - # The heap was saturated and containment filtering removed too many entries. + # The heap was saturated and filtering removed too many entries. # Double the oversample factor and rebuild from the cached scored list. oversample *= 2 diff --git a/tools/tdgpt/taosanalytics/app.py b/tools/tdgpt/taosanalytics/app.py index c90cf621ab9c..134b984712f3 100644 --- a/tools/tdgpt/taosanalytics/app.py +++ b/tools/tdgpt/taosanalytics/app.py @@ -310,8 +310,8 @@ def do_profile_search(request, api_version): - Or return all profiles with distance below the threshold when using dtw. - Or return all profiles with similarity above the threshold when using cosine similarity. - "num" and "threshold" cannot be set at the same time. - - "exclude_contained" is only applicable for dtw and means whether to exclude the worse matched profile in a strict-containment pair, keeping the better one (the match with the smaller distance). For example, if there are two matched profiles with ts window [1, 5] and [2, 4], and one strictly contains the other, the worse match will be excluded if "exclude_contained" is set to true. - "exclude_source" is applicable for all algorithms and means whether to exclude the matched profile that contains the source profile. For example, if the source profile has ts window [2, 4], the matched profile with ts window [2, 4] will be excluded if "exclude_source" is set to true. + - "exclude_overlap" is applicable for all algorithms and means whether to exclude any matched profile that overlaps with a better-ranked result. For example, if there are two matched profiles with ts window [1, 5] and [4, 6], the profile [4, 6] will be excluded if "exclude_overlap" is set to true. Endpoint-touching windows are treated as adjacent/non-overlapping, so windows such as [1, 5] and [5, 9] are not excluded by "exclude_overlap". - Threshold-based results are capped at 500 matches. target_data.ts may be either: - a unix timestamp list, such as [1, 2, 3, 4, 5, 6] @@ -330,8 +330,8 @@ def do_profile_search(request, api_version): }, "result": { "num": 3, - "exclude_contained": true, - "exclude_source": true + "exclude_source": true, + "exclude_overlap": true }, "source_data": { "ts": [1000, 2000, 3000, 4000, 5000], diff --git a/tools/tdgpt/taosanalytics/test/unit_test.py b/tools/tdgpt/taosanalytics/test/unit_test.py index 75a04b6aae1a..271abac66623 100644 --- a/tools/tdgpt/taosanalytics/test/unit_test.py +++ b/tools/tdgpt/taosanalytics/test/unit_test.py @@ -624,9 +624,9 @@ def test_exclude_source_uses_source_data_ts_window(self): result = do_profile_search_impl(req_json) self.assertEqual(result["rows"], 0) - def test_exclude_contained_keeps_better_outer(self): - # Outer window [1,5] is a better (smaller distance) match than inner [2,4]. - # exclude_contained should keep the outer and discard the inner. + def test_exclude_overlap_removes_overlapping_worse_window(self): + # [1,5] (distance 0.0) overlaps/contains [2,4] (worse match). + # exclude_overlap should keep [1,5] and discard [2,4]. req_json = { "normalization": "none", "algo": { @@ -635,7 +635,7 @@ def test_exclude_contained_keeps_better_outer(self): }, "result": { "num": 20, - "exclude_contained": True, + "exclude_overlap": True, }, "source_data": {"ts": [1, 2, 3, 4, 5], "data": [1, 2, 3, 4, 5]}, "target_data": { @@ -652,9 +652,9 @@ def test_exclude_contained_keeps_better_outer(self): self.assertIn([1, 5], matched_windows) self.assertNotIn([2, 4], matched_windows) - def test_exclude_contained_keeps_better_inner(self): - # Inner window [2,4] is a better (smaller distance) match than outer [1,5]. - # exclude_contained should keep the inner and discard the outer. + def test_exclude_overlap_keeps_better_window_discards_worse(self): + # [2,4] (distance 0.0) is a better match than outer [1,5]. They overlap. + # exclude_overlap should keep [2,4] (processed first, best-first) and discard [1,5]. req_json = { "normalization": "none", "algo": { @@ -663,7 +663,7 @@ def test_exclude_contained_keeps_better_inner(self): }, "result": { "num": 20, - "exclude_contained": True, + "exclude_overlap": True, }, "source_data": {"ts": [2, 3, 4], "data": [2, 3, 4]}, "target_data": { @@ -715,12 +715,12 @@ def test_exclude_source_with_profile_list(self): self.assertNotIn([2, 4], matched_windows) self.assertIn([4, 6], matched_windows) - def test_exclude_contained_oversample_prevents_underfill(self): - """The retry loop doubles the oversample when _filter_exclude_contained removes + def test_exclude_overlap_oversample_prevents_underfill(self): + """The retry loop doubles the oversample when _filter_exclude_overlap removes too many candidates, ensuring target_rows results are returned even when the - initial heap would be too small to cover all non-contained profiles. + initial heap would be too small to cover all non-overlapping profiles. - Setup: 1 best match + 15 near-clones that all contain [10,14] + 2 independent + Setup: 1 best match + 15 near-clones that all overlap with [10,14] + 2 independent profiles at ranks 17-18. With an initial oversample of 8 the heap holds only 16 entries; both independent profiles are evicted before scoring completes, so the first scan under-fills. The retry loop detects this (total_passed > heap @@ -736,9 +736,9 @@ def test_exclude_contained_oversample_prevents_underfill(self): ts_list.append([10, 14]) data_list.append(list(source)) - # 15 near-clone profiles whose ts_windows all contain [10, 14]. - # They rank better than the two independent profiles but form a containment - # cluster with [10, 14], so _filter_exclude_contained discards all 15. + # 15 near-clone profiles whose ts_windows all overlap with [10, 14]. + # They rank better than the two independent profiles but overlap with + # [10, 14], so _filter_exclude_overlap discards all 15. for i in range(15): ts_list.append([10 - (i + 1), 14 + (i + 1)]) # [9,15], [8,16], ..., [-5,29] data_list.append([v + 0.01 * (i + 1) for v in source]) @@ -753,23 +753,23 @@ def test_exclude_contained_oversample_prevents_underfill(self): "source_data": source, "target_data": {"ts": ts_list, "data": data_list}, "algo": {"type": "dtw", "params": {"radius": 1}}, - "result": {"num": 2, "exclude_contained": True}, + "result": {"num": 2, "exclude_overlap": True}, } - original = ps._CONTAINMENT_OVERSAMPLE + original = ps._EXCLUSION_OVERSAMPLE try: - # Start with oversample=8 (heap_limit=16). All 18 profiles pass threshold - # filtering, so the heap is saturated and both independent profiles are - # evicted. The retry loop must detect the under-fill, double the - # oversample, and rescan until it returns 2. - ps._CONTAINMENT_OVERSAMPLE = 8 + # Start with oversample=8 (heap_limit=16). All 18 profiles remain + # eligible candidates during scoring, so the heap is saturated and both + # independent profiles are evicted. The retry loop must detect the + # under-fill, double the oversample, and rescan until it returns 2. + ps._EXCLUSION_OVERSAMPLE = 8 result = ps.do_profile_search_impl(req) self.assertEqual( result["rows"], 2, "retry loop must compensate for the under-filled initial heap and return both independent profiles", ) finally: - ps._CONTAINMENT_OVERSAMPLE = original + ps._EXCLUSION_OVERSAMPLE = original def test_window_size_step_skips_intermediate_sizes(self): @@ -875,6 +875,166 @@ def test_window_size_step_non_integer_is_invalid(self): with self.assertRaises(ValueError): do_profile_search_impl(req_json) + def test_exclude_overlap_partial_overlap(self): + """Partial overlap (not containment) between [1,5] and [4,8] should still + cause the worse match to be excluded when exclude_overlap is True.""" + req_json = { + "normalization": "none", + "algo": { + "type": "dtw", + "params": {"radius": 1}, + }, + "result": { + "num": 10, + "exclude_overlap": True, + }, + "source_data": [1, 2, 3, 4, 5], + "target_data": { + "ts": [[1, 5], [4, 8], [10, 14]], + "data": [ + [1, 2, 3, 4, 5], # best match (distance 0.0) + [4, 5, 6, 7, 8], # ts_window [4,8] partially overlaps [1,5] at timestamps 4 and 5 → excluded + [1, 2, 3, 4, 5], # non-overlapping, same distance → kept + ], + }, + } + + result = do_profile_search_impl(req_json) + matched_windows = [m["ts_window"] for m in result["matches"]] + self.assertIn([1, 5], matched_windows) + self.assertNotIn([4, 8], matched_windows) + self.assertIn([10, 14], matched_windows) + + def test_exclude_overlap_works_with_cosine(self): + """exclude_overlap must work for the cosine algorithm (it was not restricted + to dtw, unlike the former exclude_contained option).""" + req_json = { + "normalization": "none", + "algo": { + "type": "cosine", + "params": {}, + }, + "result": { + "num": 10, + "exclude_overlap": True, + }, + "source_data": [1, 0, -1], + "target_data": { + "ts": [[1, 3], [2, 4], [10, 12]], + "data": [ + [2, 0, -2], # cosine similarity 1.0 (best) + [1, 0, -1], # cosine similarity 1.0, overlaps with [1,3] → excluded + [-1, 0, 1], # cosine similarity -1.0, non-overlapping → kept + ], + }, + } + + result = do_profile_search_impl(req_json) + matched_windows = [m["ts_window"] for m in result["matches"]] + self.assertIn([1, 3], matched_windows) + self.assertNotIn([2, 4], matched_windows) + self.assertIn([10, 12], matched_windows) + + def test_exclude_overlap_non_overlapping_windows_all_kept(self): + """When no windows overlap, exclude_overlap must not remove any result.""" + req_json = { + "normalization": "none", + "algo": { + "type": "dtw", + "params": {"radius": 1}, + }, + "result": { + "num": 10, + "exclude_overlap": True, + }, + "source_data": [1, 2, 3], + "target_data": { + "ts": [[1, 3], [5, 7], [9, 11]], + "data": [ + [1, 2, 3], + [1, 2, 3], + [1, 2, 3], + ], + }, + } + result = do_profile_search_impl(req_json) + self.assertEqual(result["rows"], 3) + matched_windows = [m["ts_window"] for m in result["matches"]] + self.assertIn([1, 3], matched_windows) + self.assertIn([5, 7], matched_windows) + self.assertIn([9, 11], matched_windows) + + def test_exclude_overlap_identical_single_point_windows(self): + """Two identical single-point windows [t,t] must be treated as overlapping + so that exclude_overlap discards the worse-ranked duplicate.""" + from taosanalytics.algo.tool.profile_search import _is_interval_overlapping + # Identical single-point windows overlap. + self.assertTrue(_is_interval_overlapping([5, 5], [5, 5])) + # Different single-point windows do not overlap. + self.assertFalse(_is_interval_overlapping([5, 5], [6, 6])) + # Adjacency rule still holds for multi-point windows. + self.assertFalse(_is_interval_overlapping([1, 5], [5, 9])) + + def test_exclude_overlap_single_shared_timestamp_not_overlap(self): + """Two profiles that share only one endpoint timestamp (e.g. [1,5] and [5,9]) + must NOT be considered overlapping — a single touching point is adjacent, + not a true overlap.""" + req_json = { + "normalization": "none", + "algo": { + "type": "dtw", + "params": {"radius": 1}, + }, + "result": { + "num": 10, + "exclude_overlap": True, + }, + "source_data": [1, 2, 3, 4, 5], + "target_data": { + "ts": [[1, 5], [5, 9]], + "data": [ + [1, 2, 3, 4, 5], # best match (distance 0.0) + [5, 6, 7, 8, 9], # touches at ts=5 only — should NOT be excluded + ], + }, + } + + result = do_profile_search_impl(req_json) + self.assertEqual(result["rows"], 2) + matched_windows = [m["ts_window"] for m in result["matches"]] + self.assertIn([1, 5], matched_windows) + self.assertIn([5, 9], matched_windows) + + def test_exclude_overlap_single_point_window_edge_case(self): + """Single-point windows [t,t] must be treated consistently by exclude_overlap: + exact duplicates overlap and should be excluded, while [t,t] and [t,t+k] + only touch at an endpoint and should both be kept.""" + req_json = { + "normalization": "none", + "algo": { + "type": "dtw", + "params": {"radius": 1}, + }, + "result": { + "num": 10, + "exclude_overlap": True, + }, + "source_data": [7], + "target_data": { + "ts": [[1, 1], [1, 1], [1, 2]], + "data": [ + [7], # best match + [7], # exact same single-point window; should be excluded + [7, 8], # shares only endpoint t=1 with [1,1]; should be kept + ], + }, + } + + result = do_profile_search_impl(req_json) + self.assertEqual(result["rows"], 2) + matched_windows = [m["ts_window"] for m in result["matches"]] + self.assertEqual(matched_windows.count([1, 1]), 1) + self.assertIn([1, 2], matched_windows) if __name__ == '__main__': unittest.main()