Skip to content

Commit

Permalink
snapshot: Transfer files concurrently to speed up snapshot transfer
Browse files Browse the repository at this point in the history
Currently snapshot transfer happens by sending GetFileRequest for
every file known to be in the remote snapshot. This happens
sequentially for each file.

The only real configurations which allow tuning the throughput of this
transfer are the throttle which can be set when initializing the
braft::Node, or the runtime configuration raft_max_byte_count_per_rpc
which determines how many chunks a large file will be broken into
during the transfer. The default is 128KiB, so a 1MiB file will be
transfered in about 8 GetFileRequests.

This works great for snapshots which have a handful of large
files. But if a snapshot has hundreds or thousands of small files then
transfer of these snapshots can be pretty slow.

I locally create a snapshot with 100k files on my development machine,
for example, it can take up to 30 minutes to transfer all of those
files in that snapshot. Even though the latency per transfer is low,
there is a full round trip plus a flush of __raft_meta on the
receiving end for each file.

This patch adds concurrency to these transfers. When a remote snapshot
is transferred locally, up to raft_max_get_file_request_concurrency
GetFileRequests will be sent concurrently. This defaults to 64.

With this patch, the 100k file snapshot consistently transfers in
under 10 seconds on my development machine.

This should resolve baidu#362.
  • Loading branch information
ambroff committed Feb 6, 2025
1 parent ab0017f commit 4fe88dd
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 47 deletions.
181 changes: 135 additions & 46 deletions src/braft/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// Zheng,Pengfei([email protected])
// Xiong,Kai([email protected])

#include <gflags/gflags.h>
#include <butil/time.h>
#include <butil/string_printf.h> // butil::string_appendf
#include <brpc/uri.h>
Expand All @@ -34,6 +35,13 @@

namespace braft {

DEFINE_int32(raft_max_get_file_request_concurrency, 64,
"Maximum number of concurrent GetFileRequests which will be "
"in-flight when copying a snapshot from a "
"remote peer to local.");
BRPC_VALIDATE_GFLAG(raft_max_get_file_request_concurrency,
brpc::PositiveInteger);

const char* LocalSnapshotStorage::_s_temp_path = "temp";

LocalSnapshotMetaTable::LocalSnapshotMetaTable() {}
Expand Down Expand Up @@ -777,8 +785,30 @@ void LocalSnapshotCopier::copy() {
}
std::vector<std::string> files;
_remote_snapshot.list_files(&files);
for (size_t i = 0; i < files.size() && ok(); ++i) {
copy_file(files[i]);

// Do the copy in chunks. We don't want to submit 1 million concurrent
// GetFileRequests to the remote
// server. This is configurable via
// raft_max_get_file_request_concurrency. Making a copy of this onto the
// stack in case it changes while this loop is running.
std::span<std::string> files_span{files};
std::size_t chunk_size = FLAGS_raft_max_get_file_request_concurrency;
for (std::size_t i = 0; i < files_span.size(); i += chunk_size) {
{
BAIDU_SCOPED_LOCK(_mutex);
if (_cancelled) {
break;
}
}

if (!ok()) {
break;
}

auto remaining = files_span.size() - i;
auto current_chunk_size = std::min(chunk_size, remaining);
auto chunk = files_span.subspan(i, current_chunk_size);
copy_files(chunk);
}
} while (0);
if (!ok() && _writer && _writer->ok()) {
Expand Down Expand Up @@ -949,65 +979,124 @@ void LocalSnapshotCopier::filter() {
}
}

void LocalSnapshotCopier::copy_file(const std::string& filename) {
if (_writer->get_file_meta(filename, NULL) == 0) {
LOG(INFO) << "Skipped downloading " << filename
<< " path: " << _writer->get_path();
return;
void LocalSnapshotCopier::copy_files(std::span<std::string const> filenames) {
// We start a concurrent file copy session for each file we will download and then wait for all of them to complete or
// fail. This struct contains the context for each file in the snapshot, which is kept in the vector below.
struct RemoteFileContext {
std::string filename;
LocalFileMeta meta;
scoped_refptr<RemoteFileCopier::Session> session;

RemoteFileContext(std::string filename, LocalFileMeta meta, scoped_refptr<RemoteFileCopier::Session> session)
: filename{std::move(filename)}, meta{std::move(meta)}, session{std::move(session)} {}
};

std::vector<RemoteFileContext> sessions;
sessions.reserve(filenames.size());

// Create a copy session for each input file.
{
std::unique_lock lck{_mutex};

if (_cancelled) {
set_error(ECANCELED, "%s", berror(ECANCELED));
return;
}
std::string file_path = _writer->get_path() + '/' + filename;
butil::FilePath sub_path(filename);
if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") {

for (const auto& filename : filenames) {
if (_writer->get_file_meta(filename, nullptr) == 0) {
LOG(INFO) << "Skipped downloading " << filename << " path: " << _writer->get_path();
continue;
}

std::string file_path = _writer->get_path() + '/' + filename;
butil::FilePath sub_path(filename);
if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") {
butil::File::Error e;
bool rc = false;
if (FLAGS_raft_create_parent_directories) {
butil::FilePath sub_dir =
butil::FilePath(_writer->get_path()).Append(sub_path.DirName());
rc = _fs->create_directory(sub_dir.value(), &e, true);
butil::FilePath sub_dir = butil::FilePath(_writer->get_path()).Append(sub_path.DirName());
rc = _fs->create_directory(sub_dir.value(), &e, true);
} else {
rc = create_sub_directory(
_writer->get_path(), sub_path.DirName().value(), _fs, &e);
rc = create_sub_directory(_writer->get_path(), sub_path.DirName().value(), _fs, &e);
}
if (!rc) {
LOG(ERROR) << "Fail to create directory for " << file_path
<< " : " << butil::File::ErrorToString(e);
set_error(file_error_to_os_error(e),
"Fail to create directory");
LOG(ERROR) << "Fail to create directory for " << file_path << " : " << butil::File::ErrorToString(e);
set_error(file_error_to_os_error(e), "Fail to create directory");
}
}
LocalFileMeta meta;
_remote_snapshot.get_file_meta(filename, &meta);
std::unique_lock<raft_mutex_t> lck(_mutex);
if (_cancelled) {
set_error(ECANCELED, "%s", berror(ECANCELED));
return;
}
scoped_refptr<RemoteFileCopier::Session> session
= _copier.start_to_copy_to_file(filename, file_path, NULL);
if (session == NULL) {
LOG(WARNING) << "Fail to copy " << filename
<< " path: " << _writer->get_path();
}

LocalFileMeta meta;
_remote_snapshot.get_file_meta(filename, &meta);

scoped_refptr<RemoteFileCopier::Session> session = _copier.start_to_copy_to_file(filename, file_path, nullptr);
if (session == nullptr) {
LOG(WARNING) << "Fail to copy " << filename << " path: " << _writer->get_path();
set_error(-1, "Fail to copy %s", filename.c_str());
return;
}

sessions.emplace_back(filename, std::move(meta), std::move(session));
}
_cur_session = session.get();
lck.unlock();
session->join();
lck.lock();
_cur_session = NULL;
lck.unlock();
if (!session->status().ok()) {
set_error(session->status().error_code(), session->status().error_cstr());
return;
}

bool failed = false;

// Now wait for each concurrent session to complete.
for (const auto& remote_file_context : sessions) {
if (failed) {
remote_file_context.session->cancel();
remote_file_context.session->join();
}
if (_writer->add_file(filename, &meta) != 0) {
set_error(EIO, "Fail to add file to writer");
return;

// If LocalSnapshotCopier::cancel() has been called, just cancel the remaining items.
{
std::unique_lock lck{_mutex};
if (_cancelled) {
remote_file_context.session->cancel();
lck.unlock();
remote_file_context.session->join();
continue;
}
}

{
std::unique_lock lck{_mutex};
_cur_session = remote_file_context.session.get();
}

remote_file_context.session->join();

{
std::unique_lock lck{_mutex};
_cur_session = nullptr;
}

// If LocalSnapshotCopier::cancel() has been called, just continue cancelling the remaining ones.
if (_cancelled) {
continue;
}

if (!remote_file_context.session->status().ok()) {
set_error(remote_file_context.session->status().error_code(), "%s",
remote_file_context.session->status().error_cstr());
failed = true;
continue;
}

if (_writer->add_file(remote_file_context.filename, &remote_file_context.meta) != 0) {
set_error(EIO, "Fail to add file to writer");
failed = true;
continue;
}
}

if (!failed) {
if (_writer->sync() != 0) {
set_error(EIO, "Fail to sync writer");
return;
set_error(EIO, "Fail to sync writer");
return;
}
}
}

void LocalSnapshotCopier::start() {
Expand Down
2 changes: 1 addition & 1 deletion src/braft/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ friend class LocalSnapshotStorage;
int filter_before_copy(LocalSnapshotWriter* writer,
SnapshotReader* last_snapshot);
void filter();
void copy_file(const std::string& filename);
void copy_files(std::span<std::string const> filenames);

raft_mutex_t _mutex;
bthread_t _tid;
Expand Down

0 comments on commit 4fe88dd

Please sign in to comment.