Skip to content

Commit 8da71ec

Browse files
[WIP] Fixed #639 Top-K Nearest Neighbors to Matrix Profile (normalize=False) (#714)
* Add support for top-k to naive aamp * Add test function for TopK. Expect Error. * Add TopK support to aamp. Error Resolved * Add TopK test function for AB_join * (Temporarily) fix scraamp to adapt the changes in _aamp * fix calling function _aamp * (Temporarily) fixed aamped * Avoid unnecessary passing k and minor changes * fix docstring in aamped * Add support for top-k to naive prescraamp * Add test function for prescraamp top-k. Expect Error * Add top-k support for prescraamp * Add top-k test function for AB_join prescraamp * Add top-k support to scraamp and update test function * Add new test functions and minor fixes * update test function to adapt to new changes * update test function to adapt to new changes * Add Top-K support to gpu_aamp * fix import * fix minor bug * minor fixes * Add test function for top-k * Empty Commit * Add top-k support to naive * fix issues to pass existing unit tests aampi * Add top-k test. Expect Error * Add top-k support to performant aampi * Add top-k test function to aampi with egress * minor changes * Revised signature * replace param with class attribute * fixed style * fixed typo * Move device function to core to fix circular import error * Update test functions * fix bug * replace wrong variable with the correct one * fix docstring * fix docstrings * Empty commit
1 parent 9cd0984 commit 8da71ec

18 files changed

+1602
-696
lines changed

stumpy/aamp.py

Lines changed: 147 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
@njit(
1717
# "(f8[:], f8[:], i8, b1[:], b1[:], f8, i8[:], i8, i8, i8, f8[:, :, :],"
18-
# "i8[:, :, :], b1)",
18+
# "f8[:, :], f8[:, :], i8[:, :, :], i8[:, :], i8[:, :], b1)",
1919
fastmath=True,
2020
)
2121
def _compute_diagonal(
@@ -30,12 +30,17 @@ def _compute_diagonal(
3030
diags_stop_idx,
3131
thread_idx,
3232
P,
33+
PL,
34+
PR,
3335
I,
36+
IL,
37+
IR,
3438
ignore_trivial,
3539
):
3640
"""
37-
Compute (Numba JIT-compiled) and update P, I along a single diagonal using a single
38-
thread and avoiding race conditions
41+
Compute (Numba JIT-compiled) and update the (top-k) matrix profile P,
42+
PL, PR, I, IL, and IR sequentially along individual diagonals using a single
43+
thread and avoiding race conditions.
3944
4045
Parameters
4146
----------
@@ -49,12 +54,6 @@ def _compute_diagonal(
4954
m : int
5055
Window size
5156
52-
P : numpy.ndarray
53-
Matrix profile
54-
55-
I : numpy.ndarray
56-
Matrix profile indices
57-
5857
T_A_subseq_isfinite : numpy.ndarray
5958
A boolean array that indicates whether a subsequence in `T_A` contains a
6059
`np.nan`/`np.inf` value (False)
@@ -78,6 +77,24 @@ def _compute_diagonal(
7877
thread_idx : int
7978
The thread index
8079
80+
P : numpy.ndarray
81+
The (top-k) matrix profile, sorted in ascending order per row
82+
83+
PL : numpy.ndarray
84+
The top-1 left marix profile
85+
86+
PR : numpy.ndarray
87+
The top-1 right marix profile
88+
89+
I : numpy.ndarray
90+
The (top-k) matrix profile indices
91+
92+
IL : numpy.ndarray
93+
The top-1 left matrix profile indices
94+
95+
IR : numpy.ndarray
96+
The top-1 right matrix profile indices
97+
8198
ignore_trivial : bool
8299
Set to `True` if this is a self-join. Otherwise, for AB-join, set this to
83100
`False`. Default is `True`.
@@ -92,16 +109,16 @@ def _compute_diagonal(
92109
uint64_1 = np.uint64(1)
93110

94111
for diag_idx in range(diags_start_idx, diags_stop_idx):
95-
k = diags[diag_idx]
112+
g = diags[diag_idx]
96113

97-
if k >= 0:
98-
iter_range = range(0, min(n_A - m + 1, n_B - m + 1 - k))
114+
if g >= 0:
115+
iter_range = range(0, min(n_A - m + 1, n_B - m + 1 - g))
99116
else:
100-
iter_range = range(-k, min(n_A - m + 1, n_B - m + 1 - k))
117+
iter_range = range(-g, min(n_A - m + 1, n_B - m + 1 - g))
101118

102119
for i in iter_range:
103120
uint64_i = np.uint64(i)
104-
uint64_j = np.uint64(i + k)
121+
uint64_j = np.uint64(i + g)
105122

106123
if uint64_i == 0 or uint64_j == 0:
107124
p_norm = (
@@ -129,36 +146,59 @@ def _compute_diagonal(
129146

130147
if T_A_subseq_isfinite[uint64_i] and T_B_subseq_isfinite[uint64_j]:
131148
# Neither subsequence contains NaNs
132-
if p_norm < P[thread_idx, uint64_i, 0]:
133-
P[thread_idx, uint64_i, 0] = p_norm
134-
I[thread_idx, uint64_i, 0] = uint64_j
135149

136-
if ignore_trivial:
137-
if p_norm < P[thread_idx, uint64_j, 0]:
138-
P[thread_idx, uint64_j, 0] = p_norm
139-
I[thread_idx, uint64_j, 0] = uint64_i
150+
# `P[thread_idx, i, :]` is sorted ascendingly and MUST be updated
151+
# when the newly-calculated `p_norm` value becomes smaller than the
152+
# last (i.e. greatest) element in this array. Note that the goal
153+
# is to have top-k smallest distancs for each subsequence.
154+
if p_norm < P[thread_idx, uint64_i, -1]:
155+
idx = np.searchsorted(P[thread_idx, uint64_i], p_norm)
156+
core._shift_insert_at_index(
157+
P[thread_idx, uint64_i], idx, p_norm, shift="right"
158+
)
159+
core._shift_insert_at_index(
160+
I[thread_idx, uint64_i], idx, uint64_j, shift="right"
161+
)
162+
163+
if ignore_trivial: # self-joins only
164+
if p_norm < P[thread_idx, uint64_j, -1]:
165+
idx = np.searchsorted(P[thread_idx, uint64_j], p_norm)
166+
core._shift_insert_at_index(
167+
P[thread_idx, uint64_j], idx, p_norm, shift="right"
168+
)
169+
core._shift_insert_at_index(
170+
I[thread_idx, uint64_j], idx, uint64_i, shift="right"
171+
)
140172

141173
if uint64_i < uint64_j:
142174
# left matrix profile and left matrix profile index
143-
if p_norm < P[thread_idx, uint64_j, 1]:
144-
P[thread_idx, uint64_j, 1] = p_norm
145-
I[thread_idx, uint64_j, 1] = uint64_i
175+
if p_norm < PL[thread_idx, uint64_j]:
176+
PL[thread_idx, uint64_j] = p_norm
177+
IL[thread_idx, uint64_j] = uint64_i
146178

147179
# right matrix profile and right matrix profile index
148-
if p_norm < P[thread_idx, uint64_i, 2]:
149-
P[thread_idx, uint64_i, 2] = p_norm
150-
I[thread_idx, uint64_i, 2] = uint64_j
180+
if p_norm < PR[thread_idx, uint64_i]:
181+
PR[thread_idx, uint64_i] = p_norm
182+
IR[thread_idx, uint64_i] = uint64_j
151183

152184
return
153185

154186

155187
@njit(
156-
# "(f8[:], f8[:], i8, b1[:], b1[:], i8[:], b1)",
188+
# "(f8[:], f8[:], i8, b1[:], b1[:], i8[:], b1, i8)",
157189
parallel=True,
158190
fastmath=True,
159191
)
160192
def _aamp(
161-
T_A, T_B, m, T_A_subseq_isfinite, T_B_subseq_isfinite, p, diags, ignore_trivial
193+
T_A,
194+
T_B,
195+
m,
196+
T_A_subseq_isfinite,
197+
T_B_subseq_isfinite,
198+
p,
199+
diags,
200+
ignore_trivial,
201+
k,
162202
):
163203
"""
164204
A Numba JIT-compiled version of AAMP for parallel computation of the matrix
@@ -194,13 +234,30 @@ def _aamp(
194234
Set to `True` if this is a self-join. Otherwise, for AB-join, set this to
195235
`False`. Default is `True`.
196236
237+
k : int
238+
The number of top `k` smallest distances used to construct the matrix profile.
239+
Note that this will increase the total computational time and memory usage
240+
when k > 1.
241+
197242
Returns
198243
-------
199-
P : numpy.ndarray
200-
Matrix profile
244+
out1 : numpy.ndarray
245+
The (top-k) matrix profile
201246
202-
I : numpy.ndarray
203-
Matrix profile indices
247+
out2 : numpy.ndarray
248+
The (top-1) left matrix profile
249+
250+
out3 : numpy.ndarray
251+
The (top-1) right matrix profile
252+
253+
out4 : numpy.ndarray
254+
The (top-k) matrix profile indices
255+
256+
out5 : numpy.ndarray
257+
The (top-1) left matrix profile indices
258+
259+
out6 : numpy.ndarray
260+
The (top-1) right matrix profile indices
204261
205262
Notes
206263
-----
@@ -213,8 +270,15 @@ def _aamp(
213270
n_B = T_B.shape[0]
214271
l = n_A - m + 1
215272
n_threads = numba.config.NUMBA_NUM_THREADS
216-
P = np.full((n_threads, l, 3), np.inf, dtype=np.float64)
217-
I = np.full((n_threads, l, 3), -1, dtype=np.int64)
273+
274+
P = np.full((n_threads, l, k), np.inf, dtype=np.float64)
275+
I = np.full((n_threads, l, k), -1, dtype=np.int64)
276+
277+
PL = np.full((n_threads, l), np.inf, dtype=np.float64)
278+
IL = np.full((n_threads, l), -1, dtype=np.int64)
279+
280+
PR = np.full((n_threads, l), np.inf, dtype=np.float64)
281+
IR = np.full((n_threads, l), -1, dtype=np.int64)
218282

219283
ndist_counts = core._count_diagonal_ndist(diags, m, n_A, n_B)
220284
diags_ranges = core._get_array_ranges(ndist_counts, n_threads, False)
@@ -233,26 +297,37 @@ def _aamp(
233297
diags_ranges[thread_idx, 1],
234298
thread_idx,
235299
P,
300+
PL,
301+
PR,
236302
I,
303+
IL,
304+
IR,
237305
ignore_trivial,
238306
)
239307

240308
# Reduction of results from all threads
241309
for thread_idx in range(1, n_threads):
242-
for i in prange(l):
243-
if P[0, i, 0] > P[thread_idx, i, 0]:
244-
P[0, i, 0] = P[thread_idx, i, 0]
245-
I[0, i, 0] = I[thread_idx, i, 0]
246-
# left matrix profile and left matrix profile indices
247-
if P[0, i, 1] > P[thread_idx, i, 1]:
248-
P[0, i, 1] = P[thread_idx, i, 1]
249-
I[0, i, 1] = I[thread_idx, i, 1]
250-
# right matrix profile and right matrix profile indices
251-
if P[0, i, 2] > P[thread_idx, i, 2]:
252-
P[0, i, 2] = P[thread_idx, i, 2]
253-
I[0, i, 2] = I[thread_idx, i, 2]
254-
255-
return np.power(P[0, :, :], 1.0 / p), I[0, :, :]
310+
# update top-k arrays
311+
core._merge_topk_PI(P[0], P[thread_idx], I[0], I[thread_idx])
312+
313+
# update left matrix profile and matrix profile indices
314+
mask = PL[0] > PL[thread_idx]
315+
PL[0][mask] = PL[thread_idx][mask]
316+
IL[0][mask] = IL[thread_idx][mask]
317+
318+
# update right matrix profile and matrix profile indices
319+
mask = PR[0] > PR[thread_idx]
320+
PR[0][mask] = PR[thread_idx][mask]
321+
IR[0][mask] = IR[thread_idx][mask]
322+
323+
return (
324+
np.power(P[0], 1.0 / p),
325+
np.power(PL[0], 1.0 / p),
326+
np.power(PR[0], 1.0 / p),
327+
I[0],
328+
IL[0],
329+
IR[0],
330+
)
256331

257332

258333
def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
@@ -291,8 +366,16 @@ def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
291366
Returns
292367
-------
293368
out : numpy.ndarray
294-
The first column consists of the matrix profile, the second column
295-
consists of the matrix profile indices.
369+
When k = 1 (default), the first column consists of the matrix profile,
370+
the second column consists of the matrix profile indices, the third column
371+
consists of the left matrix profile indices, and the fourth column consists
372+
of the right matrix profile indices. However, when k > 1, the output array
373+
will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k])
374+
consists of the top-k matrix profile, the next set of k columns
375+
(i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile
376+
indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or,
377+
equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left
378+
matrix profile indices and the top-1 right matrix profile indices, respectively.
296379
297380
Notes
298381
-----
@@ -331,19 +414,26 @@ def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
331414
l = n_A - m + 1
332415

333416
excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
334-
out = np.empty((l, 4), dtype=object)
335-
336417
if ignore_trivial:
337418
diags = np.arange(excl_zone + 1, n_A - m + 1, dtype=np.int64)
338419
else:
339420
diags = np.arange(-(n_A - m + 1) + 1, n_B - m + 1, dtype=np.int64)
340421

341-
P, I = _aamp(
342-
T_A, T_B, m, T_A_subseq_isfinite, T_B_subseq_isfinite, p, diags, ignore_trivial
422+
P, PL, PR, I, IL, IR = _aamp(
423+
T_A,
424+
T_B,
425+
m,
426+
T_A_subseq_isfinite,
427+
T_B_subseq_isfinite,
428+
p,
429+
diags,
430+
ignore_trivial,
431+
k,
343432
)
344433

345-
out[:, 0] = P[:, 0]
346-
out[:, 1:] = I[:, :]
434+
out = np.empty((l, 2 * k + 2), dtype=object)
435+
out[:, :k] = P
436+
out[:, k:] = np.column_stack((I, IL, IR))
347437

348438
core._check_P(out[:, 0])
349439

stumpy/aamped.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,16 @@ def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
5555
Returns
5656
-------
5757
out : numpy.ndarray
58-
The first column consists of the matrix profile, the second column
59-
consists of the matrix profile indices.
58+
When k = 1 (default), the first column consists of the matrix profile,
59+
the second column consists of the matrix profile indices, the third column
60+
consists of the left matrix profile indices, and the fourth column consists
61+
of the right matrix profile indices. However, when k > 1, the output array
62+
will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k])
63+
consists of the top-k matrix profile, the next set of k columns
64+
(i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile
65+
indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or,
66+
equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left
67+
matrix profile indices and the top-1 right matrix profile indices, respectively.
6068
6169
Notes
6270
-----
@@ -94,12 +102,10 @@ def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
94102
n_B = T_B.shape[0]
95103
l = n_A - m + 1
96104

97-
excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
98-
out = np.empty((l, 4), dtype=object)
99-
100105
hosts = list(dask_client.ncores().keys())
101106
nworkers = len(hosts)
102107

108+
excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
103109
if ignore_trivial:
104110
diags = np.arange(excl_zone + 1, n_A - m + 1, dtype=np.int64)
105111
else:
@@ -141,20 +147,30 @@ def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
141147
p,
142148
diags_futures[i],
143149
ignore_trivial,
150+
k,
144151
)
145152
)
146153

147154
results = dask_client.gather(futures)
148-
profile, indices = results[0]
155+
profile, profile_L, profile_R, indices, indices_L, indices_R = results[0]
149156
for i in range(1, len(hosts)):
150-
P, I = results[i]
151-
for col in range(P.shape[1]): # pragma: no cover
152-
cond = P[:, col] < profile[:, col]
153-
profile[:, col] = np.where(cond, P[:, col], profile[:, col])
154-
indices[:, col] = np.where(cond, I[:, col], indices[:, col])
155-
156-
out[:, 0] = profile[:, 0]
157-
out[:, 1:4] = indices
157+
P, PL, PR, I, IL, IR = results[i]
158+
# Update top-k matrix profile and matrix profile indices
159+
core._merge_topk_PI(profile, P, indices, I)
160+
161+
# Update top-1 left matrix profile and matrix profile index
162+
mask = PL < profile_L
163+
profile_L[mask] = PL[mask]
164+
indices_L[mask] = IL[mask]
165+
166+
# Update top-1 right matrix profile and matrix profile index
167+
mask = PR < profile_R
168+
profile_R[mask] = PR[mask]
169+
indices_R[mask] = IR[mask]
170+
171+
out = np.empty((l, 2 * k + 2), dtype=object)
172+
out[:, :k] = profile
173+
out[:, k : 2 * k + 2] = np.column_stack((indices, indices_L, indices_R))
158174

159175
core._check_P(out[:, 0])
160176

0 commit comments

Comments
 (0)