Skip to content

Conversation

@swallowCXY
Copy link

Description

Type of Change

  • Types
    • Bug fix
    • New feature
      • Transfer Engine
      • Mooncake Store
      • Mooncake EP
      • Integration
      • P2P Store
      • Python Wheel
    • Breaking change
    • CI/CD
    • Documentation update
    • Other

How Has This Been Tested?

Checklist

  • I have performed a self-review of my own code.
  • I have updated the documentation.
  • I have added tests to prove my changes are effective.

@swallowCXY swallowCXY changed the title client rpc接口定义和data_manager初步实现 [WIP]client rpc接口定义和data_manager初步实现 Jan 5, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @swallowCXY, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

此拉取请求为mooncake-store系统引入了客户端间RPC通信和数据管理的核心组件。它定义了远程数据操作的RPC接口,并提供了DataManager的初步实现,以协调数据存储和客户端之间高效的零拷贝数据传输。这为分布式数据访问和高效数据移动奠定了基础。

Highlights

  • 客户端RPC服务定义: 引入了新的ClientRpcService类及其相关类型(RemoteBufferDesc, RemoteReadRequest, RemoteWriteRequest, BatchRemoteReadRequest, BatchRemoteWriteRequest),用于处理客户端之间远程数据读写请求的RPC通信。
  • DataManager初步实现: 实现了新的DataManager类,用于管理数据访问操作。它集成了TieredBackend进行分层存储,并利用TransferEngine实现零拷贝RDMA数据传输,支持本地的Put/Get/Delete以及远程的ReadData/WriteData操作。
  • 客户端集成: 更新了Client类,添加了GetRpcEndpoint()GetPeerClient()方法,并引入了coro_rpc_serverClientRpcServiceDataManager以及PeerClient实例的映射,为客户端间的RPC通信奠定了基础。

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

这个PR引入了客户端RPC服务和用于处理远程数据操作的DataManager。整体结构是一个不错的起点。但是,有几个关键问题需要解决。其中包括由于std::expectedtl::expected使用不一致导致的编译错误,以及rpc_client_pool缺少定义。此外,还存在严重的性能问题,例如批量操作的顺序实现以及使用低效的轮询循环来等待传输完成。数据传输逻辑中也存在大量重复代码,应该进行重构。我提供了具体的评论和建议来解决这些问题。

Comment on lines +39 to +46
std::expected<void, ErrorCode> ReadRemoteData(const RemoteReadRequest& request);

/**
* @brief Write remote data: Client A requests Client B to write data from A
* @param request RemoteWriteRequest containing key, source buffers, and target_tier_id
* @return ErrorCode indicating success or failure
*/
std::expected<void, ErrorCode> WriteRemoteData(const RemoteWriteRequest& request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

ReadRemoteDataWriteRemoteData的声明使用了std::expected,但它们在client_rpc_service.cpp中的定义使用了tl::expected。这将导致编译错误。请统一使用tl::expected以保持一致性。

Suggested change
std::expected<void, ErrorCode> ReadRemoteData(const RemoteReadRequest& request);
/**
* @brief Write remote data: Client A requests Client B to write data from A
* @param request RemoteWriteRequest containing key, source buffers, and target_tier_id
* @return ErrorCode indicating success or failure
*/
std::expected<void, ErrorCode> WriteRemoteData(const RemoteWriteRequest& request);
tl::expected<void, ErrorCode> ReadRemoteData(const RemoteReadRequest& request);
/**
* @brief Write remote data: Client A requests Client B to write data from A
* @param request RemoteWriteRequest containing key, source buffers, and target_tier_id
* @return ErrorCode indicating success or failure
*/
tl::expected<void, ErrorCode> WriteRemoteData(const RemoteWriteRequest& request);

Comment on lines +390 to +441
// Wait for transfer completion
bool all_completed = false;
const int max_wait_iterations = 1000; // Timeout protection
int iteration = 0;

while (!all_completed && iteration < max_wait_iterations) {
all_completed = true;
for (size_t i = 0; i < transfer_requests.size(); ++i) {
TransferStatus status;
Status status_result = transfer_engine_->getTransferStatus(batch_id, i, status);

if (!status_result.ok()) {
LOG(ERROR) << "TransferDataFromRemote: Failed to get transfer status for task " << i;
transfer_engine_->freeBatchID(batch_id);
transfer_engine_->closeSegment(remote_segment_id);
return std::unexpected(ErrorCode::TRANSFER_FAIL);
}

if (status.s == TransferStatusEnum::FAILED ||
status.s == TransferStatusEnum::CANCELED ||
status.s == TransferStatusEnum::TIMEOUT) {
LOG(ERROR) << "TransferDataFromRemote: Transfer failed for task " << i
<< ", status: " << static_cast<int>(status.s);
transfer_engine_->freeBatchID(batch_id);
transfer_engine_->closeSegment(remote_segment_id);
return std::unexpected(ErrorCode::TRANSFER_FAIL);
}

if (status.s != TransferStatusEnum::COMPLETED) {
all_completed = false;
break;
}
}

if (!all_completed) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
++iteration;
}
}

if (!all_completed) {
LOG(ERROR) << "TransferDataFromRemote: Transfer timeout";
transfer_engine_->freeBatchID(batch_id);
transfer_engine_->closeSegment(remote_segment_id);
return std::unexpected(ErrorCode::TRANSFER_FAIL);
}

// Cleanup
transfer_engine_->freeBatchID(batch_id);
transfer_engine_->closeSegment(remote_segment_id);

return {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

TransferDataToRemote函数类似,这里也使用了低效的轮询循环来等待传输完成。这会浪费CPU资源并增加延迟。建议改用TransferEngine可能提供的更有效的同步机制,如阻塞等待或回调。

Comment on lines +12 to +15
std::expected<void, ErrorCode> ReadRemoteData(const RemoteReadRequest& request);
std::expected<void, ErrorCode> WriteRemoteData(const RemoteWriteRequest& request);
std::vector<tl::expected<void, ErrorCode>> BatchReadRemoteData(const BatchRemoteReadRequest& request);
std::vector<tl::expected<void, ErrorCode>> BatchWriteRemoteData(const BatchRemoteWriteRequest& request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

std::expectedtl::expected的使用不一致。为了与代码库的其他部分(如client_rpc_service.cpp)保持一致,最好统一使用tl::expected

Suggested change
std::expected<void, ErrorCode> ReadRemoteData(const RemoteReadRequest& request);
std::expected<void, ErrorCode> WriteRemoteData(const RemoteWriteRequest& request);
std::vector<tl::expected<void, ErrorCode>> BatchReadRemoteData(const BatchRemoteReadRequest& request);
std::vector<tl::expected<void, ErrorCode>> BatchWriteRemoteData(const BatchRemoteWriteRequest& request);
tl::expected<void, ErrorCode> ReadRemoteData(const RemoteReadRequest& request);
tl::expected<void, ErrorCode> WriteRemoteData(const RemoteWriteRequest& request);
std::vector<tl::expected<void, ErrorCode>> BatchReadRemoteData(const BatchRemoteReadRequest& request);
std::vector<tl::expected<void, ErrorCode>> BatchWriteRemoteData(const BatchRemoteWriteRequest& request);

std::vector<tl::expected<void, ErrorCode>> BatchReadRemoteData(const BatchRemoteReadRequest& request);
std::vector<tl::expected<void, ErrorCode>> BatchWriteRemoteData(const BatchRemoteWriteRequest& request);
private:
rpc_client_pool client_pool_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

类型rpc_client_pool在此文件中未定义或包含,这将导致编译错误。您可能需要包含<ylt/coro_rpc/coro_rpc_client.hpp>并使用coro_rpc::coro_rpc_client_pool

Suggested change
rpc_client_pool client_pool_;
coro_rpc::coro_rpc_client_pool client_pool_;

Comment on lines +257 to +308
// Wait for transfer completion
bool all_completed = false;
const int max_wait_iterations = 1000; // Timeout protection
int iteration = 0;

while (!all_completed && iteration < max_wait_iterations) {
all_completed = true;
for (size_t i = 0; i < transfer_requests.size(); ++i) {
TransferStatus status;
Status status_result = transfer_engine_->getTransferStatus(batch_id, i, status);

if (!status_result.ok()) {
LOG(ERROR) << "TransferDataToRemote: Failed to get transfer status for task " << i;
transfer_engine_->freeBatchID(batch_id);
transfer_engine_->closeSegment(remote_segment_id);
return std::unexpected(ErrorCode::TRANSFER_FAIL);
}

if (status.s == TransferStatusEnum::FAILED ||
status.s == TransferStatusEnum::CANCELED ||
status.s == TransferStatusEnum::TIMEOUT) {
LOG(ERROR) << "TransferDataToRemote: Transfer failed for task " << i
<< ", status: " << static_cast<int>(status.s);
transfer_engine_->freeBatchID(batch_id);
transfer_engine_->closeSegment(remote_segment_id);
return std::unexpected(ErrorCode::TRANSFER_FAIL);
}

if (status.s != TransferStatusEnum::COMPLETED) {
all_completed = false;
break;
}
}

if (!all_completed) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
++iteration;
}
}

if (!all_completed) {
LOG(ERROR) << "TransferDataToRemote: Transfer timeout after " << max_wait_iterations << " iterations";
transfer_engine_->freeBatchID(batch_id);
transfer_engine_->closeSegment(remote_segment_id);
return std::unexpected(ErrorCode::TRANSFER_FAIL);
}

// Cleanup
transfer_engine_->freeBatchID(batch_id);
transfer_engine_->closeSegment(remote_segment_id);

return {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

此函数使用带有std::this_thread::sleep_for的轮询循环来等待传输完成。这种方式效率低下,因为它消耗CPU周期并引入不必要的延迟。如果TransferEngine API允许,使用更高效的阻塞机制(如future、条件变量或回调)会好得多。这种忙等待可能成为一个严重的性能瓶颈。

Comment on lines +117 to +123
auto handle_result = tiered_backend_->Get(key);
if (!handle_result.has_value()) {
LOG(ERROR) << "ReadData: Failed to get data for key: " << key
<< ", error: " << toString(handle_result.error());
timer.LogResponse("error_code=", handle_result.error());
return std::unexpected(ErrorCode::OBJECT_NOT_FOUND);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

在这里,来自tiered_backend_->Get()的特定错误被捕获、记录,然后被替换为更通用的ErrorCode::OBJECT_NOT_FOUND。虽然记录原始错误是好的,但将特定的错误代码(handle_result.error())传播给调用者将为上游的错误处理和调试提供更详细的信息。

    if (!handle_result.has_value()) {
        LOG(ERROR) << "ReadData: Failed to get data for key: " << key
                   << ", error: " << toString(handle_result.error());
        timer.LogResponse("error_code=", handle_result.error());
        return std::unexpected(handle_result.error());
    }

Comment on lines +152 to +157
auto handle_result = tiered_backend_->Allocate(total_size, tier_id);
if (!handle_result.has_value()) {
LOG(ERROR) << "WriteData: Failed to allocate space for key: " << key;
timer.LogResponse("error_code=", handle_result.error());
return std::unexpected(ErrorCode::NO_AVAILABLE_HANDLE);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

ReadData中的情况类似,这里将来自tiered_backend_->Allocate的特定错误替换为通用的ErrorCode::NO_AVAILABLE_HANDLE。为了更好的错误追溯,建议向上层传播原始的错误码handle_result.error()

    if (!handle_result.has_value()) {
        LOG(ERROR) << "WriteData: Failed to allocate space for key: " << key;
        timer.LogResponse("error_code=", handle_result.error());
        return std::unexpected(handle_result.error());
    }

}

// Open remote segment
SegmentHandle remote_segment_id = transfer_engine_->openSegment(remote_segment_name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

SegmentHandleBatchID资源是手动管理的,closeSegmentfreeBatchID在多个退出路径中被调用。这很容易出错。使用RAII包装器(如带有自定义删除器的std::unique_ptr)将确保这些资源总是被正确清理,即使在异常或提前返回的情况下也是如此。


// Convert uint64_t tier_id to UUID
// UUID is pair<uint64_t, uint64_t>, so we use the first part for tier_id
// TODO: This is a temporary solution. Should use proper UUID mapping or change RPC protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

这里的TODO正确地指出了将uint64_t转换为UUID是一个临时解决方案。如果UUID需要唯一或具有不同的结构,这可能会导致问题。为了使设计更加健壮,RPC协议应更新为直接使用UUID类型,这将消除这种转换和占位符uuid.second = 0;的需要。

Comment on lines +451 to +454
coro_rpc_server rpc_server_;
ClientRpcService rpc_service_;
DataManager data_manager_;
std::map<std::string, std::unique_ptr<PeerClient>> peer_clients_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

rpc_service_依赖于data_manager_,因此data_manager_应该在rpc_service_之前声明,以确保正确的初始化顺序。此外,DataManagerClientRpcService没有默认构造函数,因此它们必须在Client构造函数的成员初始化列表中进行初始化。按依赖顺序声明可以提高代码的清晰度和安全性。

Suggested change
coro_rpc_server rpc_server_;
ClientRpcService rpc_service_;
DataManager data_manager_;
std::map<std::string, std::unique_ptr<PeerClient>> peer_clients_;
coro_rpc_server rpc_server_;
DataManager data_manager_;
ClientRpcService rpc_service_;
std::map<std::string, std::unique_ptr<PeerClient>> peer_clients_;

@swallowCXY swallowCXY closed this Jan 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant