Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,8 @@ private void finishActiveMasterInitialization() throws IOException, InterruptedE
Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType = procedureExecutor
.getActiveProceduresNoCopy().stream().collect(Collectors.groupingBy(p -> p.getClass()));

// This manager must be accessed AFTER hbase:meta is confirmed on line..
this.tableStateManager = new TableStateManager(this);
// Create Assignment Manager
this.assignmentManager = createAssignmentManager(this, masterRegion);
this.assignmentManager.start();
Expand All @@ -1056,8 +1058,6 @@ private void finishActiveMasterInitialization() throws IOException, InterruptedE
.map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
Sets.union(rsListStorage.getAll(), walManager.getLiveServersFromWALDir()),
walManager.getSplittingServersFromWALDir());
// This manager must be accessed AFTER hbase:meta is confirmed on line..
this.tableStateManager = new TableStateManager(this);

startupTaskGroup.addTask("Initializing ZK system trackers");
initializeZKBasedSystemTrackers();
Expand Down Expand Up @@ -2007,7 +2007,7 @@ private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransi
// But if there are zero regions in transition, it can skip sleep to speed up.
while (
!interrupted && EnvironmentEdgeManager.currentTime() < nextBalanceStartTime
&& this.assignmentManager.getRegionStates().hasRegionsInTransition()
&& this.assignmentManager.getRegionTransitScheduledCount() > 0
) {
try {
Thread.sleep(100);
Expand All @@ -2019,8 +2019,7 @@ private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransi
// Throttling by max number regions in transition
while (
!interrupted && maxRegionsInTransition > 0
&& this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
>= maxRegionsInTransition
&& this.assignmentManager.getRegionTransitScheduledCount() >= maxRegionsInTransition
&& EnvironmentEdgeManager.currentTime() <= cutoffTime
) {
try {
Expand Down Expand Up @@ -2099,7 +2098,7 @@ public BalanceResponse balance(BalanceRequest request) throws IOException {

synchronized (this.balancer) {
// Only allow one balance run at at time.
if (this.assignmentManager.hasRegionsInTransition()) {
if (this.assignmentManager.getRegionTransitScheduledCount() > 0) {
List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition();
// if hbase:meta region is in transition, result of assignment cannot be recorded
// ignore the force flag in that case
Expand All @@ -2114,8 +2113,8 @@ public BalanceResponse balance(BalanceRequest request) throws IOException {

if (!request.isIgnoreRegionsInTransition() || metaInTransition) {
LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" + metaInTransition
+ ") because " + regionsInTransition.size() + " region(s) in transition: " + toPrint
+ (truncated ? "(truncated list)" : ""));
+ ") because " + assignmentManager.getRegionTransitScheduledCount()
+ " region(s) in transition: " + toPrint + (truncated ? "(truncated list)" : ""));
return responseBuilder.build();
}
}
Expand Down Expand Up @@ -2251,7 +2250,7 @@ public boolean normalizeRegions(final NormalizeTableFilterParams ntfp,
if (skipRegionManagementAction("region normalizer")) {
return false;
}
if (assignmentManager.hasRegionsInTransition()) {
if (assignmentManager.getRegionTransitScheduledCount() > 0) {
return false;
}

Expand Down Expand Up @@ -3081,7 +3080,7 @@ public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet<Option> option
case REGIONS_IN_TRANSITION: {
if (assignmentManager != null) {
builder.setRegionsInTransition(
assignmentManager.getRegionStates().getRegionsStateInTransition());
new ArrayList<>(assignmentManager.getRegionsStateInTransition()));
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -232,12 +235,15 @@ public class AssignmentManager {

private final int forceRegionRetainmentRetries;

private final RegionInTransitionTracker regionInTransitionTracker;

public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
this(master, masterRegion, new RegionStateStore(master, masterRegion));
}

AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
this.master = master;
regionInTransitionTracker = new RegionInTransitionTracker(master.getTableStateManager());
this.regionStateStore = stateStore;
this.metrics = new MetricsAssignmentManager();
this.masterRegion = masterRegion;
Expand Down Expand Up @@ -331,6 +337,8 @@ public void start() throws IOException, KeeperException {
regionNode.setLastHost(lastHost);
regionNode.setRegionLocation(regionLocation);
regionNode.setOpenSeqNum(openSeqNum);
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);

if (regionNode.getProcedure() != null) {
regionNode.getProcedure().stateLoaded(this, regionNode);
}
Expand Down Expand Up @@ -382,7 +390,7 @@ public void setupRIT(List<TransitRegionStateProcedure> procs) {
return;
}
}
LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
LOG.info("Attach {} to {} to restore", proc, regionNode);
regionNode.setProcedure(proc);
});
}
Expand Down Expand Up @@ -411,6 +419,7 @@ public void stop() {

// Stop the RegionStateStore
regionStates.clear();
regionInTransitionTracker.stop();

// Update meta events (for testing)
if (hasProcExecutor) {
Expand Down Expand Up @@ -1093,7 +1102,7 @@ private int submitUnassignProcedure(TableName tableName,
regionNode.lock();
try {
if (shouldSubmit.apply(regionNode)) {
if (regionNode.isInTransition()) {
if (regionNode.isTransitionScheduled()) {
logRIT.accept(regionNode);
inTransitionCount++;
continue;
Expand Down Expand Up @@ -1702,10 +1711,8 @@ public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
}

protected void update(final AssignmentManager am) {
final RegionStates regionStates = am.getRegionStates();
this.statTimestamp = EnvironmentEdgeManager.currentTime();
update(regionStates.getRegionsStateInTransition(), statTimestamp);
update(regionStates.getRegionFailedOpen(), statTimestamp);
update(am.getRegionsStateInTransition(), statTimestamp);

if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
LOG.debug("RITs over threshold: {}",
Expand Down Expand Up @@ -1873,6 +1880,11 @@ public void visitRegionState(Result result, final RegionInfo regionInfo, final S
if (regionNode.getProcedure() != null) {
regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
}
// add regions to RIT while visiting the meta
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
if (master.getServerManager().isServerDead(regionNode.getRegionLocation())) {
regionInTransitionTracker.regionCrashed(regionNode);
}
}
};

Expand Down Expand Up @@ -2046,15 +2058,52 @@ public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
return new Pair<Integer, Integer>(ritCount, states.size());
}

// This comparator sorts the RegionStates by time stamp then Region name.
// Comparing by timestamp alone can lead us to discard different RegionStates that happen
// to share a timestamp.
private final static class RegionStateStampComparator implements Comparator<RegionState> {
@Override
public int compare(final RegionState l, final RegionState r) {
int stampCmp = Long.compare(l.getStamp(), r.getStamp());
return stampCmp != 0 ? stampCmp : RegionInfo.COMPARATOR.compare(l.getRegion(), r.getRegion());
}
}

public final static RegionStateStampComparator REGION_STATE_STAMP_COMPARATOR =
new RegionStateStampComparator();

// ============================================================================================
// TODO: Region State In Transition
// ============================================================================================
public boolean hasRegionsInTransition() {
return regionStates.hasRegionsInTransition();
return regionInTransitionTracker.hasRegionsInTransition();
}

public List<RegionStateNode> getRegionsInTransition() {
return regionStates.getRegionsInTransition();
return regionInTransitionTracker.getRegionsInTransition();
}

public boolean isRegionInTransition(final RegionInfo regionInfo) {
return regionInTransitionTracker.isRegionInTransition(regionInfo);
}

public int getRegionTransitScheduledCount() {
return regionStates.getRegionTransitScheduledCount();
}

/**
* Get the number of regions in transition.
*/
public int getRegionsInTransitionCount() {
return regionInTransitionTracker.getRegionsInTransition().size();
}

public SortedSet<RegionState> getRegionsStateInTransition() {
final SortedSet<RegionState> rit = new TreeSet<RegionState>(REGION_STATE_STAMP_COMPARATOR);
for (RegionStateNode node : getRegionsInTransition()) {
rit.add(node.toRegionState());
}
return rit;
}

public List<RegionInfo> getAssignedRegions() {
Expand Down Expand Up @@ -2122,6 +2171,8 @@ private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode
if (e != null) {
// revert
regionNode.setState(state);
} else {
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
}
});
return future;
Expand Down Expand Up @@ -2170,6 +2221,7 @@ CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giv
if (regionLocation != null) {
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
} else {
// revert
regionNode.setState(state);
Expand Down Expand Up @@ -2230,6 +2282,7 @@ CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
// on table that contains state.
setMetaAssigned(regionInfo, true);
}
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
});
}

Expand All @@ -2247,6 +2300,7 @@ public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode
regionNode.setLastHost(regionLocation);
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
} else {
// revert
regionNode.setState(state);
Expand All @@ -2260,6 +2314,18 @@ public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode
// The above methods can only be called in TransitRegionStateProcedure(and related procedures)
// ============================================================================================

// As soon as a server a crashed, region hosting on that are un-available, this method helps to
// track those un-available regions. This method can only be called from ServerCrashProcedure.
public void markRegionsAsCrashed(List<RegionInfo> regionsOnCrashedServer,
ServerName crashedServerName) {
for (RegionInfo regionInfo : regionsOnCrashedServer) {
RegionStateNode node = regionStates.getOrCreateRegionStateNode(regionInfo);
if (node.getRegionLocation() == crashedServerName) {
regionInTransitionTracker.regionCrashed(node);
}
}
}

public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
// Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
Expand All @@ -2284,6 +2350,9 @@ public void markRegionAsSplit(final RegionInfo parent, final ServerName serverNa
// it is a split parent. And usually only one of them can match, as after restart, the region
// state will be changed from SPLIT to CLOSED.
regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td);
regionInTransitionTracker.handleRegionStateNodeOperation(node);
regionInTransitionTracker.handleRegionStateNodeOperation(nodeA);
regionInTransitionTracker.handleRegionStateNodeOperation(nodeB);
if (shouldAssignFavoredNodes(parent)) {
List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA,
Expand All @@ -2303,12 +2372,14 @@ public void markRegionAsSplit(final RegionInfo parent, final ServerName serverNa
public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
RegionInfo[] mergeParents) throws IOException {
final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
node.setState(State.MERGED);
for (RegionInfo ri : mergeParents) {
regionStates.deleteRegion(ri);
regionInTransitionTracker.handleRegionDelete(ri);
}

TableDescriptor td = master.getTableDescriptors().get(child.getTable());
regionStateStore.mergeRegions(child, mergeParents, serverName, td);
regionInTransitionTracker.handleRegionStateNodeOperation(node);
if (shouldAssignFavoredNodes(child)) {
getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,15 @@ private static TransitRegionStateProcedure[] createAssignProcedures(MasterProced
regionNode.lock();
try {
if (ignoreIfInTransition) {
if (regionNode.isInTransition()) {
if (regionNode.isTransitionScheduled()) {
return null;
}
} else {
// should never fail, as we have the exclusive region lock, and the region is newly
// created, or has been successfully closed so should not be on any servers, so SCP
// will
// not process it either.
assert !regionNode.isInTransition();
assert !regionNode.isTransitionScheduled();
}
regionNode.setProcedure(proc);
} finally {
Expand All @@ -184,7 +184,7 @@ private static TransitRegionStateProcedure[] createAssignProcedures(MasterProced
// apply ignoreRITs to replica regions as well.
if (
!ignoreIfInTransition || !env.getAssignmentManager().getRegionStates()
.getOrCreateRegionStateNode(ri).isInTransition()
.getOrCreateRegionStateNode(ri).isTransitionScheduled()
) {
replicaRegionInfos.add(ri);
}
Expand Down Expand Up @@ -232,7 +232,7 @@ private static TransitRegionStateProcedure[] createRoundRobinAssignProcedures(
for (RegionInfo region : regionsAndReplicas) {
if (
env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(region)
.isInTransition()
.isTransitionScheduled()
) {
return null;
}
Expand Down
Loading