Skip to content

Commit

Permalink
[fix](cloud) Fix tablet_id is zero when txn lazy commit
Browse files Browse the repository at this point in the history
* when ry and ms convert tmp rowsets at the same times, and finish convert task
  with different partitions severally, may cause tablet_id variable is zero
  • Loading branch information
SWJTU-ZhangLei committed Oct 17, 2024
1 parent 0bbf502 commit 7c5333b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 15 deletions.
4 changes: 2 additions & 2 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ void commit_txn_immediately(
std::tie(code, msg) = task->wait();
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
<< " code=" << code << "msg=" << msg;
<< " code=" << code << " msg=" << msg;
return;
}
last_pending_txn_id = 0;
Expand Down Expand Up @@ -1654,7 +1654,7 @@ void commit_txn_eventually(
std::tie(code, msg) = task->wait();
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
<< " code=" << code << "msg=" << msg;
<< " code=" << code << " msg=" << msg;
return;
}

Expand Down
58 changes: 45 additions & 13 deletions cloud/src/meta-service/txn_lazy_committer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,31 +303,33 @@ void TxnLazyCommitTask::commit() {
code_ = MetaServiceCode::OK;
msg_.clear();
int64_t db_id;
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowset_metas;
scan_tmp_rowset(instance_id_, txn_id_, txn_kv_, code_, msg_, &db_id, &tmp_rowset_metas);
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> all_tmp_rowset_metas;
scan_tmp_rowset(instance_id_, txn_id_, txn_kv_, code_, msg_, &db_id,
&all_tmp_rowset_metas);
if (code_ != MetaServiceCode::OK) {
LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id_ << " code=" << code_;
break;
}

VLOG_DEBUG << "txn_id=" << txn_id_
<< " tmp_rowset_metas.size()=" << tmp_rowset_metas.size();
if (tmp_rowset_metas.size() == 0) {
<< " tmp_rowset_metas.size()=" << all_tmp_rowset_metas.size();
if (all_tmp_rowset_metas.size() == 0) {
LOG(INFO) << "empty tmp_rowset_metas, txn_id=" << txn_id_;
}

// <partition_id, tmp_rowsets>
std::unordered_map<int64_t,
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>>
partition_to_tmp_rowset_metas;
for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas) {
for (auto& [tmp_rowset_key, tmp_rowset_pb] : all_tmp_rowset_metas) {
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].emplace_back();
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().first =
tmp_rowset_key;
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().second =
tmp_rowset_pb;
}

int64_t table_id = -1;
// tablet_id -> TabletIndexPB
std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
for (auto& [partition_id, tmp_rowset_metas] : partition_to_tmp_rowset_metas) {
Expand All @@ -346,7 +348,6 @@ void TxnLazyCommitTask::commit() {
}
if (code_ != MetaServiceCode::OK) break;

DCHECK(tmp_rowset_metas.size() > 0);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
Expand All @@ -357,14 +358,39 @@ void TxnLazyCommitTask::commit() {
break;
}

int64_t table_id = -1;
for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas) {
if (table_id <= 0) {
table_id = tablet_ids[tmp_rowset_pb.tablet_id()].table_id();
DCHECK(tmp_rowset_metas.size() > 0);
if (table_id <= 0) {
if (tablet_ids.size() > 0) {
LOG(INFO) << "zhangleixxx" << tablet_ids.begin()->second.table_id();
table_id = tablet_ids.begin()->second.table_id();
} else {
// get table_id from storage
int64_t first_tablet_id = tmp_rowset_metas.begin()->second.tablet_id();
std::string tablet_idx_key =
meta_tablet_idx_key({instance_id_, first_tablet_id});
std::string tablet_idx_val;
err = txn->get(tablet_idx_key, &tablet_idx_val, true);
if (TxnErrorCode::TXN_OK != err) {
code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get tablet idx, txn_id=" << txn_id_
<< " key=" << hex(tablet_idx_key) << " err=" << err;
msg_ = ss.str();
LOG(WARNING) << msg_;
break;
}

TabletIndexPB tablet_idx_pb;
if (!tablet_idx_pb.ParseFromString(tablet_idx_val)) {
code_ = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse tablet idx pb txn_id=" << txn_id_
<< " key=" << hex(tablet_idx_key);
msg_ = ss.str();
break;
}
table_id = tablet_idx_pb.table_id();
}
txn->remove(tmp_rowset_key);
LOG(INFO) << "remove tmp_rowset_key=" << hex(tmp_rowset_key)
<< " txn_id=" << txn_id_;
}

DCHECK(table_id > 0);
Expand Down Expand Up @@ -415,6 +441,12 @@ void TxnLazyCommitTask::commit() {
LOG(INFO) << "put ver_key=" << hex(ver_key) << " txn_id=" << txn_id_
<< " version_pb=" << version_pb.ShortDebugString();

for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas) {
txn->remove(tmp_rowset_key);
LOG(INFO) << "remove tmp_rowset_key=" << hex(tmp_rowset_key)
<< " txn_id=" << txn_id_;
}

err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code_ = cast_as<ErrCategory::COMMIT>(err);
Expand Down

0 comments on commit 7c5333b

Please sign in to comment.