Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](migrate) Copy binlog files #41083

Merged
merged 1 commit into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions be/src/olap/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,69 @@ Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletU
return status;
}

Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
Version version, RowsetBinlogMetasPB* metas_pb) {
Status status;
auto tablet_uid_str = tablet_uid.to_string();
auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first);
auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1);
auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key](
std::string_view key, std::string_view value) -> bool {
VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value);
if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable.
// All binlog meta has been scanned
return false;
}

if (!starts_with_binlog_meta(key)) {
auto err_msg = fmt::format("invalid binlog meta key:{}", key);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}

BinlogMetaEntryPB binlog_meta_entry_pb;
if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}

const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
binlog_meta_pb->set_rowset_id(rowset_id);
binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
binlog_meta_pb->set_meta_key(std::string {key});
binlog_meta_pb->set_meta(std::string {value});

auto binlog_data_key =
make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
std::string binlog_data;
status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
if (!status.ok()) {
LOG(WARNING) << status.to_string();
return false;
}
binlog_meta_pb->set_data_key(binlog_data_key);
binlog_meta_pb->set_data(binlog_data);

return false;
};

Status iterStatus =
meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func);
if (!iterStatus.ok()) {
LOG(WARNING) << fmt::format(
"fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key,
version.to_string(), iterStatus.to_string());
return iterStatus;
}
return status;
}

Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb) {
Status status;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class RowsetMetaManager {
static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb);
// get all binlog metas of a tablet in version.
static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
Version version, RowsetBinlogMetasPB* metas_pb);
static Status remove_binlog(OlapMeta* meta, const std::string& suffix);
static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb);
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2512,6 +2512,11 @@ Status Tablet::get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versio
binlog_versions, metas_pb);
}

Status Tablet::get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb) {
return RowsetMetaManager::get_rowset_binlog_metas(_data_dir->get_meta(), tablet_uid(),
binlog_versions, metas_pb);
}

std::string Tablet::get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const {
return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ class Tablet final : public BaseTablet {
std::string_view rowset_id) const;
Status get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb);
Status get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb);
std::string get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const;
std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
if (binlog_meta_filesize > 0) {
contain_binlog = true;
RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb));
VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << binlog_metas_file;
}
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file));
}
Expand Down
102 changes: 99 additions & 3 deletions be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/pb_helper.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
Expand Down Expand Up @@ -262,9 +263,11 @@ Status EngineStorageMigrationTask::_migrate() {
}

std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets);
RowsetBinlogMetasPB rowset_binlog_metas_pb;
do {
// migrate all index and data files but header file
res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
res = _copy_index_and_data_files(full_path, temp_consistent_rowsets,
&rowset_binlog_metas_pb);
if (!res.ok()) {
break;
}
Expand Down Expand Up @@ -292,7 +295,8 @@ Status EngineStorageMigrationTask::_migrate() {
// we take the lock to complete it to avoid long-term competition with other tasks
if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) {
// force to copy the remaining data and index
res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
res = _copy_index_and_data_files(full_path, temp_consistent_rowsets,
&rowset_binlog_metas_pb);
if (!res.ok()) {
break;
}
Expand All @@ -307,6 +311,16 @@ Status EngineStorageMigrationTask::_migrate() {
}
}

// save rowset binlog metas
if (rowset_binlog_metas_pb.rowset_binlog_metas_size() > 0) {
auto rowset_binlog_metas_pb_filename =
fmt::format("{}/rowset_binlog_metas.pb", full_path);
res = write_pb(rowset_binlog_metas_pb_filename, rowset_binlog_metas_pb);
if (!res.ok()) {
break;
}
}

// generate new tablet meta and write to hdr file
res = _gen_and_write_header_to_hdr_file(shard, full_path, consistent_rowsets, end_version);
if (!res.ok()) {
Expand Down Expand Up @@ -350,10 +364,92 @@ void EngineStorageMigrationTask::_generate_new_header(
}

Status EngineStorageMigrationTask::_copy_index_and_data_files(
const string& full_path, const std::vector<RowsetSharedPtr>& consistent_rowsets) const {
const string& full_path, const std::vector<RowsetSharedPtr>& consistent_rowsets,
RowsetBinlogMetasPB* all_binlog_metas_pb) const {
RowsetBinlogMetasPB rowset_binlog_metas_pb;
for (const auto& rs : consistent_rowsets) {
RETURN_IF_ERROR(rs->copy_files_to(full_path, rs->rowset_id()));

Version binlog_versions = rs->version();
RETURN_IF_ERROR(_tablet->get_rowset_binlog_metas(binlog_versions, &rowset_binlog_metas_pb));
}

// copy index binlog files.
for (const auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) {
auto num_segments = rowset_binlog_meta.num_segments();
std::string_view rowset_id = rowset_binlog_meta.rowset_id();

RowsetMetaPB rowset_meta_pb;
if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
auto err_msg = fmt::format("fail to parse binlog meta data value:{}",
rowset_binlog_meta.data());
LOG(WARNING) << err_msg;
return Status::InternalError(err_msg);
}
const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
TabletSchema tablet_schema;
tablet_schema.init_from_pb(tablet_schema_pb);

// copy segment files and index files
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
std::string segment_file_path = _tablet->get_segment_filepath(rowset_id, segment_index);
auto snapshot_segment_file_path =
fmt::format("{}/{}_{}.binlog", full_path, rowset_id, segment_index);

Status status = io::global_local_filesystem()->copy_path(segment_file_path,
snapshot_segment_file_path);
if (!status.ok()) {
LOG(WARNING) << "fail to copy binlog segment file. [src=" << segment_file_path
<< ", dest=" << snapshot_segment_file_path << "]" << status;
return status;
}
VLOG_DEBUG << "copy " << segment_file_path << " to " << snapshot_segment_file_path;

if (tablet_schema.get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
for (const auto& index : tablet_schema.indexes()) {
if (index.index_type() != IndexType::INVERTED) {
continue;
}
auto index_id = index.index_id();
auto index_file =
_tablet->get_segment_index_filepath(rowset_id, segment_index, index_id);
auto snapshot_segment_index_file_path =
fmt::format("{}/{}_{}_{}.binlog-index", full_path, rowset_id,
segment_index, index_id);
VLOG_DEBUG << "copy " << index_file << " to "
<< snapshot_segment_index_file_path;
status = io::global_local_filesystem()->copy_path(
index_file, snapshot_segment_index_file_path);
if (!status.ok()) {
LOG(WARNING)
<< "fail to copy binlog index file. [src=" << index_file
<< ", dest=" << snapshot_segment_index_file_path << "]" << status;
return status;
}
}
} else if (tablet_schema.has_inverted_index()) {
auto index_file = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
auto snapshot_segment_index_file_path =
fmt::format("{}/{}_{}.binlog-index", full_path, rowset_id, segment_index);
VLOG_DEBUG << "copy " << index_file << " to " << snapshot_segment_index_file_path;
status = io::global_local_filesystem()->copy_path(index_file,
snapshot_segment_index_file_path);
if (!status.ok()) {
LOG(WARNING) << "fail to copy binlog index file. [src=" << index_file
<< ", dest=" << snapshot_segment_index_file_path << "]" << status;
return status;
}
}
}
}

std::move(rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->begin(),
rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->end(),
google::protobuf::RepeatedFieldBackInserter(
all_binlog_metas_pb->mutable_rowset_binlog_metas()));

return Status::OK();
}

Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/task/engine_storage_migration_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <gen_cpp/olap_file.pb.h>

#include <mutex>
#include <shared_mutex>
#include <string>
Expand Down Expand Up @@ -69,7 +71,8 @@ class EngineStorageMigrationTask final : public EngineTask {
// TODO: hkp
// rewrite this function
Status _copy_index_and_data_files(const std::string& full_path,
const std::vector<RowsetSharedPtr>& consistent_rowsets) const;
const std::vector<RowsetSharedPtr>& consistent_rowsets,
RowsetBinlogMetasPB* all_binlog_metas_pb) const;

private:
StorageEngine& _engine;
Expand Down
Loading