diff --git a/mooncake-store/include/tiered_cache/cache_tier.h b/mooncake-store/include/tiered_cache/cache_tier.h new file mode 100644 index 000000000..d0bbd0da5 --- /dev/null +++ b/mooncake-store/include/tiered_cache/cache_tier.h @@ -0,0 +1,88 @@ +#pragma once + +#include +#include +#include +#include + +#include "transfer_engine.h" + +namespace mooncake { + +struct DataSource; +enum class MemoryType; +class TieredBackend; + +/** + * @enum MemoryType + * @brief Defines the physical storage medium type for a cache tier. + */ +enum class MemoryType { DRAM, NVME, UNKNOWN }; + +static inline std::string MemoryTypeToString(MemoryType type) { + switch (type) { + case MemoryType::DRAM: + return "DRAM"; + case MemoryType::NVME: + return "NVME"; + default: + return "UNKNOWN"; + } +} + +/** + * @struct DataSource + * @brief Describes a source of data for copy/write operations. + */ +struct DataSource { + uint64_t ptr; // Pointer to data (if in memory) / file descriptor + uint64_t offset; // Offset within the source (for files/SSDs) + size_t size; // Size in bytes + MemoryType type; // Source memory type +}; + +/** + * @class CacheTier + * @brief Abstract base class for a single tier (e.g., DRAM, SSD). + * * Update: Supports decoupled Allocation/Write/Bind operations to allow + * flexible placement strategies (Client-centric vs Master-centric). + */ +class CacheTier { + public: + virtual ~CacheTier() = default; + + /** + * @brief Initializes the cache tier. + */ + virtual bool Init(TieredBackend* backend, TransferEngine* engine) = 0; + + /** + * @brief Reserve Space (Allocation) + * Finds free space of `size` bytes. Does NOT copy data. + * * @param size Bytes to allocate. + * @param data DataSource struct to fill with allocation info. + * @return true if allocation succeeds. + */ + virtual bool Allocate(size_t size, DataSource& data) = 0; + + /** + * @brief Free Space (Rollback/Cleanup) + * Releases space at offset. Used when writes fail or explicitly freeing + * anonymous blocks. + */ + virtual bool Free(DataSource data) = 0; + + // --- Accessors & Metadata --- + virtual uint64_t GetTierId() const = 0; + virtual size_t GetCapacity() const = 0; + virtual size_t GetUsage() const = 0; + virtual MemoryType GetMemoryType() const = 0; + virtual const std::vector& GetTags() const = 0; + + protected: + // A pointer to the parent backend, allowing tiers to access shared services + // like the DataCopier. + TieredBackend* backend_ = nullptr; +}; + +} // namespace mooncake diff --git a/mooncake-store/include/tiered_cache/copier_registry.h b/mooncake-store/include/tiered_cache/copier_registry.h new file mode 100644 index 000000000..80c518ae7 --- /dev/null +++ b/mooncake-store/include/tiered_cache/copier_registry.h @@ -0,0 +1,86 @@ +#pragma once + +#include "tiered_cache/cache_tier.h" +#include "tiered_cache/data_copier.h" +#include +#include +#include +#include + +namespace mooncake { + +// Forward declaration from data_copier.h to avoid circular dependency +class DataCopierBuilder; + +// Holds the registration information for a memory type. +struct MemoryTypeRegistration { + MemoryType type; + CopyFunction to_dram_func; + CopyFunction from_dram_func; +}; + +// Holds the registration for an optimized direct path. +struct DirectPathRegistration { + MemoryType src_type; + MemoryType dest_type; + CopyFunction func; +}; + +/** + * @brief A singleton registry for data copier functions. + * + * Modules can register their copy functions here during static initialization. + * The DataCopierBuilder will then use this registry to construct a DataCopier. + */ +class CopierRegistry { + public: + /** + * @brief Get the singleton instance of the registry. + */ + static CopierRegistry& GetInstance(); + + /** + * @brief Registers the to/from DRAM copy functions for a memory type. + */ + void RegisterMemoryType(MemoryType type, CopyFunction to_dram, + CopyFunction from_dram); + + /** + * @brief Registers an optional, optimized direct copy path. + */ + void RegisterDirectPath(MemoryType src, MemoryType dest, CopyFunction func); + + // These methods are used by the DataCopierBuilder to collect all + // registrations. + const std::vector& GetMemoryTypeRegistrations() + const; + const std::vector& GetDirectPathRegistrations() + const; + + private: + friend class DataCopierBuilder; + + CopierRegistry() = default; + ~CopierRegistry() = default; + CopierRegistry(const CopierRegistry&) = delete; + CopierRegistry& operator=(const CopierRegistry&) = delete; + + std::vector memory_type_regs_; + std::vector direct_path_regs_; +}; + +/** + * @brief A helper class to automatically register copiers at static + * initialization time. + * + * To register a new memory type, simply declare a static instance of this class + * in the corresponding .cpp file, providing the type and its to/from DRAM + * copiers. + */ +class CopierRegistrar { + public: + CopierRegistrar(MemoryType type, CopyFunction to_dram, + CopyFunction from_dram); +}; + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/include/tiered_cache/data_copier.h b/mooncake-store/include/tiered_cache/data_copier.h new file mode 100644 index 000000000..cd843ec09 --- /dev/null +++ b/mooncake-store/include/tiered_cache/data_copier.h @@ -0,0 +1,90 @@ +#pragma once + +#include "tiered_cache/cache_tier.h" +#include +#include +#include +#include +#include +#include + +namespace mooncake { + +using CopyFunction = + std::function; + +class DataCopier; + +/** + * @brief A helper class to build a valid DataCopier. + * + * This builder enforces the rule that for any new memory type added, + * its copy functions to and from DRAM *must* be provided via the + * CopierRegistry. + */ +class DataCopierBuilder { + public: + /** + * @brief Constructs a builder. It automatically pulls all existing + * registrations from the global CopierRegistry. + */ + DataCopierBuilder(); + + /** + * @brief (Optional) Registers a highly optimized direct copy path. + * This will be used instead of the DRAM fallback. Can be used for testing + * or for paths that are not self-registered. + * @return A reference to the builder for chaining. + */ + DataCopierBuilder& AddDirectPath(MemoryType src_type, MemoryType dest_type, + CopyFunction func); + + /** + * @brief Builds the final, immutable DataCopier object. + * It verifies that all memory types defined in the MemoryType enum + * have been registered via the registry before creating the object. + * @return A unique_ptr to the new DataCopier. + * @throws std::logic_error if a required to/from DRAM copier is missing. + */ + std::unique_ptr Build() const; + + private: + std::map, CopyFunction> copy_matrix_; +}; + +/** + * @brief A central utility for copying data between different memory types. + * It supports a fallback mechanism via DRAM for any copy paths that are not + * explicitly registered as a direct path. + */ +class DataCopier { + public: + // The constructor is private. Use DataCopierBuilder to create an instance. + ~DataCopier() = default; + DataCopier(const DataCopier&) = delete; + DataCopier& operator=(const DataCopier&) = delete; + + /** + * @brief Executes a copy from a source to a destination. + * It first attempts to find a direct copy function (e.g., VRAM -> VRAM). + * If not found, it automatically falls back to a two-step copy via a + * temporary DRAM buffer (e.g., VRAM -> DRAM -> SSD). + * @param src The data source descriptor. + * @param dest_type The memory type of the destination. + * @param dest_ptr A pointer to the destination (memory address, handle, + * etc.). + * @return True if the copy was successful, false otherwise. + */ + bool Copy(const DataSource& src, const DataSource& dst) const; + + private: + friend class DataCopierBuilder; // Allow builder to access the constructor. + DataCopier( + std::map, CopyFunction> copy_matrix); + + CopyFunction FindCopier(MemoryType src_type, MemoryType dest_type) const; + const std::map, CopyFunction> + copy_matrix_; +}; + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/include/tiered_cache/tiered_backend.h b/mooncake-store/include/tiered_cache/tiered_backend.h new file mode 100644 index 000000000..a43a1f20c --- /dev/null +++ b/mooncake-store/include/tiered_cache/tiered_backend.h @@ -0,0 +1,195 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "tiered_cache/cache_tier.h" +#include "tiered_cache/data_copier.h" + +namespace mooncake { + +class TieredBackend; // Forward declaration + +/** + * @struct TieredLocation + * @brief Describes the physical location of a segment within the tiered + * storage. + */ +struct TieredLocation { + uint64_t tier_id; + struct DataSource data; +}; + +/** + * @struct TierView + * @brief A snapshot of a tier's status, used for reporting topology to the + * Master. + */ +struct TierView { + uint64_t id; + MemoryType type; + size_t capacity; + size_t usage; + size_t free_space; + int priority; + std::vector tags; +}; + +/** + * @struct AllocationEntry + * @brief The internal state of an allocation. + * acts as the "Control Block" for the resource. + * When the last shared_ptr pointing to this entry dies, the destructor + * triggers the physical release of the resource via the Backend. + */ +struct AllocationEntry { + TieredBackend* backend; + TieredLocation loc; + + AllocationEntry(TieredBackend* b, TieredLocation l) : backend(b), loc(l) {} + + // Destructor: Automatically releases the resource if valid + ~AllocationEntry(); +}; + +/** + * @typedef AllocationHandle + * @brief A reference-counted handle to a storage resource. + */ +using AllocationHandle = std::shared_ptr; + +/** + * @brief Callback for metadata synchronization. + * Invoked after data copy is complete. + * Returns true if sync succeeds, false otherwise. + */ +using MetadataSyncCallback = + std::function; + +/** + * @class TieredBackend + * @brief Data plane management class supporting tiered storage with RAII-based + * resource management. + */ +class TieredBackend { + public: + TieredBackend(); + ~TieredBackend() = default; + + bool Init(Json::Value root, TransferEngine* engine); + + // --- Client-Centric Operations --- + // All the following operations are designed for Client-Centric, Client + // should manage the resource by itself, and synchronize with Master when + // needed. + + /** + * @brief Allocation + * reserves storage space. Returns a handle. + * If the handle goes out of scope without being committed, the space is + * auto-freed. + */ + AllocationHandle Allocate( + size_t size, std::optional preferred_tier = std::nullopt); + + /** + * @brief Execution (Write) + * Writes data to the location specified by the handle. + */ + bool Write(const DataSource& source, AllocationHandle handle); + + /** + * @brief Commit (Register) + * Registers the handle in the local metadata index. + * Supports multi-tier: The key (SegmentID) can exist in multiple tiers + * simultaneously. If a replica already exists on the same tier, it is + * replaced. + */ + bool Commit(const std::string& key, AllocationHandle handle); + + /** + * @brief Get + * Returns a handle. + * @param tier_id: If specified, returns the handle on that specific tier. + * If nullopt, returns the handle from the highest priority tier available. + */ + AllocationHandle Get(const std::string& key, + std::optional tier_id = std::nullopt); + + /** + * @brief Delete + * Removes the key from the metadata index. + * @param tier_id: If specified, removes only the replica on that tier. + * If nullopt, removes ALL replicas for this key (and the key entry itself). + */ + bool Delete(const std::string& key, + std::optional tier_id = std::nullopt); + + // --- Composite Operations --- + + /** + * @brief Data Migration / Replication + * Creates a copy of data on dest_tier_id. + * Note: This adds a new replica. It does NOT automatically delete the + * source replica. + */ + bool CopyData(const std::string& key, const DataSource& source, + uint64_t dest_tier_id, MetadataSyncCallback sync_cb); + + // --- Introspection & Internal --- + + std::vector GetTierViews() const; + const CacheTier* GetTier(uint64_t tier_id) const; + const DataCopier& GetDataCopier() const; + + // Internal API called by AllocationEntry destructor + void FreeInternal(const TieredLocation& loc); + + private: + struct TierInfo { + int priority; + std::vector tags; + }; + + /** + * @struct MetadataEntry + * @brief Holds all replicas for a specific key. + * Uses a dedicated mutex to allow per-key concurrency. + */ + struct MetadataEntry { + mutable std::shared_mutex mutex; // Entry-level lock + std::vector> + replicas; // tier_id -> handle + }; + + // Get list of Tier IDs sorted by priority (descending) + std::vector GetSortedTiers() const; + + // Low-level allocation logic + bool AllocateInternalRaw(size_t size, + std::optional preferred_tier, + TieredLocation* out_loc); + + // Map from tier ID to the actual CacheTier instance. + std::unordered_map> tiers_; + + // Map from tier ID to static config info + std::unordered_map tier_info_; + + // Global Metadata Index: Key -> Entry + // map_mutex_ only protects the structure of this map (insertions/deletions + // of keys). Accessing existing keys is highly concurrent. + std::unordered_map> + metadata_index_; + mutable std::shared_mutex map_mutex_; + + std::unique_ptr data_copier_; +}; + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/CMakeLists.txt b/mooncake-store/src/CMakeLists.txt index 05968b56d..3ae31244e 100644 --- a/mooncake-store/src/CMakeLists.txt +++ b/mooncake-store/src/CMakeLists.txt @@ -26,6 +26,9 @@ set(MOONCAKE_STORE_SOURCES dummy_client.cpp http_metadata_server.cpp file_storage.cpp + tiered_cache/copier_registry.cpp + tiered_cache/data_copier.cpp + tiered_cache/tiered_backend.cpp ) set(EXTRA_LIBS "") diff --git a/mooncake-store/src/tiered_cache/copier_registry.cpp b/mooncake-store/src/tiered_cache/copier_registry.cpp new file mode 100644 index 000000000..d6554bdfd --- /dev/null +++ b/mooncake-store/src/tiered_cache/copier_registry.cpp @@ -0,0 +1,41 @@ +#include "tiered_cache/copier_registry.h" +#include "tiered_cache/data_copier.h" +#include + +namespace mooncake { + +CopierRegistry& CopierRegistry::GetInstance() { + static CopierRegistry instance; + return instance; +} + +void CopierRegistry::RegisterMemoryType(MemoryType type, CopyFunction to_dram, + CopyFunction from_dram) { + memory_type_regs_.push_back( + {type, std::move(to_dram), std::move(from_dram)}); +} + +void CopierRegistry::RegisterDirectPath(MemoryType src, MemoryType dest, + CopyFunction func) { + direct_path_regs_.push_back({src, dest, std::move(func)}); +} + +const std::vector& +CopierRegistry::GetMemoryTypeRegistrations() const { + return memory_type_regs_; +} + +const std::vector& +CopierRegistry::GetDirectPathRegistrations() const { + return direct_path_regs_; +} + +CopierRegistrar::CopierRegistrar(MemoryType type, CopyFunction to_dram, + CopyFunction from_dram) { + // When a static CopierRegistrar object is created, it registers the memory + // type. + CopierRegistry::GetInstance().RegisterMemoryType(type, std::move(to_dram), + std::move(from_dram)); +} + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/tiered_cache/data_copier.cpp b/mooncake-store/src/tiered_cache/data_copier.cpp new file mode 100644 index 000000000..36fde3f14 --- /dev/null +++ b/mooncake-store/src/tiered_cache/data_copier.cpp @@ -0,0 +1,116 @@ +#include "tiered_cache/data_copier.h" +#include "tiered_cache/copier_registry.h" +#include +#include +#include + +namespace mooncake { + +DataCopierBuilder::DataCopierBuilder() { + // Process all registrations from the global registry. + const auto& registry = CopierRegistry::GetInstance(); + + for (const auto& reg : registry.GetMemoryTypeRegistrations()) { + copy_matrix_[{reg.type, MemoryType::DRAM}] = reg.to_dram_func; + copy_matrix_[{MemoryType::DRAM, reg.type}] = reg.from_dram_func; + } + for (const auto& reg : registry.GetDirectPathRegistrations()) { + copy_matrix_[{reg.src_type, reg.dest_type}] = reg.func; + } +} + +DataCopierBuilder& DataCopierBuilder::AddDirectPath(MemoryType src_type, + MemoryType dest_type, + CopyFunction func) { + copy_matrix_[{src_type, dest_type}] = std::move(func); + return *this; +} + +std::unique_ptr DataCopierBuilder::Build() const { + const auto& registry = CopierRegistry::GetInstance(); + for (const auto& reg : registry.GetMemoryTypeRegistrations()) { + if (reg.type == MemoryType::DRAM) { + continue; + } + if (copy_matrix_.find({reg.type, MemoryType::DRAM}) == + copy_matrix_.end()) { + throw std::logic_error( + "DataCopierBuilder Error: Missing copy function for type " + + MemoryTypeToString(reg.type) + " TO DRAM."); + } + if (copy_matrix_.find({MemoryType::DRAM, reg.type}) == + copy_matrix_.end()) { + throw std::logic_error( + "DataCopierBuilder Error: Missing copy function for DRAM TO " + "type " + + MemoryTypeToString(reg.type) + "."); + } + } + + return std::unique_ptr(new DataCopier(copy_matrix_)); +} + +DataCopier::DataCopier( + std::map, CopyFunction> copy_matrix) + : copy_matrix_(std::move(copy_matrix)) {} + +CopyFunction DataCopier::FindCopier(MemoryType src_type, + MemoryType dest_type) const { + auto it = copy_matrix_.find({src_type, dest_type}); + return (it != copy_matrix_.end()) ? it->second : nullptr; +} + +bool DataCopier::Copy(const DataSource& src, const DataSource& dest) const { + MemoryType dest_type = dest.type; + // Try to find a direct copy function. + if (auto direct_copier = FindCopier(src.type, dest_type)) { + VLOG(1) << "Using direct copier for " << MemoryTypeToString(src.type) + << " -> " << MemoryTypeToString(dest_type); + return direct_copier(src, dest); + } + + // If no direct copier, try fallback via DRAM. + if (src.type != MemoryType::DRAM && dest_type != MemoryType::DRAM) { + VLOG(1) << "No direct copier. Attempting fallback via DRAM for " + << MemoryTypeToString(src.type) << " -> " + << MemoryTypeToString(dest_type); + + auto to_dram_copier = FindCopier(src.type, MemoryType::DRAM); + auto from_dram_copier = FindCopier(MemoryType::DRAM, dest_type); + + if (to_dram_copier && from_dram_copier) { + std::unique_ptr temp_dram_buffer( + new (std::nothrow) char[src.size]); + if (!temp_dram_buffer) { + LOG(ERROR) << "Failed to allocate temporary DRAM buffer for " + "fallback copy."; + return false; + } + + // Step A: Source -> DRAM + DataSource temp_dram = { + reinterpret_cast(temp_dram_buffer.get()), 0, src.size, + MemoryType::DRAM}; + if (!to_dram_copier(src, temp_dram)) { + LOG(ERROR) << "Fallback copy failed at Step A (Source -> DRAM)"; + return false; + } + + // Step B: DRAM -> Destination + if (!from_dram_copier(temp_dram, dest)) { + LOG(ERROR) + << "Fallback copy failed at Step B (DRAM -> Destination)"; + return false; + } + return true; + } + } + + LOG(ERROR) << "No copier registered for transfer from memory type " + << MemoryTypeToString(src.type) << " to " + << MemoryTypeToString(dest_type) + << ", and fallback path is not available."; + return false; +} + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/tiered_cache/tiered_backend.cpp b/mooncake-store/src/tiered_cache/tiered_backend.cpp new file mode 100644 index 000000000..9c53dad4d --- /dev/null +++ b/mooncake-store/src/tiered_cache/tiered_backend.cpp @@ -0,0 +1,358 @@ +#include +#include +#include +#include + +#include "tiered_cache/tiered_backend.h" +#include "tiered_cache/cache_tier.h" + +namespace mooncake { + +AllocationEntry::~AllocationEntry() { + if (backend) { + // When ref count drops to 0, call back to backend to free physical + // resource. + backend->FreeInternal(loc); + } +} + +TieredBackend::TieredBackend() = default; + +bool TieredBackend::Init(Json::Value root, TransferEngine* engine) { + // Initialize DataCopier + try { + DataCopierBuilder builder; + data_copier_ = builder.Build(); + } catch (const std::logic_error& e) { + LOG(FATAL) << "Failed to build DataCopier: " << e.what(); + return false; + } + + // Initialize Tiers + if (!root.isMember("tiers")) { + LOG(ERROR) << "Tiered cache config is missing 'tiers' array."; + return false; + } + + for (const auto& tier_config : root["tiers"]) { + uint64_t id = tier_config["id"].asUInt(); + // std::string type = tier_config["type"].asString(); // Unused for now + int priority = tier_config["priority"].asInt(); + std::vector tags; + if (tier_config.isMember("tags")) { + for (const auto& tag : tier_config["tags"]) + tags.push_back(tag.asString()); + } + + // TODO: Logic to instantiate specific CacheTier types (DRAM/SSD) goes + // here. For example: std::unique_ptr tier = + // CacheTierFactory::Create(tier_config); tier->Init(this, engine); + // tiers_[id] = std::move(tier); + + // Placeholder for compilation if Factory is not ready + // tiers_[id] = std::make_unique(); + + tier_info_[id] = {priority, tags}; + } + + LOG(INFO) << "TieredBackend initialized successfully with " << tiers_.size() + << " tiers."; + return true; +} + +std::vector TieredBackend::GetSortedTiers() const { + std::vector ids; + for (const auto& [id, _] : tiers_) ids.push_back(id); + + // Sort by priority descending (higher priority first) + std::sort(ids.begin(), ids.end(), [this](uint64_t a, uint64_t b) { + return tier_info_.at(a).priority > tier_info_.at(b).priority; + }); + return ids; +} + +bool TieredBackend::AllocateInternalRaw(size_t size, + std::optional preferred_tier, + TieredLocation* out_loc) { + if (!out_loc) return false; + + // Try preferred tier first + if (preferred_tier.has_value()) { + auto it = tiers_.find(*preferred_tier); + if (it != tiers_.end()) { + if (it->second->Allocate(size, out_loc->data)) { + out_loc->tier_id = *preferred_tier; + return true; + } + } + } + + // Fallback: Auto-tiering based on priority + auto sorted_tiers = GetSortedTiers(); + for (uint64_t tier_id : sorted_tiers) { + if (preferred_tier.has_value() && tier_id == *preferred_tier) continue; + + auto& tier = tiers_[tier_id]; + if (tier->Allocate(size, out_loc->data)) { + out_loc->tier_id = tier_id; + return true; + } + } + return false; +} + +void TieredBackend::FreeInternal(const TieredLocation& loc) { + auto it = tiers_.find(loc.tier_id); + if (it != tiers_.end()) { + it->second->Free(loc.data); + } +} + +AllocationHandle TieredBackend::Allocate( + size_t size, std::optional preferred_tier) { + TieredLocation loc; + if (AllocateInternalRaw(size, preferred_tier, &loc)) { + // Create the handle (Ref count = 1). + // If this handle dies without being committed, AllocationEntry + // destructor triggers FreeInternal. + return std::make_shared(this, loc); + } + return nullptr; +} + +bool TieredBackend::Write(const DataSource& source, AllocationHandle handle) { + if (!handle) return false; + auto it = tiers_.find(handle->loc.tier_id); + if (it == tiers_.end()) return false; + + return data_copier_->Copy(source, handle->loc.data); +} + +bool TieredBackend::Commit(const std::string& key, AllocationHandle handle) { + if (!handle) return false; + + std::shared_ptr entry = nullptr; + + // Try to find existing entry (Global Read Lock) + { + std::shared_lock read_lock(map_mutex_); + auto it = metadata_index_.find(key); + if (it != metadata_index_.end()) { + entry = it->second; + } + } + + // Create if not exists (Global Write Lock) + if (!entry) { + std::unique_lock write_lock(map_mutex_); + // Double-check logic + auto it = metadata_index_.find(key); + if (it != metadata_index_.end()) { + entry = it->second; + } else { + entry = std::make_shared(); + metadata_index_[key] = entry; + } + } + + // Update Entry (Entry Write Lock) + // Global lock is released. We only lock this specific key's entry. + { + std::unique_lock entry_lock(entry->mutex); + // Insert or replace the handle for this tier + bool found = false; + for (auto& replica : entry->replicas) { + if (replica.first == handle->loc.tier_id) { + replica.second = handle; + found = true; + break; + } + } + + if (!found) { + entry->replicas.emplace_back(handle->loc.tier_id, handle); + std::sort(entry->replicas.begin(), entry->replicas.end(), + [this](const std::pair& a, + const std::pair& b) { + return tier_info_.at(a.first).priority > + tier_info_.at(b.first).priority; + }); + } + } + + return true; +} + +AllocationHandle TieredBackend::Get(const std::string& key, + std::optional tier_id) { + std::shared_ptr entry = nullptr; + + // Find Entry (Global Read Lock) + { + std::shared_lock read_lock(map_mutex_); + auto it = metadata_index_.find(key); + if (it == metadata_index_.end()) { + return nullptr; + } + entry = it->second; + } + + // Read Entry (Entry Read Lock) + std::shared_lock entry_read_lock(entry->mutex); + + if (entry->replicas.empty()) return nullptr; + + if (tier_id.has_value()) { + for (const auto& replica : entry->replicas) { + if (replica.first == *tier_id) { + return replica.second; + } + } + return nullptr; + } + + // Fallback: Return highest priority replica + return entry->replicas.begin()->second; +} + +bool TieredBackend::Delete(const std::string& key, + std::optional tier_id) { + // Hold references locally to ensure destruction happens OUTSIDE the + // locks This is crucial for non-blocking deletions. + AllocationHandle handle_ref = nullptr; + std::vector handles_to_free; + + if (tier_id.has_value()) { + // Delete Specific Replica + + bool need_cleanup = false; + bool found_tier = false; + + // Optimistic Delete (Global Read Lock + Entry Write Lock) + // This is fast and allows high concurrency. + { + std::shared_lock read_lock(map_mutex_); + auto it = metadata_index_.find(key); + if (it != metadata_index_.end()) { + auto entry = it->second; + + std::unique_lock entry_write_lock( + entry->mutex); + auto tier_it = entry->replicas.end(); + for (auto it = entry->replicas.begin(); + it != entry->replicas.end(); ++it) { + if (it->first == *tier_id) { + tier_it = it; + break; + } + } + + if (tier_it != entry->replicas.end()) { + handle_ref = + tier_it->second; // Capture reference (+1 ref count) + entry->replicas.erase(tier_it); + found_tier = true; + } + + // Mark for cleanup if entry becomes empty + if (entry->replicas.empty()) { + need_cleanup = true; + } + } + } // Read lock released here + + // Retry with Write Lock + // If the entry is empty, we upgrade to a global write lock to remove + // it. This prevents memory leaks from empty "zombie" entries. + if (need_cleanup) { + std::unique_lock write_lock(map_mutex_); + + auto it = metadata_index_.find(key); + if (it != metadata_index_.end()) { + auto entry = it->second; + + // Double-Check Locking: + // Another thread might have added a replica now + std::unique_lock entry_lock(entry->mutex); + + if (entry->replicas.empty()) { + // Confirmed empty, safe to remove from global index + metadata_index_.erase(it); + } + } + } + + return found_tier; + + } else { + // Delete All Replicas (Full Key Deletion) + // Requires Global Write Lock since we are modifying the map structure. + std::unique_lock global_write_lock(map_mutex_); + auto it = metadata_index_.find(key); + if (it == metadata_index_.end()) return false; + + auto entry = it->second; + + { + std::unique_lock entry_lock(entry->mutex); + handles_to_free.reserve(entry->replicas.size()); + for (auto& replica : entry->replicas) { + handles_to_free.push_back(replica.second); + } + entry->replicas.clear(); + } + + // Remove the entry from the global index + metadata_index_.erase(it); + } + + // Handles go out of scope here. + // Ref count drops to 0 -> ~AllocationEntry() -> FreeInternal(). + // This happens concurrently without holding any locks. + return true; +} + +bool TieredBackend::CopyData(const std::string& key, const DataSource& source, + uint64_t dest_tier_id, + MetadataSyncCallback sync_cb) { + if (source.size == 0) return false; + auto dest_handle = Allocate(source.size, dest_tier_id); + if (!dest_handle) return false; + + if (!Write(source, dest_handle)) return false; + + // Sync with Master (Critical Step) + if (sync_cb) { + bool sync_success = sync_cb(key, dest_handle->loc); + if (!sync_success) { + LOG(ERROR) << "CopyData aborted: Master sync failed for key " + << key; + return false; + } + } + + // Commit (Add Replica) + // Takes ownership of dest_handle into the map + return Commit(key, dest_handle); +} + +std::vector TieredBackend::GetTierViews() const { + std::vector views; + for (const auto& [id, tier] : tiers_) { + const auto& info = tier_info_.at(id); + size_t cap = tier->GetCapacity(); + size_t used = tier->GetUsage(); + views.push_back({id, tier->GetMemoryType(), cap, used, cap - used, + info.priority, info.tags}); + } + return views; +} + +const CacheTier* TieredBackend::GetTier(uint64_t tier_id) const { + auto it = tiers_.find(tier_id); + return (it != tiers_.end()) ? it->second.get() : nullptr; +} + +const DataCopier& TieredBackend::GetDataCopier() const { return *data_copier_; } + +} // namespace mooncake \ No newline at end of file