diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp b/be/src/olap/rowset/rowset_meta_manager.cpp index 9d1cbd8858983b..9ba6f9540db7fc 100644 --- a/be/src/olap/rowset/rowset_meta_manager.cpp +++ b/be/src/olap/rowset/rowset_meta_manager.cpp @@ -357,6 +357,69 @@ Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletU return status; } +Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, + Version version, RowsetBinlogMetasPB* metas_pb) { + Status status; + auto tablet_uid_str = tablet_uid.to_string(); + auto prefix_key = make_binlog_meta_key_prefix(tablet_uid); + auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first); + auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1); + auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key]( + std::string_view key, std::string_view value) -> bool { + VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value); + if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable. + // All binlog meta has been scanned + return false; + } + + if (!starts_with_binlog_meta(key)) { + auto err_msg = fmt::format("invalid binlog meta key:{}", key); + status = Status::InternalError(err_msg); + LOG(WARNING) << err_msg; + return false; + } + + BinlogMetaEntryPB binlog_meta_entry_pb; + if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) { + auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); + status = Status::InternalError(err_msg); + LOG(WARNING) << err_msg; + return false; + } + + const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); + auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); + binlog_meta_pb->set_rowset_id(rowset_id); + binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); + binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); + binlog_meta_pb->set_meta_key(std::string {key}); + binlog_meta_pb->set_meta(std::string {value}); + + auto binlog_data_key = + make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); + std::string binlog_data; + status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); + if (!status.ok()) { + LOG(WARNING) << status.to_string(); + return false; + } + binlog_meta_pb->set_data_key(binlog_data_key); + binlog_meta_pb->set_data(binlog_data); + + return false; + }; + + Status iterStatus = + meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func); + if (!iterStatus.ok()) { + LOG(WARNING) << fmt::format( + "fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key, + version.to_string(), iterStatus.to_string()); + return iterStatus; + } + return status; +} + Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, RowsetBinlogMetasPB* metas_pb) { Status status; diff --git a/be/src/olap/rowset/rowset_meta_manager.h b/be/src/olap/rowset/rowset_meta_manager.h index b61e8c0276949f..eb04128fdedaf2 100644 --- a/be/src/olap/rowset/rowset_meta_manager.h +++ b/be/src/olap/rowset/rowset_meta_manager.h @@ -72,6 +72,9 @@ class RowsetMetaManager { static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, const std::vector& binlog_versions, RowsetBinlogMetasPB* metas_pb); + // get all binlog metas of a tablet in version. + static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, + Version version, RowsetBinlogMetasPB* metas_pb); static Status remove_binlog(OlapMeta* meta, const std::string& suffix); static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, RowsetBinlogMetasPB* metas_pb); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index b23404583f7ce2..51eabe5495ef89 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2512,6 +2512,11 @@ Status Tablet::get_rowset_binlog_metas(const std::vector& binlog_versio binlog_versions, metas_pb); } +Status Tablet::get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb) { + return RowsetMetaManager::get_rowset_binlog_metas(_data_dir->get_meta(), tablet_uid(), + binlog_versions, metas_pb); +} + std::string Tablet::get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const { return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 800c720a1c4431..33253e82ced2b5 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -436,6 +436,7 @@ class Tablet final : public BaseTablet { std::string_view rowset_id) const; Status get_rowset_binlog_metas(const std::vector& binlog_versions, RowsetBinlogMetasPB* metas_pb); + Status get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb); std::string get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const; std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index e7679da060361a..468a6b2fb126f0 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -972,6 +972,7 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, if (binlog_meta_filesize > 0) { contain_binlog = true; RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb)); + VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << binlog_metas_file; } RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file)); } diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 7c870a5e8ea5fc..21be34a334dd8d 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -37,6 +37,7 @@ #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" +#include "olap/pb_helper.h" #include "olap/rowset/rowset_meta.h" #include "olap/snapshot_manager.h" #include "olap/storage_engine.h" @@ -262,9 +263,11 @@ Status EngineStorageMigrationTask::_migrate() { } std::vector temp_consistent_rowsets(consistent_rowsets); + RowsetBinlogMetasPB rowset_binlog_metas_pb; do { // migrate all index and data files but header file - res = _copy_index_and_data_files(full_path, temp_consistent_rowsets); + res = _copy_index_and_data_files(full_path, temp_consistent_rowsets, + &rowset_binlog_metas_pb); if (!res.ok()) { break; } @@ -292,7 +295,8 @@ Status EngineStorageMigrationTask::_migrate() { // we take the lock to complete it to avoid long-term competition with other tasks if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) { // force to copy the remaining data and index - res = _copy_index_and_data_files(full_path, temp_consistent_rowsets); + res = _copy_index_and_data_files(full_path, temp_consistent_rowsets, + &rowset_binlog_metas_pb); if (!res.ok()) { break; } @@ -307,6 +311,16 @@ Status EngineStorageMigrationTask::_migrate() { } } + // save rowset binlog metas + if (rowset_binlog_metas_pb.rowset_binlog_metas_size() > 0) { + auto rowset_binlog_metas_pb_filename = + fmt::format("{}/rowset_binlog_metas.pb", full_path); + res = write_pb(rowset_binlog_metas_pb_filename, rowset_binlog_metas_pb); + if (!res.ok()) { + break; + } + } + // generate new tablet meta and write to hdr file res = _gen_and_write_header_to_hdr_file(shard, full_path, consistent_rowsets, end_version); if (!res.ok()) { @@ -350,10 +364,92 @@ void EngineStorageMigrationTask::_generate_new_header( } Status EngineStorageMigrationTask::_copy_index_and_data_files( - const string& full_path, const std::vector& consistent_rowsets) const { + const string& full_path, const std::vector& consistent_rowsets, + RowsetBinlogMetasPB* all_binlog_metas_pb) const { + RowsetBinlogMetasPB rowset_binlog_metas_pb; for (const auto& rs : consistent_rowsets) { RETURN_IF_ERROR(rs->copy_files_to(full_path, rs->rowset_id())); + + Version binlog_versions = rs->version(); + RETURN_IF_ERROR(_tablet->get_rowset_binlog_metas(binlog_versions, &rowset_binlog_metas_pb)); + } + + // copy index binlog files. + for (const auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) { + auto num_segments = rowset_binlog_meta.num_segments(); + std::string_view rowset_id = rowset_binlog_meta.rowset_id(); + + RowsetMetaPB rowset_meta_pb; + if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) { + auto err_msg = fmt::format("fail to parse binlog meta data value:{}", + rowset_binlog_meta.data()); + LOG(WARNING) << err_msg; + return Status::InternalError(err_msg); + } + const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema(); + TabletSchema tablet_schema; + tablet_schema.init_from_pb(tablet_schema_pb); + + // copy segment files and index files + for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { + std::string segment_file_path = _tablet->get_segment_filepath(rowset_id, segment_index); + auto snapshot_segment_file_path = + fmt::format("{}/{}_{}.binlog", full_path, rowset_id, segment_index); + + Status status = io::global_local_filesystem()->copy_path(segment_file_path, + snapshot_segment_file_path); + if (!status.ok()) { + LOG(WARNING) << "fail to copy binlog segment file. [src=" << segment_file_path + << ", dest=" << snapshot_segment_file_path << "]" << status; + return status; + } + VLOG_DEBUG << "copy " << segment_file_path << " to " << snapshot_segment_file_path; + + if (tablet_schema.get_inverted_index_storage_format() == + InvertedIndexStorageFormatPB::V1) { + for (const auto& index : tablet_schema.indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + auto index_id = index.index_id(); + auto index_file = + _tablet->get_segment_index_filepath(rowset_id, segment_index, index_id); + auto snapshot_segment_index_file_path = + fmt::format("{}/{}_{}_{}.binlog-index", full_path, rowset_id, + segment_index, index_id); + VLOG_DEBUG << "copy " << index_file << " to " + << snapshot_segment_index_file_path; + status = io::global_local_filesystem()->copy_path( + index_file, snapshot_segment_index_file_path); + if (!status.ok()) { + LOG(WARNING) + << "fail to copy binlog index file. [src=" << index_file + << ", dest=" << snapshot_segment_index_file_path << "]" << status; + return status; + } + } + } else if (tablet_schema.has_inverted_index()) { + auto index_file = InvertedIndexDescriptor::get_index_file_path_v2( + InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path)); + auto snapshot_segment_index_file_path = + fmt::format("{}/{}_{}.binlog-index", full_path, rowset_id, segment_index); + VLOG_DEBUG << "copy " << index_file << " to " << snapshot_segment_index_file_path; + status = io::global_local_filesystem()->copy_path(index_file, + snapshot_segment_index_file_path); + if (!status.ok()) { + LOG(WARNING) << "fail to copy binlog index file. [src=" << index_file + << ", dest=" << snapshot_segment_index_file_path << "]" << status; + return status; + } + } + } } + + std::move(rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->begin(), + rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->end(), + google::protobuf::RepeatedFieldBackInserter( + all_binlog_metas_pb->mutable_rowset_binlog_metas())); + return Status::OK(); } diff --git a/be/src/olap/task/engine_storage_migration_task.h b/be/src/olap/task/engine_storage_migration_task.h index 8858854de921d4..7578b7de94f352 100644 --- a/be/src/olap/task/engine_storage_migration_task.h +++ b/be/src/olap/task/engine_storage_migration_task.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include #include @@ -69,7 +71,8 @@ class EngineStorageMigrationTask final : public EngineTask { // TODO: hkp // rewrite this function Status _copy_index_and_data_files(const std::string& full_path, - const std::vector& consistent_rowsets) const; + const std::vector& consistent_rowsets, + RowsetBinlogMetasPB* all_binlog_metas_pb) const; private: StorageEngine& _engine;