Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](log) Print last failure status for unhealthy replica #38153

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1705,6 +1705,7 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
VLOG_NOTICE << "get publish version task. signature=" << req.signature;

std::set<TTabletId> error_tablet_ids;
std::unordered_map<TTabletId, Status> error_tablet_id_to_status;
std::map<TTabletId, TVersion> succ_tablets;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
Expand All @@ -1716,8 +1717,10 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
succ_tablets.clear();
error_tablet_ids.clear();
table_id_to_tablet_id_to_num_delta_rows.clear();
error_tablet_id_to_status.clear();
EnginePublishVersionTask engine_task(_engine, publish_version_req, &error_tablet_ids,
&succ_tablets, &discontinuous_version_tablets,
&error_tablet_id_to_status, &succ_tablets,
&discontinuous_version_tablets,
&table_id_to_tablet_id_to_num_delta_rows);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
status = engine_task.execute();
Expand Down Expand Up @@ -1819,8 +1822,14 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
finish_task_request.__set_signature(req.signature);
finish_task_request.__set_report_version(s_report_version);
finish_task_request.__set_succ_tablets(succ_tablets);
finish_task_request.__set_error_tablet_ids(
std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end()));
auto error_tablets = std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end());
finish_task_request.__set_error_tablet_ids(error_tablets);
std::vector<TStatus> error_statuses;
error_statuses.reserve(error_tablets.size());
for (const auto& tablet_id : error_tablets) {
error_statuses.emplace_back(error_tablet_id_to_status[tablet_id].to_thrift());
}
finish_task_request.__set_error_statuses(error_statuses);
finish_task_request.__set_table_id_to_tablet_id_to_delta_num_rows(
table_id_to_tablet_id_to_num_delta_rows);
finish_task(finish_task_request);
Expand Down
39 changes: 25 additions & 14 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,26 @@ void TabletPublishStatistics::record_in_bvar() {

EnginePublishVersionTask::EnginePublishVersionTask(
StorageEngine& engine, const TPublishVersionRequest& publish_version_req,
std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets,
std::set<TTabletId>* error_tablet_ids,
std::unordered_map<TTabletId, Status>* error_tablet_id_to_status,
std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets,
std::map<TTableId, std::map<TTabletId, int64_t>>* table_id_to_tablet_id_to_num_delta_rows)
: _engine(engine),
_publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_error_tablet_id_to_status(error_tablet_id_to_status),
_succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets),
_table_id_to_tablet_id_to_num_delta_rows(table_id_to_tablet_id_to_num_delta_rows) {
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"TabletPublishTxnTask");
}

void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id, Status st) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
_error_tablet_ids->insert(tablet_id);
_error_tablet_id_to_status->emplace(tablet_id, std::move(st));
}

Status EnginePublishVersionTask::execute() {
Expand Down Expand Up @@ -158,19 +162,19 @@ Status EnginePublishVersionTask::execute() {
// and receive fe's publish version task
// this be must return as an error tablet
if (rowset == nullptr) {
add_error_tablet_id(tablet_info.tablet_id);
res = Status::Error<PUSH_ROWSET_NOT_FOUND>(
"could not find related rowset for tablet {}, txn id {}",
tablet_info.tablet_id, transaction_id);
add_error_tablet_id(tablet_info.tablet_id, res);
continue;
}
TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_info.tablet_id,
tablet_info.tablet_uid);
if (tablet == nullptr) {
add_error_tablet_id(tablet_info.tablet_id);
res = Status::Error<PUSH_TABLE_NOT_EXIST>(
res = Status::Error<TABLET_MISSING>(
"can't get tablet when publish version. tablet_id={}",
tablet_info.tablet_id);
add_error_tablet_id(tablet_info.tablet_id, res);
continue;
}
// in uniq key model with merge-on-write, we should see all
Expand All @@ -197,7 +201,6 @@ Status EnginePublishVersionTask::execute() {
continue;
}
auto handle_version_not_continuous = [&]() {
add_error_tablet_id(tablet_info.tablet_id);
// When there are too many missing versions, do not directly retry the
// publish and handle it through async publish.
if (max_version + config::mow_publish_max_discontinuous_version_num <
Expand All @@ -209,10 +212,6 @@ Status EnginePublishVersionTask::execute() {
_discontinuous_version_tablets->emplace_back(
partition_id, tablet_info.tablet_id, version.first);
}
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
"version not continuous for mow, tablet_id={}, "
"tablet_max_version={}, txn_version={}",
tablet_info.tablet_id, max_version, version.first);
int64_t missed_version = max_version + 1;
int64_t missed_txn_id = _engine.txn_manager()->get_txn_by_tablet_version(
tablet->tablet_id(), missed_version);
Expand All @@ -222,11 +221,16 @@ Status EnginePublishVersionTask::execute() {
"version={}, tablet_id={}, transaction_id={}",
missed_version, missed_txn_id, version.second, tablet->tablet_id(),
_publish_version_req.transaction_id);
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
"version not continuous for mow, tablet_id={}, "
"tablet_max_version={}, txn_version={}",
tablet_info.tablet_id, max_version, version.first);
if (first_time_update) {
LOG(INFO) << msg;
} else {
LOG_EVERY_SECOND(INFO) << msg;
}
add_error_tablet_id(tablet_info.tablet_id, res);
};
// The versions during the schema change period need to be also continuous
if (tablet_state == TabletState::TABLET_NOTREADY) {
Expand Down Expand Up @@ -290,7 +294,10 @@ Status EnginePublishVersionTask::execute() {
TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_info.tablet_id);
auto tablet_id = tablet_info.tablet_id;
if (tablet == nullptr) {
add_error_tablet_id(tablet_id);
Status st = Status::Error<TABLET_MISSING, false>(
"can't get tablet when publish version. tablet_id={}",
tablet_info.tablet_id);
add_error_tablet_id(tablet_id, st);
_succ_tablets->erase(tablet_id);
LOG(WARNING) << "publish version failed on transaction, not found tablet. "
<< "transaction_id=" << transaction_id << ", tablet_id=" << tablet_id
Expand All @@ -304,7 +311,11 @@ Status EnginePublishVersionTask::execute() {
// current just report 0
(*_succ_tablets)[tablet_id] = 0;
} else {
add_error_tablet_id(tablet_id);
Status st = Status::Error<VERSION_NOT_EXIST, false>(
"check_version_exist failed when publish version. tablet_id={}, "
"version={}",
tablet_info.tablet_id, version.to_string());
add_error_tablet_id(tablet_id, st);
if (!res.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
LOG(WARNING)
<< "publish version failed on transaction, tablet version not "
Expand Down Expand Up @@ -387,7 +398,7 @@ void TabletPublishTxnTask::handle() {
LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << _result;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id, _result);
return;
}

Expand All @@ -401,7 +412,7 @@ void TabletPublishTxnTask::handle() {
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << _result;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id, _result);
return;
}
int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,17 @@ class EnginePublishVersionTask final : public EngineTask {
public:
EnginePublishVersionTask(
StorageEngine& engine, const TPublishVersionRequest& publish_version_req,
std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets,
std::set<TTabletId>* error_tablet_ids,
std::unordered_map<TTabletId, Status>* error_tablet_id_to_status,
std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets,
std::map<TTableId, std::map<TTabletId, int64_t>>*
table_id_to_tablet_id_to_num_delta_rows);
~EnginePublishVersionTask() override = default;

Status execute() override;

void add_error_tablet_id(int64_t tablet_id);
void add_error_tablet_id(int64_t tablet_id, Status st);

private:
void _calculate_tbl_num_delta_rows(
Expand All @@ -108,6 +110,7 @@ class EnginePublishVersionTask final : public EngineTask {
const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
std::set<TTabletId>* _error_tablet_ids = nullptr;
std::unordered_map<TTabletId, Status>* _error_tablet_id_to_status;
std::map<TTabletId, TVersion>* _succ_tablets;
std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets = nullptr;
std::map<TTableId, std::map<TTabletId, int64_t>>* _table_id_to_tablet_id_to_num_delta_rows =
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.catalog;

import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -109,6 +110,7 @@ public static class ReplicaContext {
// the last load failed version
@SerializedName(value = "lfv", alternate = {"lastFailedVersion"})
private long lastFailedVersion = -1L;
private Status lastFailedStatus = null;
@Deprecated
@SerializedName(value = "lfvh", alternate = {"lastFailedVersionHash"})
private long lastFailedVersionHash = 0L;
Expand Down Expand Up @@ -292,6 +294,14 @@ public long getLastFailedVersion() {
return lastFailedVersion;
}

public void setLastFailedStatus(Status st) {
this.lastFailedStatus = st;
}

public Status getLastFailedStatus() {
return lastFailedStatus;
}

public long getLastFailedTimestamp() {
return lastFailedTimestamp;
}
Expand Down Expand Up @@ -407,6 +417,7 @@ public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVer
this.lastFailedVersion = -1;
this.lastFailedTimestamp = -1;
this.lastFailedVersionHash = 0;
this.lastFailedStatus = null;
}
if (this.lastFailedVersion > 0
&& this.lastSuccessVersion > this.lastFailedVersion) {
Expand Down Expand Up @@ -516,6 +527,7 @@ private void updateReplicaVersion(long newVersion, long lastFailedVersion, long
this.lastFailedVersion = -1;
this.lastFailedVersionHash = 0;
this.lastFailedTimestamp = -1;
this.lastFailedStatus = null;
if (this.version < this.lastSuccessVersion) {
this.version = this.lastSuccessVersion;
}
Expand Down Expand Up @@ -650,6 +662,8 @@ public String toString() {
strBuffer.append(lastSuccessVersion);
strBuffer.append(", lastFailedTimestamp=");
strBuffer.append(lastFailedTimestamp);
strBuffer.append(", lastFailedStatus=");
strBuffer.append(lastFailedStatus);
strBuffer.append(", schemaHash=");
strBuffer.append(schemaHash);
strBuffer.append(", state=");
Expand Down Expand Up @@ -686,6 +700,10 @@ public String toStringSimple(boolean checkBeAlive) {
strBuffer.append(lastSuccessVersion);
strBuffer.append(", lastFailedTimestamp=");
strBuffer.append(lastFailedTimestamp);
if (lastFailedStatus != null) {
strBuffer.append(", lastFailedStatus=");
strBuffer.append(lastFailedStatus);
}
}
if (isBad()) {
strBuffer.append(", isBad=true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
errorTabletIds = request.getErrorTabletIds();
}

List<TStatus> errorStatuses = null;
if (request.isSetErrorStatuses()) {
errorStatuses = request.getErrorStatuses();
}

if (request.isSetReportVersion()) {
// report version is required. here we check if set, for compatibility.
long reportVersion = request.getReportVersion();
Expand All @@ -521,6 +526,7 @@ private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
PublishVersionTask publishVersionTask = (PublishVersionTask) task;
publishVersionTask.setSuccTablets(succTablets);
publishVersionTask.addErrorTablets(errorTabletIds);
publishVersionTask.addErrorStatuses(errorTabletIds, errorStatuses);
publishVersionTask.setFinished(true);

if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.doris.task;

import org.apache.doris.common.Status;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TPublishVersionRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TTaskType;

import com.google.common.collect.Maps;
Expand Down Expand Up @@ -46,6 +48,8 @@ public class PublishVersionTask extends AgentTask {

private List<Long> errorTablets;

private Map<Long, Status> errorStatuses = Maps.newHashMap();

// tabletId => version, current version = 0
private Map<Long, Long> succTablets;

Expand Down Expand Up @@ -99,6 +103,20 @@ public synchronized void addErrorTablets(List<Long> errorTablets) {
this.errorTablets.addAll(errorTablets);
}

public synchronized Map<Long, Status> getErrorStatuses() {
return errorStatuses;
}

public synchronized void addErrorStatuses(List<Long> errorTablets, List<TStatus> errorStatuses) {
this.errorStatuses.clear();
if (errorStatuses == null) {
return;
}
for (int i = 0; i < errorTablets.size(); i++) {
this.errorStatuses.put(errorTablets.get(i), new Status(errorStatuses.get(i)));
}
}

public void setTableIdTabletsDeltaRows(Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows) {
this.tableIdToTabletDeltaRows.putAll(tableIdToTabletDeltaRows);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
Expand Down Expand Up @@ -2177,6 +2178,7 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat
}
Map<Long, Triple<Long, Long, Partition>> partitionMap = new HashMap<>();
Map<Long, Triple<Long, Long, OlapTable>> tableMap = new HashMap<>();
Map<Long, Status> replicaLastErrorStatuses = transactionState.getReplicaLastErrorStatuses();
for (TableCommitInfo tableCommitInfo : tableCommitInfos) {
long tableId = tableCommitInfo.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
Expand Down Expand Up @@ -2232,6 +2234,10 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat
lastFailedVersion = newCommitVersion;
}
failedVersionSetReplicas.add(replica.getId());
Status st = replicaLastErrorStatuses.get(replica.getId());
if (st != null) {
replica.setLastFailedStatus(st);
}
if (LOG.isDebugEnabled()) {
LOG.debug("txn_id={}, set replica={}, last_failed_version={}",
transactionState.getTransactionId(), replica.getId(),
Expand Down
Loading
Loading