Skip to content

Commit 439b0fa

Browse files
committed
Add tests and fixes for swaps journal processing
1 parent c9a1785 commit 439b0fa

File tree

2 files changed

+312
-15
lines changed

2 files changed

+312
-15
lines changed

src/VecSim/algorithms/svs/svs_tiered.h

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
#include <chrono>
99
#include <condition_variable>
10+
#include <functional>
11+
#include <map>
1012
#include <memory>
1113
#include <mutex>
1214
#include <tuple>
@@ -204,12 +206,22 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
204206
using backend_index_t = VecSimIndexAbstract<DataType, float>;
205207
using svs_index_t = SVSIndexBase;
206208

207-
// label, oldId, newId
209+
// swaps_journal is used by updateSVSIndex() to track vectors swap operations that were done in
210+
// the Flat index during SVS index updating.
211+
// The journal contains tuples of (label, oldId, newId).
212+
// oldId is the index of the label in flat index before the swap.
213+
// newId is the index of the label in flat index after the swap.
214+
// if oldId == newId, it means that the vector was not moved in the Flat index, but was removed
215+
// at the end of the Flat index.
216+
// if label == SKIP_LABEL, it means that the vector was not moved/removed in the Flat index, but
217+
// updated in-place
208218
using swap_record = std::tuple<labelType, idType, idType>;
219+
constexpr static size_t SKIP_LABEL = std::numeric_limits<labelType>::max();
220+
std::vector<swap_record> swaps_journal;
221+
209222
size_t trainingTriggerThreshold;
210223
size_t updateTriggerThreshold;
211224
size_t updateJobWaitTime;
212-
std::vector<swap_record> swaps_journal;
213225
// Used to prevent scheduling multiple index update jobs at the same time.
214226
// As far as the update job does a batch update, job queue should have just 1 job at the moment.
215227
std::atomic_flag indexUpdateScheduled = ATOMIC_FLAG_INIT;
@@ -487,6 +499,22 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
487499
backend_index_t *GetBackendIndex() { return this->backendIndex; }
488500
void submitSingleJob(AsyncJob *job) { Base::submitSingleJob(job); }
489501
void submitJobs(vecsim_stl::vector<AsyncJob *> &jobs) { Base::submitJobs(jobs); }
502+
503+
// Tracing helpers can be used to trace/inject code in the index update process.
504+
std::map<std::string, std::function<void()>> tracingCallbacks;
505+
void registerTracingCallback(const std::string &name, std::function<void()> callback) {
506+
tracingCallbacks[name] = std::move(callback);
507+
}
508+
void executeTracingCallback(const std::string &name) const {
509+
auto it = tracingCallbacks.find(name);
510+
if (it != tracingCallbacks.end()) {
511+
it->second();
512+
}
513+
}
514+
#else
515+
void executeTracingCallback(const std::string &name) const {
516+
// In production, we do nothing.
517+
}
490518
#endif
491519

492520
private:
@@ -555,30 +583,52 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
555583
swaps_journal.clear();
556584
} // release frontend index
557585

586+
executeTracingCallback("UpdateJob::before_add_to_svs");
558587
{ // lock backend index for writing and add vectors there
559588
std::lock_guard lock(this->mainIndexGuard);
560589
auto svs_index = GetSVSIndex();
561590
assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim());
562591
svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(),
563592
labels_to_move.size());
564593
}
594+
executeTracingCallback("UpdateJob::after_add_to_svs");
565595
// clean-up frontend index
566596
{ // lock frontend index for writing and delete moved vectors
567597
std::lock_guard lock(this->flatIndexGuard);
568598

569-
// Enumerate journal and swap label in labels_to_move to newId position.
599+
// Enumerate journal and reflect swaps in the labels_to_move.
600+
// The journal contains tuples of (label, oldId, newId).
601+
// oldId is the index of the label in flat index before the swap.
602+
// newId is the index of the label in flat index after the swap.
570603
for (const auto &p : this->swaps_journal) {
571604
auto label = std::get<0>(p);
572605
auto oldId = std::get<1>(p);
573606
auto newId = std::get<2>(p);
607+
608+
// If label is SKIP_LABEL, it means that the vector was not moved but updated
609+
// in-place. We do not need to remove it from labels_to_move, just skip it in
610+
// deleteVector().
611+
if (label == SKIP_LABEL) {
612+
labels_to_move[oldId] = SKIP_LABEL;
613+
continue; // Next record.
614+
}
615+
616+
// If oldId is out of bounds, and newId is in - do no delete
574617
if (oldId >= labels_to_move.size()) {
575-
continue; // Skip if oldId is out of bounds
618+
if (newId < labels_to_move.size()) {
619+
labels_to_move[newId] = SKIP_LABEL;
620+
}
621+
continue; // Next record.
622+
}
623+
624+
// If recorded label does not match the label in labels_to_move,
625+
// it means that another vector was moved to the old position - do not delete it.
626+
if (label != labels_to_move[oldId]) {
627+
labels_to_move[oldId] = SKIP_LABEL;
628+
continue;
576629
}
577-
assert(label == labels_to_move[oldId] &&
578-
"Journal label does not match the label in labels_to_move");
579630

580-
// TODO: verify the assumption asserted below
581-
// Expecting oldId and label at the end of labels_to_move vectors.
631+
// Labels should be moved or removed from the end of index
582632
assert(oldId == labels_to_move.size() - 1 &&
583633
"Old ID is not at the end of labels_to_move vector");
584634

@@ -592,13 +642,17 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
592642
// delete vectors from the frontend index in reverse order
593643
// it increases the chance of avoiding swaps in the frontend index and performance
594644
// improvement
595-
size_t deleted = 0;
645+
int deleted = 0;
596646
for (idType i = labels_to_move.size(); i > 0; --i) {
597647
auto label = labels_to_move[i - 1];
598-
// Delete the vector from the frontend index
599-
deleted += this->frontendIndex->deleteVectorById(label, i - 1);
648+
// Delete the vector from the frontend index if not in-place updated.
649+
if (label != SKIP_LABEL) {
650+
deleted += this->frontendIndex->deleteVectorById(label, i - 1);
651+
}
600652
}
601-
assert(deleted == labels_to_move.size());
653+
assert(deleted == std::count_if(labels_to_move.begin(), labels_to_move.end(),
654+
[](labelType label) { return label != SKIP_LABEL; }) &&
655+
"Deleted vectors count does not match the number of labels to delete");
602656
}
603657
}
604658

@@ -693,7 +747,14 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
693747
}
694748
{ // Add vector to the frontend index.
695749
std::lock_guard lock(this->flatIndexGuard);
696-
ret = std::max(ret + this->frontendIndex->addVector(blob, label), 0);
750+
const auto ft_ret = this->frontendIndex->addVector(blob, label);
751+
752+
if (ft_ret == 0) { // Vector was overriden - add 'skiping' swap to the journal.
753+
for (auto id : this->frontendIndex->getElementIds(label)) {
754+
this->swaps_journal.emplace_back(SKIP_LABEL, id, id);
755+
}
756+
}
757+
ret = std::max(ret + ft_ret, 0);
697758
// Check frontend index size to determine if an update job schedule is needed.
698759
frontend_index_size = this->frontendIndex->indexSize();
699760
}

0 commit comments

Comments
 (0)