Skip to content

Commit

Permalink
[fix](replica num) Fix the decrease in the number of replicas and une… (
Browse files Browse the repository at this point in the history
#48704)

…ven distribution of replicas among bes
  • Loading branch information
deardeng authored Mar 12, 2025
1 parent 3391d1a commit 74c8eed
Show file tree
Hide file tree
Showing 4 changed files with 313 additions and 1 deletion.
118 changes: 117 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.View;
import org.apache.doris.cloud.alter.CloudSchemaChangeHandler;
import org.apache.doris.common.AnalysisException;
Expand All @@ -82,6 +85,8 @@
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TOdbcTableType;
import org.apache.doris.thrift.TSortType;
import org.apache.doris.thrift.TTabletType;
Expand All @@ -101,6 +106,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class Alter {
private static final Logger LOG = LogManager.getLogger(Alter.class);
Expand Down Expand Up @@ -988,6 +994,15 @@ public void modifyPartitionsProperty(Database db,

// modify meta here
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
Map<Long, Long> tableBeToReplicaNumMap = Maps.newHashMap();
for (String partitionName : partitionNames) {
Partition partition = olapTable.getPartition(partitionName, isTempPartition);
Map<Long, Long> partitionBeToReplicaNumMap = getReplicaCountByBackend(partition);

for (Map.Entry<Long, Long> entry : partitionBeToReplicaNumMap.entrySet()) {
tableBeToReplicaNumMap.merge(entry.getKey(), entry.getValue(), Long::sum);
}
}
for (String partitionName : partitionNames) {
Partition partition = olapTable.getPartition(partitionName, isTempPartition);
// 4. data property
Expand Down Expand Up @@ -1040,6 +1055,10 @@ public void modifyPartitionsProperty(Database db,
}
// 2. replica allocation
if (!replicaAlloc.isNotSet()) {
if (Config.isNotCloudMode() && !olapTable.isColocateTable()) {
setReplicasToDrop(partition, partitionInfo.getReplicaAllocation(partition.getId()),
replicaAlloc, tableBeToReplicaNumMap);
}
partitionInfo.setReplicaAllocation(partition.getId(), replicaAlloc);
}
// 3. in memory
Expand All @@ -1062,11 +1081,108 @@ public void modifyPartitionsProperty(Database db,
Env.getCurrentEnv().getEditLog().logBatchModifyPartition(info);
}

public void setReplicasToDrop(Partition partition,
ReplicaAllocation oldReplicaAlloc,
ReplicaAllocation newReplicaAlloc,
Map<Long, Long> tableBeToReplicaNumMap) {
if (newReplicaAlloc.getAllocMap().entrySet().stream().noneMatch(
entry -> entry.getValue() < oldReplicaAlloc.getReplicaNumByTag(entry.getKey()))) {
return;
}

SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
List<Long> aliveBes = systemInfoService.getAllBackendIds(true);

processReplicasInPartition(partition,
tableBeToReplicaNumMap, systemInfoService, oldReplicaAlloc, newReplicaAlloc, aliveBes);
}

private void processReplicasInPartition(Partition partition,
Map<Long, Long> tableBeToReplicaNumMap, SystemInfoService systemInfoService,
ReplicaAllocation oldReplicaAlloc, ReplicaAllocation newReplicaAlloc,
List<Long> aliveBes) {
List<Tag> changeTags = newReplicaAlloc.getAllocMap().entrySet().stream()
.filter(entry -> entry.getValue() < oldReplicaAlloc.getReplicaNumByTag(entry.getKey()))
.map(Map.Entry::getKey).collect(Collectors.toList());
Map<Long, Long> partitionBeToReplicaNumMap = getReplicaCountByBackend(partition);
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
if (!isTabletHealthy(tablet, systemInfoService, partition, oldReplicaAlloc, aliveBes)) {
continue;
}
Map<Tag, List<Replica>> tagToReplicaMap = getReplicasWithTag(tablet);
for (Tag tag : changeTags) {
List<Replica> toDealReplicas = tagToReplicaMap.get(tag);
if (toDealReplicas == null || toDealReplicas.isEmpty()) {
continue;
}
sortReplicasByBackendCount(toDealReplicas, tableBeToReplicaNumMap, partitionBeToReplicaNumMap);
int replicasToDrop = oldReplicaAlloc.getReplicaNumByTag(tag)
- newReplicaAlloc.getReplicaNumByTag(tag);
markReplicasForDropping(toDealReplicas, replicasToDrop,
tableBeToReplicaNumMap, partitionBeToReplicaNumMap);
}
}
}
}

private boolean isTabletHealthy(Tablet tablet, SystemInfoService systemInfoService,
Partition partition, ReplicaAllocation oldReplicaAlloc,
List<Long> aliveBes) {
return tablet.getHealth(systemInfoService, partition.getVisibleVersion(), oldReplicaAlloc, aliveBes)
.status == Tablet.TabletStatus.HEALTHY;
}


private Map<Tag, List<Replica>> getReplicasWithTag(Tablet tablet) {
return tablet.getReplicas().stream()
.collect(Collectors.groupingBy(replica -> Env.getCurrentSystemInfo()
.getBackend(replica.getBackendIdWithoutException()).getLocationTag()));
}

private void sortReplicasByBackendCount(List<Replica> replicas,
Map<Long, Long> tableBeToReplicaNumMap,
Map<Long, Long> partitionBeToReplicaNumMap) {
replicas.sort((Replica r1, Replica r2) -> {
long countPartition1 = partitionBeToReplicaNumMap.getOrDefault(r1.getBackendIdWithoutException(), 0L);
long countPartition2 = partitionBeToReplicaNumMap.getOrDefault(r2.getBackendIdWithoutException(), 0L);
if (countPartition1 != countPartition2) {
return Long.compare(countPartition2, countPartition1);
}
long countTable1 = tableBeToReplicaNumMap.getOrDefault(r1.getBackendIdWithoutException(), 0L);
long countTable2 = tableBeToReplicaNumMap.getOrDefault(r2.getBackendIdWithoutException(), 0L);
return Long.compare(countTable2, countTable1); // desc sort
});
}

private void markReplicasForDropping(List<Replica> replicas, int replicasToDrop,
Map<Long, Long> tableBeToReplicaNumMap,
Map<Long, Long> partitionBeToReplicaNumMap) {
for (int i = 0; i < replicas.size(); i++) {
Replica r = replicas.get(i);
long beId = r.getBackendIdWithoutException();
if (i >= replicasToDrop) {
r.setScaleInDropTimeStamp(-1); // Mark for not dropping
} else {
r.setScaleInDropTimeStamp(System.currentTimeMillis()); // Mark for dropping
tableBeToReplicaNumMap.put(beId, tableBeToReplicaNumMap.get(beId) - 1);
partitionBeToReplicaNumMap.put(beId, partitionBeToReplicaNumMap.get(beId) - 1);
}
}
}

public static Map<Long, Long> getReplicaCountByBackend(Partition partition) {
return partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE).stream()
.flatMap(index -> index.getTablets().stream())
.flatMap(tablet -> tablet.getBackendIds().stream())
.collect(Collectors.groupingBy(id -> id, Collectors.counting()));
}

public void checkNoForceProperty(Map<String, String> properties) throws DdlException {
for (RewriteProperty property : PropertyAnalyzer.getInstance().getForceProperties()) {
if (properties.containsKey(property.key())) {
throw new DdlException("Cann't modify property '" + property.key() + "'"
+ (Config.isCloudMode() ? " in cloud mode" : "") + ".");
+ (Config.isCloudMode() ? " in cloud mode" : "") + ".");
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ public static class ReplicaContext {

private long userDropTime = -1;

private long scaleInDropTime = -1;

private long lastReportVersion = 0;

public Replica() {
Expand Down Expand Up @@ -862,6 +864,22 @@ public boolean isUserDrop() {
return false;
}

public void setScaleInDropTimeStamp(long scaleInDropTime) {
this.scaleInDropTime = scaleInDropTime;
}

public boolean isScaleInDrop() {
if (this.scaleInDropTime > 0) {
if (System.currentTimeMillis() - this.scaleInDropTime
< Config.manual_drop_replica_valid_second * 1000L) {
return true;
}
this.scaleInDropTime = -1;
}
return false;
}


public boolean isAlive() {
return getState() != ReplicaState.CLONE
&& getState() != ReplicaState.DECOMMISSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ private void handleRedundantReplica(TabletSchedCtx tabletCtx, boolean force) thr
|| deleteReplicaNotInValidTag(tabletCtx, force)
|| deleteReplicaChosenByRebalancer(tabletCtx, force)
|| deleteReplicaOnUrgentHighDisk(tabletCtx, force)
|| deleteFromScaleInDropReplicas(tabletCtx, force)
|| deleteReplicaOnHighLoadBackend(tabletCtx, force)) {
// if we delete at least one redundant replica, we still throw a SchedException with status FINISHED
// to remove this tablet from the pendingTablets(consider it as finished)
Expand Down Expand Up @@ -1089,6 +1090,17 @@ private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletCtx, boolean
return deleteFromHighLoadBackend(tabletCtx, tabletCtx.getReplicas(), force, statistic);
}

private boolean deleteFromScaleInDropReplicas(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
// Check if there are any scale drop replicas
for (Replica replica : tabletCtx.getReplicas()) {
if (replica.isScaleInDrop()) {
deleteReplicaInternal(tabletCtx, replica, "scale drop replica", force);
return true;
}
}
return false;
}

private boolean deleteFromHighLoadBackend(TabletSchedCtx tabletCtx, List<Replica> replicas,
boolean force, LoadStatisticForTag statistic) throws SchedException {
Replica chosenReplica = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.alter.Alter;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.utframe.TestWithFeService;

import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class DecreaseReplicationNumTest extends TestWithFeService {

private Database db;

@Override
protected void beforeCreatingConnectContext() throws Exception {
Config.enable_debug_points = true;
Config.disable_balance = true;
Config.drop_backend_after_decommission = false;
Config.tablet_schedule_interval_ms = 1000L;
Config.tablet_checker_interval_ms = 1000L;
}

@Override
protected int backendNum() {
return 5;
}

@Override
protected void runBeforeAll() throws Exception {
Thread.sleep(1000);
createDatabase("test");
useDatabase("test");
db = Env.getCurrentInternalCatalog().getDbOrMetaException("test");
}

@Override
protected void runBeforeEach() throws Exception {
// set back to default value
Config.max_scheduling_tablets = 2000;
for (Table table : db.getTables()) {
dropTable(table.getName(), true);
}
Env.getCurrentEnv().getTabletScheduler().clear();
DebugPointUtil.clearDebugPoints();
Assertions.assertTrue(checkBEHeartbeat(Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG)));
}


@Test
public void testDecreaseReplicaNum() throws Exception {
createTable("CREATE TABLE tbl1 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 10"
+ " PROPERTIES ('replication_num' = '5')");

OlapTable table = (OlapTable) db.getTableOrMetaException("tbl1");
Partition partition = table.getPartitions().iterator().next();
Map<Long, Long> beIdToTabletNum = Alter.getReplicaCountByBackend(partition);
Assertions.assertEquals(5, beIdToTabletNum.size());
Assertions.assertEquals(Lists.newArrayList(10L, 10L, 10L, 10L, 10L), new ArrayList<>(beIdToTabletNum.values()));
beIdToTabletNum.forEach((key, value) -> Assertions.assertEquals(value, 10L));

List<Backend> backends = Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG);
Assertions.assertEquals(backendNum(), backends.size());
Backend highLoadBe = backends.get(0);
DebugPointUtil.addDebugPointWithValue("FE.HIGH_LOAD_BE_ID", highLoadBe.getId());

alterTableSync("ALTER TABLE tbl1 MODIFY PARTITION(*) SET ('replication_num' = '3')");
boolean succ = false;
for (int i = 0; i < 100; i++) {
beIdToTabletNum = Alter.getReplicaCountByBackend(partition);
Set<Long> afterAlter = new HashSet<>(beIdToTabletNum.values());
// wait for scheduler
if (afterAlter.size() == 1 && !beIdToTabletNum.containsValue(10L)) {
Assertions.assertTrue(afterAlter.contains(6L));
Assertions.assertEquals(Lists.newArrayList(6L, 6L, 6L, 6L, 6L), new ArrayList<>(beIdToTabletNum.values()));
succ = true;
break;
}
Thread.sleep(1000);
}
Assertions.assertTrue(succ);
}

@Test
public void testDecreaseMultiPartitionReplicaNum() throws Exception {
createTable("create table test_multi(id int, part int) "
+ "partition by range(part) ("
+ " partition p1 values[('1'), ('2')),"
+ " partition p2 values[('2'), ('3')),"
+ " partition p3 values[('3'), ('4'))"
+ ") "
+ "distributed by hash(id) BUCKETS 9 "
+ "properties ('replication_num'='4')");

OlapTable table = (OlapTable) db.getTableOrMetaException("test_multi");
List<Partition> partitions = table.getAllPartitions();
partitions.forEach(p -> {
Map<Long, Long> beIdToTabletNum = Alter.getReplicaCountByBackend(p);
Assertions.assertEquals(5, beIdToTabletNum.size());
List<Long> sortedValues = new ArrayList<>(beIdToTabletNum.values());
sortedValues.sort(Collections.reverseOrder());
Assertions.assertEquals(Lists.newArrayList(8L, 7L, 7L, 7L, 7L), sortedValues);
});

List<Backend> backends = Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG);
Assertions.assertEquals(backendNum(), backends.size());
Backend highLoadBe = backends.get(0);
DebugPointUtil.addDebugPointWithValue("FE.HIGH_LOAD_BE_ID", highLoadBe.getId());

alterTableSync("ALTER TABLE test_multi MODIFY PARTITION(*) SET ('replication_num' = '2')");
partitions.forEach(p -> {
boolean succ = false;
for (int i = 0; i < 100; i++) {
Map<Long, Long> beIdToTabletNum = Alter.getReplicaCountByBackend(p);
Set<Long> afterAlter = new HashSet<>(beIdToTabletNum.values());
List<Long> sortedValues = new ArrayList<>(beIdToTabletNum.values());
sortedValues.sort(Collections.reverseOrder());
// wait for scheduler
if (afterAlter.size() == 2 && beIdToTabletNum.containsValue(4L) && beIdToTabletNum.containsValue(3L)) {
Assertions.assertEquals(Lists.newArrayList(4L, 4L, 4L, 3L, 3L), sortedValues);
succ = true;
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
System.out.println(ignored);
}
}
Assertions.assertTrue(succ);
});

}
}

0 comments on commit 74c8eed

Please sign in to comment.