diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md
new file mode 100644
index 000000000..26bdfedd5
--- /dev/null
+++ b/.github/pull_request_template.md
@@ -0,0 +1,29 @@
+## Description
+
+
+
+## Type of Change
+
+* Types
+ - [ ] Bug fix
+ - [ ] New feature
+ - [ ] Transfer Engine
+ - [ ] Mooncake Store
+ - [ ] Mooncake EP
+ - [ ] Integration
+ - [ ] P2P Store
+ - [ ] Python Wheel
+ - [ ] Breaking change
+ - [ ] CI/CD
+ - [ ] Documentation update
+ - [ ] Other
+
+## How Has This Been Tested?
+
+
+
+## Checklist
+
+- [ ] I have performed a self-review of my own code.
+- [ ] I have updated the documentation.
+- [ ] I have added tests to prove my changes are effective.
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 11670d901..91151b3ec 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -166,6 +166,28 @@ jobs:
deactivate
shell: bash
+ - name: Start Mooncake Master
+ run: |
+ source test_env/bin/activate
+ mkdir -p /tmp/mooncake_storage
+ mooncake_master \
+ --eviction_high_watermark_ratio=0.95 \
+ --cluster_id=ci_test_cluster \
+ --port 50051 &
+ sleep 3
+ shell: bash
+
+ - name: Run Python Tensor API Performance Test (CI check)
+ env:
+ MOONCAKE_MASTER: "127.0.0.1:50051"
+ MOONCAKE_TE_META_DATA_SERVER: "http://127.0.0.1:8080/metadata"
+ MOONCAKE_PROTOCOL: "tcp"
+ LOCAL_HOSTNAME: "127.0.0.1"
+ run: |
+ source test_env/bin/activate
+ python scripts/test_tensor_api.py -n 1
+ shell: bash
+
build-flags:
runs-on: ubuntu-22.04
strategy:
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 13179e792..a664909d3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12,6 +12,7 @@ if (BUILD_UNIT_TESTS)
enable_testing()
endif()
+option(WITH_TE "build mooncake transfer engine and sample code" ON)
option(WITH_STORE "build mooncake store library and sample code" ON)
option(WITH_P2P_STORE "build p2p store library and sample code" OFF)
option(WITH_RUST_EXAMPLE "build the Rust interface and sample code for the transfer engine" OFF)
@@ -45,8 +46,10 @@ add_subdirectory(mooncake-common)
include_directories(mooncake-common/etcd)
include_directories(mooncake-common/include)
-add_subdirectory(mooncake-transfer-engine)
-include_directories(mooncake-transfer-engine/include)
+if (WITH_TE)
+ add_subdirectory(mooncake-transfer-engine)
+ include_directories(mooncake-transfer-engine/include)
+endif()
if (WITH_STORE)
message(STATUS "Mooncake Store will be built")
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
new file mode 100644
index 000000000..18c914718
--- /dev/null
+++ b/CODE_OF_CONDUCT.md
@@ -0,0 +1,128 @@
+# Contributor Covenant Code of Conduct
+
+## Our Pledge
+
+We as members, contributors, and leaders pledge to make participation in our
+community a harassment-free experience for everyone, regardless of age, body
+size, visible or invisible disability, ethnicity, sex characteristics, gender
+identity and expression, level of experience, education, socio-economic status,
+nationality, personal appearance, race, religion, or sexual identity
+and orientation.
+
+We pledge to act and interact in ways that contribute to an open, welcoming,
+diverse, inclusive, and healthy community.
+
+## Our Standards
+
+Examples of behavior that contributes to a positive environment for our
+community include:
+
+* Demonstrating empathy and kindness toward other people
+* Being respectful of differing opinions, viewpoints, and experiences
+* Giving and gracefully accepting constructive feedback
+* Accepting responsibility and apologizing to those affected by our mistakes,
+ and learning from the experience
+* Focusing on what is best not just for us as individuals, but for the
+ overall community
+
+Examples of unacceptable behavior include:
+
+* The use of sexualized language or imagery, and sexual attention or
+ advances of any kind
+* Trolling, insulting or derogatory comments, and personal or political attacks
+* Public or private harassment
+* Publishing others' private information, such as a physical or email
+ address, without their explicit permission
+* Other conduct which could reasonably be considered inappropriate in a
+ professional setting
+
+## Enforcement Responsibilities
+
+Community leaders are responsible for clarifying and enforcing our standards of
+acceptable behavior and will take appropriate and fair corrective action in
+response to any behavior that they deem inappropriate, threatening, offensive,
+or harmful.
+
+Community leaders have the right and responsibility to remove, edit, or reject
+comments, commits, code, wiki edits, issues, and other contributions that are
+not aligned to this Code of Conduct, and will communicate reasons for moderation
+decisions when appropriate.
+
+## Scope
+
+This Code of Conduct applies within all community spaces, and also applies when
+an individual is officially representing the community in public spaces.
+Examples of representing our community include using an official e-mail address,
+posting via an official social media account, or acting as an appointed
+representative at an online or offline event.
+
+## Enforcement
+
+Instances of abusive, harassing, or otherwise unacceptable behavior may be
+reported to the community leaders responsible for enforcement at
+.
+All complaints will be reviewed and investigated promptly and fairly.
+
+All community leaders are obligated to respect the privacy and security of the
+reporter of any incident.
+
+## Enforcement Guidelines
+
+Community leaders will follow these Community Impact Guidelines in determining
+the consequences for any action they deem in violation of this Code of Conduct:
+
+### 1. Correction
+
+**Community Impact**: Use of inappropriate language or other behavior deemed
+unprofessional or unwelcome in the community.
+
+**Consequence**: A private, written warning from community leaders, providing
+clarity around the nature of the violation and an explanation of why the
+behavior was inappropriate. A public apology may be requested.
+
+### 2. Warning
+
+**Community Impact**: A violation through a single incident or series
+of actions.
+
+**Consequence**: A warning with consequences for continued behavior. No
+interaction with the people involved, including unsolicited interaction with
+those enforcing the Code of Conduct, for a specified period of time. This
+includes avoiding interactions in community spaces as well as external channels
+like social media. Violating these terms may lead to a temporary or
+permanent ban.
+
+### 3. Temporary Ban
+
+**Community Impact**: A serious violation of community standards, including
+sustained inappropriate behavior.
+
+**Consequence**: A temporary ban from any sort of interaction or public
+communication with the community for a specified period of time. No public or
+private interaction with the people involved, including unsolicited interaction
+with those enforcing the Code of Conduct, is allowed during this period.
+Violating these terms may lead to a permanent ban.
+
+### 4. Permanent Ban
+
+**Community Impact**: Demonstrating a pattern of violation of community
+standards, including sustained inappropriate behavior, harassment of an
+individual, or aggression toward or disparagement of classes of individuals.
+
+**Consequence**: A permanent ban from any sort of public interaction within
+the community.
+
+## Attribution
+
+This Code of Conduct is adapted from the [Contributor Covenant][homepage],
+version 2.0, available at
+https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.
+
+Community Impact Guidelines were inspired by [Mozilla's code of conduct
+enforcement ladder](https://github.com/mozilla/diversity).
+
+[homepage]: https://www.contributor-covenant.org
+
+For answers to common questions about this code of conduct, see the FAQ at
+https://www.contributor-covenant.org/faq. Translations are available at
+https://www.contributor-covenant.org/translations.
diff --git a/README.md b/README.md
index 0be1e7800..a91b3e7c5 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@
| Traces
| Technical Report
| Blog
- | Slack
+ | Slack
@@ -18,6 +18,7 @@
[](https://pypi.org/project/mooncake-transfer-engine)
[](https://deepwiki.com/kvcache-ai/Mooncake)
[](https://github.com/kvcache-ai/Mooncake/graphs/commit-activity)
+ [](https://github.com/kvcache-ai/Mooncake/blob/main/LICENSE-APACHE)
diff --git a/doc/en/mooncake-store.md b/doc/en/mooncake-store.md
index 30a2a903e..02827b4b6 100644
--- a/doc/en/mooncake-store.md
+++ b/doc/en/mooncake-store.md
@@ -488,6 +488,18 @@ There are two startup parameters in `master_service` related to the soft pin mec
Notably, soft pinned objects can still be removed using APIs such as `Remove` or `RemoveAll`.
+### Zombie Object Cleanup
+
+If a Client crashes or experiences a network failure after sending a `PutStart` request but before it can send the corresponding `PutEnd` or `PutRevoke` request to the Master, the object initiated by `PutStart` enters a "zombie" state—rendering it neither usable nor deletable. The existence of such "zombie objects" not only consumes storage space but also prevents subsequent `Put` operations on the same keys. To mitigate these issues, the Master records the start time of each `PutStart` request and employs two timeout thresholds—`put_start_discard_timeout` and `put_start_release_timeout`—to clean up zombie objects.
+
+#### `PutStart` Preemption
+
+If an object receives neither a `PutEnd` nor a `PutRevoke` request within `put_start_discard_timeout` (default: 30 seconds) after its `PutStart`, any subsequent `PutStart` request for the same object will be allowed to "preempt" the previous `PutStart`. This enables the new request to proceed with writing the object, thereby preventing a single faulty Client from permanently blocking access to that object. Note that during such preemption, the storage space allocated by the old `PutStart` is not reused; instead, new space is allocated for the preempting `PutStart`. The space previously allocated by the old `PutStart` will be reclaimed via the mechanism described below.
+
+#### Space Reclaim
+
+Replica space allocated during a `PutStart` is considered releasable by the Master if the write operation is neither completed (via `PutEnd`) nor canceled (via `PutRevoke`) within `put_start_release_timeout` (default: 10 minutes) after the `PutStart`. When object eviction is triggered—either due to allocation failures or because storage utilization exceeds the configured threshold—these releasable replica spaces are prioritized for release to reclaim storage capacity.
+
### Preferred Segment Allocation
Mooncake Store provides a **preferred segment allocation** feature that allows users to specify a preferred storage segment (node) for object allocation. This feature is particularly useful for optimizing data locality and reducing network overhead in distributed scenarios.
diff --git a/doc/en/transfer-engine.md b/doc/en/transfer-engine.md
index 6ccacd92f..4f9bad7e7 100644
--- a/doc/en/transfer-engine.md
+++ b/doc/en/transfer-engine.md
@@ -443,3 +443,4 @@ For advanced users, TransferEngine provides the following advanced runtime optio
- `MC_MIN_PRC_PORT` Specifies the minimum port number for RPC service. The default value is 15000.
- `MC_MAX_PRC_PORT` Specifies the maximum port number for RPC service. The default value is 17000.
- `MC_PATH_ROUNDROBIN` Use round-robin mode in the RDMA path selection. This may be beneficial for transferring large bulks.
+- `MC_ENDPOINT_STORE_TYPE` Choose FIFO Endpoint Store (`FIFO`) or Sieve Endpoint Store (`SIEVE`), default is `SIEVE`.
diff --git a/doc/en/troubleshooting.md b/doc/en/troubleshooting.md
index 10cad63a7..bd7d5bf90 100644
--- a/doc/en/troubleshooting.md
+++ b/doc/en/troubleshooting.md
@@ -64,6 +64,13 @@ In addition, if the error `Failed to get description of XXX` is displayed, it in
## SGLang Common Questions
+### Do I need RDMA to run SGLang and Mooncake?
+
+When using Mooncake for KV cache transfer in SGLang PD disaggregation deployments, GPUDirect RDMA (GDR) is required.
+
+When using Mooncake as a KV cache storage backend in SGLang HiCache, RDMA is recommended for better performance.
+However, if RDMA NICs are not available, the TCP protocol is also supported.
+
### How to make sure GPUDirect RDMA (GDR) is supported
1. Verify the presence of an RDMA-capable NIC (e.g., Mellanox, ERDMA) and drivers.
@@ -84,7 +91,7 @@ lsmod | grep peer_mem
lsmod | grep nvidia_peer_mem
```
-3. If you use container to run SGLang, please make sure RDMA and GDR driver are installed in the container and run container in previledge mode. Requirements: (1) privileged mode must be enabled. (2) RDMA devices/NVIDIA devices mounted into container
+3. If you use container to run SGLang, please make sure RDMA and GDR driver are installed in the container and run container in privileged mode. Requirements: (1) privileged mode must be enabled. (2) RDMA devices/NVIDIA devices mounted into container
4. Check the connectivity
Benchmark end-to-end performance using ib_write_bw.
@@ -93,7 +100,6 @@ apt install perftest
# server side
ib_write_bw -d [rdma_device] -R -x gdr
# client side
-# server side
ib_write_bw -d [rdma_device] -R -x gdr [server_ip]
```
Expected Output:
diff --git a/doc/zh/mooncake-store.md b/doc/zh/mooncake-store.md
index 1568eb60c..1801dfe9d 100644
--- a/doc/zh/mooncake-store.md
+++ b/doc/zh/mooncake-store.md
@@ -492,6 +492,18 @@ virtual std::unique_ptr Allocate(
被软固定的对象仍然可以通过 `Remove`、`RemoveAll` 等 API 主动删除。
+### 僵尸对象清理机制
+
+如果Client因为进程崩溃或网络故障等原因,在发送完`PutStart`请求后无法向Master发送对应的`PutEnd`或`PutRevoke`请求,就会导致`PutStart`的对象处于无法使用也无法删除的“僵尸”状态。“僵尸对象”的存在不仅会占用存储空间,还会导致后续对相同key的`Put`操作无法进行。为了避免这些问题,Master会记录每个对象`PutStart`请求的开始时间,并基于两个超时时间:`put_start_discard_timeout`和`put_start_release_timeout`,对僵尸对象进行清理。
+
+#### `PutStart`顶替
+
+如果一个对象在`PutStart`后的`put_start_discard_timeout`(默认为30秒)时间内没有收到任何的`PutEnd`或是`PutRevoke`请求,那么后续新来的对该对象的`PutStart`操作将能够“顶替”旧的`PutStart`操作,继续进行对该对象的写入,从而避免单个Client的故障导致一些对象永远无法使用。需要注意的是,在发生“顶替”时,不会复用旧`PutStart`分配的空间,而是会重新分配空间供新`PutStart`使用,旧`PutStart`分配的空间将通过下述机制进行回收。
+
+#### 空间回收
+
+在`PutStart`中为对象分配的副本空间,如果在`PutStart`后的`put_start_release_timeout`(默认为10分钟)时间内没有完成写入(收到`PutEnd`)或被撤销(收到`PutRevoke`),将会被Master视为是可释放的。在因空间分配失败或空间使用率高于设定水位而触发对象淘汰时,这些可释放的对象副本空间将会被优先释放以回收存储空间。
+
### 首选段分配
Mooncake Store 提供了**首选段分配**功能,允许用户为对象分配指定首选的存储段(节点)。此功能特别适用于优化数据局部性和减少分布式场景中的网络开销。
diff --git a/doc/zh/transfer-engine.md b/doc/zh/transfer-engine.md
index 97341601f..ba9437536 100644
--- a/doc/zh/transfer-engine.md
+++ b/doc/zh/transfer-engine.md
@@ -415,4 +415,5 @@ int init(const std::string &metadata_conn_string,
- `MC_FORCE_TCP` 强制使用 TCP 作为主要传输方式,无论是否安装了有效的 RDMA 网卡
- `MC_MIN_PRC_PORT` 指定 RPC 服务使用的最小端口号。默认值为 15000。
- `MC_MAX_PRC_PORT` 指定 RPC 服务使用的最大端口号。默认值为 17000。
-- `MC_PATH_ROUNDROBIN` 指定 RDMA 路径选择使用 Round Robin 模式,这对于传输大块数据可能有利。
\ No newline at end of file
+- `MC_PATH_ROUNDROBIN` 指定 RDMA 路径选择使用 Round Robin 模式,这对于传输大块数据可能有利。
+- `MC_ENDPOINT_STORE_TYPE` 选择 FIFO Endpoint Store (`FIFO`) 或者 Sieve Endpoint Store (`SIEVE`),模式是 `SIEVE`。
\ No newline at end of file
diff --git a/docs/source/design/mooncake-store.md b/docs/source/design/mooncake-store.md
index b78a604c8..17a252914 100644
--- a/docs/source/design/mooncake-store.md
+++ b/docs/source/design/mooncake-store.md
@@ -488,6 +488,18 @@ There are two startup parameters in `master_service` related to the soft pin mec
Notably, soft pinned objects can still be removed using APIs such as `Remove` or `RemoveAll`.
+### Zombie Object Cleanup
+
+If a Client crashes or experiences a network failure after sending a `PutStart` request but before it can send the corresponding `PutEnd` or `PutRevoke` request to the Master, the object initiated by `PutStart` enters a "zombie" state—rendering it neither usable nor deletable. The existence of such "zombie objects" not only consumes storage space but also prevents subsequent `Put` operations on the same keys. To mitigate these issues, the Master records the start time of each `PutStart` request and employs two timeout thresholds—`put_start_discard_timeout` and `put_start_release_timeout`—to clean up zombie objects.
+
+#### `PutStart` Preemption
+
+If an object receives neither a `PutEnd` nor a `PutRevoke` request within `put_start_discard_timeout` (default: 30 seconds) after its `PutStart`, any subsequent `PutStart` request for the same object will be allowed to "preempt" the previous `PutStart`. This enables the new request to proceed with writing the object, thereby preventing a single faulty Client from permanently blocking access to that object. Note that during such preemption, the storage space allocated by the old `PutStart` is not reused; instead, new space is allocated for the preempting `PutStart`. The space previously allocated by the old `PutStart` will be reclaimed via the mechanism described below.
+
+#### Space Reclaim
+
+Replica space allocated during a `PutStart` is considered releasable by the Master if the write operation is neither completed (via `PutEnd`) nor canceled (via `PutRevoke`) within `put_start_release_timeout` (default: 10 minutes) after the `PutStart`. When object eviction is triggered—either due to allocation failures or because storage utilization exceeds the configured threshold—these releasable replica spaces are prioritized for release to reclaim storage capacity.
+
### Preferred Segment Allocation
Mooncake Store provides a **preferred segment allocation** feature that allows users to specify a preferred storage segment (node) for object allocation. This feature is particularly useful for optimizing data locality and reducing network overhead in distributed scenarios.
diff --git a/docs/source/getting_started/examples/sglang-integration/hicache-integration-v1.md b/docs/source/getting_started/examples/sglang-integration/hicache-integration-v1.md
index 02d9cde19..806a67c70 100644
--- a/docs/source/getting_started/examples/sglang-integration/hicache-integration-v1.md
+++ b/docs/source/getting_started/examples/sglang-integration/hicache-integration-v1.md
@@ -102,28 +102,43 @@ For more details, please refer to [Mooncake official installation guide](https:/
**Mooncake** is a distributed system that efficiently aggregates memory resources across multiple servers. It can also be deployed on a single server for simpler setups.
-When integrated with **SGLang**, the system conceptually consists of four key components: `the master service`, `metadata service`, `store service`, and the `SGLang server`. Among them, the `master service` and `metadata service` are responsible for object and metadata maintenance. The `store service` manages a contiguous memory segment that contributes to the distributed KV cache, making its memory accessible to both local and remote `SGLang servers`. Data transfer occurs directly between the `store service` and `SGLang servers`, bypassing the `master service`.
+When integrated with **SGLang**, the system conceptually consists of four key components: `the master service`, `metadata service` (Optional), `store service` (Optional), and the `SGLang server`. Among them, the `master service` and `metadata service` are responsible for object and metadata maintenance. The `store service` manages a contiguous memory segment that contributes to the distributed KV cache, making its memory accessible to both local and remote `SGLang servers`. Data transfer occurs directly between the `store service` and `SGLang servers`, bypassing the `master service`.
### Single Server Deployment
-**Launch Mooncake `metadata service`:**
+**Launch Mooncake `metadata service` (Optional):**
```bash
python -m mooncake.http_metadata_server
```
+This service is responsible for centralized metadata management including internal connection status and related metadata.
+
+Deployment of the `metadata service` can be skipped in the following cases:
+* Mooncake supports non-centralized metadata management via a P2P handshake mechanism to exchange metadata. When using this mode, deployment of the `metadata service` can be skipped.
+* Mooncake also supports embedding `mededata service` into `master service`. In this case, only the `master service` needs to be started.
+
**Launch Mooncake `master service`:**
+The `master service` orchestrates the logical storage space pool across the entire cluster, managing KV cache space allocation and eviction.
+
+To start `mooncake_master`:
+
```bash
mooncake_master --eviction_high_watermark_ratio=0.95
```
+To start `mooncake_master` with embedded `metadata service` (so that a separate `metadata service` deployment can be skipped):
+
+```bash
+mooncake_master --enable_http_metadata_server=true --http_metadata_server_port=8080 --eviction_high_watermark_ratio=0.95
+```
+
**Understanding `eviction_high_watermark_ratio`:**
When a `PutStart` request fails due to insufficient memory, or when the eviction thread detects that space usage has reached the configured high watermark ratio, an eviction task is triggered to free up space by evicting a portion of objects.
-Due to memory fragmentation, allocation failures may occur even when memory usage has not yet reached 100%. The actual threshold depends on the workload. This [benchmark document](https://kvcache-ai.github.io/Mooncake/performance/allocator-benchmark-result.html)
- provides memory allocation efficiency results under different scenarios. if excessive allocation failures are observed, consider lowering this parameter accordingly.
+Due to memory fragmentation, allocation failures may occur even when memory usage has not yet reached 100%. The actual threshold depends on the workload. This [benchmark document](https://kvcache-ai.github.io/Mooncake/performance/allocator-benchmark-result.html) provides memory allocation efficiency results under different scenarios. if excessive allocation failures are observed, consider lowering this parameter accordingly.
**Launch Mooncake `store service` (Optional):**
@@ -132,88 +147,125 @@ First, create and save a configuration file in JSON format. For example:
```json
{
"local_hostname": "localhost",
- "metadata_server": "http://localhost:8080/metadata",
- "master_server_address": "localhost:50051",
+ "metadata_server": "http://127.0.0.1:8080/metadata",
+ "master_server_address": "127.0.0.1:50051",
"protocol": "rdma",
- "device_name": "mlx5_0,mlx5_1",
- "global_segment_size": 2684354560,
+ "device_name": "",
+ "global_segment_size": "4gb",
"local_buffer_size": 0
}
```
-Parameter Explanation:
+Note: If the `metadata service` is not deployed, set this field to:
-* `local_hostname`: The hostname of the `store service`.
-* `metadata_server`: The network address of the `metadata service`. The default port is 8080.
-* `master_server_address`: The network address of the `master service`. The default port is 50051.
-* `protocol`: The protocol used by the Mooncake. Supported values are `"rdma"` or `"tcp"`. For optimal performance, `"rdma"` is recommended.
-* `device_name`: The RDMA devices used by Mooncake. This parameter is required only when the protocol is set to `"rdma"`. Available devices can be listed using the `ibv_devices` command.
-* `global_segment_size`: The amount of memory (in bytes) contributed to the global memory pool. A larger value allows Mooncake to cache more KV tensors.
-* `local_buffer_size`: Local buffer is used to do request operations such as `Get` or `Put`. In this case, it is set to 0 because the instance functions solely as a storage server, contributing memory to the global pool without issuing any request operations.
+```json
+ "metadata_server": "P2PHANDSHAKE",
+```
Then start the `store service`:
```bash
-python -m mooncake.mooncake_store_service --config=[config_path]
+python -m mooncake.mooncake_store_service --config=[config_path] --port=8081
```
Mooncake `store service` configuration can also be provided via environment variables:
```bash
+MOONCAKE_LOCAL_HOSTNAME="localhost" \
MOONCAKE_TE_META_DATA_SERVER="http://127.0.0.1:8080/metadata" \
-MOONCAKE_GLOBAL_SEGMENT_SIZE=4294967296 \
+MOONCAKE_MASTER="127.0.0.1:50051" \
MOONCAKE_PROTOCOL="rdma" \
-MOONCAKE_DEVICE="erdma_0,erdma_1" \
-MOONCAKE_MASTER=127.0.0.1:50051 \
-python -m mooncake.mooncake_store_service
+MOONCAKE_DEVICE="" \
+MOONCAKE_GLOBAL_SEGMENT_SIZE="4gb" \
+MOONCAKE_LOCAL_BUFFER_SIZE=0 \
+python -m mooncake.mooncake_store_service --port=8081
```
+**Parameter Explanation:**
+
+* `local_hostname`, `MOONCAKE_LOCAL_HOSTNAME`: The hostname of the `store service`.
+* `metadata_server`, `MOONCAKE_TE_META_DATA_SERVER` : The network address of the `metadata service`. The default port is 8080. If the `metadata service` is not deployed, set this field to: `"metadata_server": "P2PHANDSHAKE"`.
+* `master_server_address`, `MOONCAKE_MASTER`: The network address of the `master service`. The default port is 50051.
+* `protocol`, `MOONCAKE_PROTOCOL`: The protocol used by Mooncake. Supported values are `"rdma"` or `"tcp"`. For optimal performance, `"rdma"` is recommended.
+* `device_name`, `MOONCAKE_DEVICE`: The RDMA devices used by Mooncake. This field can usually be left empty, as Mooncake automatically discovers available NICs by default. This parameter is required only when the protocol is set to `"rdma"` **and** a specific set of NICs needs to be used. Example: `"device_name": "mlx5_0,mlx5_1"`. To list available devices, run `ibv_devices`. **Note:** If the environment variable `MC_MS_AUTO_DISC` is set to `1`, any `device_name` or `MOONCAKE_DEVICE` configuration will be overridden, and Mooncake will switch to auto-discovery mode.
+* `global_segment_size`, `MOONCAKE_GLOBAL_SEGMENT_SIZE`: The amount of memory contributed to the global memory pool. Accepts either bytes (integer) or a string with the `gb` suffix, e.g., `"4294967296"` or `"4gb"`. A larger value allows Mooncake to cache more KV tensors.
+* `local_buffer_size`, `MOONCAKE_LOCAL_BUFFER_SIZE`: Local buffer is used to do request operations such as `Get` or `Put`. In this case, it is set to 0 because the instance functions solely as a storage server, contributing memory to the global pool without issuing any request operations.
+
+**Important: Understanding Global Segment Size**
+
+`global_segment_size` and `MOONCAKE_GLOBAL_SEGMENT_SIZE`: This parameter specifies the amount of memory each instance contributes to the distributed memory pool. The total memory available for KV cache storage across the cluster is the sum of the memory contributed by all instances.
+
+Adjust this value according to system’s available memory and expected cache requirements.
Note: If `MOONCAKE_GLOBAL_SEGMENT_SIZE` is set to a non-zero value when starting the `SGLang server`, launching the `store service` can be skipped. In this case, the `SGLang server` also takes on the role of the `store service`, which simplifies deployment but couples the two components together. Users can choose the deployment approach that best fits their needs.
**Start the `SGLang server` with Mooncake enabled:**
-Mooncake configuration can be provided via environment variables. Note that, for optimal performance, the Mooncake backend currently supports only the `page_first` layout (which optimizes memory access patterns for KV cache operations).
+There are three ways to configure Mooncake:
-There are two ways to configure Mooncake: 1. Using environment variables; 2. Using extra-config of sglang arguments.
+1. Via extra configuration passed through sglang parameters
+2. Using JSON configuration files
+3. Using environment variables
-**Using env variables to configure Mooncake**
+Mooncake loads configuration in the following priority order:
+
+1. If Mooncake-specific options are provided in `--hicache-storage-backend-extra-config`, they are used first.
+2. If not, Mooncake checks whether the environment variable `DEFAULT_MOONCAKE_CONFIG_PATH_ENV` is set, and loads the JSON config file from that path.
+3. If neither of the above is provided, Mooncake falls back to environment variables.
+
+**Using extra-config of sglang arguments to configure Mooncake**
```bash
-MOONCAKE_TE_META_DATA_SERVER="http://127.0.0.1:8080/metadata" \
-MOONCAKE_MASTER=127.0.0.1:50051 \
-MOONCAKE_PROTOCOL="rdma" \
-MOONCAKE_DEVICE="mlx5_0,mlx5_1" \
-MOONCAKE_GLOBAL_SEGMENT_SIZE=4294967296 \
python -m sglang.launch_server \
--enable-hierarchical-cache \
- --hicache-storage-backend mooncake\
- --model-path [model_path]
+ --hicache-storage-backend mooncake \
+ --model-path [model_path] \
+ --hicache-storage-backend-extra-config '{"master_server_address": "127.0.0.1:50051", "local_hostname": "localhost", "metadata_server": "http://127.0.0.1:8080/metadata", "global_segment_size": "4gb", "protocol": "rdma", "device_name": ""}'
```
-Parameter Explanation:
-
-* `MOONCAKE_TE_META_DATA_SERVER`: The network address of the `metadata service`. The default port is 8080.
-* `MOONCAKE_MASTER`: The network address of the `master service`. The default port is 50051.
-* `MOONCAKE_PROTOCOL`: The protocol used by Mooncake. Supported values are `"rdma"` or `"tcp"`. For optimal performance, `"rdma"` is recommended.
-* `MOONCAKE_DEVICE`: The RDMA devices used by Mooncake. This parameter is required only when the protocol is set to `"rdma"`. Available devices can be listed using the `ibv_devices` command.
-* `MOONCAKE_GLOBAL_SEGMENT_SIZE`: The amount of memory (in bytes) contributed to the global memory pool. If at least one `store service` is launched, then this value could be set to `0`. In this case, the `SGLang server` will not contribute any memory to the system. Note that KV tensors cached in the contributed memory will be lost once this process terminates; however, this will not cause any system errors.
+**Using JSON file to configure Mooncake**
-**Using extra-config of sglang arguments to configure Mooncake**
+SGLang server can load Mooncake config from `SGLANG_HICACHE_MOONCAKE_CONFIG_PATH`.
```bash
+export SGLANG_HICACHE_MOONCAKE_CONFIG_PATH=/sgl-workspace/sglang/benchmark/hicache/mooncake_config.json
+
+echo '{
+ "local_hostname": "localhost",
+ "metadata_server": "http://127.0.0.1:8080/metadata",
+ "master_server_address": "127.0.0.1:50051",
+ "protocol": "rdma",
+ "device_name": "",
+ "global_segment_size": "4gb"
+}' > ${SGLANG_HICACHE_MOONCAKE_CONFIG_PATH}
+
python -m sglang.launch_server \
--enable-hierarchical-cache \
--hicache-storage-backend mooncake \
- --model-path [model_path] \
- --hicache-storage-backend-extra-config '{"master_server_address": "127.0.0.1:50051", "local_hostname": "localhost", "metadata_server": "http://127.0.0.1:8080/metadata", "global_segment_size": 4294967296, "local_buffer_size": 16777216, "protocol": "rdma", "device_name": "mlx5_0,mlx5_1"}'
+ --model-path [model_path]
```
-**Important: Understanding Global Segment Size**
+**Using env variables to configure Mooncake**
-`global_segment_size` for `store service` and `MOONCAKE_GLOBAL_SEGMENT_SIZE` for `SGLang service`: This parameter specifies the amount of memory each instance contributes to the distributed memory pool. The total memory available for KV cache storage across the cluster is the sum of the memory contributed by all instances.
+```bash
+MOONCAKE_TE_META_DATA_SERVER="http://127.0.0.1:8080/metadata" \
+MOONCAKE_MASTER="127.0.0.1:50051" \
+MOONCAKE_PROTOCOL="rdma" \
+MOONCAKE_DEVICE="" \
+MOONCAKE_GLOBAL_SEGMENT_SIZE="4gb" \
+python -m sglang.launch_server \
+ --enable-hierarchical-cache \
+ --hicache-storage-backend mooncake\
+ --model-path [model_path]
+```
-Adjust this value according to system’s available memory and expected cache requirements.
+**Parameter Explanation:**
+
+The Mooncake parameters used here are essentially the same as those configured for the `store service`.
+
+In particular, for the `global segment size`, if at least one `store service` instance is running, this value can be set to `0`. In this case, the SGLang server will not contribute any memory to the system. Note that KV tensors stored in this contributed memory will be lost when the process exits; however, this will **not** cause any system errors.
+
+**Important:** when `tp > 1`, each Tensor Parallel (TP) rank launches its own Mooncake backend instance and contributes `1/global_segment_size` memory. Therefore, the total memory consumption equals `global segment size`.
**HiCache Related Parameters for SGLang Server**
@@ -281,11 +333,23 @@ python -m sglang_router.launch_router \
## Troubleshooting
-**RDMA Registration Failure**:
+**RDMA Registration Failure:**
* In some environments, RDMA registration may require root privileges. In this case, try running the program as root.
* In certain environments (e.g., eRDMA), there is an upper limit on the total amount of RDMA memory that can be registered. Once this limit is exceeded, registration will fail. To resolve this, you can lower the value of `MOONCAKE_GLOBAL_SEGMENT_SIZE`, or reduce the host memory allocated to HiCache in the `SGLang server` (since this memory is fully registered with RDMA to enable zero-copy).
+**HiCache CPU Memory Usage:**
+
+When using HiCache, the default L2 host DRAM (CPU memory) size for KV cache is **2 times** the size of the L1 device memory (GPU memory) for KV cache.
+
+If the model is small but the GPU memory is large — especially in multi-TP (tensor parallel) setups — this may cause the L1 KV cache to become very large, which in turn can consume excessive CPU DRAM.
+
+In such cases, you should manually configure an appropriate L2 cache size based on your hardware. This can be done by setting `--hicache-ratio` or `--hicache-size`.
+
+**More Information:**
+
+Additional troubleshooting information can be found [here](https://kvcache-ai.github.io/Mooncake/troubleshooting/troubleshooting.html).
+
## Test Mooncake Store
This test is intended for developers to quickly verify that the MooncakeStore class interfaces are functioning correctly.
@@ -296,7 +360,6 @@ First, start the `metadata service` and `master service`. Then run the `test_moo
MOONCAKE_TE_META_DATA_SERVER="http://127.0.0.1:8080/metadata" \
MOONCAKE_MASTER=127.0.0.1:50051 \
MOONCAKE_PROTOCOL="rdma" \
-MOONCAKE_DEVICE="mlx5_0,mlx5_1" \
MOONCAKE_GLOBAL_SEGMENT_SIZE=16777216 \
python3 [path of test_mooncake_store.py]
```
diff --git a/docs/source/http-api-reference/http-service.md b/docs/source/http-api-reference/http-service.md
new file mode 100644
index 000000000..578b65408
--- /dev/null
+++ b/docs/source/http-api-reference/http-service.md
@@ -0,0 +1,126 @@
+# Mooncake Store HTTP Service
+
+The Mooncake Store HTTP Service provides RESTful endpoints for cluster management, monitoring, and data operations. This service is embedded within the `mooncake_master` process and can be enabled alongside the primary RPC services.
+
+## Overview
+
+The HTTP service serves multiple purposes:
+- **Metrics & Monitoring**: Prometheus-compatible metrics endpoints
+- **Cluster Management**: Query and manage distributed storage segments
+- **Data Inspection**: Examine stored objects and their replicas
+- **Health Checks**: Service availability and status verification
+
+## HTTP Endpoints
+
+### Metrics Endpoints
+
+#### `/metrics`
+Prometheus-compatible metrics endpoint providing detailed system metrics in text format.
+
+**Method**: `GET`
+**Content-Type**: `text/plain; version=0.0.4`
+**Response**: Comprehensive metrics including request counts, error rates, latency statistics, and resource utilization
+
+**Example**:
+```bash
+curl http://localhost:8080/metrics
+```
+
+#### `/metrics/summary`
+Human-readable metrics summary with key performance indicators.
+
+**Method**: `GET`
+**Content-Type**: `text/plain; version=0.0.4`
+**Response**: Condensed overview of system health and performance metrics
+
+**Example**:
+```bash
+curl http://localhost:8080/metrics/summary
+```
+
+### Data Management Endpoints
+
+#### `/query_key`
+Retrieve replica information for a specific key, including memory locations and transport endpoints.
+
+**Method**: `GET`
+**Parameters**: `key` (query parameter) - The object key to query
+**Content-Type**: `text/plain; version=0.0.4`
+**Response**: JSON-formatted replica descriptors for memory replicas
+
+**Example**:
+```bash
+curl "http://localhost:8080/query_key?key=my_object"
+```
+
+**Response Format**:
+```json
+{
+ "transport_endpoint_": "hostname:port",
+ "buffer_descriptors": [...]
+}
+```
+
+#### `/get_all_keys`
+List all keys currently stored in the distributed system.
+
+**Method**: `GET`
+**Content-Type**: `text/plain; version=0.0.4`
+**Response**: Newline-separated list of all stored keys
+
+**Example**:
+```bash
+curl http://localhost:8080/get_all_keys
+```
+
+### Segment Management Endpoints
+
+#### `/get_all_segments`
+List all mounted segments in the cluster.
+
+**Method**: `GET`
+**Content-Type**: `text/plain; version=0.0.4`
+**Response**: Newline-separated list of segment names
+
+**Example**:
+```bash
+curl http://localhost:8080/get_all_segments
+```
+
+#### `/query_segment`
+Query detailed information about a specific segment, including used and available capacity.
+
+**Method**: `GET`
+**Parameters**: `segment` (query parameter) - Segment name to query
+**Content-Type**: `text/plain; version=0.0.4`
+**Response**: Multi-line text with segment details
+
+**Example**:
+```bash
+curl "http://localhost:8080/query_segment?segment=segment_name"
+```
+
+**Response Format**:
+```
+segment_name
+Used(bytes): 1073741824
+Capacity(bytes): 4294967296
+```
+
+### Health Check Endpoints
+
+#### `/health`
+Basic health check endpoint for service availability verification.
+
+**Method**: `GET`
+**Content-Type**: `text/plain; version=0.0.4`
+**Response**: `OK` when service is healthy
+**Status Codes**:
+- `200 OK`: Service is healthy
+- Other: Service may be experiencing issues
+
+**Example**:
+```bash
+curl http://localhost:8080/health
+```
+
diff --git a/docs/source/index.md b/docs/source/index.md
index e09185d59..4014d7c7e 100644
--- a/docs/source/index.md
+++ b/docs/source/index.md
@@ -86,6 +86,7 @@ performance/allocator-benchmark-result.md
python-api-reference/mooncake-store
python-api-reference/transfer-engine
+http-api-reference/http-service
:::
% Explanation of Mooncake internals
diff --git a/docs/source/troubleshooting/troubleshooting.md b/docs/source/troubleshooting/troubleshooting.md
index 3f04a7dd5..e6c50f595 100644
--- a/docs/source/troubleshooting/troubleshooting.md
+++ b/docs/source/troubleshooting/troubleshooting.md
@@ -71,3 +71,47 @@ If the network state is unstable, some requests may not be delivered, displaying
Note: In most cases, the errors output, except for the first occurrence, are `work request flushed error`. This is because when the first error occurs, the RDMA driver sets the connection to an unavailable state, so tasks in the submission queue are blocked from execution and subsequent errors are reported. Therefore, it is recommended to locate the first occurrence of the error and check it.
In addition, if the error `Failed to get description of XXX` is displayed, it indicates that the Segment name input by the user when calling the `openSegment` interface cannot be found in the etcd database. For memory read/write scenarios, the Segment name needs to strictly match the `local_hostname` field filled in by the other node during initialization.
+
+## SGLang Common Questions
+
+### Do I need RDMA to run SGLang and Mooncake?
+
+When using Mooncake for KV cache transfer in SGLang PD disaggregation deployments, GPUDirect RDMA (GDR) is required.
+
+When using Mooncake as a KV cache storage backend in SGLang HiCache, RDMA is recommended for better performance.
+However, if RDMA NICs are not available, the TCP protocol is also supported.
+
+### How to make sure GPUDirect RDMA (GDR) is supported
+
+1. Verify the presence of an RDMA-capable NIC (e.g., Mellanox, ERDMA) and drivers.
+```
+ibv_devices
+lspci | grep rdma
+lsmod | grep -E 'ib_core|mlx4_core|mlx5_core|nvidia_peer_mem'
+```
+If no RDMA devices appear: (1) Confirm physical NIC presence via lspci
+(2) Install vendor-specific drivers (e.g., Mellanox MLNX_OFED)
+
+2. check GDR driver is ready, and peer_memory module (part of MLNX_OFED) should be installed
+```
+# Check peer_memory module (from MLNX_OFED)
+lsmod | grep peer_mem
+
+# Verify NVIDIA peer memory module
+lsmod | grep nvidia_peer_mem
+```
+
+3. If you use container to run SGLang, please make sure RDMA and GDR driver are installed in the container and run container in privileged mode. Requirements: (1) privileged mode must be enabled. (2) RDMA devices/NVIDIA devices mounted into container
+
+4. Check the connectivity
+Benchmark end-to-end performance using ib_write_bw.
+```
+apt install perftest
+# server side
+ib_write_bw -d [rdma_device] -R -x gdr
+# client side
+ib_write_bw -d [rdma_device] -R -x gdr [server_ip]
+```
+Expected Output:
+Successful bidirectional transfer with "BW peak" reported
+Errors with -x gdr indicate GDR setup failures
diff --git a/mooncake-common/common.cmake b/mooncake-common/common.cmake
index a35104d38..d12bdbde3 100644
--- a/mooncake-common/common.cmake
+++ b/mooncake-common/common.cmake
@@ -74,6 +74,7 @@ option(WITH_RUST_EXAMPLE "build the Rust interface and sample code for the trans
option(WITH_METRICS "enable metrics and metrics reporting thread" ON)
option(USE_3FS "option for using 3FS storage backend" OFF)
option(WITH_NVIDIA_PEERMEM "disable to support RDMA without nvidia-peermem. If WITH_NVIDIA_PEERMEM=OFF then USE_CUDA=ON is required." ON)
+option(USE_EVENT_DRIVEN_COMPLETION "option for using event-driven completion (store & transfer engine)" OFF)
option(USE_LRU_MASTER "option for using LRU in master service" OFF)
set(LRU_MAX_CAPACITY 1000)
@@ -83,6 +84,12 @@ if (USE_LRU_MASTER)
add_compile_definitions(LRU_MAX_CAPACITY)
endif()
+if (USE_EVENT_DRIVEN_COMPLETION)
+ add_compile_definitions(USE_EVENT_DRIVEN_COMPLETION)
+ message(STATUS "Event-driven completion is enabled")
+else()
+ message(STATUS "Event-driven completion is disabled")
+endif()
if (USE_NVMEOF)
set(USE_CUDA ON)
diff --git a/mooncake-common/src/CMakeLists.txt b/mooncake-common/src/CMakeLists.txt
index 13dc70d67..63bb25d05 100644
--- a/mooncake-common/src/CMakeLists.txt
+++ b/mooncake-common/src/CMakeLists.txt
@@ -10,4 +10,8 @@ add_library(mooncake_common
target_link_libraries(mooncake_common PUBLIC
yaml-cpp
jsoncpp
-)
\ No newline at end of file
+)
+
+if (BUILD_SHARED_LIBS)
+ install(TARGETS mooncake_common DESTINATION lib)
+endif()
diff --git a/mooncake-ep/include/mooncake_backend.h b/mooncake-ep/include/mooncake_backend.h
index 47f9faff5..b012eb6f4 100644
--- a/mooncake-ep/include/mooncake_backend.h
+++ b/mooncake-ep/include/mooncake_backend.h
@@ -8,8 +8,6 @@
namespace mooncake {
-std::string getCudaTopologyJson(const std::vector& filter);
-
class MooncakeBackend final : public ::c10d::Backend {
public:
struct MooncakeBackendOptions final : ::c10d::Backend::Options {
@@ -63,7 +61,6 @@ class MooncakeBackend final : public ::c10d::Backend {
static void setHostIp(const std::string& hostIp) { hostIp_ = hostIp; }
static void setDeviceFilter(std::vector filters) {
- hca_filters_ = filters;
engine_.setWhitelistFilters(std::move(filters));
}
@@ -75,7 +72,6 @@ class MooncakeBackend final : public ::c10d::Backend {
private:
static TransferEngine engine_;
static Transport* transport_;
- static std::vector hca_filters_;
static int backendIndex_;
bool isCpu_{false};
static std::string hostIp_;
diff --git a/mooncake-ep/src/CMakeLists.txt b/mooncake-ep/src/CMakeLists.txt
index cfbda215e..be988ca5b 100644
--- a/mooncake-ep/src/CMakeLists.txt
+++ b/mooncake-ep/src/CMakeLists.txt
@@ -1,4 +1,4 @@
-add_library(mooncake_ep mooncake_backend.cpp mooncake_cuda_topology.cpp mooncake_ep_buffer.cpp mooncake_ep_kernel.cu mooncake_worker.cu mooncake_worker_thread.cpp mooncake_ibgda/mlx5gda.cpp)
+add_library(mooncake_ep mooncake_backend.cpp mooncake_ep_buffer.cpp mooncake_ep_kernel.cu mooncake_worker.cu mooncake_worker_thread.cpp mooncake_ibgda/mlx5gda.cpp)
set_target_properties(mooncake_ep PROPERTIES POSITION_INDEPENDENT_CODE ON)
target_link_libraries(mooncake_ep PUBLIC ${TORCH_LIBRARIES} transfer_engine ibverbs mlx5)
diff --git a/mooncake-ep/src/mooncake_backend.cpp b/mooncake-ep/src/mooncake_backend.cpp
index 0d56a8d28..707ab3fca 100644
--- a/mooncake-ep/src/mooncake_backend.cpp
+++ b/mooncake-ep/src/mooncake_backend.cpp
@@ -16,9 +16,8 @@ constexpr const char* SPARSE_ERROR_MSG = "Sparse op not supported.";
constexpr const char* REDUCE_DTYPE_ERROR_MSG = "Unsupported reduce dtype: ";
std::string MooncakeBackend::hostIp_ = "127.0.0.1";
-TransferEngine MooncakeBackend::engine_ = TransferEngine();
+TransferEngine MooncakeBackend::engine_ = TransferEngine(true);
Transport* MooncakeBackend::transport_ = nullptr;
-std::vector MooncakeBackend::hca_filters_;
int MooncakeBackend::backendIndex_ = 0;
MooncakeWorker MooncakeBackend::worker_;
@@ -34,11 +33,7 @@ MooncakeBackend::MooncakeBackend(
// Initialize transfer engine
if (!transport_) {
engine_.init(P2PHANDSHAKE, hostIp_);
- std::string topology = getCudaTopologyJson(hca_filters_);
- void** args = (void**)malloc(2 * sizeof(void*));
- args[0] = (void*)topology.c_str();
- args[1] = nullptr;
- transport_ = engine_.installTransport("rdma", args);
+ transport_ = engine_.installTransport("rdma", nullptr);
TORCH_CHECK(transport_ != nullptr,
c10::str("Failed to install transport"));
}
diff --git a/mooncake-ep/src/mooncake_cuda_topology.cpp b/mooncake-ep/src/mooncake_cuda_topology.cpp
deleted file mode 100644
index c7ada6140..000000000
--- a/mooncake-ep/src/mooncake_cuda_topology.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-#include
-#include
-#include
-#include
-#include
-
-namespace mooncake {
-
-struct InfinibandDevice {
- std::string name;
- std::string pci_bus_id;
-};
-
-static std::vector listInfiniBandDevices(
- const std::vector &filter) {
- int num_devices = 0;
- std::vector devices;
-
- struct ibv_device **device_list = ibv_get_device_list(&num_devices);
- if (!device_list) {
- LOG(WARNING) << "No RDMA devices found, check your device installation";
- return {};
- }
- if (device_list && num_devices <= 0) {
- LOG(WARNING) << "No RDMA devices found, check your device installation";
- ibv_free_device_list(device_list);
- return {};
- }
-
- for (int i = 0; i < num_devices; ++i) {
- std::string device_name = ibv_get_device_name(device_list[i]);
- if (!filter.empty() && std::find(filter.begin(), filter.end(),
- device_name) == filter.end())
- continue;
- char path[PATH_MAX + 32];
- char resolved_path[PATH_MAX];
- // Get the PCI bus id for the infiniband device. Note that
- // "/sys/class/infiniband/mlx5_X/" is a symlink to
- // "/sys/devices/pciXXXX:XX/XXXX:XX:XX.X/infiniband/mlx5_X/".
- snprintf(path, sizeof(path), "/sys/class/infiniband/%s/../..",
- device_name.c_str());
- if (realpath(path, resolved_path) == NULL) {
- PLOG(ERROR) << "listInfiniBandDevices: realpath " << path
- << " failed";
- continue;
- }
- std::string pci_bus_id = basename(resolved_path);
-
- devices.push_back(
- InfinibandDevice{.name = std::move(device_name),
- .pci_bus_id = std::move(pci_bus_id)});
- }
- ibv_free_device_list(device_list);
- return devices;
-}
-
-static int getPciDistance(const char *bus1, const char *bus2) {
- char buf[PATH_MAX];
- char path1[PATH_MAX];
- char path2[PATH_MAX];
- snprintf(buf, sizeof(buf), "/sys/bus/pci/devices/%s", bus1);
- if (realpath(buf, path1) == NULL) {
- return -1;
- }
- snprintf(buf, sizeof(buf), "/sys/bus/pci/devices/%s", bus2);
- if (realpath(buf, path2) == NULL) {
- return -1;
- }
-
- char *ptr1 = path1;
- char *ptr2 = path2;
- while (*ptr1 && *ptr1 == *ptr2) {
- ptr1++;
- ptr2++;
- }
- int distance = 0;
- for (; *ptr1; ptr1++) {
- distance += (*ptr1 == '/');
- }
- for (; *ptr2; ptr2++) {
- distance += (*ptr2 == '/');
- }
-
- return distance;
-}
-
-static std::vector discoverCudaTopology(
- const std::vector &all_hca) {
- std::vector topology;
- int device_count;
- if (cudaGetDeviceCount(&device_count) != cudaSuccess) {
- device_count = 0;
- }
- for (int i = 0; i < device_count; i++) {
- char pci_bus_id[20];
- if (cudaDeviceGetPCIBusId(pci_bus_id, sizeof(pci_bus_id), i) !=
- cudaSuccess) {
- continue;
- }
- for (char *ch = pci_bus_id; (*ch = tolower(*ch)); ch++);
-
- std::vector preferred_hca;
- std::vector avail_hca;
-
- // Find HCAs with minimum distance in one pass
- int min_distance = INT_MAX;
- std::vector min_distance_hcas;
-
- for (const auto &hca : all_hca) {
- int distance = getPciDistance(hca.pci_bus_id.c_str(), pci_bus_id);
- if (distance >= 0) {
- if (distance < min_distance) {
- min_distance = distance;
- min_distance_hcas.clear();
- min_distance_hcas.push_back(hca.name);
- } else if (distance == min_distance) {
- min_distance_hcas.push_back(hca.name);
- }
- }
- }
-
- // Add HCAs with minimum distance to preferred_hca, others to avail_hca
- for (const auto &hca : all_hca) {
- if (std::find(min_distance_hcas.begin(), min_distance_hcas.end(),
- hca.name) != min_distance_hcas.end()) {
- preferred_hca.push_back(hca.name);
- } else {
- avail_hca.push_back(hca.name);
- }
- }
- topology.push_back(
- TopologyEntry{.name = "cuda:" + std::to_string(i),
- .preferred_hca = std::move(preferred_hca),
- .avail_hca = std::move(avail_hca)});
- }
- return topology;
-}
-
-std::string getCudaTopologyJson(const std::vector &filter) {
- Json::Value topology(Json::objectValue);
- auto all_hca = listInfiniBandDevices(filter);
- for (auto &ent : discoverCudaTopology(all_hca)) {
- topology[ent.name] = ent.toJson();
- }
- return topology.toStyledString();
-}
-} // namespace mooncake
\ No newline at end of file
diff --git a/mooncake-integration/CMakeLists.txt b/mooncake-integration/CMakeLists.txt
index 5a79ff087..ccdc66606 100644
--- a/mooncake-integration/CMakeLists.txt
+++ b/mooncake-integration/CMakeLists.txt
@@ -29,15 +29,17 @@ message("${PYTHON_SYS_PATH}")
set(PYTHON_PACKAGE_NAME "mooncake")
-pybind11_add_module(engine ${SOURCES} ${CACHE_ALLOCATOR_SOURCES}
- transfer_engine/transfer_engine_py.cpp
-)
+if (WITH_TE)
+ pybind11_add_module(engine ${SOURCES} ${CACHE_ALLOCATOR_SOURCES}
+ transfer_engine/transfer_engine_py.cpp
+ )
-target_link_libraries(engine PUBLIC
- transfer_engine
- glog::glog
- gflags::gflags
-)
+ target_link_libraries(engine PUBLIC
+ transfer_engine
+ glog::glog
+ gflags::gflags
+ )
+endif()
set(ALLOCATOR_SO_PATH "${CMAKE_BINARY_DIR}/mooncake-transfer-engine/nvlink-allocator/nvlink_allocator.so")
if(USE_MNNVL)
@@ -113,4 +115,6 @@ if (WITH_STORE)
install(TARGETS store DESTINATION ${PYTHON_SYS_PATH}/${PYTHON_PACKAGE_NAME})
endif()
-install(TARGETS engine DESTINATION ${PYTHON_SYS_PATH}/${PYTHON_PACKAGE_NAME})
+if (WITH_TE)
+ install(TARGETS engine DESTINATION ${PYTHON_SYS_PATH}/${PYTHON_PACKAGE_NAME})
+endif()
diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp
index 20c66ee00..ad285c8aa 100644
--- a/mooncake-integration/store/store_py.cpp
+++ b/mooncake-integration/store/store_py.cpp
@@ -171,6 +171,113 @@ class MooncakeStorePyWrapper {
}
}
+ pybind11::list batch_get_tensor(const std::vector &keys) {
+ if (!store_ || !store_->client_) {
+ LOG(ERROR) << "Client is not initialized";
+ py::list empty_list;
+ for (size_t i = 0; i < keys.size(); ++i) {
+ empty_list.append(py::none());
+ }
+ return empty_list;
+ }
+
+ // Phase 1: Batch Get Buffers (GIL Released)
+ std::vector> buffer_handles;
+ {
+ py::gil_scoped_release release_gil;
+ // This internal call already handles logging for query failures
+ buffer_handles = store_->batch_get_buffer(keys);
+ }
+
+ py::list results_list;
+
+ try {
+ py::gil_scoped_acquire acquire_gil;
+ auto torch = torch_module();
+
+ for (const auto &buffer_handle : buffer_handles) {
+ if (!buffer_handle) {
+ results_list.append(py::none());
+ continue;
+ }
+
+ auto total_length = buffer_handle->size();
+ if (total_length <= sizeof(TensorMetadata)) {
+ LOG(ERROR) << "Invalid data format: insufficient data for "
+ "metadata";
+ results_list.append(py::none());
+ continue;
+ }
+
+ char *exported_data = new char[total_length];
+ if (!exported_data) {
+ LOG(ERROR) << "Failed to allocate memory for tensor data";
+ results_list.append(py::none());
+ continue;
+ }
+
+ memcpy(exported_data, buffer_handle->ptr(), total_length);
+
+ TensorMetadata metadata;
+ memcpy(&metadata, exported_data, sizeof(TensorMetadata));
+
+ if (metadata.ndim < 0 || metadata.ndim > 4) {
+ delete[] exported_data;
+ LOG(ERROR)
+ << "Invalid tensor metadata: ndim=" << metadata.ndim;
+ results_list.append(py::none());
+ continue;
+ }
+
+ TensorDtype dtype_enum =
+ static_cast(metadata.dtype);
+ if (dtype_enum == TensorDtype::UNKNOWN) {
+ delete[] exported_data;
+ LOG(ERROR) << "Unknown tensor dtype!";
+ results_list.append(py::none());
+ continue;
+ }
+
+ size_t tensor_size = total_length - sizeof(TensorMetadata);
+ if (tensor_size == 0) {
+ delete[] exported_data;
+ LOG(ERROR) << "Invalid data format: no tensor data found";
+ results_list.append(py::none());
+ continue;
+ }
+
+ pybind11::object np_array;
+ int dtype_index = static_cast(dtype_enum);
+ if (dtype_index >= 0 &&
+ dtype_index < static_cast(array_creators.size())) {
+ // This call MUST take ownership of exported_data
+ np_array = array_creators[dtype_index](
+ exported_data, sizeof(TensorMetadata), tensor_size);
+ } else {
+ delete[] exported_data; // Free memory on error
+ LOG(ERROR) << "Unsupported dtype enum: " << dtype_index;
+ results_list.append(py::none());
+ continue;
+ }
+
+ if (metadata.ndim > 0) {
+ std::vector shape_vec;
+ for (int i = 0; i < metadata.ndim; i++) {
+ shape_vec.push_back(metadata.shape[i]);
+ }
+ py::tuple shape_tuple = py::cast(shape_vec);
+ np_array = np_array.attr("reshape")(shape_tuple);
+ }
+ pybind11::object tensor = torch.attr("from_numpy")(np_array);
+ results_list.append(tensor);
+ }
+ } catch (const pybind11::error_already_set &e) {
+ LOG(ERROR) << "Failed during batch tensor deserialization: "
+ << e.what();
+ }
+ return results_list;
+ }
+
int put_tensor(const std::string &key, pybind11::object tensor) {
if (!store_ || !store_->client_) {
LOG(ERROR) << "Client is not initialized";
@@ -240,6 +347,160 @@ class MooncakeStorePyWrapper {
return -static_cast(ErrorCode::INVALID_PARAMS);
}
}
+
+ std::vector batch_put_tensor(const std::vector &keys,
+ const pybind11::list &tensors_list) {
+ if (!store_ || !store_->client_) {
+ LOG(ERROR) << "Client is not initialized";
+ return std::vector(
+ keys.size(), -static_cast(ErrorCode::INVALID_PARAMS));
+ }
+
+ if (keys.size() != tensors_list.size()) {
+ LOG(ERROR) << "Keys and tensors list size mismatch. keys="
+ << keys.size() << ", tensors=" << tensors_list.size();
+ return std::vector(
+ keys.size(), -static_cast(ErrorCode::INVALID_PARAMS));
+ }
+
+ if (keys.empty()) {
+ return std::vector();
+ }
+
+ struct TensorInfo {
+ uintptr_t data_ptr;
+ size_t tensor_size;
+ TensorMetadata metadata;
+ bool valid = false; // Mark if metadata extraction was successful
+ };
+
+ std::vector infos(keys.size());
+ std::vector results(keys.size(), 0); // Default to success
+
+ // Phase 1: Extract Metadata (GIL Held)
+ try {
+ for (size_t i = 0; i < keys.size(); ++i) {
+ py::object tensor = tensors_list[i];
+
+ if (!(tensor.attr("__class__")
+ .attr("__name__")
+ .cast()
+ .find("Tensor") != std::string::npos)) {
+ LOG(ERROR)
+ << "Input at index " << i << " is not a PyTorch tensor";
+ results[i] = -static_cast(ErrorCode::INVALID_PARAMS);
+ continue;
+ }
+
+ uintptr_t data_ptr =
+ tensor.attr("data_ptr")().cast();
+ size_t numel = tensor.attr("numel")().cast();
+ size_t element_size =
+ tensor.attr("element_size")().cast();
+ size_t tensor_size = numel * element_size;
+
+ pybind11::object shape_obj = tensor.attr("shape");
+ pybind11::object dtype_obj = tensor.attr("dtype");
+
+ TensorDtype dtype_enum = get_tensor_dtype(dtype_obj);
+ if (dtype_enum == TensorDtype::UNKNOWN) {
+ LOG(ERROR)
+ << "Unsupported tensor dtype for key " << keys[i];
+ results[i] = -static_cast(ErrorCode::INVALID_PARAMS);
+ continue;
+ }
+
+ pybind11::tuple shape_tuple =
+ pybind11::cast(shape_obj);
+ int32_t ndim = static_cast(shape_tuple.size());
+ if (ndim > 4) {
+ LOG(ERROR) << "Tensor " << keys[i]
+ << " has more than 4 dimensions: " << ndim;
+ results[i] = -static_cast(ErrorCode::INVALID_PARAMS);
+ continue;
+ }
+
+ TensorMetadata metadata;
+ metadata.dtype = static_cast(dtype_enum);
+ metadata.ndim = ndim;
+
+ for (int j = 0; j < 4; j++) {
+ metadata.shape[j] =
+ (j < ndim) ? shape_tuple[j].cast() : -1;
+ }
+
+ infos[i] = TensorInfo{data_ptr, tensor_size, metadata, true};
+ }
+ } catch (const pybind11::error_already_set &e) {
+ LOG(ERROR) << "Failed to access tensor data during batch put: "
+ << e.what();
+ return results;
+ }
+
+ std::vector valid_keys;
+ std::vector buffer_ptrs;
+ std::vector buffer_sizes;
+ std::vector>
+ temp_handles; // Manages lifetime of allocated buffers
+ std::vector valid_indices; // To map results back
+
+ {
+ py::gil_scoped_release release_gil;
+
+ for (size_t i = 0; i < infos.size(); ++i) {
+ if (!infos[i].valid) {
+ continue; // Skip items that failed metadata extraction
+ }
+
+ const auto &info = infos[i];
+ size_t total_size = sizeof(TensorMetadata) + info.tensor_size;
+
+ // Allocate a contiguous buffer for this tensor (metadata +
+ // data)
+ auto alloc_result =
+ store_->client_buffer_allocator_->allocate(total_size);
+
+ if (!alloc_result) {
+ LOG(ERROR)
+ << "Failed to allocate buffer for key: " << keys[i]
+ << "size is: " << total_size;
+ results[i] = -static_cast(ErrorCode::INVALID_PARAMS);
+ continue; // Skip this item
+ }
+
+ auto &handle = *alloc_result;
+
+ // Copy metadata
+ memcpy(handle.ptr(), &info.metadata, sizeof(TensorMetadata));
+ // Copy tensor data
+ memcpy(
+ static_cast(handle.ptr()) + sizeof(TensorMetadata),
+ reinterpret_cast(info.data_ptr), info.tensor_size);
+
+ // Add to the list for batch_put_from
+ valid_keys.push_back(keys[i]);
+ buffer_ptrs.push_back(handle.ptr());
+ buffer_sizes.push_back(total_size);
+ temp_handles.push_back(
+ std::make_unique(std::move(handle)));
+ valid_indices.push_back(i);
+ }
+
+ if (valid_keys.empty()) {
+ return results;
+ }
+
+ std::vector batch_op_results =
+ store_->batch_put_from(valid_keys, buffer_ptrs, buffer_sizes);
+
+ for (size_t i = 0; i < batch_op_results.size(); ++i) {
+ size_t original_index = valid_indices[i];
+ results[original_index] = batch_op_results[i];
+ }
+ }
+
+ return results;
+ }
};
PYBIND11_MODULE(store, m) {
@@ -400,6 +661,11 @@ PYBIND11_MODULE(store, m) {
"Get a PyTorch tensor from the store")
.def("put_tensor", &MooncakeStorePyWrapper::put_tensor, py::arg("key"),
py::arg("tensor"), "Put a PyTorch tensor into the store")
+ .def("batch_get_tensor", &MooncakeStorePyWrapper::batch_get_tensor,
+ py::arg("keys"), "Get a batch of PyTorch tensors from the store")
+ .def("batch_put_tensor", &MooncakeStorePyWrapper::batch_put_tensor,
+ py::arg("keys"), py::arg("tensors_list"),
+ "Put a batch of PyTorch tensors into the store")
.def(
"register_buffer",
[](MooncakeStorePyWrapper &self, uintptr_t buffer_ptr,
diff --git a/mooncake-store/include/allocation_strategy.h b/mooncake-store/include/allocation_strategy.h
index 3e7fa65dc..4c2b19495 100644
--- a/mooncake-store/include/allocation_strategy.h
+++ b/mooncake-store/include/allocation_strategy.h
@@ -1,13 +1,12 @@
#pragma once
#include
-#include
#include
-#include
#include
#include
#include
-#include
+#include
+#include
#include
#include "allocator.h" // Contains BufferAllocator declaration
@@ -18,7 +17,7 @@ namespace mooncake {
/**
* @brief Abstract interface for allocation strategy, responsible for
- * allocating multiple slices across multiple replicas using available
+ * allocating a slice (with one or more replicas) using available
* BufferAllocators.
*
* The allocation strategy follows best-effort semantics: if the requested
@@ -31,9 +30,8 @@ class AllocationStrategy {
virtual ~AllocationStrategy() = default;
/**
- * @brief Allocates multiple slices across the requested number of replicas
- * using best-effort semantics. Each replica will contain all
- * requested slices.
+ * @brief Allocates a slice across the requested number of replicas
+ * using best-effort semantics.
*
* The allocation follows best-effort semantics: if the full requested
* replica count cannot be satisfied, the method will allocate as many
@@ -44,7 +42,7 @@ class AllocationStrategy {
* @param allocators_by_name Container of mounted allocators, key is
* segment_name, value is the corresponding
* allocators
- * @param slice_sizes Sizes of slices to be allocated in each replica
+ * @param slice_length Length of the slice to be allocated
* @param config Replica configuration containing number of replicas and
* placement constraints
* @return tl::expected, ErrorCode> containing
@@ -60,8 +58,7 @@ class AllocationStrategy {
const std::unordered_map<
std::string, std::vector>>&
allocators_by_name,
- const std::vector& slice_sizes,
- const ReplicateConfig& config) = 0;
+ const size_t slice_length, const ReplicateConfig& config) = 0;
};
/**
@@ -87,189 +84,110 @@ class RandomAllocationStrategy : public AllocationStrategy {
const std::unordered_map<
std::string, std::vector>>&
allocators_by_name,
- const std::vector& slice_sizes, const ReplicateConfig& config) {
- if (auto validation_error =
- validateInput(slice_sizes, config.replica_num)) {
- return tl::make_unexpected(*validation_error);
+ const size_t slice_length, const ReplicateConfig& config) {
+ // Validate input parameters
+ if (slice_length == 0 || config.replica_num == 0) {
+ return tl::make_unexpected(ErrorCode::INVALID_PARAMS);
}
- std::vector>>
- replica_buffers(config.replica_num);
- for (auto& replica_buffer : replica_buffers) {
- replica_buffer.reserve(slice_sizes.size());
- }
-
- // Track the actual number of replicas we can allocate
- size_t actual_replica_count = config.replica_num;
-
- // Allocate each slice across replicas
- for (size_t slice_idx = 0; slice_idx < slice_sizes.size();
- ++slice_idx) {
- auto slice_replicas = allocateSlice(allocators, allocators_by_name,
- slice_sizes[slice_idx],
- actual_replica_count, config);
-
- if (slice_replicas.empty()) {
- return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE);
- }
-
- if (slice_replicas.size() < actual_replica_count) {
- actual_replica_count = slice_replicas.size();
- // NOTE: replica allocation is best effort
- VLOG(1) << "Failed to allocate all replicas for slice "
- << slice_idx << ", reducing replica count to "
- << actual_replica_count;
-
- // Resize replica_buffers to match the new count
- replica_buffers.resize(actual_replica_count);
- }
-
- for (size_t replica_idx = 0; replica_idx < actual_replica_count;
- ++replica_idx) {
- replica_buffers[replica_idx].push_back(
- std::move(slice_replicas[replica_idx]));
+ // Fast path: single allocator case
+ if (allocators.size() == 1) {
+ if (auto buffer = allocators[0]->allocate(slice_length)) {
+ std::vector result;
+ result.emplace_back(std::move(buffer),
+ ReplicaStatus::PROCESSING);
+ return result;
}
+ return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE);
}
std::vector replicas;
- replicas.reserve(actual_replica_count);
- for (size_t replica_idx = 0; replica_idx < actual_replica_count;
- ++replica_idx) {
- replicas.emplace_back(std::move(replica_buffers[replica_idx]),
- ReplicaStatus::PROCESSING);
- }
-
- return replicas;
- }
-
- std::optional validateInput(
- const std::vector& slice_sizes, size_t replica_num) const {
- if (replica_num == 0 || slice_sizes.empty() ||
- std::count(slice_sizes.begin(), slice_sizes.end(), 0) > 0) {
- return ErrorCode::INVALID_PARAMS;
- }
-
- return std::nullopt;
- }
-
- /**
- * @brief Allocates replicas for a single slice across different segments
- */
- std::vector> allocateSlice(
- const std::vector>& allocators,
- const std::unordered_map<
- std::string, std::vector>>&
- allocators_by_name,
- size_t slice_size, size_t replica_num, const ReplicateConfig& config,
- std::unordered_set& used_segments) {
- std::vector> buffers;
- buffers.reserve(replica_num);
-
- for (size_t i = 0; i < replica_num; ++i) {
- auto buffer =
- allocateSingleBuffer(allocators, allocators_by_name, slice_size,
- config, used_segments);
+ replicas.reserve(config.replica_num);
- if (!buffer) {
- break;
- }
-
- used_segments.insert(buffer->getSegmentName());
- buffers.push_back(std::move(buffer));
- }
-
- return buffers;
- }
-
- std::vector> allocateSlice(
- const std::vector>& allocators,
- const std::unordered_map<
- std::string, std::vector>>&
- allocators_by_name,
- size_t slice_size, size_t replica_num, const ReplicateConfig& config) {
- std::unordered_set empty_segments;
- return allocateSlice(allocators, allocators_by_name, slice_size,
- replica_num, config, empty_segments);
- }
-
- /**
- * @brief Allocates a single buffer respecting preferences and exclusions
- */
- std::unique_ptr allocateSingleBuffer(
- const std::vector>& allocators,
- const std::unordered_map<
- std::string, std::vector>>&
- allocators_by_name,
- size_t size, const ReplicateConfig& config,
- const std::unordered_set& excluded_segments) {
- // Try preferred segment first
- if (!config.preferred_segment.empty() &&
- !excluded_segments.contains(config.preferred_segment)) {
+ // Try preferred segment first if specified
+ if (!config.preferred_segment.empty()) {
auto preferred_it =
allocators_by_name.find(config.preferred_segment);
if (preferred_it != allocators_by_name.end()) {
for (auto& allocator : preferred_it->second) {
- if (auto buffer = allocator->allocate(size)) {
- return buffer;
+ if (auto buffer = allocator->allocate(slice_length)) {
+ replicas.emplace_back(std::move(buffer),
+ ReplicaStatus::PROCESSING);
+ break;
}
}
}
}
- return tryRandomAllocate(allocators, size, excluded_segments);
- }
-
- /**
- * @brief Attempts allocation with random selection from allocators that can
- * fit the size
- */
- std::unique_ptr tryRandomAllocate(
- const std::vector>& allocators,
- size_t size, const std::unordered_set& excluded_segments) {
- std::vector eligible_indices;
- eligible_indices.reserve(allocators.size());
- for (size_t i = 0; i < allocators.size(); ++i) {
- if (!excluded_segments.contains(allocators[i]->getSegmentName()) &&
- allocators[i]->getLargestFreeRegion() >= size) {
- eligible_indices.push_back(i);
- }
+ if (replicas.size() == config.replica_num) {
+ return replicas;
}
- if (eligible_indices.empty()) {
- return nullptr;
+ // If replica_num is not satisfied, allocate the remaining replicas
+ // randomly Randomly select a starting point from allocators_by_name
+ if (allocators_by_name.empty()) {
+ if (replicas.empty()) {
+ return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE);
+ }
+ return replicas;
}
- // Thread-local random number generator for thread safety
- thread_local std::mt19937 rng(std::random_device{}());
- std::shuffle(eligible_indices.begin(), eligible_indices.end(), rng);
+ static thread_local std::mt19937 generator(clock());
+ std::uniform_int_distribution distribution(
+ 0, allocators_by_name.size() - 1);
+ size_t start_idx = distribution(generator);
+
+ // Get iterator to the starting point
+ auto start_it = allocators_by_name.begin();
+ std::advance(start_it, start_idx);
+
+ auto it = start_it;
+ size_t max_retry = std::min(kMaxRetryLimit, allocators_by_name.size());
+ size_t retry_count = 0;
+
+ // Try to allocate remaining replicas, starting from random position
+ // TODO: Change the segment data structure to avoid traversing the
+ // entire map every time
+ while (replicas.size() < config.replica_num &&
+ retry_count < max_retry) {
+ // Skip preferred segment if it was already allocated
+ if (it->first != config.preferred_segment) {
+ // Try each allocator in this segment
+ bool allocated = false;
+ for (auto& allocator : it->second) {
+ if (auto buffer = allocator->allocate(slice_length)) {
+ replicas.emplace_back(std::move(buffer),
+ ReplicaStatus::PROCESSING);
+ // Allocate at most one replica per segment
+ allocated = true;
+ break;
+ }
+ }
+ if (!allocated) {
+ ++retry_count;
+ }
+ }
+ // Move to next segment (circular)
+ ++it;
+ if (it == allocators_by_name.end()) {
+ it = allocators_by_name.begin();
+ }
- const size_t max_tries =
- std::min(kMaxRetryLimit, eligible_indices.size());
- for (size_t i = 0; i < max_tries; ++i) {
- auto& allocator = allocators[eligible_indices[i]];
- if (auto buffer = allocator->allocate(size)) {
- return buffer;
+ // If we have cycled through all segments, break
+ if (it == start_it) {
+ break;
}
- retry_counter_.fetch_add(1); // Track allocation attempts
}
- return nullptr;
+ // Return allocated replicas (may be fewer than requested)
+ if (replicas.empty()) {
+ return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE);
+ }
+ return replicas;
}
- /**
- * @brief Get the number of allocation retry attempts
- */
- uint64_t getRetryCount() const { return retry_counter_.load(); }
-
- /**
- * @brief Reset the retry counter
- */
- void resetRetryCount() { retry_counter_.store(0); }
-
private:
- static constexpr size_t kMaxRetryLimit = 10;
- // Observer for allocation retries
- std::atomic_uint64_t retry_counter_{0};
+ static constexpr size_t kMaxRetryLimit = 100;
};
-} // namespace mooncake
+} // namespace mooncake
\ No newline at end of file
diff --git a/mooncake-store/include/client.h b/mooncake-store/include/client.h
index bb19d0ccf..2aefd76c7 100644
--- a/mooncake-store/include/client.h
+++ b/mooncake-store/include/client.h
@@ -19,6 +19,7 @@
#include "transfer_task.h"
#include "types.h"
#include "replica.h"
+#include "master_metric_manager.h"
namespace mooncake {
@@ -254,6 +255,11 @@ class Client {
return metrics_->summary_metrics();
}
+ tl::expected
+ CalcCacheStats() {
+ return master_client_.CalcCacheStats();
+ }
+
// For Prometheus-style metrics
tl::expected SerializeMetrics() {
if (metrics_ == nullptr) {
@@ -283,6 +289,7 @@ class Client {
const std::string& local_hostname,
const std::string& metadata_connstring, const std::string& protocol,
const std::optional& device_names);
+ void InitTransferSubmitter();
ErrorCode TransferData(const Replica::Descriptor& replica_descriptor,
std::vector& slices,
TransferRequest::OpCode op_code);
@@ -333,6 +340,9 @@ class Client {
const std::vector& query_results,
std::unordered_map>& slices);
+ // Client identification
+ const UUID client_id_;
+
// Client-side metrics
std::unique_ptr metrics_;
@@ -358,10 +368,6 @@ class Client {
std::thread ping_thread_;
std::atomic ping_running_{false};
void PingThreadMain(bool is_ha_mode, std::string current_master_address);
-
- // Client identification
- UUID client_id_;
- bool te_initialized_{false};
};
} // namespace mooncake
diff --git a/mooncake-store/include/client_buffer.hpp b/mooncake-store/include/client_buffer.hpp
index 0a1ca1e4c..79c393604 100644
--- a/mooncake-store/include/client_buffer.hpp
+++ b/mooncake-store/include/client_buffer.hpp
@@ -103,11 +103,11 @@ uint64_t calculate_total_size(const Replica::Descriptor& replica);
* @brief Allocate slices from a buffer handle based on replica descriptor
* @param slices Output vector to store the allocated slices
* @param replica The replica descriptor defining the slice structure
- * @param buffer_handle The buffer handle to allocate slices from
+ * @param buffer_ptr The buffer pointer to allocate slices from
* @return 0 on success, non-zero on error
*/
int allocateSlices(std::vector& slices,
const Replica::Descriptor& replica,
- BufferHandle& buffer_handle);
+ void* buffer_ptr);
} // namespace mooncake
\ No newline at end of file
diff --git a/mooncake-store/include/master_client.h b/mooncake-store/include/master_client.h
index c7f26ecc8..8c1d20cb8 100644
--- a/mooncake-store/include/master_client.h
+++ b/mooncake-store/include/master_client.h
@@ -11,6 +11,7 @@
#include "replica.h"
#include "types.h"
#include "rpc_types.h"
+#include "master_metric_manager.h"
namespace mooncake {
@@ -21,7 +22,8 @@ static const std::string kDefaultMasterAddress = "localhost:50051";
*/
class MasterClient {
public:
- MasterClient(MasterClientMetric* metrics = nullptr) : metrics_(metrics) {
+ MasterClient(const UUID& client_id, MasterClientMetric* metrics = nullptr)
+ : client_id_(client_id), metrics_(metrics) {
coro_io::client_pool::pool_config
pool_conf{};
const char* value = std::getenv("MC_RPC_PROTOCOL");
@@ -62,6 +64,14 @@ class MasterClient {
[[nodiscard]] std::vector> BatchExistKey(
const std::vector& object_keys);
+ /**
+ * @brief Calculate cache hit rate metrics
+ * @param object_keys None
+ * @return Map containing metrics
+ */
+ [[nodiscard]] tl::expected
+ CalcCacheStats();
+
/**
* @brief Gets object metadata without transferring data
* @param object_key Key to query
@@ -179,11 +189,10 @@ class MasterClient {
/**
* @brief Registers a segment to master for allocation
* @param segment Segment to register
- * @param client_id The uuid of the client
* @return tl::expected indicating success/failure
*/
[[nodiscard]] tl::expected MountSegment(
- const Segment& segment, const UUID& client_id);
+ const Segment& segment);
/**
* @brief Re-mount segments, invoked when the client is the first time to
@@ -191,20 +200,18 @@ class MasterClient {
* to remount. This function is idempotent. Client should retry if the
* return code is not ErrorCode::OK.
* @param segments Segments to remount
- * @param client_id The uuid of the client
* @return tl::expected indicating success/failure
*/
[[nodiscard]] tl::expected ReMountSegment(
- const std::vector& segments, const UUID& client_id);
+ const std::vector& segments);
/**
* @brief Unregisters a memory segment from master
* @param segment_id ID of the segment to unmount
- * @param client_id The uuid of the client
* @return tl::expected indicating success/failure
*/
[[nodiscard]] tl::expected UnmountSegment(
- const UUID& segment_id, const UUID& client_id);
+ const UUID& segment_id);
/**
* @brief Gets the cluster ID for the current client to use as subdirectory
@@ -215,12 +222,10 @@ class MasterClient {
/**
* @brief Pings master to check its availability
- * @param client_id The uuid of the client
* @return tl::expected
* containing view version and client status
*/
- [[nodiscard]] tl::expected Ping(
- const UUID& client_id);
+ [[nodiscard]] tl::expected Ping();
private:
/**
@@ -275,6 +280,9 @@ class MasterClient {
};
RpcClientAccessor client_accessor_;
+ // The client identification.
+ const UUID client_id_;
+
// Metrics for tracking RPC operations
MasterClientMetric* metrics_;
std::shared_ptr>
diff --git a/mooncake-store/include/master_config.h b/mooncake-store/include/master_config.h
index c3fdbf3bc..025e111e3 100644
--- a/mooncake-store/include/master_config.h
+++ b/mooncake-store/include/master_config.h
@@ -37,6 +37,9 @@ struct MasterConfig {
bool enable_http_metadata_server;
uint32_t http_metadata_server_port;
std::string http_metadata_server_host;
+
+ uint64_t put_start_discard_timeout_sec;
+ uint64_t put_start_release_timeout_sec;
};
class MasterServiceSupervisorConfig {
@@ -66,6 +69,8 @@ class MasterServiceSupervisorConfig {
std::string root_fs_dir = DEFAULT_ROOT_FS_DIR;
int64_t global_file_segment_size = DEFAULT_GLOBAL_FILE_SEGMENT_SIZE;
BufferAllocatorType memory_allocator = BufferAllocatorType::OFFSET;
+ uint64_t put_start_discard_timeout_sec = DEFAULT_PUT_START_DISCARD_TIMEOUT;
+ uint64_t put_start_release_timeout_sec = DEFAULT_PUT_START_RELEASE_TIMEOUT;
MasterServiceSupervisorConfig() = default;
@@ -102,6 +107,9 @@ class MasterServiceSupervisorConfig {
memory_allocator = BufferAllocatorType::OFFSET;
}
+ put_start_discard_timeout_sec = config.put_start_discard_timeout_sec;
+ put_start_release_timeout_sec = config.put_start_release_timeout_sec;
+
validate();
}
@@ -166,6 +174,8 @@ class WrappedMasterServiceConfig {
std::string root_fs_dir = DEFAULT_ROOT_FS_DIR;
int64_t global_file_segment_size = DEFAULT_GLOBAL_FILE_SEGMENT_SIZE;
BufferAllocatorType memory_allocator = BufferAllocatorType::OFFSET;
+ uint64_t put_start_discard_timeout_sec = DEFAULT_PUT_START_DISCARD_TIMEOUT;
+ uint64_t put_start_release_timeout_sec = DEFAULT_PUT_START_RELEASE_TIMEOUT;
WrappedMasterServiceConfig() = default;
@@ -196,6 +206,9 @@ class WrappedMasterServiceConfig {
} else {
memory_allocator = mooncake::BufferAllocatorType::OFFSET;
}
+
+ put_start_discard_timeout_sec = config.put_start_discard_timeout_sec;
+ put_start_release_timeout_sec = config.put_start_release_timeout_sec;
}
// From MasterServiceSupervisorConfig, enable_ha is set to true
@@ -221,6 +234,8 @@ class WrappedMasterServiceConfig {
root_fs_dir = config.root_fs_dir;
global_file_segment_size = config.global_file_segment_size;
memory_allocator = config.memory_allocator;
+ put_start_discard_timeout_sec = config.put_start_discard_timeout_sec;
+ put_start_release_timeout_sec = config.put_start_release_timeout_sec;
}
};
@@ -244,6 +259,8 @@ class MasterServiceConfigBuilder {
std::string root_fs_dir_ = DEFAULT_ROOT_FS_DIR;
int64_t global_file_segment_size_ = DEFAULT_GLOBAL_FILE_SEGMENT_SIZE;
BufferAllocatorType memory_allocator_ = BufferAllocatorType::OFFSET;
+ uint64_t put_start_discard_timeout_sec_ = DEFAULT_PUT_START_DISCARD_TIMEOUT;
+ uint64_t put_start_release_timeout_sec_ = DEFAULT_PUT_START_RELEASE_TIMEOUT;
public:
MasterServiceConfigBuilder() = default;
@@ -312,6 +329,18 @@ class MasterServiceConfigBuilder {
return *this;
}
+ MasterServiceConfigBuilder& set_put_start_discard_timeout_sec(
+ uint64_t put_start_discard_timeout_sec) {
+ put_start_discard_timeout_sec_ = put_start_discard_timeout_sec;
+ return *this;
+ }
+
+ MasterServiceConfigBuilder& set_put_start_release_timeout_sec(
+ uint64_t put_start_release_timeout_sec) {
+ put_start_release_timeout_sec_ = put_start_release_timeout_sec;
+ return *this;
+ }
+
MasterServiceConfig build() const;
};
@@ -331,6 +360,8 @@ class MasterServiceConfig {
std::string root_fs_dir = DEFAULT_ROOT_FS_DIR;
int64_t global_file_segment_size = DEFAULT_GLOBAL_FILE_SEGMENT_SIZE;
BufferAllocatorType memory_allocator = BufferAllocatorType::OFFSET;
+ uint64_t put_start_discard_timeout_sec = DEFAULT_PUT_START_DISCARD_TIMEOUT;
+ uint64_t put_start_release_timeout_sec = DEFAULT_PUT_START_RELEASE_TIMEOUT;
MasterServiceConfig() = default;
@@ -349,6 +380,8 @@ class MasterServiceConfig {
root_fs_dir = config.root_fs_dir;
global_file_segment_size = config.global_file_segment_size;
memory_allocator = config.memory_allocator;
+ put_start_discard_timeout_sec = config.put_start_discard_timeout_sec;
+ put_start_release_timeout_sec = config.put_start_release_timeout_sec;
}
// Static factory method to create a builder
@@ -370,6 +403,8 @@ inline MasterServiceConfig MasterServiceConfigBuilder::build() const {
config.root_fs_dir = root_fs_dir_;
config.global_file_segment_size = global_file_segment_size_;
config.memory_allocator = memory_allocator_;
+ config.put_start_discard_timeout_sec = put_start_discard_timeout_sec_;
+ config.put_start_release_timeout_sec = put_start_release_timeout_sec_;
return config;
}
diff --git a/mooncake-store/include/master_metric_manager.h b/mooncake-store/include/master_metric_manager.h
index e11ceaa33..bcfdca871 100644
--- a/mooncake-store/include/master_metric_manager.h
+++ b/mooncake-store/include/master_metric_manager.h
@@ -19,6 +19,39 @@ class MasterMetricManager {
MasterMetricManager(MasterMetricManager&&) = delete;
MasterMetricManager& operator=(MasterMetricManager&&) = delete;
+ // Memory Storage Metrics(global & segment)
+ void inc_allocated_mem_size(const std::string& segment, int64_t val = 1);
+ void dec_allocated_mem_size(const std::string& segment, int64_t val = 1);
+ void reset_allocated_mem_size();
+ void inc_total_mem_capacity(const std::string& segment, int64_t val = 1);
+ void dec_total_mem_capacity(const std::string& segment, int64_t val = 1);
+ void reset_total_mem_capacity();
+ double get_global_mem_used_ratio(void);
+
+ void inc_mem_cache_hit_nums(int64_t val = 1);
+ void inc_file_cache_hit_nums(int64_t val = 1);
+ void inc_mem_cache_nums(int64_t val = 1);
+ void inc_file_cache_nums(int64_t val = 1);
+ void dec_mem_cache_nums(int64_t val = 1);
+ void dec_file_cache_nums(int64_t val = 1);
+
+ void inc_valid_get_nums(int64_t val = 1);
+ void inc_total_get_nums(int64_t val = 1);
+
+ enum class CacheHitStat {
+ MEMORY_HITS,
+ SSD_HITS,
+ MEMORY_TOTAL,
+ SSD_TOTAL,
+ MEMORY_HIT_RATE,
+ SSD_HIT_RATE,
+ OVERALL_HIT_RATE,
+ VALID_GET_RATE
+ };
+ using CacheHitStatDict = std::unordered_map;
+ void add_stat_to_dict(CacheHitStatDict&, CacheHitStat, double);
+ CacheHitStatDict calculate_cache_stats();
+
// Memory Storage Metrics
void inc_allocated_mem_size(int64_t val = 1);
void dec_allocated_mem_size(int64_t val = 1);
@@ -26,7 +59,9 @@ class MasterMetricManager {
void dec_total_mem_capacity(int64_t val = 1);
int64_t get_allocated_mem_size();
int64_t get_total_mem_capacity();
- double get_global_mem_used_ratio(void);
+ double get_segment_mem_used_ratio(const std::string& segment);
+ int64_t get_segment_allocated_mem_size(const std::string& segment);
+ int64_t get_segment_total_mem_capacity(const std::string& segment);
// File Storage Metrics
void inc_allocated_file_size(int64_t val = 1);
@@ -161,6 +196,15 @@ class MasterMetricManager {
int64_t get_evicted_key_count();
int64_t get_evicted_size();
+ // PutStart Discard Metrics
+ void inc_put_start_discard_cnt(int64_t count, int64_t size);
+ void inc_put_start_release_cnt(int64_t count, int64_t size);
+
+ // PutStart Discard Metrics Getters
+ int64_t get_put_start_discard_cnt();
+ int64_t get_put_start_release_cnt();
+ int64_t get_put_start_discarded_staging_size();
+
// --- Serialization ---
/**
* @brief Serializes all managed metrics into Prometheus text format.
@@ -185,8 +229,16 @@ class MasterMetricManager {
// --- Metric Members ---
// Memory Storage Metrics
- ylt::metric::gauge_t mem_allocated_size_; // Use update for gauge
- ylt::metric::gauge_t mem_total_capacity_; // Use update for gauge
+ ylt::metric::gauge_t
+ mem_allocated_size_; // Overall memory usage update for gauge
+ ylt::metric::gauge_t
+ mem_total_capacity_; // Overall memory capacity update for gauge
+ ylt::metric::dynamic_gauge_1t
+ mem_allocated_size_per_segment_; // Segment memory usage update for
+ // gauge
+ ylt::metric::dynamic_gauge_1t
+ mem_total_capacity_per_segment_; // Segment memory capacity update for
+ // gauge
// File Storage Metrics
ylt::metric::gauge_t file_allocated_size_;
@@ -255,12 +307,36 @@ class MasterMetricManager {
ylt::metric::counter_t batch_put_revoke_items_;
ylt::metric::counter_t batch_put_revoke_failed_items_;
+ // cache hit Statistics
+ ylt::metric::counter_t mem_cache_hit_nums_;
+ ylt::metric::counter_t file_cache_hit_nums_;
+ ylt::metric::gauge_t mem_cache_nums_;
+ ylt::metric::gauge_t file_cache_nums_;
+
+ ylt::metric::counter_t valid_get_nums_;
+ ylt::metric::counter_t total_get_nums_;
+
+ static const inline std::unordered_map
+ stat_names_ = {{CacheHitStat::MEMORY_HITS, "memory_hits"},
+ {CacheHitStat::SSD_HITS, "ssd_hits"},
+ {CacheHitStat::MEMORY_TOTAL, "memory_total"},
+ {CacheHitStat::SSD_TOTAL, "ssd_total"},
+ {CacheHitStat::MEMORY_HIT_RATE, "memory_hit_rate"},
+ {CacheHitStat::SSD_HIT_RATE, "ssd_hit_rate"},
+ {CacheHitStat::OVERALL_HIT_RATE, "overall_hit_rate"},
+ {CacheHitStat::VALID_GET_RATE, "valid_get_rate"}};
+
// Eviction Metrics
ylt::metric::counter_t eviction_success_;
ylt::metric::counter_t eviction_attempts_;
ylt::metric::counter_t evicted_key_count_;
ylt::metric::counter_t evicted_size_;
+ // PutStart Discard Metrics
+ ylt::metric::counter_t put_start_discard_cnt_;
+ ylt::metric::counter_t put_start_release_cnt_;
+ ylt::metric::gauge_t put_start_discarded_staging_size_;
+
// Some metrics are used only in HA mode. Use a flag to control the output
// content.
bool enable_ha_{false};
diff --git a/mooncake-store/include/master_service.h b/mooncake-store/include/master_service.h
index 28c318bbe..348407c99 100644
--- a/mooncake-store/include/master_service.h
+++ b/mooncake-store/include/master_service.h
@@ -5,6 +5,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -132,14 +133,14 @@ class MasterService {
/**
* @brief Start a put operation for an object
- * @param[out] replica_list Vector to store replica information for slices
+ * @param[out] replica_list Vector to store replica information for the
+ * slice
* @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if exists,
* ErrorCode::NO_AVAILABLE_HANDLE if allocation fails,
* ErrorCode::INVALID_PARAMS if slice size is invalid
*/
- auto PutStart(const std::string& key,
- const std::vector& slice_lengths,
- const ReplicateConfig& config)
+ auto PutStart(const UUID& client_id, const std::string& key,
+ const uint64_t slice_length, const ReplicateConfig& config)
-> tl::expected, ErrorCode>;
/**
@@ -148,8 +149,8 @@ class MasterService {
* @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if not
* found, ErrorCode::INVALID_WRITE if replica status is invalid
*/
- auto PutEnd(const std::string& key, ReplicaType replica_type)
- -> tl::expected;
+ auto PutEnd(const UUID& client_id, const std::string& key,
+ ReplicaType replica_type) -> tl::expected;
/**
* @brief Revoke a put operation, replica_type indicates the type of
@@ -157,8 +158,8 @@ class MasterService {
* @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if not
* found, ErrorCode::INVALID_WRITE if replica status is invalid
*/
- auto PutRevoke(const std::string& key, ReplicaType replica_type)
- -> tl::expected;
+ auto PutRevoke(const UUID& client_id, const std::string& key,
+ ReplicaType replica_type) -> tl::expected;
/**
* @brief Complete a batch of put operations
@@ -166,7 +167,7 @@ class MasterService {
* found, ErrorCode::INVALID_WRITE if replica status is invalid
*/
std::vector> BatchPutEnd(
- const std::vector& keys);
+ const UUID& client_id, const std::vector& keys);
/**
* @brief Revoke a batch of put operations
@@ -174,7 +175,7 @@ class MasterService {
* found, ErrorCode::INVALID_WRITE if replica status is invalid
*/
std::vector> BatchPutRevoke(
- const std::vector& keys);
+ const UUID& client_id, const std::vector& keys);
/**
* @brief Remove an object and its replicas
@@ -245,15 +246,18 @@ class MasterService {
if (soft_pin_timeout) {
MasterMetricManager::instance().dec_soft_pin_key_count(1);
}
- MasterMetricManager::instance().dec_allocated_file_size(
- disk_replica_size);
}
ObjectMetadata() = delete;
- ObjectMetadata(size_t value_length, std::vector&& reps,
- bool enable_soft_pin)
- : replicas(std::move(reps)),
+ ObjectMetadata(
+ const UUID& client_id_,
+ const std::chrono::steady_clock::time_point put_start_time_,
+ size_t value_length, std::vector&& reps,
+ bool enable_soft_pin)
+ : client_id(client_id_),
+ put_start_time(put_start_time_),
+ replicas(std::move(reps)),
size(value_length),
lease_timeout(),
soft_pin_timeout(std::nullopt) {
@@ -263,16 +267,6 @@ class MasterService {
MasterMetricManager::instance().inc_soft_pin_key_count(1);
}
MasterMetricManager::instance().observe_value_size(value_length);
- // Automatic update allocated_file_size via RAII
- for (const auto& replica : replicas) {
- if (replica.is_disk_replica()) {
- disk_replica_size += replica.get_descriptor()
- .get_disk_descriptor()
- .object_size;
- }
- }
- MasterMetricManager::instance().inc_allocated_file_size(
- disk_replica_size);
}
ObjectMetadata(const ObjectMetadata&) = delete;
@@ -280,6 +274,9 @@ class MasterService {
ObjectMetadata(ObjectMetadata&&) = delete;
ObjectMetadata& operator=(ObjectMetadata&&) = delete;
+ const UUID client_id;
+ const std::chrono::steady_clock::time_point put_start_time;
+
std::vector replicas;
size_t size;
// Default constructor, creates a time_point representing
@@ -287,7 +284,6 @@ class MasterService {
std::chrono::steady_clock::time_point lease_timeout; // hard lease
std::optional
soft_pin_timeout; // optional soft pin, only set for vip objects
- uint64_t disk_replica_size = 0;
// Check if there are some replicas with a different status than the
// given value. If there are, return the status of the first replica
@@ -374,6 +370,31 @@ class MasterService {
return replica.status() == ReplicaStatus::COMPLETE;
});
}
+
+ bool HasCompletedReplicas() const {
+ return std::any_of(
+ replicas.begin(), replicas.end(), [](const Replica& replica) {
+ return replica.status() == ReplicaStatus::COMPLETE;
+ });
+ }
+
+ std::vector DiscardProcessingReplicas() {
+ auto partition_point = std::partition(
+ replicas.begin(), replicas.end(), [](const Replica& replica) {
+ return replica.status() != ReplicaStatus::PROCESSING;
+ });
+
+ std::vector discarded_replicas;
+ if (partition_point != replicas.end()) {
+ discarded_replicas.reserve(
+ std::distance(partition_point, replicas.end()));
+ std::move(partition_point, replicas.end(),
+ std::back_inserter(discarded_replicas));
+ replicas.erase(partition_point, replicas.end());
+ }
+
+ return discarded_replicas;
+ }
};
static constexpr size_t kNumShards = 1024; // Number of metadata shards
@@ -383,6 +404,7 @@ class MasterService {
mutable Mutex mutex;
std::unordered_map metadata
GUARDED_BY(mutex);
+ std::unordered_set processing_keys GUARDED_BY(mutex);
};
std::array metadata_shards_;
@@ -394,6 +416,19 @@ class MasterService {
// Helper to clean up stale handles pointing to unmounted segments
bool CleanupStaleHandles(ObjectMetadata& metadata);
+ /**
+ * @brief Helper to discard expired processing keys.
+ */
+ void DiscardExpiredProcessingKeys(
+ MetadataShard& shard, const std::chrono::steady_clock::time_point& now);
+
+ /**
+ * @brief Helper to release space of expired discarded replicas.
+ * @return Number of released objects that have memory replicas
+ */
+ uint64_t ReleaseExpiredDiscardedReplicas(
+ const std::chrono::steady_clock::time_point& now);
+
// Eviction thread function
void EvictionThreadFunc();
@@ -421,20 +456,29 @@ class MasterService {
: service_(service),
key_(key),
shard_idx_(service_->getShardIndex(key)),
- lock_(&service_->metadata_shards_[shard_idx_].mutex),
- it_(service_->metadata_shards_[shard_idx_].metadata.find(key)) {
+ shard_(service_->metadata_shards_[shard_idx_]),
+ lock_(&shard_.mutex),
+ it_(shard_.metadata.find(key)),
+ processing_it_(shard_.processing_keys.find(key)) {
// Automatically clean up invalid handles
- if (it_ != service_->metadata_shards_[shard_idx_].metadata.end()) {
+ if (it_ != shard_.metadata.end()) {
if (service_->CleanupStaleHandles(it_->second)) {
- service_->metadata_shards_[shard_idx_].metadata.erase(it_);
- it_ = service_->metadata_shards_[shard_idx_].metadata.end();
+ this->Erase();
+
+ if (processing_it_ != shard_.processing_keys.end()) {
+ this->EraseFromProcessing();
+ }
}
}
}
// Check if metadata exists
bool Exists() const NO_THREAD_SAFETY_ANALYSIS {
- return it_ != service_->metadata_shards_[shard_idx_].metadata.end();
+ return it_ != shard_.metadata.end();
+ }
+
+ bool InProcessing() const NO_THREAD_SAFETY_ANALYSIS {
+ return processing_it_ != shard_.processing_keys.end();
}
// Get metadata (only call when Exists() is true)
@@ -442,16 +486,23 @@ class MasterService {
// Delete current metadata (for PutRevoke or Remove operations)
void Erase() NO_THREAD_SAFETY_ANALYSIS {
- service_->metadata_shards_[shard_idx_].metadata.erase(it_);
- it_ = service_->metadata_shards_[shard_idx_].metadata.end();
+ shard_.metadata.erase(it_);
+ it_ = shard_.metadata.end();
+ }
+
+ void EraseFromProcessing() NO_THREAD_SAFETY_ANALYSIS {
+ shard_.processing_keys.erase(processing_it_);
+ processing_it_ = shard_.processing_keys.end();
}
private:
MasterService* service_;
std::string key_;
size_t shard_idx_;
+ MetadataShard& shard_;
MutexLocker lock_;
std::unordered_map::iterator it_;
+ std::unordered_set::iterator processing_it_;
};
friend class MetadataAccessor;
@@ -493,6 +544,43 @@ class MasterService {
SegmentManager segment_manager_;
BufferAllocatorType memory_allocator_type_;
std::shared_ptr allocation_strategy_;
+
+ // Discarded replicas management
+ const std::chrono::seconds put_start_discard_timeout_sec_;
+ const std::chrono::seconds put_start_release_timeout_sec_;
+ class DiscardedReplicas {
+ public:
+ DiscardedReplicas() = delete;
+
+ DiscardedReplicas(std::vector&& replicas,
+ std::chrono::steady_clock::time_point ttl)
+ : replicas_(std::move(replicas)), ttl_(ttl), mem_size_(0) {
+ for (auto& replica : replicas_) {
+ mem_size_ += replica.get_memory_buffer_size();
+ }
+ MasterMetricManager::instance().inc_put_start_discard_cnt(
+ 1, mem_size_);
+ }
+
+ ~DiscardedReplicas() {
+ MasterMetricManager::instance().inc_put_start_release_cnt(
+ 1, mem_size_);
+ }
+
+ uint64_t memSize() const { return mem_size_; }
+
+ bool isExpired(const std::chrono::steady_clock::time_point& now) const {
+ return ttl_ <= now;
+ }
+
+ private:
+ std::vector replicas_;
+ std::chrono::steady_clock::time_point ttl_;
+ uint64_t mem_size_;
+ };
+ std::mutex discarded_replicas_mutex_;
+ std::list discarded_replicas_
+ GUARDED_BY(discarded_replicas_mutex_);
};
-} // namespace mooncake
+} // namespace mooncake
\ No newline at end of file
diff --git a/mooncake-store/include/replica.h b/mooncake-store/include/replica.h
index c37e6149a..740d0f6df 100644
--- a/mooncake-store/include/replica.h
+++ b/mooncake-store/include/replica.h
@@ -87,7 +87,7 @@ struct ReplicateConfig {
};
struct MemoryReplicaData {
- std::vector> buffers;
+ std::unique_ptr buffer;
};
struct DiskReplicaData {
@@ -96,8 +96,8 @@ struct DiskReplicaData {
};
struct MemoryDescriptor {
- std::vector buffer_descriptors;
- YLT_REFL(MemoryDescriptor, buffer_descriptors);
+ AllocatedBuffer::Descriptor buffer_descriptor;
+ YLT_REFL(MemoryDescriptor, buffer_descriptor);
};
struct DiskDescriptor {
@@ -111,14 +111,57 @@ class Replica {
struct Descriptor;
// memory replica constructor
- Replica(std::vector> buffers,
- ReplicaStatus status)
- : data_(MemoryReplicaData{std::move(buffers)}), status_(status) {}
+ Replica(std::unique_ptr buffer, ReplicaStatus status)
+ : data_(MemoryReplicaData{std::move(buffer)}), status_(status) {}
// disk replica constructor
Replica(std::string file_path, uint64_t object_size, ReplicaStatus status)
: data_(DiskReplicaData{std::move(file_path), object_size}),
- status_(status) {}
+ status_(status) {
+ // Automatic update allocated_file_size via RAII
+ MasterMetricManager::instance().inc_allocated_file_size(object_size);
+ }
+
+ ~Replica() {
+ if (status_ != ReplicaStatus::UNDEFINED && is_disk_replica()) {
+ const auto& disk_data = std::get(data_);
+ MasterMetricManager::instance().dec_allocated_file_size(
+ disk_data.object_size);
+ }
+ }
+
+ // Copy-construction is not allowed.
+ Replica(const Replica&) = delete;
+ Replica& operator=(const Replica&) = delete;
+
+ // Move-construction is allowed.
+ Replica(Replica&& src) noexcept
+ : data_(std::move(src.data_)), status_(src.status_) {
+ // Mark the source as moved-from so its destructor doesn't
+ // double-decrement metrics.
+ src.status_ = ReplicaStatus::UNDEFINED;
+ }
+
+ Replica& operator=(Replica&& src) noexcept {
+ if (this == &src) {
+ // Same object, skip moving.
+ return *this;
+ }
+
+ // Decrement metric for the current object before overwriting.
+ if (status_ != ReplicaStatus::UNDEFINED && is_disk_replica()) {
+ const auto& disk_data = std::get(data_);
+ MasterMetricManager::instance().dec_allocated_file_size(
+ disk_data.object_size);
+ }
+
+ data_ = std::move(src.data_);
+ status_ = src.status_;
+ // Mark src as moved-from.
+ src.status_ = ReplicaStatus::UNDEFINED;
+
+ return *this;
+ }
[[nodiscard]] Descriptor get_descriptor() const;
@@ -139,15 +182,21 @@ class Replica {
[[nodiscard]] bool has_invalid_mem_handle() const {
if (is_memory_replica()) {
const auto& mem_data = std::get(data_);
- return std::any_of(
- mem_data.buffers.begin(), mem_data.buffers.end(),
- [](const std::unique_ptr& buf_ptr) {
- return !buf_ptr->isAllocatorValid();
- });
+ return !mem_data.buffer->isAllocatorValid();
}
return false; // DiskReplicaData does not have handles
}
+ [[nodiscard]] size_t get_memory_buffer_size() const {
+ if (is_memory_replica()) {
+ const auto& mem_data = std::get(data_);
+ return mem_data.buffer->size();
+ } else {
+ LOG(ERROR) << "Invalid replica type: " << type();
+ return 0;
+ }
+ }
+
[[nodiscard]] std::vector> get_segment_names()
const;
@@ -237,12 +286,13 @@ inline Replica::Descriptor Replica::get_descriptor() const {
if (is_memory_replica()) {
const auto& mem_data = std::get(data_);
MemoryDescriptor mem_desc;
- mem_desc.buffer_descriptors.reserve(mem_data.buffers.size());
- for (const auto& buf_ptr : mem_data.buffers) {
- if (buf_ptr) {
- mem_desc.buffer_descriptors.push_back(
- buf_ptr->get_descriptor());
- }
+ if (mem_data.buffer) {
+ mem_desc.buffer_descriptor = mem_data.buffer->get_descriptor();
+ } else {
+ mem_desc.buffer_descriptor.size_ = 0;
+ mem_desc.buffer_descriptor.buffer_address_ = 0;
+ mem_desc.buffer_descriptor.transport_endpoint_ = "";
+ LOG(ERROR) << "Trying to get invalid memory replica descriptor";
}
desc.descriptor_variant = std::move(mem_desc);
} else if (is_disk_replica()) {
@@ -260,15 +310,11 @@ inline std::vector> Replica::get_segment_names()
const {
if (is_memory_replica()) {
const auto& mem_data = std::get(data_);
- std::vector> segment_names(
- mem_data.buffers.size());
- for (size_t i = 0; i < mem_data.buffers.size(); ++i) {
- if (mem_data.buffers[i] &&
- mem_data.buffers[i]->isAllocatorValid()) {
- segment_names[i] = mem_data.buffers[i]->getSegmentName();
- } else {
- segment_names[i] = std::nullopt;
- }
+ std::vector> segment_names;
+ if (mem_data.buffer && mem_data.buffer->isAllocatorValid()) {
+ segment_names.push_back(mem_data.buffer->getSegmentName());
+ } else {
+ segment_names.push_back(std::nullopt);
}
return segment_names;
}
@@ -281,10 +327,8 @@ inline std::ostream& operator<<(std::ostream& os, const Replica& replica) {
if (replica.is_memory_replica()) {
const auto& mem_data = std::get(replica.data_);
os << "type: MEMORY, buffers: [";
- for (const auto& buf_ptr : mem_data.buffers) {
- if (buf_ptr) {
- os << *buf_ptr;
- }
+ if (mem_data.buffer) {
+ os << *mem_data.buffer;
}
os << "]";
} else if (replica.is_disk_replica()) {
diff --git a/mooncake-store/include/rpc_service.h b/mooncake-store/include/rpc_service.h
index 355e444c1..95d32ffa1 100644
--- a/mooncake-store/include/rpc_service.h
+++ b/mooncake-store/include/rpc_service.h
@@ -26,6 +26,9 @@ class WrappedMasterService {
tl::expected ExistKey(const std::string& key);
+ tl::expected
+ CalcCacheStats();
+
std::vector> BatchExistKey(
const std::vector& keys);
@@ -41,25 +44,27 @@ class WrappedMasterService {
BatchGetReplicaList(const std::vector& keys);
tl::expected, ErrorCode> PutStart(
- const std::string& key, const std::vector& slice_lengths,
- const ReplicateConfig& config);
+ const UUID& client_id, const std::string& key,
+ const uint64_t slice_length, const ReplicateConfig& config);
- tl::expected PutEnd(const std::string& key,
+ tl::expected PutEnd(const UUID& client_id,
+ const std::string& key,
ReplicaType replica_type);
- tl::expected PutRevoke(const std::string& key,
+ tl::expected PutRevoke(const UUID& client_id,
+ const std::string& key,
ReplicaType replica_type);
std::vector, ErrorCode>>
- BatchPutStart(const std::vector& keys,
- const std::vector>& slice_lengths,
+ BatchPutStart(const UUID& client_id, const std::vector& keys,
+ const std::vector& slice_lengths,
const ReplicateConfig& config);
std::vector> BatchPutEnd(
- const std::vector& keys);
+ const UUID& client_id, const std::vector& keys);
std::vector> BatchPutRevoke(
- const std::vector& keys);
+ const UUID& client_id, const std::vector& keys);
tl::expected Remove(const std::string& key);
@@ -92,4 +97,4 @@ class WrappedMasterService {
void RegisterRpcService(coro_rpc::coro_rpc_server& server,
mooncake::WrappedMasterService& wrapped_master_service);
-} // namespace mooncake
+} // namespace mooncake
\ No newline at end of file
diff --git a/mooncake-store/include/transfer_task.h b/mooncake-store/include/transfer_task.h
index a97b47bc8..6b8fc82c3 100644
--- a/mooncake-store/include/transfer_task.h
+++ b/mooncake-store/include/transfer_task.h
@@ -400,36 +400,35 @@ class TransferSubmitter {
/**
* @brief Select the optimal transfer strategy
*/
- TransferStrategy selectStrategy(
- const std::vector& handles,
- const std::vector& slices) const;
+ TransferStrategy selectStrategy(const AllocatedBuffer::Descriptor& handle,
+ const std::vector& slices) const;
/**
* @brief Check if all handles refer to local segments
*/
- bool isLocalTransfer(
- const std::vector& handles) const;
+ bool isLocalTransfer(const AllocatedBuffer::Descriptor& handle) const;
/**
* @brief Validate transfer parameters
*/
- bool validateTransferParams(
- const std::vector& handles,
- const std::vector& slices, bool is_multi_buffers = false) const;
+ bool validateTransferParams(const AllocatedBuffer::Descriptor& handle,
+ const std::vector& slices) const;
/**
* @brief Submit memcpy operation asynchronously
*/
std::optional submitMemcpyOperation(
- const std::vector& handles,
- std::vector& slices, TransferRequest::OpCode op_code);
+ const AllocatedBuffer::Descriptor& handle,
+ const std::vector& slices,
+ const TransferRequest::OpCode op_code);
/**
* @brief Submit transfer engine operation asynchronously
*/
std::optional submitTransferEngineOperation(
- const std::vector& handles,
- std::vector& slices, TransferRequest::OpCode op_code);
+ const AllocatedBuffer::Descriptor& handle,
+ const std::vector& slices,
+ const TransferRequest::OpCode op_code);
std::optional submitFileReadOperation(
const Replica::Descriptor& replica, std::vector& slices,
diff --git a/mooncake-store/include/types.h b/mooncake-store/include/types.h
index 5f9f540ca..100cc13de 100644
--- a/mooncake-store/include/types.h
+++ b/mooncake-store/include/types.h
@@ -41,6 +41,9 @@ static const int64_t DEFAULT_GLOBAL_FILE_SEGMENT_SIZE =
static const std::string PUT_NO_SPACE_HELPER_STR = // A helpful string
" due to insufficient space. Consider lowering "
"eviction_high_watermark_ratio or mounting more segments.";
+static constexpr uint64_t DEFAULT_PUT_START_DISCARD_TIMEOUT = 30; // 30 seconds
+static constexpr uint64_t DEFAULT_PUT_START_RELEASE_TIMEOUT =
+ 600; // 10 minutes
// Forward declarations
class BufferAllocatorBase;
@@ -113,6 +116,7 @@ enum class ErrorCode : int32_t {
// Parameter errors (Range: -600 to -699)
INVALID_PARAMS = -600, ///< Invalid parameters.
+ ILLEGAL_CLIENT = -601, ///< Illegal client to do the operation.
// Engine operation errors (Range: -700 to -799)
INVALID_WRITE = -700, ///< Invalid write operation.
diff --git a/mooncake-store/src/CMakeLists.txt b/mooncake-store/src/CMakeLists.txt
index 385c84045..7d6300909 100644
--- a/mooncake-store/src/CMakeLists.txt
+++ b/mooncake-store/src/CMakeLists.txt
@@ -48,6 +48,10 @@ if (STORE_USE_ETCD)
add_dependencies(mooncake_store build_etcd_wrapper)
endif()
+if (BUILD_SHARED_LIBS)
+ install(TARGETS mooncake_store DESTINATION lib)
+endif()
+
# Master binary
add_executable(mooncake_master master.cpp)
@@ -66,6 +70,7 @@ target_link_libraries(mooncake_master PRIVATE
mooncake_store
cachelib_memory_allocator
pthread
+ ibverbs
mooncake_common
${ETCD_WRAPPER_LIB}
${MASTER_EXTRA_LIBS}
@@ -75,4 +80,4 @@ if (STORE_USE_ETCD)
add_dependencies(mooncake_master build_etcd_wrapper)
endif()
-install(TARGETS mooncake_master DESTINATION bin)
\ No newline at end of file
+install(TARGETS mooncake_master DESTINATION bin)
diff --git a/mooncake-store/src/allocator.cpp b/mooncake-store/src/allocator.cpp
index 18ac7e00a..6bb951bcc 100644
--- a/mooncake-store/src/allocator.cpp
+++ b/mooncake-store/src/allocator.cpp
@@ -18,13 +18,13 @@ std::string AllocatedBuffer::getSegmentName() const noexcept {
}
AllocatedBuffer::~AllocatedBuffer() {
+ // Note: This is an edge case. If the 'weak_ptr' is released, the segment
+ // has already been deallocated at this point, and its memory usage details
+ // (capacity/allocated) no longer need to be maintained.
auto alloc = allocator_.lock();
if (alloc) {
alloc->deallocate(this);
VLOG(1) << "buf_handle_deallocated size=" << size_;
- } else {
- MasterMetricManager::instance().dec_allocated_mem_size(size_);
- VLOG(1) << "allocator=expired_or_null in buf_handle_destructor";
}
}
@@ -92,7 +92,10 @@ CachelibBufferAllocator::CachelibBufferAllocator(std::string segment_name,
<< static_cast(pool_id_);
}
-CachelibBufferAllocator::~CachelibBufferAllocator() = default;
+CachelibBufferAllocator::~CachelibBufferAllocator() {
+ MasterMetricManager::instance().dec_allocated_mem_size(segment_name_,
+ cur_size_);
+};
std::unique_ptr CachelibBufferAllocator::allocate(
size_t size) {
@@ -117,7 +120,7 @@ std::unique_ptr CachelibBufferAllocator::allocate(
VLOG(1) << "allocation_succeeded size=" << size
<< " segment=" << segment_name_ << " address=" << buffer;
cur_size_.fetch_add(size);
- MasterMetricManager::instance().inc_allocated_mem_size(size);
+ MasterMetricManager::instance().inc_allocated_mem_size(segment_name_, size);
return std::make_unique(shared_from_this(), buffer, size);
}
@@ -128,7 +131,8 @@ void CachelibBufferAllocator::deallocate(AllocatedBuffer* handle) {
size_t freed_size =
handle->size_; // Store size before handle might become invalid
cur_size_.fetch_sub(freed_size);
- MasterMetricManager::instance().dec_allocated_mem_size(freed_size);
+ MasterMetricManager::instance().dec_allocated_mem_size(segment_name_,
+ freed_size);
VLOG(1) << "deallocation_succeeded address=" << handle->buffer_ptr_
<< " size=" << freed_size << " segment=" << segment_name_;
} catch (const std::exception& e) {
@@ -180,7 +184,10 @@ OffsetBufferAllocator::OffsetBufferAllocator(std::string segment_name,
}
}
-OffsetBufferAllocator::~OffsetBufferAllocator() = default;
+OffsetBufferAllocator::~OffsetBufferAllocator() {
+ MasterMetricManager::instance().dec_allocated_mem_size(segment_name_,
+ cur_size_);
+};
std::unique_ptr OffsetBufferAllocator::allocate(size_t size) {
if (!offset_allocator_) {
@@ -217,7 +224,7 @@ std::unique_ptr OffsetBufferAllocator::allocate(size_t size) {
}
cur_size_.fetch_add(size);
- MasterMetricManager::instance().inc_allocated_mem_size(size);
+ MasterMetricManager::instance().inc_allocated_mem_size(segment_name_, size);
return allocated_buffer;
}
@@ -228,7 +235,8 @@ void OffsetBufferAllocator::deallocate(AllocatedBuffer* handle) {
size_t freed_size = handle->size();
handle->offset_handle_.reset();
cur_size_.fetch_sub(freed_size);
- MasterMetricManager::instance().dec_allocated_mem_size(freed_size);
+ MasterMetricManager::instance().dec_allocated_mem_size(segment_name_,
+ freed_size);
VLOG(1) << "deallocation_succeeded address=" << handle->data()
<< " size=" << freed_size << " segment=" << segment_name_;
} catch (const std::exception& e) {
diff --git a/mooncake-store/src/client.cpp b/mooncake-store/src/client.cpp
index 52bf75215..79a705ea3 100644
--- a/mooncake-store/src/client.cpp
+++ b/mooncake-store/src/client.cpp
@@ -36,12 +36,13 @@ namespace mooncake {
Client::Client(const std::string& local_hostname,
const std::string& metadata_connstring)
- : metrics_(ClientMetric::Create()),
- master_client_(metrics_ ? &metrics_->master_client_metric : nullptr),
+ : client_id_(generate_uuid()),
+ metrics_(ClientMetric::Create()),
+ master_client_(client_id_,
+ metrics_ ? &metrics_->master_client_metric : nullptr),
local_hostname_(local_hostname),
metadata_connstring_(metadata_connstring),
write_thread_pool_(2) {
- client_id_ = generate_uuid();
LOG(INFO) << "client_id=" << client_id_;
if (metrics_) {
@@ -126,16 +127,10 @@ static inline void rtrim(std::string& s) {
s.end());
}
-static std::vector get_auto_discover_filters(bool auto_discover) {
+static std::vector get_auto_discover_filters() {
std::vector whitelst_filters;
char* ev_ad = std::getenv("MC_MS_FILTERS");
if (ev_ad) {
- if (!auto_discover) {
- LOG(WARNING)
- << "auto discovery not set, but find whitelist filters: "
- << ev_ad;
- return whitelst_filters;
- }
LOG(INFO) << "whitelist filters: " << ev_ad;
char delimiter = ',';
char* end = ev_ad + std::strlen(ev_ad);
@@ -233,130 +228,132 @@ ErrorCode Client::InitTransferEngine(
const std::string& local_hostname, const std::string& metadata_connstring,
const std::string& protocol,
const std::optional& device_names) {
- if (!te_initialized_) {
- // get auto_discover and filters from env
- std::optional env_auto_discover = get_auto_discover();
- bool auto_discover = false;
- if (env_auto_discover.has_value()) {
- // Use user-specified auto-discover setting
- auto_discover = env_auto_discover.value();
- } else {
- // Enable auto-discover for RDMA if no devices are specified
- if (protocol == "rdma" && !device_names.has_value()) {
- LOG(INFO)
- << "Set auto discovery ON by default for RDMA protocol, "
- "since no "
- "device names provided";
- auto_discover = true;
- }
+ // get auto_discover and filters from env
+ std::optional env_auto_discover = get_auto_discover();
+ bool auto_discover = false;
+ if (env_auto_discover.has_value()) {
+ // Use user-specified auto-discover setting
+ auto_discover = env_auto_discover.value();
+ } else {
+ // Enable auto-discover for RDMA if no devices are specified
+ if (protocol == "rdma" && !device_names.has_value()) {
+ LOG(INFO) << "Set auto discovery ON by default for RDMA protocol, "
+ "since no "
+ "device names provided";
+ auto_discover = true;
+ }
+ }
+ transfer_engine_->setAutoDiscover(auto_discover);
+
+ // Honor filters when auto-discovery is enabled; otherwise warn once
+ if (auto_discover) {
+ LOG(INFO) << "Transfer engine auto discovery is enabled for protocol: "
+ << protocol;
+ auto filters = get_auto_discover_filters();
+ transfer_engine_->setWhitelistFilters(std::move(filters));
+ } else {
+ const char* env_filters = std::getenv("MC_MS_FILTERS");
+ if (env_filters && *env_filters != '\0') {
+ LOG(WARNING)
+ << "MC_MS_FILTERS is set but auto discovery is disabled; "
+ << "ignoring whitelist: " << env_filters;
}
- transfer_engine_->setAutoDiscover(auto_discover);
+ }
- auto [hostname, port] = parseHostNameWithPort(local_hostname);
- int rc = transfer_engine_->init(metadata_connstring, local_hostname,
- hostname, port);
- if (rc != 0) {
- LOG(ERROR) << "Failed to initialize transfer engine, rc=" << rc;
- return ErrorCode::INTERNAL_ERROR;
- }
+ auto [hostname, port] = parseHostNameWithPort(local_hostname);
+ int rc = transfer_engine_->init(metadata_connstring, local_hostname,
+ hostname, port);
+ if (rc != 0) {
+ LOG(ERROR) << "Failed to initialize transfer engine, rc=" << rc;
+ return ErrorCode::INTERNAL_ERROR;
+ }
- if (auto_discover) {
- LOG(INFO)
- << "Transfer engine auto discovery is enabled for protocol: "
- << protocol;
- auto filters = get_auto_discover_filters(auto_discover);
- transfer_engine_->setWhitelistFilters(std::move(filters));
- } else {
- LOG(INFO)
- << "Transfer engine auto discovery is disabled for protocol: "
- << protocol;
+ if (!auto_discover) {
+ LOG(INFO) << "Transfer engine auto discovery is disabled for protocol: "
+ << protocol;
- Transport* transport = nullptr;
+ Transport* transport = nullptr;
- if (protocol == "rdma") {
- if (!device_names.has_value() || device_names->empty()) {
- LOG(ERROR)
- << "RDMA protocol requires device names when auto "
- "discovery is disabled";
- return ErrorCode::INVALID_PARAMS;
- }
+ if (protocol == "rdma") {
+ if (!device_names.has_value() || device_names->empty()) {
+ LOG(ERROR) << "RDMA protocol requires device names when auto "
+ "discovery is disabled";
+ return ErrorCode::INVALID_PARAMS;
+ }
- LOG(INFO) << "Using specified RDMA devices: "
- << device_names.value();
+ LOG(INFO) << "Using specified RDMA devices: "
+ << device_names.value();
- std::vector devices =
- splitString(device_names.value(), ',', /*skip_empty=*/true);
+ std::vector devices =
+ splitString(device_names.value(), ',', /*skip_empty=*/true);
- // Manually discover topology with specified devices only
- auto topology = transfer_engine_->getLocalTopology();
- if (topology) {
- topology->discover(devices);
- LOG(INFO) << "Topology discovery complete with specified "
- "devices. Found "
- << topology->getHcaList().size() << " HCAs";
- }
+ // Manually discover topology with specified devices only
+ auto topology = transfer_engine_->getLocalTopology();
+ if (topology) {
+ topology->discover(devices);
+ LOG(INFO) << "Topology discovery complete with specified "
+ "devices. Found "
+ << topology->getHcaList().size() << " HCAs";
+ }
- transport = transfer_engine_->installTransport("rdma", nullptr);
- if (!transport) {
- LOG(ERROR)
- << "Failed to install RDMA transport with specified "
- "devices";
- return ErrorCode::INTERNAL_ERROR;
- }
- } else if (protocol == "tcp") {
- if (device_names.has_value()) {
- LOG(WARNING)
- << "TCP protocol does not use device names, ignoring";
- }
+ transport = transfer_engine_->installTransport("rdma", nullptr);
+ if (!transport) {
+ LOG(ERROR) << "Failed to install RDMA transport with specified "
+ "devices";
+ return ErrorCode::INTERNAL_ERROR;
+ }
+ } else if (protocol == "tcp") {
+ if (device_names.has_value()) {
+ LOG(WARNING)
+ << "TCP protocol does not use device names, ignoring";
+ }
- try {
- transport =
- transfer_engine_->installTransport("tcp", nullptr);
- } catch (std::exception& e) {
- LOG(ERROR)
- << "tcp_transport_install_failed error_message=\""
- << e.what() << "\"";
- return ErrorCode::INTERNAL_ERROR;
- }
+ try {
+ transport = transfer_engine_->installTransport("tcp", nullptr);
+ } catch (std::exception& e) {
+ LOG(ERROR) << "tcp_transport_install_failed error_message=\""
+ << e.what() << "\"";
+ return ErrorCode::INTERNAL_ERROR;
+ }
- if (!transport) {
- LOG(ERROR) << "Failed to install TCP transport";
- return ErrorCode::INTERNAL_ERROR;
- }
- } else if (protocol == "ascend") {
- if (device_names.has_value()) {
- LOG(WARNING) << "Ascend protocol does not use device "
- "names, ignoring";
- }
- try {
- transport =
- transfer_engine_->installTransport("ascend", nullptr);
- } catch (std::exception& e) {
- LOG(ERROR)
- << "ascend_transport_install_failed error_message=\""
- << e.what() << "\"";
- return ErrorCode::INTERNAL_ERROR;
- }
+ if (!transport) {
+ LOG(ERROR) << "Failed to install TCP transport";
+ return ErrorCode::INTERNAL_ERROR;
+ }
+ } else if (protocol == "ascend") {
+ if (device_names.has_value()) {
+ LOG(WARNING) << "Ascend protocol does not use device "
+ "names, ignoring";
+ }
+ try {
+ transport =
+ transfer_engine_->installTransport("ascend", nullptr);
+ } catch (std::exception& e) {
+ LOG(ERROR) << "ascend_transport_install_failed error_message=\""
+ << e.what() << "\"";
+ return ErrorCode::INTERNAL_ERROR;
+ }
- if (!transport) {
- LOG(ERROR) << "Failed to install Ascend transport";
- return ErrorCode::INTERNAL_ERROR;
- }
- } else {
- LOG(ERROR) << "unsupported_protocol protocol=" << protocol;
- return ErrorCode::INVALID_PARAMS;
+ if (!transport) {
+ LOG(ERROR) << "Failed to install Ascend transport";
+ return ErrorCode::INTERNAL_ERROR;
}
+ } else {
+ LOG(ERROR) << "unsupported_protocol protocol=" << protocol;
+ return ErrorCode::INVALID_PARAMS;
}
}
+ return ErrorCode::OK;
+}
+
+void Client::InitTransferSubmitter() {
// Initialize TransferSubmitter after transfer engine is ready
// Keep using logical local_hostname for name-based behaviors; endpoint is
// used separately where needed.
transfer_submitter_ = std::make_unique(
*transfer_engine_, storage_backend_,
metrics_ ? &metrics_->transfer_metric : nullptr);
-
- return ErrorCode::OK;
}
std::optional> Client::Create(
@@ -397,18 +394,20 @@ std::optional> Client::Create(
// Initialize transfer engine
if (transfer_engine == nullptr) {
client->transfer_engine_ = std::make_shared();
+ err = client->InitTransferEngine(local_hostname, metadata_connstring,
+ protocol, device_names);
+ if (err != ErrorCode::OK) {
+ LOG(ERROR) << "Failed to initialize transfer engine";
+ return std::nullopt;
+ }
} else {
client->transfer_engine_ = transfer_engine;
- client->te_initialized_ = true;
- LOG(INFO) << "Use exist transfer engine instance";
- }
- err = client->InitTransferEngine(local_hostname, metadata_connstring,
- protocol, device_names);
- if (err != ErrorCode::OK) {
- LOG(ERROR) << "Failed to initialize transfer engine";
- return std::nullopt;
+ LOG(INFO) << "Use existing transfer engine instance. Skip its "
+ "initialization.";
}
+ client->InitTransferSubmitter();
+
return client;
}
@@ -594,11 +593,11 @@ std::vector> Client::BatchGetWhenPreferSameNode(
continue;
}
auto& memory_descriptor = replica.get_memory_descriptor();
- if (memory_descriptor.buffer_descriptors.empty()) {
+ if (memory_descriptor.buffer_descriptor.size_ == 0) {
results[i] = tl::unexpected(ErrorCode::INVALID_REPLICA);
continue;
}
- auto& buffer_descriptor = memory_descriptor.buffer_descriptors[0];
+ auto& buffer_descriptor = memory_descriptor.buffer_descriptor;
auto seg = buffer_descriptor.transport_endpoint_;
auto& op = seg_to_op_map[seg];
op.replicas.emplace_back(replica);
@@ -1245,12 +1244,11 @@ std::vector> Client::BatchPutWhenPreferSameNode(
continue;
}
auto& memory_descriptor = replica.get_memory_descriptor();
- if (memory_descriptor.buffer_descriptors.empty()) {
- op.SetError(ErrorCode::INVALID_PARAMS,
- "buffer descriptors is empty.");
+ if (memory_descriptor.buffer_descriptor.size_ == 0) {
+ op.SetError(ErrorCode::INVALID_PARAMS, "buffer size is 0.");
continue;
}
- auto& buffer_descriptor = memory_descriptor.buffer_descriptors[0];
+ auto& buffer_descriptor = memory_descriptor.buffer_descriptor;
auto seg = buffer_descriptor.transport_endpoint_;
if (seg_to_ops.find(seg) == seg_to_ops.end()) {
seg_to_ops.emplace(seg, PutOperation(op.key, op.slices));
@@ -1291,7 +1289,7 @@ std::vector> Client::BatchPutWhenPreferSameNode(
WaitForTransfers(merged_ops);
for (auto& op : merged_ops) {
auto& memory_descriptor = op.replicas[0].get_memory_descriptor();
- auto& buffer_descriptor = memory_descriptor.buffer_descriptors[0];
+ auto& buffer_descriptor = memory_descriptor.buffer_descriptor;
auto seg = buffer_descriptor.transport_endpoint_;
seg_to_ops.at(seg).state = op.state;
}
@@ -1300,7 +1298,7 @@ std::vector> Client::BatchPutWhenPreferSameNode(
continue;
}
auto& memory_descriptor = op.replicas[0].get_memory_descriptor();
- auto& buffer_descriptor = memory_descriptor.buffer_descriptors[0];
+ auto& buffer_descriptor = memory_descriptor.buffer_descriptor;
auto seg = buffer_descriptor.transport_endpoint_;
op.state = seg_to_ops.at(seg).state;
auto state = std::make_shared();
@@ -1424,7 +1422,7 @@ tl::expected Client::MountSegment(const void* buffer,
segment.te_endpoint = local_hostname_;
}
- auto mount_result = master_client_.MountSegment(segment, client_id_);
+ auto mount_result = master_client_.MountSegment(segment);
if (!mount_result) {
ErrorCode err = mount_result.error();
LOG(ERROR) << "mount_segment_to_master_failed base=" << buffer
@@ -1454,8 +1452,7 @@ tl::expected Client::UnmountSegment(const void* buffer,
return tl::unexpected(ErrorCode::INVALID_PARAMS);
}
- auto unmount_result =
- master_client_.UnmountSegment(segment->second.id, client_id_);
+ auto unmount_result = master_client_.UnmountSegment(segment->second.id);
if (!unmount_result) {
ErrorCode err = unmount_result.error();
LOG(ERROR) << "Failed to unmount segment from master: "
@@ -1617,9 +1614,7 @@ ErrorCode Client::TransferRead(const Replica::Descriptor& replica_descriptor,
size_t total_size = 0;
if (replica_descriptor.is_memory_replica()) {
auto& mem_desc = replica_descriptor.get_memory_descriptor();
- for (const auto& handle : mem_desc.buffer_descriptors) {
- total_size += handle.size_;
- }
+ total_size = mem_desc.buffer_descriptor.size_;
} else {
auto& disk_desc = replica_descriptor.get_disk_descriptor();
total_size = disk_desc.object_size;
@@ -1657,8 +1652,7 @@ void Client::PingThreadMain(bool is_ha_mode,
auto& segment = it.second;
segments.emplace_back(segment);
}
- auto remount_result =
- master_client_.ReMountSegment(segments, client_id_);
+ auto remount_result = master_client_.ReMountSegment(segments);
if (!remount_result) {
ErrorCode err = remount_result.error();
LOG(ERROR) << "Failed to remount segments: " << err;
@@ -1677,7 +1671,7 @@ void Client::PingThreadMain(bool is_ha_mode,
}
// Ping master
- auto ping_result = master_client_.Ping(client_id_);
+ auto ping_result = master_client_.Ping();
if (ping_result) {
// Reset ping failure count
ping_fail_count = 0;
diff --git a/mooncake-store/src/client_buffer.cpp b/mooncake-store/src/client_buffer.cpp
index 5acfc2932..23fad52da 100644
--- a/mooncake-store/src/client_buffer.cpp
+++ b/mooncake-store/src/client_buffer.cpp
@@ -72,36 +72,29 @@ uint64_t calculate_total_size(const Replica::Descriptor& replica) {
auto& disk_descriptor = replica.get_disk_descriptor();
total_length = disk_descriptor.object_size;
} else {
- for (auto& handle :
- replica.get_memory_descriptor().buffer_descriptors) {
- total_length += handle.size_;
- }
+ total_length = replica.get_memory_descriptor().buffer_descriptor.size_;
}
return total_length;
}
int allocateSlices(std::vector& slices,
- const Replica::Descriptor& replica,
- BufferHandle& buffer_handle) {
- uint64_t offset = 0;
+ const Replica::Descriptor& replica, void* buffer_ptr) {
if (replica.is_memory_replica() == false) {
// For disk-based replica, split into slices based on file size
+ uint64_t offset = 0;
uint64_t total_length = replica.get_disk_descriptor().object_size;
while (offset < total_length) {
auto chunk_size = std::min(total_length - offset, kMaxSliceSize);
- void* chunk_ptr = static_cast(buffer_handle.ptr()) + offset;
+ void* chunk_ptr = static_cast(buffer_ptr) + offset;
slices.emplace_back(Slice{chunk_ptr, chunk_size});
offset += chunk_size;
}
} else {
// For memory-based replica, split into slices based on buffer
// descriptors
- for (auto& handle :
- replica.get_memory_descriptor().buffer_descriptors) {
- void* chunk_ptr = static_cast(buffer_handle.ptr()) + offset;
- slices.emplace_back(Slice{chunk_ptr, handle.size_});
- offset += handle.size_;
- }
+ auto& handle = replica.get_memory_descriptor().buffer_descriptor;
+ void* chunk_ptr = buffer_ptr;
+ slices.emplace_back(Slice{chunk_ptr, handle.size_});
}
return 0;
}
diff --git a/mooncake-store/src/http_metadata_server.cpp b/mooncake-store/src/http_metadata_server.cpp
index 2dd1bbe80..e35152edf 100644
--- a/mooncake-store/src/http_metadata_server.cpp
+++ b/mooncake-store/src/http_metadata_server.cpp
@@ -11,7 +11,7 @@ namespace mooncake {
HttpMetadataServer::HttpMetadataServer(uint16_t port, const std::string& host)
: port_(port),
host_(host),
- server_(std::make_unique(4, port)),
+ server_(std::make_unique(4, port, host_)),
running_(false) {
init_server();
}
diff --git a/mooncake-store/src/master.cpp b/mooncake-store/src/master.cpp
index b96b52be2..80f886b48 100644
--- a/mooncake-store/src/master.cpp
+++ b/mooncake-store/src/master.cpp
@@ -86,6 +86,14 @@ DEFINE_int32(http_metadata_server_port, 8080,
DEFINE_string(http_metadata_server_host, "0.0.0.0",
"Host for HTTP metadata server to bind to");
+DEFINE_uint64(put_start_discard_timeout_sec,
+ mooncake::DEFAULT_PUT_START_DISCARD_TIMEOUT,
+ "Timeout for discarding uncompleted PutStart operations");
+DEFINE_uint64(put_start_release_timeout_sec,
+ mooncake::DEFAULT_PUT_START_RELEASE_TIMEOUT,
+ "Timeout for releasing space allocated in uncompleted PutStart "
+ "operations");
+
void InitMasterConf(const mooncake::DefaultConfig& default_config,
mooncake::MasterConfig& master_config) {
// Initialize the master service configuration from the default config
@@ -147,6 +155,12 @@ void InitMasterConf(const mooncake::DefaultConfig& default_config,
default_config.GetString("http_metadata_server_host",
&master_config.http_metadata_server_host,
FLAGS_http_metadata_server_host);
+ default_config.GetUInt64("put_start_discard_timeout_sec",
+ &master_config.put_start_discard_timeout_sec,
+ FLAGS_put_start_discard_timeout_sec);
+ default_config.GetUInt64("put_start_release_timeout_sec",
+ &master_config.put_start_release_timeout_sec,
+ FLAGS_put_start_release_timeout_sec);
}
void LoadConfigFromCmdline(mooncake::MasterConfig& master_config,
@@ -303,6 +317,20 @@ void LoadConfigFromCmdline(mooncake::MasterConfig& master_config,
master_config.http_metadata_server_host =
FLAGS_http_metadata_server_host;
}
+ if ((google::GetCommandLineFlagInfo("put_start_discard_timeout_sec",
+ &info) &&
+ !info.is_default) ||
+ !conf_set) {
+ master_config.put_start_discard_timeout_sec =
+ FLAGS_put_start_discard_timeout_sec;
+ }
+ if ((google::GetCommandLineFlagInfo("put_start_release_timeout_sec",
+ &info) &&
+ !info.is_default) ||
+ !conf_set) {
+ master_config.put_start_release_timeout_sec =
+ FLAGS_put_start_release_timeout_sec;
+ }
}
// Function to start HTTP metadata server
@@ -404,7 +432,11 @@ int main(int argc, char* argv[]) {
<< ", http_metadata_server_port="
<< master_config.http_metadata_server_port
<< ", http_metadata_server_host="
- << master_config.http_metadata_server_host;
+ << master_config.http_metadata_server_host
+ << ", put_start_discard_timeout_sec="
+ << master_config.put_start_discard_timeout_sec
+ << ", put_start_release_timeout_sec="
+ << master_config.put_start_release_timeout_sec;
// Start HTTP metadata server if enabled
std::unique_ptr http_metadata_server;
diff --git a/mooncake-store/src/master_client.cpp b/mooncake-store/src/master_client.cpp
index cb9923409..cb5408bd8 100644
--- a/mooncake-store/src/master_client.cpp
+++ b/mooncake-store/src/master_client.cpp
@@ -13,6 +13,7 @@
#include "rpc_service.h"
#include "types.h"
#include "utils/scoped_vlog_timer.h"
+#include "master_metric_manager.h"
namespace mooncake {
@@ -34,6 +35,11 @@ struct RpcNameTraits<&WrappedMasterService::GetReplicaList> {
static constexpr const char* value = "GetReplicaList";
};
+template <>
+struct RpcNameTraits<&WrappedMasterService::CalcCacheStats> {
+ static constexpr const char* value = "CalcCacheStats";
+};
+
template <>
struct RpcNameTraits<&WrappedMasterService::GetReplicaListByRegex> {
static constexpr const char* value = "GetReplicaListByRegex";
@@ -253,6 +259,12 @@ std::vector> MasterClient::BatchExistKey(
return result;
}
+tl::expected
+MasterClient::CalcCacheStats() {
+ return invoke_rpc<&WrappedMasterService::CalcCacheStats,
+ MasterMetricManager::CacheHitStatDict>();
+}
+
tl::expected>,
ErrorCode>
MasterClient::GetReplicaListByRegex(const std::string& str) {
@@ -297,16 +309,14 @@ MasterClient::PutStart(const std::string& key,
ScopedVLogTimer timer(1, "MasterClient::PutStart");
timer.LogRequest("key=", key, ", slice_count=", slice_lengths.size());
- // Convert size_t to uint64_t for RPC
- std::vector rpc_slice_lengths;
- rpc_slice_lengths.reserve(slice_lengths.size());
- for (const auto& length : slice_lengths) {
- rpc_slice_lengths.push_back(length);
+ uint64_t total_slice_length = 0;
+ for (const auto& slice_length : slice_lengths) {
+ total_slice_length += slice_length;
}
auto result = invoke_rpc<&WrappedMasterService::PutStart,
std::vector>(
- key, rpc_slice_lengths, config);
+ client_id_, key, total_slice_length, config);
timer.LogResponseExpected(result);
return result;
}
@@ -319,9 +329,19 @@ MasterClient::BatchPutStart(
ScopedVLogTimer timer(1, "MasterClient::BatchPutStart");
timer.LogRequest("keys_count=", keys.size());
+ std::vector total_slice_lengths;
+ total_slice_lengths.reserve(slice_lengths.size());
+ for (const auto& slice_lengths : slice_lengths) {
+ uint64_t total_slice_length = 0;
+ for (const auto& slice_length : slice_lengths) {
+ total_slice_length += slice_length;
+ }
+ total_slice_lengths.emplace_back(total_slice_length);
+ }
+
auto result = invoke_batch_rpc<&WrappedMasterService::BatchPutStart,
std::vector>(
- keys.size(), keys, slice_lengths, config);
+ keys.size(), client_id_, keys, total_slice_lengths, config);
timer.LogResponse("result=", result.size(), " operations");
return result;
}
@@ -331,8 +351,8 @@ tl::expected MasterClient::PutEnd(const std::string& key,
ScopedVLogTimer timer(1, "MasterClient::PutEnd");
timer.LogRequest("key=", key);
- auto result =
- invoke_rpc<&WrappedMasterService::PutEnd, void>(key, replica_type);
+ auto result = invoke_rpc<&WrappedMasterService::PutEnd, void>(
+ client_id_, key, replica_type);
timer.LogResponseExpected(result);
return result;
}
@@ -343,7 +363,7 @@ std::vector