diff --git a/src/VecSim/CMakeLists.txt b/src/VecSim/CMakeLists.txt index 00ea92143..8eab3e271 100644 --- a/src/VecSim/CMakeLists.txt +++ b/src/VecSim/CMakeLists.txt @@ -28,7 +28,6 @@ add_library(VectorSimilarity ${VECSIM_LIBTYPE} index_factories/tiered_factory.cpp index_factories/svs_factory.cpp index_factories/index_factory.cpp - index_factories/components/components_factory.cpp algorithms/hnsw/visited_nodes_handler.cpp vec_sim.cpp vec_sim_debug.cpp diff --git a/src/VecSim/index_factories/components/components_factory.cpp b/src/VecSim/index_factories/components/components_factory.cpp deleted file mode 100644 index caf485005..000000000 --- a/src/VecSim/index_factories/components/components_factory.cpp +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright (c) 2006-Present, Redis Ltd. - * All rights reserved. - * - * Licensed under your choice of the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - */ -#include "VecSim/index_factories/components/components_factory.h" - -PreprocessorsContainerParams CreatePreprocessorsContainerParams(VecSimMetric metric, size_t dim, - bool is_normalized, - unsigned char alignment) { - // If the index metric is Cosine, and is_normalized == true, we will skip normalizing vectors - // and query blobs. - VecSimMetric pp_metric; - if (is_normalized && metric == VecSimMetric_Cosine) { - pp_metric = VecSimMetric_IP; - } else { - pp_metric = metric; - } - return {.metric = pp_metric, .dim = dim, .alignment = alignment}; -} diff --git a/src/VecSim/index_factories/components/components_factory.h b/src/VecSim/index_factories/components/components_factory.h index 1135d85cc..066bd0bca 100644 --- a/src/VecSim/index_factories/components/components_factory.h +++ b/src/VecSim/index_factories/components/components_factory.h @@ -14,9 +14,52 @@ #include "VecSim/index_factories/components/preprocessors_factory.h" #include "VecSim/spaces/computer/calculator.h" +/** + * @brief Creates parameters for a preprocessors container based on the given metric, dimension, + * normalization flag, and alignment. + * + * @tparam DataType The data type of the vector elements (e.g., float, int). + * @param metric The similarity metric to be used (e.g., Cosine, Inner Product). + * @param dim The dimensionality of the vectors. + * @param is_normalized A flag indicating whether the vectors are already normalized. + * @param alignment The alignment requirement for the data. + * @return A PreprocessorsContainerParams object containing the processed parameters: + * - metric: The adjusted metric based on the input and normalization flag. + * - dim: The dimensionality of the vectors. + * - alignment: The alignment requirement for the data. + * - processed_bytes_count: The size of the processed data blob in bytes. + * + * @details + * If the metric is Cosine and the data type is integral, the processed bytes count may include + * additional space for normalization. If the vectors are already + * normalized (is_normalized == true), the metric is adjusted to Inner Product (IP) to skip + * redundant normalization during preprocessing. + */ +template PreprocessorsContainerParams CreatePreprocessorsContainerParams(VecSimMetric metric, size_t dim, bool is_normalized, - unsigned char alignment); + unsigned char alignment) { + // By default the processed blob size is the same as the original blob size. + size_t processed_bytes_count = dim * sizeof(DataType); + + VecSimMetric pp_metric = metric; + if (metric == VecSimMetric_Cosine) { + // if metric is cosine and DataType is integral, the processed_bytes_count includes the + // norm appended to the vector. + if (std::is_integral::value) { + processed_bytes_count += sizeof(float); + } + // if is_normalized == true, we will enforce skipping normalizing vector and query blobs by + // setting the metric to IP. + if (is_normalized) { + pp_metric = VecSimMetric_IP; + } + } + return {.metric = pp_metric, + .dim = dim, + .alignment = alignment, + .processed_bytes_count = processed_bytes_count}; +} template IndexComponents @@ -29,7 +72,7 @@ CreateIndexComponents(std::shared_ptr allocator, VecSimMetric m auto indexCalculator = new (allocator) DistanceCalculatorCommon(allocator, distFunc); PreprocessorsContainerParams ppParams = - CreatePreprocessorsContainerParams(metric, dim, is_normalized, alignment); + CreatePreprocessorsContainerParams(metric, dim, is_normalized, alignment); auto preprocessors = CreatePreprocessorsContainer(allocator, ppParams); return {indexCalculator, preprocessors}; diff --git a/src/VecSim/index_factories/components/preprocessors_factory.h b/src/VecSim/index_factories/components/preprocessors_factory.h index ba8d2a286..c11e935d8 100644 --- a/src/VecSim/index_factories/components/preprocessors_factory.h +++ b/src/VecSim/index_factories/components/preprocessors_factory.h @@ -15,6 +15,7 @@ struct PreprocessorsContainerParams { VecSimMetric metric; size_t dim; unsigned char alignment; + size_t processed_bytes_count; }; template @@ -25,8 +26,8 @@ CreatePreprocessorsContainer(std::shared_ptr allocator, if (params.metric == VecSimMetric_Cosine) { auto multiPPContainer = new (allocator) MultiPreprocessorsContainer(allocator, params.alignment); - auto cosine_preprocessor = - new (allocator) CosinePreprocessor(allocator, params.dim); + auto cosine_preprocessor = new (allocator) + CosinePreprocessor(allocator, params.dim, params.processed_bytes_count); int next_valid_pp_index = multiPPContainer->addPreprocessor(cosine_preprocessor); UNUSED(next_valid_pp_index); assert(next_valid_pp_index != -1 && "Cosine preprocessor was not added correctly"); diff --git a/src/VecSim/spaces/computer/preprocessor_container.cpp b/src/VecSim/spaces/computer/preprocessor_container.cpp index 8b61e1107..0ec44e36d 100644 --- a/src/VecSim/spaces/computer/preprocessor_container.cpp +++ b/src/VecSim/spaces/computer/preprocessor_container.cpp @@ -9,37 +9,37 @@ #include "VecSim/spaces/computer/preprocessor_container.h" ProcessedBlobs PreprocessorsContainerAbstract::preprocess(const void *original_blob, - size_t processed_bytes_count) const { - return ProcessedBlobs(preprocessForStorage(original_blob, processed_bytes_count), - preprocessQuery(original_blob, processed_bytes_count)); + size_t input_blob_size) const { + return ProcessedBlobs(preprocessForStorage(original_blob, input_blob_size), + preprocessQuery(original_blob, input_blob_size)); } MemoryUtils::unique_blob PreprocessorsContainerAbstract::preprocessForStorage(const void *original_blob, - size_t processed_bytes_count) const { + size_t input_blob_size) const { return wrapWithDummyDeleter(const_cast(original_blob)); } -MemoryUtils::unique_blob PreprocessorsContainerAbstract::preprocessQuery( - const void *original_blob, size_t processed_bytes_count, bool force_copy) const { - return maybeCopyToAlignedMem(original_blob, processed_bytes_count, force_copy); +MemoryUtils::unique_blob PreprocessorsContainerAbstract::preprocessQuery(const void *original_blob, + size_t input_blob_size, + bool force_copy) const { + return maybeCopyToAlignedMem(original_blob, input_blob_size, force_copy); } void PreprocessorsContainerAbstract::preprocessQueryInPlace(void *blob, - size_t processed_bytes_count) const {} + size_t input_blob_size) const {} void PreprocessorsContainerAbstract::preprocessStorageInPlace(void *blob, - size_t processed_bytes_count) const {} + size_t input_blob_size) const {} MemoryUtils::unique_blob PreprocessorsContainerAbstract::maybeCopyToAlignedMem( - const void *original_blob, size_t blob_bytes_count, bool force_copy) const { + const void *original_blob, size_t input_blob_size, bool force_copy) const { bool needs_copy = force_copy || (this->alignment && ((uintptr_t)original_blob % this->alignment != 0)); if (needs_copy) { - auto aligned_mem = this->allocator->allocate_aligned(blob_bytes_count, this->alignment); - // TODO: handle original_blob_size != processed_bytes_count - memcpy(aligned_mem, original_blob, blob_bytes_count); + auto aligned_mem = this->allocator->allocate_aligned(input_blob_size, this->alignment); + memcpy(aligned_mem, original_blob, input_blob_size); return this->wrapAllocated(aligned_mem); } diff --git a/src/VecSim/spaces/computer/preprocessor_container.h b/src/VecSim/spaces/computer/preprocessor_container.h index 5d80a321f..cd2f0dec8 100644 --- a/src/VecSim/spaces/computer/preprocessor_container.h +++ b/src/VecSim/spaces/computer/preprocessor_container.h @@ -22,19 +22,19 @@ class PreprocessorsContainerAbstract : public VecsimBaseObject { PreprocessorsContainerAbstract(std::shared_ptr allocator, unsigned char alignment) : VecsimBaseObject(allocator), alignment(alignment) {} - virtual ProcessedBlobs preprocess(const void *original_blob, - size_t processed_bytes_count) const; + // It is assumed that the resulted query blob is aligned. + virtual ProcessedBlobs preprocess(const void *original_blob, size_t input_blob_size) const; virtual MemoryUtils::unique_blob preprocessForStorage(const void *original_blob, - size_t processed_bytes_count) const; + size_t input_blob_size) const; + // It is assumed that the resulted query blob is aligned. virtual MemoryUtils::unique_blob preprocessQuery(const void *original_blob, - size_t processed_bytes_count, + size_t input_blob_size, bool force_copy = false) const; - virtual void preprocessQueryInPlace(void *blob, size_t processed_bytes_count) const; - - virtual void preprocessStorageInPlace(void *blob, size_t processed_bytes_count) const; + virtual void preprocessQueryInPlace(void *blob, size_t input_blob_size) const; + virtual void preprocessStorageInPlace(void *blob, size_t input_blob_size) const; unsigned char getAlignment() const { return alignment; } @@ -43,7 +43,7 @@ class PreprocessorsContainerAbstract : public VecsimBaseObject { // Allocate and copy the blob only if the original blob is not aligned. MemoryUtils::unique_blob maybeCopyToAlignedMem(const void *original_blob, - size_t blob_bytes_count, + size_t input_blob_size, bool force_copy = false) const; MemoryUtils::unique_blob wrapAllocated(void *blob) const { @@ -82,19 +82,17 @@ class MultiPreprocessorsContainer : public PreprocessorsContainerAbstract { */ int addPreprocessor(PreprocessorInterface *preprocessor); - ProcessedBlobs preprocess(const void *original_blob, - size_t processed_bytes_count) const override; + ProcessedBlobs preprocess(const void *original_blob, size_t input_blob_size) const override; MemoryUtils::unique_blob preprocessForStorage(const void *original_blob, - size_t processed_bytes_count) const override; + size_t input_blob_size) const override; - MemoryUtils::unique_blob preprocessQuery(const void *original_blob, - size_t processed_bytes_count, + MemoryUtils::unique_blob preprocessQuery(const void *original_blob, size_t input_blob_size, bool force_copy = false) const override; - void preprocessQueryInPlace(void *blob, size_t processed_bytes_count) const override; + void preprocessQueryInPlace(void *blob, size_t input_blob_size) const override; - void preprocessStorageInPlace(void *blob, size_t processed_bytes_count) const override; + void preprocessStorageInPlace(void *blob, size_t input_blob_size) const override; #ifdef BUILD_TESTS std::array getPreprocessors() const { @@ -159,12 +157,13 @@ int MultiPreprocessorsContainer::addPreprocessor( } template -ProcessedBlobs MultiPreprocessorsContainer::preprocess( - const void *original_blob, size_t processed_bytes_count) const { +ProcessedBlobs +MultiPreprocessorsContainer::preprocess(const void *original_blob, + size_t input_blob_size) const { // No preprocessors were added yet. if (preprocessors[0] == nullptr) { // query might need to be aligned - auto query_ptr = this->maybeCopyToAlignedMem(original_blob, processed_bytes_count); + auto query_ptr = this->maybeCopyToAlignedMem(original_blob, input_blob_size); return ProcessedBlobs( std::move(Base::wrapWithDummyDeleter(const_cast(original_blob))), std::move(query_ptr)); @@ -175,8 +174,7 @@ ProcessedBlobs MultiPreprocessorsContainer::preproces for (auto pp : preprocessors) { if (!pp) break; - pp->preprocess(original_blob, storage_blob, query_blob, processed_bytes_count, - this->alignment); + pp->preprocess(original_blob, storage_blob, query_blob, input_blob_size, this->alignment); } // At least one blob was allocated. @@ -194,7 +192,7 @@ ProcessedBlobs MultiPreprocessorsContainer::preproces if (query_blob == nullptr) { // we processed only the storage // query might need to be aligned - auto query_ptr = this->maybeCopyToAlignedMem(original_blob, processed_bytes_count); + auto query_ptr = this->maybeCopyToAlignedMem(original_blob, input_blob_size); return ProcessedBlobs(std::move(this->wrapAllocated(storage_blob)), std::move(query_ptr)); } @@ -206,13 +204,13 @@ ProcessedBlobs MultiPreprocessorsContainer::preproces template MemoryUtils::unique_blob MultiPreprocessorsContainer::preprocessForStorage( - const void *original_blob, size_t processed_bytes_count) const { + const void *original_blob, size_t input_blob_size) const { void *storage_blob = nullptr; for (auto pp : preprocessors) { if (!pp) break; - pp->preprocessForStorage(original_blob, storage_blob, processed_bytes_count); + pp->preprocessForStorage(original_blob, storage_blob, input_blob_size); } return storage_blob ? std::move(this->wrapAllocated(storage_blob)) @@ -221,40 +219,40 @@ MultiPreprocessorsContainer::preprocessForStorage( template MemoryUtils::unique_blob MultiPreprocessorsContainer::preprocessQuery( - const void *original_blob, size_t processed_bytes_count, bool force_copy) const { + const void *original_blob, size_t input_blob_size, bool force_copy) const { void *query_blob = nullptr; for (auto pp : preprocessors) { if (!pp) break; // modifies the memory in place - pp->preprocessQuery(original_blob, query_blob, processed_bytes_count, this->alignment); + pp->preprocessQuery(original_blob, query_blob, input_blob_size, this->alignment); } - return query_blob ? std::move(this->wrapAllocated(query_blob)) - : std::move(this->maybeCopyToAlignedMem(original_blob, processed_bytes_count, - force_copy)); + return query_blob + ? std::move(this->wrapAllocated(query_blob)) + : std::move(this->maybeCopyToAlignedMem(original_blob, input_blob_size, force_copy)); } template void MultiPreprocessorsContainer::preprocessQueryInPlace( - void *blob, size_t processed_bytes_count) const { + void *blob, size_t input_blob_size) const { for (auto pp : preprocessors) { if (!pp) break; // modifies the memory in place - pp->preprocessQueryInPlace(blob, processed_bytes_count, this->alignment); + pp->preprocessQueryInPlace(blob, input_blob_size, this->alignment); } } template void MultiPreprocessorsContainer::preprocessStorageInPlace( - void *blob, size_t processed_bytes_count) const { + void *blob, size_t input_blob_size) const { for (auto pp : preprocessors) { if (!pp) break; // modifies the memory in place - pp->preprocessStorageInPlace(blob, processed_bytes_count); + pp->preprocessStorageInPlace(blob, input_blob_size); } } diff --git a/src/VecSim/spaces/computer/preprocessors.h b/src/VecSim/spaces/computer/preprocessors.h index db2de5ad0..b1bb4932e 100644 --- a/src/VecSim/spaces/computer/preprocessors.h +++ b/src/VecSim/spaces/computer/preprocessors.h @@ -28,43 +28,44 @@ class PreprocessorInterface : public VecsimBaseObject { // TODO: handle a dynamic processed_bytes_count, as the allocation size of the blob might change // down the preprocessors pipeline (such as in quantization preprocessor that compresses the // vector). + // Note: input_blob_size is relevant for both storage blob and query blob, as we assume results + // are the same size. virtual void preprocess(const void *original_blob, void *&storage_blob, void *&query_blob, - size_t processed_bytes_count, unsigned char alignment) const = 0; + size_t &input_blob_size, unsigned char alignment) const = 0; virtual void preprocessForStorage(const void *original_blob, void *&storage_blob, - size_t processed_bytes_count) const = 0; + size_t &input_blob_size) const = 0; virtual void preprocessQuery(const void *original_blob, void *&query_blob, - size_t processed_bytes_count, unsigned char alignment) const = 0; - virtual void preprocessQueryInPlace(void *original_blob, size_t processed_bytes_count, + size_t &input_blob_size, unsigned char alignment) const = 0; + virtual void preprocessQueryInPlace(void *original_blob, size_t input_blob_size, unsigned char alignment) const = 0; - virtual void preprocessStorageInPlace(void *original_blob, - size_t processed_bytes_count) const = 0; + virtual void preprocessStorageInPlace(void *original_blob, size_t input_blob_size) const = 0; }; template class CosinePreprocessor : public PreprocessorInterface { public: - CosinePreprocessor(std::shared_ptr allocator, size_t dim) + // This preprocessor assumes storage_blob and query_blob + // both are preprocessed in the same way, and yield a blob in the same size. + CosinePreprocessor(std::shared_ptr allocator, size_t dim, + size_t processed_bytes_count) : PreprocessorInterface(allocator), normalize_func(spaces::GetNormalizeFunc()), - dim(dim) {} + dim(dim), processed_bytes_count(processed_bytes_count) {} // If a blob (storage_blob or query_blob) is not nullptr, it means a previous preprocessor // already allocated and processed it. So, we process it inplace. If it's null, we need to // allocate memory for it and copy the original_blob to it. void preprocess(const void *original_blob, void *&storage_blob, void *&query_blob, - size_t processed_bytes_count, unsigned char alignment) const override { - + size_t &input_blob_size, unsigned char alignment) const override { // Case 1: Blobs are different (one might be null, or both are allocated and processed // separately). if (storage_blob != query_blob) { // If one of them is null, allocate memory for it and copy the original_blob to it. if (storage_blob == nullptr) { storage_blob = this->allocator->allocate(processed_bytes_count); - // TODO: handle original_blob_size != processed_bytes_count - memcpy(storage_blob, original_blob, processed_bytes_count); + memcpy(storage_blob, original_blob, input_blob_size); } else if (query_blob == nullptr) { query_blob = this->allocator->allocate_aligned(processed_bytes_count, alignment); - // TODO: handle original_blob_size != processed_bytes_count - memcpy(query_blob, original_blob, processed_bytes_count); + memcpy(query_blob, original_blob, input_blob_size); } // Normalize both blobs. @@ -74,47 +75,53 @@ class CosinePreprocessor : public PreprocessorInterface { if (query_blob == nullptr) { // If both blobs are null, allocate query_blob and set // storage_blob to point to it. query_blob = this->allocator->allocate_aligned(processed_bytes_count, alignment); - // TODO: handle original_blob_size != processed_bytes_count - memcpy(query_blob, original_blob, processed_bytes_count); + memcpy(query_blob, original_blob, input_blob_size); storage_blob = query_blob; } // normalize one of them (since they point to the same memory). normalize_func(query_blob, this->dim); } + + input_blob_size = processed_bytes_count; } void preprocessForStorage(const void *original_blob, void *&blob, - size_t processed_bytes_count) const override { + size_t &input_blob_size) const override { if (blob == nullptr) { blob = this->allocator->allocate(processed_bytes_count); - // TODO: handle original_blob_size != processed_bytes_count - memcpy(blob, original_blob, processed_bytes_count); + memcpy(blob, original_blob, input_blob_size); } normalize_func(blob, this->dim); + input_blob_size = processed_bytes_count; } - void preprocessQuery(const void *original_blob, void *&blob, size_t processed_bytes_count, + void preprocessQuery(const void *original_blob, void *&blob, size_t &input_blob_size, unsigned char alignment) const override { if (blob == nullptr) { blob = this->allocator->allocate_aligned(processed_bytes_count, alignment); - // TODO: handle original_blob_size != processed_bytes_count - memcpy(blob, original_blob, processed_bytes_count); + memcpy(blob, original_blob, input_blob_size); } normalize_func(blob, this->dim); + input_blob_size = processed_bytes_count; } - void preprocessQueryInPlace(void *blob, size_t processed_bytes_count, + void preprocessQueryInPlace(void *blob, size_t input_blob_size, unsigned char alignment) const override { assert(blob); + // TODO: replace with a debug assert and log the exact values of input_blob_size and + // processed_bytes_count to improve observability + assert(input_blob_size == this->processed_bytes_count); normalize_func(blob, this->dim); } - void preprocessStorageInPlace(void *blob, size_t processed_bytes_count) const override { + void preprocessStorageInPlace(void *blob, size_t input_blob_size) const override { assert(blob); + assert(input_blob_size == this->processed_bytes_count); normalize_func(blob, this->dim); } private: spaces::normalizeVector_f normalize_func; const size_t dim; + const size_t processed_bytes_count; }; diff --git a/tests/unit/test_components.cpp b/tests/unit/test_components.cpp index 95679a906..a483e75fa 100644 --- a/tests/unit/test_components.cpp +++ b/tests/unit/test_components.cpp @@ -12,6 +12,7 @@ #include "VecSim/spaces/computer/preprocessor_container.h" #include "VecSim/spaces/computer/calculator.h" #include "unit_test_utils.h" +#include "tests_utils.h" class IndexCalculatorTest : public ::testing::Test {}; namespace dummyCalcultor { @@ -57,8 +58,8 @@ enum pp_mode { STORAGE_ONLY, QUERY_ONLY, BOTH, EMPTY }; template class DummyStoragePreprocessor : public PreprocessorInterface { public: - DummyStoragePreprocessor(std::shared_ptr allocator, int value_to_add_storage, - int value_to_add_query = 0) + DummyStoragePreprocessor(std::shared_ptr allocator, + DataType value_to_add_storage, DataType value_to_add_query = 0) : PreprocessorInterface(allocator), value_to_add_storage(value_to_add_storage), value_to_add_query(value_to_add_query) { if (!value_to_add_query) @@ -66,36 +67,36 @@ class DummyStoragePreprocessor : public PreprocessorInterface { } void preprocess(const void *original_blob, void *&storage_blob, void *&query_blob, - size_t processed_bytes_count, unsigned char alignment) const override { + size_t &input_blob_size, unsigned char alignment) const override { - this->preprocessForStorage(original_blob, storage_blob, processed_bytes_count); + this->preprocessForStorage(original_blob, storage_blob, input_blob_size); } void preprocessForStorage(const void *original_blob, void *&blob, - size_t processed_bytes_count) const override { + size_t &input_blob_size) const override { // If the blob was not allocated yet, allocate it. if (blob == nullptr) { - blob = this->allocator->allocate(processed_bytes_count); - memcpy(blob, original_blob, processed_bytes_count); + blob = this->allocator->allocate(input_blob_size); + memcpy(blob, original_blob, input_blob_size); } static_cast(blob)[0] += value_to_add_storage; } - void preprocessQueryInPlace(void *blob, size_t processed_bytes_count, + void preprocessQueryInPlace(void *blob, size_t input_blob_size, unsigned char alignment) const override {} - void preprocessStorageInPlace(void *blob, size_t processed_bytes_count) const override { + void preprocessStorageInPlace(void *blob, size_t input_blob_size) const override { assert(blob); static_cast(blob)[0] += value_to_add_storage; } - void preprocessQuery(const void *original_blob, void *&blob, size_t processed_bytes_count, + void preprocessQuery(const void *original_blob, void *&blob, size_t &input_blob_size, unsigned char alignment) const override { /* do nothing*/ } private: - int value_to_add_storage; - int value_to_add_query; + DataType value_to_add_storage; + DataType value_to_add_query; }; // Dummy query preprocessor @@ -111,25 +112,25 @@ class DummyQueryPreprocessor : public PreprocessorInterface { } void preprocess(const void *original_blob, void *&storage_blob, void *&query_blob, - size_t processed_bytes_count, unsigned char alignment) const override { - this->preprocessQuery(original_blob, query_blob, processed_bytes_count, alignment); + size_t &input_blob_size, unsigned char alignment) const override { + this->preprocessQuery(original_blob, query_blob, input_blob_size, alignment); } void preprocessForStorage(const void *original_blob, void *&blob, - size_t processed_bytes_count) const override { + size_t &input_blob_size) const override { /* do nothing*/ } - void preprocessQueryInPlace(void *blob, size_t processed_bytes_count, + void preprocessQueryInPlace(void *blob, size_t input_blob_size, unsigned char alignment) const override { static_cast(blob)[0] += value_to_add_query; } - void preprocessStorageInPlace(void *blob, size_t processed_bytes_count) const override {} - void preprocessQuery(const void *original_blob, void *&blob, size_t processed_bytes_count, + void preprocessStorageInPlace(void *blob, size_t input_blob_size) const override {} + void preprocessQuery(const void *original_blob, void *&blob, size_t &input_blob_size, unsigned char alignment) const override { // If the blob was not allocated yet, allocate it. if (blob == nullptr) { - blob = this->allocator->allocate_aligned(processed_bytes_count, alignment); - memcpy(blob, original_blob, processed_bytes_count); + blob = this->allocator->allocate_aligned(input_blob_size, alignment); + memcpy(blob, original_blob, input_blob_size); } static_cast(blob)[0] += value_to_add_query; } @@ -148,41 +149,41 @@ class DummyMixedPreprocessor : public PreprocessorInterface { : PreprocessorInterface(allocator), value_to_add_storage(value_to_add_storage), value_to_add_query(value_to_add_query) {} void preprocess(const void *original_blob, void *&storage_blob, void *&query_blob, - size_t processed_bytes_count, unsigned char alignment) const override { + size_t &input_blob_size, unsigned char alignment) const override { // One blob was already allocated by a previous preprocessor(s) that process both blobs the // same. The blobs are pointing to the same memory, we need to allocate another memory slot // to split them. if ((storage_blob == query_blob) && (query_blob != nullptr)) { - storage_blob = this->allocator->allocate(processed_bytes_count); - memcpy(storage_blob, query_blob, processed_bytes_count); + storage_blob = this->allocator->allocate(input_blob_size); + memcpy(storage_blob, query_blob, input_blob_size); } // Either both are nullptr or they are pointing to different memory slots. Both cases are // handled by the designated functions. - this->preprocessForStorage(original_blob, storage_blob, processed_bytes_count); - this->preprocessQuery(original_blob, query_blob, processed_bytes_count, alignment); + this->preprocessForStorage(original_blob, storage_blob, input_blob_size); + this->preprocessQuery(original_blob, query_blob, input_blob_size, alignment); } void preprocessForStorage(const void *original_blob, void *&blob, - size_t processed_bytes_count) const override { + size_t &input_blob_size) const override { // If the blob was not allocated yet, allocate it. if (blob == nullptr) { - blob = this->allocator->allocate(processed_bytes_count); - memcpy(blob, original_blob, processed_bytes_count); + blob = this->allocator->allocate(input_blob_size); + memcpy(blob, original_blob, input_blob_size); } static_cast(blob)[0] += value_to_add_storage; } - void preprocessQueryInPlace(void *blob, size_t processed_bytes_count, + void preprocessQueryInPlace(void *blob, size_t input_blob_size, unsigned char alignment) const override {} - void preprocessStorageInPlace(void *blob, size_t processed_bytes_count) const override {} - void preprocessQuery(const void *original_blob, void *&blob, size_t processed_bytes_count, + void preprocessStorageInPlace(void *blob, size_t input_blob_size) const override {} + void preprocessQuery(const void *original_blob, void *&blob, size_t &input_blob_size, unsigned char alignment) const override { // If the blob was not allocated yet, allocate it. if (blob == nullptr) { - blob = this->allocator->allocate_aligned(processed_bytes_count, alignment); - memcpy(blob, original_blob, processed_bytes_count); + blob = this->allocator->allocate_aligned(input_blob_size, alignment); + memcpy(blob, original_blob, input_blob_size); } static_cast(blob)[0] += value_to_add_query; } @@ -191,6 +192,130 @@ class DummyMixedPreprocessor : public PreprocessorInterface { int value_to_add_storage; int value_to_add_query; }; + +// TODO: test increase allocation size ( we don't really need another pp class for this) +// A preprocessor that changes the allocation size of the blobs in the same manner. +// set excess bytes to (char)2 +template +class DummyChangeAllocSizePreprocessor : public PreprocessorInterface { +private: + size_t processed_bytes_count; + static constexpr unsigned char excess_value = 2; + +public: + DummyChangeAllocSizePreprocessor(std::shared_ptr allocator, + size_t processed_bytes_count) + : PreprocessorInterface(allocator), processed_bytes_count(processed_bytes_count) {} + + static constexpr unsigned char getExcessValue() { return excess_value; } + + void preprocess(const void *original_blob, void *&storage_blob, void *&query_blob, + size_t &input_blob_size, unsigned char alignment) const override { + // if the blobs are equal, + if (storage_blob == query_blob) { + preprocessGeneral(original_blob, storage_blob, input_blob_size, alignment); + query_blob = storage_blob; + return; + } + // The blobs are not equal + + // If the input blob size is not enough + if (input_blob_size < processed_bytes_count) { + auto alloc_and_process = [&](void *&blob) { + auto new_blob = this->allocator->allocate_aligned(processed_bytes_count, alignment); + if (blob == nullptr) { + memcpy(new_blob, original_blob, input_blob_size); + } else { + // copy the blob to the new blob, and free the old one + memcpy(new_blob, blob, input_blob_size); + this->allocator->free_allocation(blob); + } + blob = new_blob; + memset((char *)blob + input_blob_size, excess_value, + processed_bytes_count - input_blob_size); + }; + + alloc_and_process(storage_blob); + alloc_and_process(query_blob); + } else { + auto alloc_and_process = [&](void *&blob) { + if (blob == nullptr) { + blob = this->allocator->allocate_aligned(processed_bytes_count, alignment); + memcpy(blob, original_blob, processed_bytes_count); + } else { + memset((char *)blob + processed_bytes_count, excess_value, + input_blob_size - processed_bytes_count); + } + }; + + alloc_and_process(storage_blob); + alloc_and_process(query_blob); + } + + // update the input blob size + input_blob_size = processed_bytes_count; + } + + void preprocessForStorage(const void *original_blob, void *&blob, + size_t &input_blob_size) const override { + + this->preprocessGeneral(original_blob, blob, input_blob_size); + } + void preprocessQueryInPlace(void *blob, size_t input_blob_size, + unsigned char alignment) const override { + // only supported if the blob in already large enough + assert(input_blob_size >= processed_bytes_count); + // set excess bytes to 0 + memset((char *)blob + processed_bytes_count, excess_value, + input_blob_size - processed_bytes_count); + } + + void preprocessStorageInPlace(void *blob, size_t input_blob_size) const override { + // only supported if the blob in already large enough + assert(input_blob_size >= processed_bytes_count); + // set excess bytes to 0 + memset((char *)blob + processed_bytes_count, excess_value, + input_blob_size - processed_bytes_count); + } + void preprocessQuery(const void *original_blob, void *&blob, size_t &input_blob_size, + unsigned char alignment) const override { + this->preprocessGeneral(original_blob, blob, input_blob_size, alignment); + } + +private: + void preprocessGeneral(const void *original_blob, void *&blob, size_t &input_blob_size, + unsigned char alignment = 0) const { + // if the size of the input is not enough. + if (input_blob_size < processed_bytes_count) { + // calloc doesn't have an alignment version, so we need to allocate aligned memory and + // cset the excess bytes to 0. + auto new_blob = this->allocator->allocate_aligned(processed_bytes_count, alignment); + if (blob == nullptr) { + // copy thr original blob + memcpy(new_blob, original_blob, input_blob_size); + } else { + // copy the blob to the new blob + memcpy(new_blob, blob, input_blob_size); + this->allocator->free_allocation(blob); + } + blob = new_blob; + // set excess bytes to 0 + memset((char *)blob + input_blob_size, excess_value, + processed_bytes_count - input_blob_size); + } else { // input size is larger than output + if (blob == nullptr) { + blob = this->allocator->allocate_aligned(processed_bytes_count, alignment); + memcpy(blob, original_blob, processed_bytes_count); + } else { + // set excess bytes to 0 + memset((char *)blob + processed_bytes_count, excess_value, + input_blob_size - processed_bytes_count); + } + } + // update the input blob size + input_blob_size = processed_bytes_count; + } +}; } // namespace dummyPreprocessors TEST(PreprocessorsTest, PreprocessorsTestBasicAlignmentTest) { @@ -207,8 +332,6 @@ TEST(PreprocessorsTest, PreprocessorsTestBasicAlignmentTest) { unsigned char address_alignment = (uintptr_t)(aligned_query.get()) % alignment; ASSERT_EQ(address_alignment, 0); } - - // The index computer is responsible for releasing the distance calculator. } template @@ -471,12 +594,15 @@ TEST(PreprocessorsTest, multiPPContainerCosineThenMixedPreprocess) { float value_to_add_storage = 7.0f; float value_to_add_query = 2.0f; const float original_blob[dim] = {initial_value, initial_value, initial_value, initial_value}; + // TODo: change this test so that original_blob_size != processed_bytes_count + constexpr size_t processed_bytes_count = sizeof(original_blob); auto multiPPContainer = MultiPreprocessorsContainer(allocator, alignment); // adding cosine preprocessor - auto cosine_preprocessor = new (allocator) CosinePreprocessor(allocator, dim); + auto cosine_preprocessor = + new (allocator) CosinePreprocessor(allocator, dim, processed_bytes_count); multiPPContainer.addPreprocessor(cosine_preprocessor); { ProcessedBlobs processed_blobs = @@ -536,6 +662,7 @@ TEST(PreprocessorsTest, multiPPContainerMixedThenCosinePreprocess) { float value_to_add_storage = 7.0f; float value_to_add_query = 2.0f; const float original_blob[dim] = {initial_value, initial_value, initial_value, initial_value}; + constexpr size_t processed_bytes_count = sizeof(original_blob); // Creating multi preprocessors container auto mixed_preprocessor = new (allocator) @@ -568,7 +695,8 @@ TEST(PreprocessorsTest, multiPPContainerMixedThenCosinePreprocess) { } // adding cosine preprocessor - auto cosine_preprocessor = new (allocator) CosinePreprocessor(allocator, dim); + auto cosine_preprocessor = + new (allocator) CosinePreprocessor(allocator, dim, processed_bytes_count); multiPPContainer.addPreprocessor(cosine_preprocessor); { ProcessedBlobs processed_blobs = @@ -597,3 +725,151 @@ TEST(PreprocessorsTest, multiPPContainerMixedThenCosinePreprocess) { } // The preprocessors should be released by the preprocessors container. } +TEST(PreprocessorsTest, decrease_size_STORAGE_then_cosine_no_size_change) {} +TEST(PreprocessorsTest, decrease_size_QUERY_then_cosine_no_size_change) {} + +TEST(PreprocessorsTest, DecreaseSizeThenFloatNormalize) { + using namespace dummyPreprocessors; + std::shared_ptr allocator = VecSimAllocator::newVecsimAllocator(); + + constexpr size_t n_preprocessors = 2; + constexpr size_t alignment = 5; + constexpr size_t elements = 8; + constexpr size_t decrease_amount = 2; + constexpr size_t new_elem_amount = elements - decrease_amount; + + // valgrind detects out of bound reads only if the considered memory is allocated on the heap, + // rather than on the stack. + constexpr size_t original_blob_size = elements * sizeof(float); + auto original_blob_alloc = allocator->allocate_unique(original_blob_size); + float *original_blob = static_cast(original_blob_alloc.get()); + test_utils::populate_float_vec(original_blob, elements); + constexpr size_t new_processed_bytes_count = + original_blob_size - decrease_amount * sizeof(float); + + // Processed blob expected output + float expected_processed_blob[elements] = {0}; + memcpy(expected_processed_blob, original_blob, new_processed_bytes_count); + // Only use half of the blob for normalization + VecSim_Normalize(expected_processed_blob, new_elem_amount, VecSimType_FLOAT32); + + // A pp that decreases the original blob size + auto decrease_size_preprocessor = new (allocator) + DummyChangeAllocSizePreprocessor(allocator, new_processed_bytes_count); + // A normalize pp + auto cosine_preprocessor = new (allocator) + CosinePreprocessor(allocator, new_elem_amount, new_processed_bytes_count); + // Creating multi preprocessors container + auto multiPPContainer = + MultiPreprocessorsContainer(allocator, alignment); + multiPPContainer.addPreprocessor(decrease_size_preprocessor); + multiPPContainer.addPreprocessor(cosine_preprocessor); + + { + ProcessedBlobs processed_blobs = + multiPPContainer.preprocess(original_blob, original_blob_size); + const void *storage_blob = processed_blobs.getStorageBlob(); + const void *query_blob = processed_blobs.getQueryBlob(); + // blobs should point to the same memory slot + ASSERT_EQ(storage_blob, query_blob); + ASSERT_NE(storage_blob, nullptr); + + // memory should be aligned + unsigned char address_alignment = (uintptr_t)(query_blob) % alignment; + ASSERT_EQ(address_alignment, 0); + + // They need to be allocated and processed + EXPECT_NO_FATAL_FAILURE(CompareVectors(static_cast(storage_blob), + expected_processed_blob, new_elem_amount)); + } +} + +TEST(PreprocessorsTest, Int8NormalizeThenIncreaseSize) { + using namespace dummyPreprocessors; + std::shared_ptr allocator = VecSimAllocator::newVecsimAllocator(); + + constexpr size_t n_preprocessors = 2; + constexpr size_t alignment = 5; + constexpr size_t elements = 8; + + // valgrind detects out of bound reads only if the considered memory is allocated on the heap, + // rather than on the stack. + constexpr size_t original_blob_size = elements * sizeof(int8_t); + auto original_blob_alloc = allocator->allocate_unique(original_blob_size); + int8_t *original_blob = static_cast(original_blob_alloc.get()); + test_utils::populate_int8_vec(original_blob, elements); + // size after normalization + constexpr size_t normalized_blob_bytes_count = original_blob_size + sizeof(float); + // size after increasing pp + constexpr size_t elements_addition = 3; + constexpr size_t final_blob_bytes_count = + normalized_blob_bytes_count + elements_addition * sizeof(unsigned char); + + // Processed blob expected output + int8_t expected_processed_blob[elements + sizeof(float) + 3] = {0}; + memcpy(expected_processed_blob, original_blob, original_blob_size); + // normalize the original blob + VecSim_Normalize(expected_processed_blob, elements, VecSimType_INT8); + // add the values of the pp that increases the blob size + unsigned char added_value = DummyChangeAllocSizePreprocessor::getExcessValue(); + for (size_t i = 0; i < elements_addition; i++) { + expected_processed_blob[elements + sizeof(float) + i] = added_value; + } + + // A normalize pp - will allocate the blob + sizeof(float) + auto cosine_preprocessor = new (allocator) + CosinePreprocessor(allocator, elements, normalized_blob_bytes_count); + // A pp that will increase the *normalized* blob size + auto increase_size_preprocessor = + new (allocator) DummyChangeAllocSizePreprocessor(allocator, final_blob_bytes_count); + // Creating multi preprocessors container + auto multiPPContainer = + MultiPreprocessorsContainer(allocator, alignment); + multiPPContainer.addPreprocessor(cosine_preprocessor); + multiPPContainer.addPreprocessor(increase_size_preprocessor); + + { + ProcessedBlobs processed_blobs = + multiPPContainer.preprocess(original_blob, sizeof(original_blob)); + const void *storage_blob = processed_blobs.getStorageBlob(); + const void *query_blob = processed_blobs.getQueryBlob(); + // blobs should point to the same memory slot + ASSERT_EQ(storage_blob, query_blob); + ASSERT_NE(storage_blob, nullptr); + + // memory should be aligned + unsigned char address_alignment = (uintptr_t)(query_blob) % alignment; + ASSERT_EQ(address_alignment, 0); + + // They need to be allocated and processed + EXPECT_NO_FATAL_FAILURE(CompareVectors(static_cast(storage_blob), + expected_processed_blob, + final_blob_bytes_count)); + } +} + +TEST(PreprocessorsTest, cosine_then_change_size) { + // cosine (not changing) + // pp that changes the blob size +} + +TEST(PreprocessorsTest, cosine_change_then_pp_change) { + // cosine ( changing) + // pp that also changes the blob size +} + +// TEST(PreprocessorsTest, multiPPContainerMixedThenCosinePreprocess) { +// // add cosine pp that changes the original blob size +// // add a pp that preprocesses the normalized blob (same size) +// // add a pp that changes the storage_blob size, but not changing the query_blob size +// } + +// TEST(PreprocessorsTest, multiPPContainerMixedThenCosinePreprocess) { +// // add a pp that changes the storage_blob size, but not changing the query_blob size +// // add a pp that preprocesses the normalized blob (same size) +// // add cosine pp that changes the original blob size +// } + +// TEST(PreprocessorsTest, multiPPContainerMixedThenCosinePreprocess) { +// // pp multi container where cosine is only needed for the query blob (not supported yet) +// } diff --git a/tests/unit/test_hnsw_tiered.cpp b/tests/unit/test_hnsw_tiered.cpp index a6540dba5..0b7157c0c 100644 --- a/tests/unit/test_hnsw_tiered.cpp +++ b/tests/unit/test_hnsw_tiered.cpp @@ -4205,52 +4205,52 @@ class PreprocessorDoubleValue : public PreprocessorInterface { PreprocessorDoubleValue(std::shared_ptr allocator, size_t dim) : PreprocessorInterface(allocator), dim(dim) {} void preprocess(const void *original_blob, void *&storage_blob, void *&query_blob, - size_t processed_bytes_count, unsigned char alignment) const override { + size_t &input_blob_size, unsigned char alignment) const override { // One blob was already allocated by a previous preprocessor(s) that process both blobs the // same. The blobs are pointing to the same memory, we need to allocate another memory slot // to split them. if ((storage_blob == query_blob) && (query_blob != nullptr)) { - storage_blob = this->allocator->allocate(processed_bytes_count); - memcpy(storage_blob, query_blob, processed_bytes_count); + storage_blob = this->allocator->allocate(input_blob_size); + memcpy(storage_blob, query_blob, input_blob_size); } // Either both are nullptr or they are pointing to different memory slots. Both cases are // handled by the designated functions. - this->preprocessForStorage(original_blob, storage_blob, processed_bytes_count); - this->preprocessQuery(original_blob, query_blob, processed_bytes_count, alignment); + this->preprocessForStorage(original_blob, storage_blob, input_blob_size); + this->preprocessQuery(original_blob, query_blob, input_blob_size, alignment); } void preprocessForStorage(const void *original_blob, void *&blob, - size_t processed_bytes_count) const override { + size_t &input_blob_size) const override { // If the blob was not allocated yet, allocate it. if (blob == nullptr) { - blob = this->allocator->allocate(processed_bytes_count); - memcpy(blob, original_blob, processed_bytes_count); + blob = this->allocator->allocate(input_blob_size); + memcpy(blob, original_blob, input_blob_size); } for (size_t i = 0; i < dim; i++) { static_cast(blob)[i] *= 2; } } - void preprocessQueryInPlace(void *blob, size_t processed_bytes_count, + void preprocessQueryInPlace(void *blob, size_t input_blob_size, unsigned char alignment) const override { for (size_t i = 0; i < dim; i++) { static_cast(blob)[i] *= 2; } } - void preprocessStorageInPlace(void *blob, size_t processed_bytes_count) const override { + void preprocessStorageInPlace(void *blob, size_t input_blob_size) const override { for (size_t i = 0; i < dim; i++) { static_cast(blob)[i] *= 2; } } - void preprocessQuery(const void *original_blob, void *&blob, size_t processed_bytes_count, + void preprocessQuery(const void *original_blob, void *&blob, size_t &input_blob_size, unsigned char alignment) const override { // If the blob was not allocated yet, allocate it. if (blob == nullptr) { - blob = this->allocator->allocate_aligned(processed_bytes_count, alignment); - memcpy(blob, original_blob, processed_bytes_count); + blob = this->allocator->allocate_aligned(input_blob_size, alignment); + memcpy(blob, original_blob, input_blob_size); } for (size_t i = 0; i < dim; i++) { static_cast(blob)[i] *= 2;