Skip to content

[WIP] Fix #639 Top-K Nearest Neighbors to Matrix Profile (normalize=False) #714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
f7c65a9
Add support for top-k to naive aamp
NimaSarajpoor Nov 10, 2022
203c8f6
Add test function for TopK. Expect Error.
NimaSarajpoor Nov 10, 2022
aa00a20
Add TopK support to aamp. Error Resolved
NimaSarajpoor Nov 10, 2022
c6d6992
Add TopK test function for AB_join
NimaSarajpoor Nov 10, 2022
7c53997
(Temporarily) fix scraamp to adapt the changes in _aamp
NimaSarajpoor Nov 11, 2022
ad4b5a0
fix calling function _aamp
NimaSarajpoor Nov 11, 2022
b6ce4f2
(Temporarily) fixed aamped
NimaSarajpoor Nov 11, 2022
4dc59ab
Avoid unnecessary passing k and minor changes
NimaSarajpoor Nov 11, 2022
3c4fb23
fix docstring in aamped
NimaSarajpoor Nov 11, 2022
da771e5
Add support for top-k to naive prescraamp
NimaSarajpoor Nov 12, 2022
6862103
Add test function for prescraamp top-k. Expect Error
NimaSarajpoor Nov 12, 2022
63f4b92
Add top-k support for prescraamp
NimaSarajpoor Nov 12, 2022
bb5021b
Add top-k test function for AB_join prescraamp
NimaSarajpoor Nov 12, 2022
eca1acc
Add top-k support to scraamp and update test function
NimaSarajpoor Nov 12, 2022
05bf2ab
Add new test functions and minor fixes
NimaSarajpoor Nov 12, 2022
ca8c002
Merge main to update branch
NimaSarajpoor Nov 12, 2022
59b2319
update test function to adapt to new changes
NimaSarajpoor Nov 12, 2022
3e4b7b6
update test function to adapt to new changes
NimaSarajpoor Nov 12, 2022
6a2d025
Add Top-K support to gpu_aamp
NimaSarajpoor Nov 12, 2022
e5e2d29
fix import
NimaSarajpoor Nov 12, 2022
07f76ff
Merge main to update local branch
NimaSarajpoor Nov 12, 2022
0ac3bda
fix minor bug
NimaSarajpoor Nov 12, 2022
fe5e4bc
minor fixes
NimaSarajpoor Nov 13, 2022
8936d7e
Add test function for top-k
NimaSarajpoor Nov 13, 2022
a814b44
Empty Commit
NimaSarajpoor Nov 13, 2022
087bc6a
Add top-k support to naive
NimaSarajpoor Nov 15, 2022
3950dfc
fix issues to pass existing unit tests aampi
NimaSarajpoor Nov 15, 2022
a6f7d6d
Add top-k test. Expect Error
NimaSarajpoor Nov 15, 2022
8ed4cc0
Add top-k support to performant aampi
NimaSarajpoor Nov 15, 2022
15e7fff
Add top-k test function to aampi with egress
NimaSarajpoor Nov 15, 2022
a6f517d
Merge main
NimaSarajpoor Nov 15, 2022
359a2e5
minor changes
NimaSarajpoor Nov 16, 2022
fb559e5
Revised signature
NimaSarajpoor Nov 16, 2022
2570bee
replace param with class attribute
NimaSarajpoor Nov 16, 2022
870c362
fixed style
NimaSarajpoor Nov 16, 2022
79e459b
Merge main
NimaSarajpoor Nov 17, 2022
759db68
fixed typo
NimaSarajpoor Nov 18, 2022
a31a508
merge main
NimaSarajpoor Nov 18, 2022
b3ed48f
Merge branch remote
NimaSarajpoor Nov 18, 2022
0e13fbc
Merge rmote branch
NimaSarajpoor Nov 18, 2022
d4dc920
Move device function to core to fix circular import error
NimaSarajpoor Nov 20, 2022
e58dffd
Update test functions
NimaSarajpoor Nov 20, 2022
5054c76
fix bug
NimaSarajpoor Dec 1, 2022
7d74c45
replace wrong variable with the correct one
NimaSarajpoor Dec 1, 2022
049198a
fix docstring
NimaSarajpoor Dec 2, 2022
0f6d342
fix docstrings
NimaSarajpoor Dec 2, 2022
a068f99
Empty commit
NimaSarajpoor Dec 2, 2022
f6cf8bc
Merge remote branch 'TopK_MatrixProfile_Non_Normalized'
NimaSarajpoor Dec 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 147 additions & 57 deletions stumpy/aamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@njit(
# "(f8[:], f8[:], i8, b1[:], b1[:], f8, i8[:], i8, i8, i8, f8[:, :, :],"
# "i8[:, :, :], b1)",
# "f8[:, :], f8[:, :], i8[:, :, :], i8[:, :], i8[:, :], b1)",
fastmath=True,
)
def _compute_diagonal(
Expand All @@ -30,12 +30,17 @@ def _compute_diagonal(
diags_stop_idx,
thread_idx,
P,
PL,
PR,
I,
IL,
IR,
ignore_trivial,
):
"""
Compute (Numba JIT-compiled) and update P, I along a single diagonal using a single
thread and avoiding race conditions
Compute (Numba JIT-compiled) and update the (top-k) matrix profile P,
PL, PR, I, IL, and IR sequentially along individual diagonals using a single
thread and avoiding race conditions.

Parameters
----------
Expand All @@ -49,12 +54,6 @@ def _compute_diagonal(
m : int
Window size

P : numpy.ndarray
Matrix profile

I : numpy.ndarray
Matrix profile indices

T_A_subseq_isfinite : numpy.ndarray
A boolean array that indicates whether a subsequence in `T_A` contains a
`np.nan`/`np.inf` value (False)
Expand All @@ -78,6 +77,24 @@ def _compute_diagonal(
thread_idx : int
The thread index

P : numpy.ndarray
The (top-k) matrix profile, sorted in ascending order per row

PL : numpy.ndarray
The top-1 left marix profile

PR : numpy.ndarray
The top-1 right marix profile

I : numpy.ndarray
The (top-k) matrix profile indices

IL : numpy.ndarray
The top-1 left matrix profile indices

IR : numpy.ndarray
The top-1 right matrix profile indices

ignore_trivial : bool
Set to `True` if this is a self-join. Otherwise, for AB-join, set this to
`False`. Default is `True`.
Expand All @@ -92,16 +109,16 @@ def _compute_diagonal(
uint64_1 = np.uint64(1)

for diag_idx in range(diags_start_idx, diags_stop_idx):
k = diags[diag_idx]
g = diags[diag_idx]

if k >= 0:
iter_range = range(0, min(n_A - m + 1, n_B - m + 1 - k))
if g >= 0:
iter_range = range(0, min(n_A - m + 1, n_B - m + 1 - g))
else:
iter_range = range(-k, min(n_A - m + 1, n_B - m + 1 - k))
iter_range = range(-g, min(n_A - m + 1, n_B - m + 1 - g))

for i in iter_range:
uint64_i = np.uint64(i)
uint64_j = np.uint64(i + k)
uint64_j = np.uint64(i + g)

if uint64_i == 0 or uint64_j == 0:
p_norm = (
Expand Down Expand Up @@ -129,36 +146,59 @@ def _compute_diagonal(

if T_A_subseq_isfinite[uint64_i] and T_B_subseq_isfinite[uint64_j]:
# Neither subsequence contains NaNs
if p_norm < P[thread_idx, uint64_i, 0]:
P[thread_idx, uint64_i, 0] = p_norm
I[thread_idx, uint64_i, 0] = uint64_j

if ignore_trivial:
if p_norm < P[thread_idx, uint64_j, 0]:
P[thread_idx, uint64_j, 0] = p_norm
I[thread_idx, uint64_j, 0] = uint64_i
# `P[thread_idx, i, :]` is sorted ascendingly and MUST be updated
# when the newly-calculated `p_norm` value becomes smaller than the
# last (i.e. greatest) element in this array. Note that the goal
# is to have top-k smallest distancs for each subsequence.
if p_norm < P[thread_idx, uint64_i, -1]:
idx = np.searchsorted(P[thread_idx, uint64_i], p_norm)
core._shift_insert_at_index(
P[thread_idx, uint64_i], idx, p_norm, shift="right"
)
core._shift_insert_at_index(
I[thread_idx, uint64_i], idx, uint64_j, shift="right"
)

if ignore_trivial: # self-joins only
if p_norm < P[thread_idx, uint64_j, -1]:
idx = np.searchsorted(P[thread_idx, uint64_j], p_norm)
core._shift_insert_at_index(
P[thread_idx, uint64_j], idx, p_norm, shift="right"
)
core._shift_insert_at_index(
I[thread_idx, uint64_j], idx, uint64_i, shift="right"
)

if uint64_i < uint64_j:
# left matrix profile and left matrix profile index
if p_norm < P[thread_idx, uint64_j, 1]:
P[thread_idx, uint64_j, 1] = p_norm
I[thread_idx, uint64_j, 1] = uint64_i
if p_norm < PL[thread_idx, uint64_j]:
PL[thread_idx, uint64_j] = p_norm
IL[thread_idx, uint64_j] = uint64_i

# right matrix profile and right matrix profile index
if p_norm < P[thread_idx, uint64_i, 2]:
P[thread_idx, uint64_i, 2] = p_norm
I[thread_idx, uint64_i, 2] = uint64_j
if p_norm < PR[thread_idx, uint64_i]:
PR[thread_idx, uint64_i] = p_norm
IR[thread_idx, uint64_i] = uint64_j

return


@njit(
# "(f8[:], f8[:], i8, b1[:], b1[:], i8[:], b1)",
# "(f8[:], f8[:], i8, b1[:], b1[:], i8[:], b1, i8)",
parallel=True,
fastmath=True,
)
def _aamp(
T_A, T_B, m, T_A_subseq_isfinite, T_B_subseq_isfinite, p, diags, ignore_trivial
T_A,
T_B,
m,
T_A_subseq_isfinite,
T_B_subseq_isfinite,
p,
diags,
ignore_trivial,
k,
):
"""
A Numba JIT-compiled version of AAMP for parallel computation of the matrix
Expand Down Expand Up @@ -194,13 +234,30 @@ def _aamp(
Set to `True` if this is a self-join. Otherwise, for AB-join, set this to
`False`. Default is `True`.

k : int
The number of top `k` smallest distances used to construct the matrix profile.
Note that this will increase the total computational time and memory usage
when k > 1.

Returns
-------
P : numpy.ndarray
Matrix profile
out1 : numpy.ndarray
The (top-k) matrix profile

I : numpy.ndarray
Matrix profile indices
out2 : numpy.ndarray
The (top-1) left matrix profile

out3 : numpy.ndarray
The (top-1) right matrix profile

out4 : numpy.ndarray
The (top-k) matrix profile indices

out5 : numpy.ndarray
The (top-1) left matrix profile indices

out6 : numpy.ndarray
The (top-1) right matrix profile indices

Notes
-----
Expand All @@ -213,8 +270,15 @@ def _aamp(
n_B = T_B.shape[0]
l = n_A - m + 1
n_threads = numba.config.NUMBA_NUM_THREADS
P = np.full((n_threads, l, 3), np.inf, dtype=np.float64)
I = np.full((n_threads, l, 3), -1, dtype=np.int64)

P = np.full((n_threads, l, k), np.inf, dtype=np.float64)
I = np.full((n_threads, l, k), -1, dtype=np.int64)

PL = np.full((n_threads, l), np.inf, dtype=np.float64)
IL = np.full((n_threads, l), -1, dtype=np.int64)

PR = np.full((n_threads, l), np.inf, dtype=np.float64)
IR = np.full((n_threads, l), -1, dtype=np.int64)

ndist_counts = core._count_diagonal_ndist(diags, m, n_A, n_B)
diags_ranges = core._get_array_ranges(ndist_counts, n_threads, False)
Expand All @@ -233,26 +297,37 @@ def _aamp(
diags_ranges[thread_idx, 1],
thread_idx,
P,
PL,
PR,
I,
IL,
IR,
ignore_trivial,
)

# Reduction of results from all threads
for thread_idx in range(1, n_threads):
for i in prange(l):
if P[0, i, 0] > P[thread_idx, i, 0]:
P[0, i, 0] = P[thread_idx, i, 0]
I[0, i, 0] = I[thread_idx, i, 0]
# left matrix profile and left matrix profile indices
if P[0, i, 1] > P[thread_idx, i, 1]:
P[0, i, 1] = P[thread_idx, i, 1]
I[0, i, 1] = I[thread_idx, i, 1]
# right matrix profile and right matrix profile indices
if P[0, i, 2] > P[thread_idx, i, 2]:
P[0, i, 2] = P[thread_idx, i, 2]
I[0, i, 2] = I[thread_idx, i, 2]

return np.power(P[0, :, :], 1.0 / p), I[0, :, :]
# update top-k arrays
core._merge_topk_PI(P[0], P[thread_idx], I[0], I[thread_idx])

# update left matrix profile and matrix profile indices
mask = PL[0] > PL[thread_idx]
PL[0][mask] = PL[thread_idx][mask]
IL[0][mask] = IL[thread_idx][mask]

# update right matrix profile and matrix profile indices
mask = PR[0] > PR[thread_idx]
PR[0][mask] = PR[thread_idx][mask]
IR[0][mask] = IR[thread_idx][mask]

return (
np.power(P[0], 1.0 / p),
np.power(PL[0], 1.0 / p),
np.power(PR[0], 1.0 / p),
I[0],
IL[0],
IR[0],
)


def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
Expand Down Expand Up @@ -291,8 +366,16 @@ def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
Returns
-------
out : numpy.ndarray
The first column consists of the matrix profile, the second column
consists of the matrix profile indices.
When k = 1 (default), the first column consists of the matrix profile,
the second column consists of the matrix profile indices, the third column
consists of the left matrix profile indices, and the fourth column consists
of the right matrix profile indices. However, when k > 1, the output array
will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k])
consists of the top-k matrix profile, the next set of k columns
(i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile
indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or,
equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left
matrix profile indices and the top-1 right matrix profile indices, respectively.

Notes
-----
Expand Down Expand Up @@ -331,19 +414,26 @@ def aamp(T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
l = n_A - m + 1

excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
out = np.empty((l, 4), dtype=object)

if ignore_trivial:
diags = np.arange(excl_zone + 1, n_A - m + 1, dtype=np.int64)
else:
diags = np.arange(-(n_A - m + 1) + 1, n_B - m + 1, dtype=np.int64)

P, I = _aamp(
T_A, T_B, m, T_A_subseq_isfinite, T_B_subseq_isfinite, p, diags, ignore_trivial
P, PL, PR, I, IL, IR = _aamp(
T_A,
T_B,
m,
T_A_subseq_isfinite,
T_B_subseq_isfinite,
p,
diags,
ignore_trivial,
k,
)

out[:, 0] = P[:, 0]
out[:, 1:] = I[:, :]
out = np.empty((l, 2 * k + 2), dtype=object)
out[:, :k] = P
out[:, k:] = np.column_stack((I, IL, IR))

core._check_P(out[:, 0])

Expand Down
44 changes: 30 additions & 14 deletions stumpy/aamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
Returns
-------
out : numpy.ndarray
The first column consists of the matrix profile, the second column
consists of the matrix profile indices.
When k = 1 (default), the first column consists of the matrix profile,
the second column consists of the matrix profile indices, the third column
consists of the left matrix profile indices, and the fourth column consists
of the right matrix profile indices. However, when k > 1, the output array
will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k])
consists of the top-k matrix profile, the next set of k columns
(i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile
indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or,
equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left
matrix profile indices and the top-1 right matrix profile indices, respectively.

Notes
-----
Expand Down Expand Up @@ -94,12 +102,10 @@ def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
n_B = T_B.shape[0]
l = n_A - m + 1

excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
out = np.empty((l, 4), dtype=object)

hosts = list(dask_client.ncores().keys())
nworkers = len(hosts)

excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
if ignore_trivial:
diags = np.arange(excl_zone + 1, n_A - m + 1, dtype=np.int64)
else:
Expand Down Expand Up @@ -141,20 +147,30 @@ def aamped(dask_client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
p,
diags_futures[i],
ignore_trivial,
k,
)
)

results = dask_client.gather(futures)
profile, indices = results[0]
profile, profile_L, profile_R, indices, indices_L, indices_R = results[0]
for i in range(1, len(hosts)):
P, I = results[i]
for col in range(P.shape[1]): # pragma: no cover
cond = P[:, col] < profile[:, col]
profile[:, col] = np.where(cond, P[:, col], profile[:, col])
indices[:, col] = np.where(cond, I[:, col], indices[:, col])

out[:, 0] = profile[:, 0]
out[:, 1:4] = indices
P, PL, PR, I, IL, IR = results[i]
# Update top-k matrix profile and matrix profile indices
core._merge_topk_PI(profile, P, indices, I)

# Update top-1 left matrix profile and matrix profile index
mask = PL < profile_L
profile_L[mask] = PL[mask]
indices_L[mask] = IL[mask]

# Update top-1 right matrix profile and matrix profile index
mask = PR < profile_R
profile_R[mask] = PR[mask]
indices_R[mask] = IR[mask]

out = np.empty((l, 2 * k + 2), dtype=object)
out[:, :k] = profile
out[:, k : 2 * k + 2] = np.column_stack((indices, indices_L, indices_R))

core._check_P(out[:, 0])

Expand Down
Loading