Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug](schema change)fix schema change cause load failed due to err -215 #23836

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
Expand Down Expand Up @@ -105,6 +106,10 @@ public enum JobType {
@SerializedName(value = "failedTabletBackends")
protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();

// used for delete decommission tablet
@SerializedName(value = "deleteTabletWatermarkTxnId")
protected long deleteTabletWatermarkTxnId = -1;

public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, long tableId, String tableName,
long timeoutMs) {
this.rawSql = rawSql;
Expand Down Expand Up @@ -319,4 +324,12 @@ public static AlterJobV2 read(DataInput in) throws IOException {
public String toJson() {
return GsonUtils.GSON.toJson(this);
}

protected void assignDeleteTabletWatermarkTxnId() {
try {
this.deleteTabletWatermarkTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
} catch (UserException e) {
LOG.warn("get next transaction id failed");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.IdGeneratorUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.PropertyAnalyzer;
Expand Down Expand Up @@ -946,10 +947,14 @@ public void processBatchDropRollup(List<AlterClause> dropRollupClauses, Database
// drop data in memory
Set<Long> indexIdSet = new HashSet<>();
Set<String> rollupNameSet = new HashSet<>();

// used for delete tablet, delete tablets when all transactions before watermarkTxnId are finished
long deleteTabletWatermarkTxnId =
Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
for (AlterClause alterClause : dropRollupClauses) {
DropRollupClause dropRollupClause = (DropRollupClause) alterClause;
String rollupIndexName = dropRollupClause.getRollupName();
long rollupIndexId = dropMaterializedView(rollupIndexName, olapTable);
long rollupIndexId = dropMaterializedView(rollupIndexName, olapTable, deleteTabletWatermarkTxnId);
indexIdSet.add(rollupIndexId);
rollupNameSet.add(rollupIndexName);
}
Expand All @@ -959,10 +964,13 @@ public void processBatchDropRollup(List<AlterClause> dropRollupClauses, Database
long dbId = db.getId();
long tableId = olapTable.getId();
String tableName = olapTable.getName();
editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, indexIdSet));
editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, indexIdSet,
deleteTabletWatermarkTxnId));
deleteIndexList = indexIdSet.stream().collect(Collectors.toList());
LOG.info("finished drop rollup index[{}] in table[{}]",
String.join("", rollupNameSet), olapTable.getName());
} catch (UserException e) {
throw new DdlException(e.getMessage());
} finally {
olapTable.writeUnlock();
}
Expand All @@ -979,11 +987,14 @@ public void processDropMaterializedView(DropMaterializedViewStmt dropMaterialize
// Step1: check drop mv index operation
checkDropMaterializedView(mvName, olapTable);
// Step2; drop data in memory
long mvIndexId = dropMaterializedView(mvName, olapTable);
long deleteTabletWatermarkTxnId =
Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
long mvIndexId = dropMaterializedView(mvName, olapTable, deleteTabletWatermarkTxnId);
// Step3: log drop mv operation
EditLog editLog = Env.getCurrentEnv().getEditLog();
editLog.logDropRollup(
new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), mvIndexId, false, 0));
new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), mvIndexId, false, 0,
deleteTabletWatermarkTxnId));
deleteIndexList.add(mvIndexId);
LOG.info("finished drop materialized view [{}] in table [{}]", mvName, olapTable.getName());
} catch (MetaNotFoundException e) {
Expand All @@ -992,6 +1003,8 @@ public void processDropMaterializedView(DropMaterializedViewStmt dropMaterialize
} else {
throw e;
}
} catch (UserException e) {
throw new DdlException(e.getMessage());
} finally {
olapTable.writeUnlock();
}
Expand Down Expand Up @@ -1033,17 +1046,17 @@ private void checkDropMaterializedView(String mvName, OlapTable olapTable)
* @param olapTable
* @return
*/
private long dropMaterializedView(String mvName, OlapTable olapTable) {
private long dropMaterializedView(String mvName, OlapTable olapTable, long deleteTabletWatermarkTxnId) {
long mvIndexId = olapTable.getIndexIdByName(mvName);
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex = partition.getIndex(mvIndexId);
// delete rollup index
partition.deleteRollupIndex(mvIndexId);
// remove tablets from inverted index
// set watermarkTxnId for each tablet
for (Tablet tablet : rollupIndex.getTablets()) {
long tabletId = tablet.getId();
invertedIndex.deleteTablet(tabletId);
invertedIndex.addDecommissionTablet(tabletId, deleteTabletWatermarkTxnId);
}
}
olapTable.deleteIndexInfo(mvName);
Expand All @@ -1070,9 +1083,9 @@ public void replayDropRollup(DropInfo dropInfo, Env env) throws MetaNotFoundExce
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex = partition.deleteRollupIndex(rollupIndexId);

// remove from inverted index
// set watermarkTxnId for each tablet
for (Tablet tablet : rollupIndex.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
invertedIndex.addDecommissionTablet(tablet.getId(), dropInfo.getDeleteTabletWatermarkTxnId());
}
}
String rollupIndexName = olapTable.getIndexNameById(rollupIndexId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ protected boolean cancelImpl(String errMsg) {
return false;
}

assignDeleteTabletWatermarkTxnId();
cancelInternal();

jobState = JobState.CANCELLED;
Expand Down Expand Up @@ -673,7 +674,7 @@ private void cancelInternal() {
for (Long partitionId : partitionIdToRollupIndex.keySet()) {
MaterializedIndex rollupIndex = partitionIdToRollupIndex.get(partitionId);
for (Tablet rollupTablet : rollupIndex.getTablets()) {
invertedIndex.deleteTablet(rollupTablet.getId());
invertedIndex.addDecommissionTablet(rollupTablet.getId(), deleteTabletWatermarkTxnId);
}
Partition partition = tbl.getPartition(partitionId);
partition.deleteRollupIndex(rollupIndexId);
Expand Down Expand Up @@ -793,6 +794,7 @@ private void replayRunningJob(RollupJobV2 replayedJob) {
* Replay job in CANCELLED state.
*/
private void replayCancelled(RollupJobV2 replayedJob) {
this.deleteTabletWatermarkTxnId = replayedJob.deleteTabletWatermarkTxnId;
cancelInternal();
// try best to drop roll index, when job is cancelled
onCancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ protected void runRunningJob() throws AlterCancelException {
} // end for partitions
commitShadowIndex();
// all partitions are good
assignDeleteTabletWatermarkTxnId();
onFinished(tbl);
} finally {
tbl.writeUnlock();
Expand Down Expand Up @@ -715,8 +716,9 @@ private void onFinished(OlapTable tbl) {
partition.visualiseShadowIndex(shadowIdxId, originIdxId == partition.getBaseIndex().getId());

// delete origin replicas
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Tablet originTablet : droppedIdx.getTablets()) {
Env.getCurrentInvertedIndex().deleteTablet(originTablet.getId());
invertedIndex.addDecommissionTablet(originTablet.getId(), deleteTabletWatermarkTxnId);
}
}
}
Expand Down Expand Up @@ -786,7 +788,7 @@ protected synchronized boolean cancelImpl(String errMsg) {
if (jobState.isFinalState()) {
return false;
}

assignDeleteTabletWatermarkTxnId();
cancelInternal();

pruneMeta();
Expand Down Expand Up @@ -821,7 +823,7 @@ private void cancelInternal() {
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
MaterializedIndex shadowIdx = entry.getValue();
for (Tablet shadowTablet : shadowIdx.getTablets()) {
invertedIndex.deleteTablet(shadowTablet.getId());
invertedIndex.addDecommissionTablet(shadowTablet.getId(), deleteTabletWatermarkTxnId);
}
partition.deleteRollupIndex(shadowIdx.getId());
}
Expand Down Expand Up @@ -916,6 +918,7 @@ private void replayPendingJob(SchemaChangeJobV2 replayedJob) throws MetaNotFound
* Should replay all changes in runRunningJob()
*/
private void replayRunningJob(SchemaChangeJobV2 replayedJob) {
this.deleteTabletWatermarkTxnId = replayedJob.deleteTabletWatermarkTxnId;
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTableNullable(tableId);
Expand All @@ -941,6 +944,7 @@ private void replayRunningJob(SchemaChangeJobV2 replayedJob) {
* Replay job in CANCELLED state.
*/
private void replayCancelled(SchemaChangeJobV2 replayedJob) {
this.deleteTabletWatermarkTxnId = replayedJob.deleteTabletWatermarkTxnId;
cancelInternal();
// try best to drop shadow index
onCancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.cooldown.CooldownConf;
import org.apache.doris.master.PartitionInfoCollector.PartitionCollectInfo;
import org.apache.doris.persist.DeleteTabletInfo;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TStorageMedium;
Expand Down Expand Up @@ -106,6 +107,9 @@ public class TabletInvertedIndex {
// Notice only none-cloud use it for be reporting tablets. This map is empty in cloud mode.
private volatile ImmutableMap<Long, PartitionCollectInfo> partitionCollectInfoMap = ImmutableMap.of();

// tablet id -> watermark id
private Map<Long, Long> decommissionTabletMap = Maps.newConcurrentMap();

private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

public TabletInvertedIndex() {
Expand Down Expand Up @@ -142,9 +146,25 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
List<CooldownConf> cooldownConfToPush,
List<CooldownConf> cooldownConfToUpdate) {
List<Pair<TabletMeta, TTabletInfo>> cooldownTablets = new ArrayList<>();
long start = System.currentTimeMillis();
// delete decommission tablet when all transactions finished
try {
for (Map.Entry<Long, Long> entry : decommissionTabletMap.entrySet()) {
long tabletId = entry.getKey();
long watermarkId = entry.getValue();
TabletMeta tabletMeta = getTabletMeta(tabletId);
if (Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
watermarkId, tabletMeta.getDbId(), tabletMeta.getTableId(),
tabletMeta.getPartitionId())) {
deleteDecommissionTablet(tabletId);
Env.getCurrentEnv().getEditLog().logDeleteDecommissionTablet(new DeleteTabletInfo(tabletId));
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
long feTabletNum = 0;
long stamp = readLock();
long start = System.currentTimeMillis();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size());
Expand Down Expand Up @@ -627,6 +647,21 @@ public void deleteTablet(long tabletId) {
}
}

public void addDecommissionTablet(long tabletId, long deleteTabletWatermark) {
decommissionTabletMap.put(tabletId, deleteTabletWatermark);
if (LOG.isDebugEnabled()) {
LOG.debug("decommission tablet: {}, watermark: {}", tabletId, deleteTabletWatermark);
}
}

public void deleteDecommissionTablet(long tabletId) {
deleteTablet(tabletId);
decommissionTabletMap.remove(tabletId);
if (LOG.isDebugEnabled()) {
LOG.debug("delete decommission tablet: {}", tabletId);
}
}

public void addReplica(long tabletId, Replica replica) {
long stamp = writeLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,10 @@ private void dropTableInternal(Database db, Table table, boolean forceDrop,

Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
db.getId(), table.getId());
DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, -1L, forceDrop, recycleTime);

Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId());

DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, -1L, forceDrop, recycleTime, -1);
Env.getCurrentEnv().getEditLog().logDropTable(info);
Env.getCurrentEnv().getMtmvService().dropTable(table);
}
Expand Down Expand Up @@ -3229,7 +3232,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
try {
dropTable(db, tableId, true, false, 0L);
if (hadLogEditCreateTable) {
DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), -1L, true, 0L);
DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), -1L, true, 0L, -1);
Env.getCurrentEnv().getEditLog().logDropTable(info);
}
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.doris.persist.ConsistencyCheckInfo;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.DatabaseInfo;
import org.apache.doris.persist.DeleteTabletInfo;
import org.apache.doris.persist.DropDbInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
Expand Down Expand Up @@ -955,6 +956,11 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
}
case OperationType.OP_DELETE_DECOMMISSION_TABLET: {
data = DeleteTabletInfo.read(in);
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,17 @@ public class BatchDropInfo implements Writable {
@SerializedName(value = "indexIdSet")
private Set<Long> indexIdSet;

public BatchDropInfo(long dbId, long tableId, String tableName, Set<Long> indexIdSet) {
// used for delete decommission tablet
@SerializedName(value = "deleteTabletWatermarkTxnId")
private long deleteTabletWatermarkTxnId = -1;

public BatchDropInfo(long dbId, long tableId, String tableName, Set<Long> indexIdSet,
long deleteTabletWatermarkTxnId) {
this.dbId = dbId;
this.tableId = tableId;
this.tableName = tableName;
this.indexIdSet = indexIdSet;
this.deleteTabletWatermarkTxnId = deleteTabletWatermarkTxnId;
}

@Override
Expand All @@ -65,7 +71,8 @@ public boolean equals(Object other) {
}
BatchDropInfo otherBatchDropInfo = (BatchDropInfo) other;
return this.dbId == otherBatchDropInfo.dbId && this.tableId == otherBatchDropInfo.tableId
&& this.indexIdSet.equals(otherBatchDropInfo.indexIdSet);
&& this.indexIdSet.equals(otherBatchDropInfo.indexIdSet)
&& this.deleteTabletWatermarkTxnId == otherBatchDropInfo.deleteTabletWatermarkTxnId;
}

@Override
Expand Down Expand Up @@ -94,6 +101,10 @@ public String getTableName() {
return tableName;
}

public long getDeleteTabletWatermarkTxnId() {
return deleteTabletWatermarkTxnId;
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}
Expand Down
Loading
Loading