-
Notifications
You must be signed in to change notification settings - Fork 415
[Store] feat: store key-value data in buckets #968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… data in buckets for efficient data management and maintenance.
Summary of ChangesHello @zhuxinjie-nz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a foundational Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a SequentialStorageBackend for managing key-value data in buckets, which is a solid approach to handle numerous small files efficiently. The implementation includes functionalities for batch operations like offloading, querying, and loading, and is accompanied by new tests, which is commendable.
However, the review has uncovered several critical thread-safety issues. Specifically, the bucket ID generation mechanism is not thread-safe, and several methods that modify shared data structures do so without adequate locking, creating potential race conditions and data corruption risks. Additionally, there are opportunities to improve error handling by properly checking the results of functions returning tl::expected instead of unsafely calling .value().
I have provided detailed comments on these points with suggestions for remediation. Addressing these concerns will significantly improve the robustness and reliability of the new storage backend.
| LOG(ERROR) << "Storage backend already initialized"; | ||
| return tl::unexpected(ErrorCode::INTERNAL_ERROR); | ||
| } | ||
| std::shared_lock lock(mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function modifies shared class members (object_bucket_map_, buckets_, total_size_) under a std::shared_lock. A shared_lock is intended for concurrent read access only. Using it during write operations leads to a race condition and undefined behavior. You must use a std::unique_lock to ensure exclusive access and maintain thread safety.
std::unique_lock lock(mutex_);| if (!write_result) { | ||
| LOG(ERROR) << "vector_write failed for: " << bucket_id | ||
| << ", error: " << write_result.error(); | ||
| buckets_.erase(bucket_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buckets_ map is being modified here with erase() without holding a lock on mutex_. Since buckets_ is a shared resource accessed by multiple threads, this operation is not thread-safe and can lead to race conditions or data corruption. You should acquire a std::unique_lock before modifying the map.
{
std::unique_lock lock(mutex_);
buckets_.erase(bucket_id);
}| tl::expected<int64_t, ErrorCode> SequentialStorageBackend::CreateBucketId() { | ||
| auto cur_time_stamp = time_gen(); | ||
| if(cur_time_stamp <= m_i64LastTimeStamp){ | ||
| m_i64SequenceID = (m_i64SequenceID + 1) & SEQUENCE_MASK; | ||
| } else{ | ||
| m_i64SequenceID = 0; | ||
| } | ||
| m_i64LastTimeStamp = cur_time_stamp; | ||
| return (cur_time_stamp << TIMESTAMP_SHIFT) | ||
| | (m_i64SequenceID << SEQUENCE_ID_SHIFT); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function for creating bucket IDs is not thread-safe. It reads and modifies the shared member variables m_i64LastTimeStamp and m_i64SequenceID without any synchronization. If CreateBucketId is called concurrently from multiple threads (e.g., via BatchOffload), it can lead to race conditions and result in duplicate bucket IDs. You must protect this critical section with a mutex.
tl::expected<int64_t, ErrorCode> SequentialStorageBackend::CreateBucketId() {
std::unique_lock lock(mutex_);
auto cur_time_stamp = time_gen();
if(cur_time_stamp <= m_i64LastTimeStamp){
m_i64SequenceID = (m_i64SequenceID + 1) & SEQUENCE_MASK;
} else{
m_i64SequenceID = 0;
}
m_i64LastTimeStamp = cur_time_stamp;
return (cur_time_stamp << TIMESTAMP_SHIFT)
| (m_i64SequenceID << SEQUENCE_ID_SHIFT);
}| char *buf = new char[object_it->second.size+1]; | ||
| buf[object_it->second.size] = '\0'; | ||
| memcpy(buf, object_it->second.ptr, object_it->second.size); | ||
| auto data = std::string(buf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code allocates memory for buf using new char[] but never deallocates it, causing a memory leak in the test. Additionally, it's inefficient as it involves multiple data copies. A safer and more efficient approach is to construct the std::string directly from the buffer pointer and size. This avoids both the memory leak and the unnecessary memcpy.
| char *buf = new char[object_it->second.size+1]; | |
| buf[object_it->second.size] = '\0'; | |
| memcpy(buf, object_it->second.ptr, object_it->second.size); | |
| auto data = std::string(buf); | |
| std::string data(static_cast<char*>(object_it->second.ptr), object_it->second.size); |
| LOG(ERROR) << "batch object is empty"; | ||
| return tl::make_unexpected(ErrorCode::INVALID_KEY); | ||
| } | ||
| auto bucket_id = CreateBucketId().value(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling .value() on a tl::expected without first checking if it contains a value is unsafe. If CreateBucketId() were to return an error, this would throw a tl::bad_expected_access exception and crash the program. It's crucial to handle the error case properly. This pattern of unchecked .value() calls appears multiple times in this file and should be addressed throughout.
auto bucket_id_result = CreateBucketId();
if (!bucket_id_result) {
return tl::make_unexpected(bucket_id_result.error());
}
auto bucket_id = bucket_id_result.value();| tl::expected<void, ErrorCode> BatchQuery(const std::vector<std::string>& keys, | ||
| std::unordered_map<std::string, SequentialObjectMetadata> &batche_object_metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| int64_t m_i64SequenceID = 0; | ||
| int64_t m_i64LastTimeStamp = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These member variables use Hungarian notation (m_i64...), which is generally discouraged in modern C++. It would be more consistent with the rest of the class to use a trailing underscore for private members (e.g., sequence_id_, last_timestamp_).
int64_t sequence_id_ = 0;
int64_t last_timestamp_ = 0;| } | ||
| std::unique_lock lock(mutex_); | ||
| total_size_ += bucket->data_size + bucket->meta_size; | ||
| for (auto key:bucket->keys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| auto file = std::move(open_file_result.value()); | ||
| for (const auto& key : keys) { | ||
| size_t offset; | ||
| auto slice = batched_slices[key]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the [] operator on batched_slices is risky because it will insert a default-constructed Slice if the key doesn't exist, which could hide bugs. It's safer to use find() and check for the key's existence, or use at() which would throw an exception if the key is missing, making debugging easier.
auto slice_it = batched_slices.find(key);
if (slice_it == batched_slices.end()) {
LOG(ERROR) << "Slice for key " << key << " not found in batched_slices";
return tl::make_unexpected(ErrorCode::INVALID_KEY);
}
auto& slice = slice_it->second;| tl::expected<std::string, ErrorCode> SequentialStorageBackend::GetBucketDataPath(int64_t bucket_id) { | ||
| std::string sep = storage_path_.empty() || storage_path_.back() == '/' ? "" : "/"; | ||
| return storage_path_ + sep + std::to_string(bucket_id); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is declared to return tl::expected<std::string, ErrorCode>, but it never actually returns an error. The implementation can be simplified by changing the return type to std::string. This also applies to GetBucketMetadataPath.
std::string SequentialStorageBackend::GetBucketDataPath(int64_t bucket_id) {
std::string sep = storage_path_.empty() || storage_path_.back() == '/' ? "" : "/";
return storage_path_ + sep + std::to_string(bucket_id);
}|
Thanks for the great work! While reviewing the PR, I found several parts of the implementation that I couldn’t fully understand in terms of the underlying motivation and design reasoning. Would it be possible to provide a high-level design description, along with an explanation of the bucket data structure and how it fits into the overall storage architecture? That would really help make the review more effective and ensure we’re aligned on the intended design direction. |
|
Maybe you could resolve Gemini's Critical and High Priority comments. For the lock, you could try |
Thank you, I'll fix this issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this work! I have left some comments.
| * - total_size_: cumulative data size of all stored objects | ||
| */ | ||
| mutable std::shared_mutex mutex_; | ||
| std::string storage_path_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use the "GUARDED_BY" to ensure these objects are accessed correctly.
| FileMode mode) const; | ||
| }; | ||
|
|
||
| class SequentialStorageBackend { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Sequential" in this context is a little bit misleading. Shall we consider using another name? e.g., BucketStorageBackend
| auto meta_path = GetBucketMetadataPath(id).value(); | ||
| auto open_file_result = OpenFile(meta_path, FileMode::Read); | ||
| if (!open_file_result) { | ||
| LOG(INFO) << "Failed to open file for reading: " << meta_path; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shall be a log(error) message
| return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); | ||
| } | ||
| auto file = std::move(open_file_result.value()); | ||
| LOG(INFO) << "Writing bucket with path: " << bucket_data_path; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no error, outputting some logs for every successful operation would result in too many logs being output, which would disturb users. Consider using vlog(1) for debugging. Same apply to other places
| auto write_bucket_result = WriteBucket(bucket_id, bucket, iovs); | ||
| if (!write_bucket_result) { | ||
| LOG(ERROR) << "Failed to write bucket with id: " << bucket_id; | ||
| buckets_.erase(bucket_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn’t we add the bucket information to buckets_ after all files have been written?
- Because once we add it to
buckets_, read requests can already access it even though the data hasn’t been fully written yet. - If the write operation fails, then this bucket shouldn’t be added to
buckets_at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Since the bucket ID is used as the file name in the storage backend, reserving a slot in buckets_ upfront prevents multiple threads from concurrently writing to the same bucket file under concurrent access.
- A read operation can only proceed after the key has been inserted into object_bucket_map_.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Holding the lock from before writing until write completion would result in excessively long lock duration.
| LOG(INFO) << "Writing bucket with path: " << bucket_data_path; | ||
|
|
||
| auto write_result = file->vector_write(iovs.data(), iovs.size(), 0); | ||
| if (!write_result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If write fails, shall we try to delete this file? It seems like this file will never be used. This also applies to the metadata file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If write fails, shall we try to delete this file? It seems like this file will never be used. This also applies to the metadata file.
If FILE_WRITE_FAIL occurred, the PosixFile destructor will delete the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. That makes sense.
|
Use |
A file was missing, now fixed |
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a BucketStorageBackend for more efficient key-value data management by grouping data into buckets. The implementation is well-structured, including a new SharedMutex wrapper, a BucketIdGenerator, and comprehensive unit tests. My review focuses on improving robustness and maintainability by addressing a critical issue with serialization, improving error handling consistency, removing unsafe coding patterns, and fixing a test that invokes undefined behavior.
| YLT_REFL(BucketObjectMetadata, offset, key_size, data_size); | ||
|
|
||
| struct BucketMetadata { | ||
| mutable std::shared_mutex statistics_mutex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BucketMetadata struct contains a std::shared_mutex statistics_mutex. std::shared_mutex is not serializable, but this struct is marked for serialization with YLT_REFL. This will likely cause compilation errors or undefined behavior at runtime when struct_pb::to_pb is called on a BucketMetadata instance. Since statistics_mutex does not appear to be used, it should be removed to avoid this issue. If a mutex is needed for BucketMetadata instances, it should be managed separately from the serializable struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix @xiaguan
| } else { | ||
| LOG(ERROR) << "Key " << key << " does not exist"; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In BatchQuery, when a key is not found, an error is logged, but the function continues and returns a success status. This is inconsistent with BatchLoad, which returns ErrorCode::INVALID_KEY in a similar scenario. This behavior can be misleading for callers. Consider returning an error, such as ErrorCode::OBJECT_NOT_FOUND, if any key is not found.
} else {
LOG(ERROR) << "Key " << key << " does not exist";
return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix @xiaguan
| GetBucketDataPath(bucket_id).value(); | ||
| auto bucket_meta_path = | ||
| GetBucketMetadataPath(bucket_id).value(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code frequently uses .value() on tl::expected objects (e.g., GetBucketDataPath(bucket_id).value()) without first checking if they contain a value. This is a fragile pattern that can lead to crashes if the function is ever modified to return an error. It's safer to check for an error before accessing the value. This pattern appears in multiple places within this file (Init, WriteBucket, StoreBucketMetadata, etc.).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix @xiaguan
| auto bucket_data_path = GetBucketDataPath(bucket_id); | ||
| return *bucket_data_path + ".meta"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsafe dereferencing of tl::expected. The GetBucketDataPath function returns a tl::expected, which could contain an error. Dereferencing it with * without checking for an error first is unsafe and will lead to a crash if an error is present. You should check for the error before dereferencing.
auto bucket_data_path = GetBucketDataPath(bucket_id);
if (!bucket_data_path) {
return tl::make_unexpected(bucket_data_path.error());
}
return bucket_data_path.value() + ".meta";There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix @xiaguan
| BucketStorageBackend::BuildBucket( | ||
| const std::unordered_map<std::string, std::vector<Slice>>& batch_object, | ||
| std::vector<iovec>& iovs) { | ||
| SharedMutexLocker lock(&mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BuildBucket method acquires an exclusive lock on mutex_, but it doesn't access any shared member variables of the BucketStorageBackend class. This lock is unnecessary and could create a performance bottleneck by serializing bucket-building operations. This method could be a static helper function or a free function that doesn't take a lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix @xiaguan
| auto bucket = buckets_.find(bucket_id); | ||
| if (bucket == buckets_.end()) { | ||
| LOG(ERROR) << "Failed to open file for reading: " | ||
| << storage_filepath; | ||
| return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); | ||
| } | ||
| auto object_metadata = buckets_[bucket_id]->object_metadata.find(key); | ||
| if (object_metadata == buckets_[bucket_id]->object_metadata.end()) { | ||
| LOG(ERROR) << "Failed to open file for reading: " | ||
| << storage_filepath; | ||
| return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of issues in this section:
- Misleading Errors: When a bucket or object metadata is not found, the code logs a generic "Failed to open file for reading" message and returns
ErrorCode::FILE_OPEN_FAIL. The message and error code should be more specific, like "Bucket not found" (BUCKET_NOT_FOUND). - Redundant Lookup:
buckets_.find(bucket_id)is called, and thenbuckets_[bucket_id]is used, which performs a second lookup. It's more efficient to use the iterator from thefindcall.
Here is a suggested change that addresses both points.
auto bucket_it = buckets_.find(bucket_id);
if (bucket_it == buckets_.end()) {
LOG(ERROR) << "Bucket not found with id: " << bucket_id;
return tl::make_unexpected(ErrorCode::BUCKET_NOT_FOUND);
}
auto object_metadata = bucket_it->second->object_metadata.find(key);
if (object_metadata == bucket_it->second->object_metadata.end()) {
LOG(ERROR) << "Object metadata not found for key '" << key
<< "' in bucket " << bucket_id;
return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND);
}| EXPECT_TRUE( | ||
| mtx.try_lock_shared()); // Multiple shared locks should be allowed | ||
| // Note: This test does not attempt recursive locking (UB), just checks | ||
| // concurrent shared access. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test attempts to acquire a shared lock on mtx while it's already held by a SharedMutexLocker in the same thread. Calling try_lock_shared on a std::shared_mutex that is already owned by the calling thread (in any mode) results in undefined behavior. This test should be removed or rewritten to test concurrency from a different thread, similar to how SharedAccessIsConcurrent is implemented.
|
|
||
| struct StorageObjectMetadata { | ||
| int64_t bucket_id; | ||
| size_t offset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use uint64_t or int64_t since size_t varies in size across different machines.
I think int64_t is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
So here are my several points of confusion:
I think we could also refer to Cachelib's Navy Block Cache documentation for more best practices on efficiently storing large key/value pairs on SSD: https://cachelib.org/docs/Cache_Library_Architecture_Guide/navy_overview#block-cache |
|
The code looks good to me. We can merge this pr at the moment and leave the further optimization to subsequent work. |
Summary
Implement a SequentialStorageBackend to store key-value data in buckets for efficient data management and maintenance.
Motivation
Behavior & Compatibility