From 02ff295fc4f76d3b2a524c6f789765c716b2875e Mon Sep 17 00:00:00 2001 From: Kevin Cai Date: Fri, 8 Nov 2024 10:48:37 +0800 Subject: [PATCH 1/2] [BugFix] fix backend node lastStartTime updated by hbTime unexpectedly (#52704) Signed-off-by: Kevin Xiaohua Cai (cherry picked from commit b2cf5e7cd29485e35e4c6f29a36983e7260a473d) # Conflicts: # fe/fe-core/src/main/java/com/starrocks/system/ComputeNode.java # fe/fe-core/src/test/java/com/starrocks/system/ComputeNodeTest.java --- .../com/starrocks/system/ComputeNode.java | 16 + .../com/starrocks/system/ComputeNodeTest.java | 397 ++++++++++++++++++ 2 files changed, 413 insertions(+) diff --git a/fe/fe-core/src/main/java/com/starrocks/system/ComputeNode.java b/fe/fe-core/src/main/java/com/starrocks/system/ComputeNode.java index d9df174053008..12d57b1665a21 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/ComputeNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/ComputeNode.java @@ -496,6 +496,7 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay) } this.lastUpdateMs = hbResponse.getHbTime(); +<<<<<<< HEAD if (!isAlive.get()) { isChanged = true; // From version 2.5 we not use isAlive to determine whether to update the lastStartTime @@ -507,6 +508,9 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay) this.lastStartTime = hbResponse.getHbTime(); } +======= + // RebootTime will be `-1` if not set from backend. +>>>>>>> b2cf5e7cd2 ([BugFix] fix backend node lastStartTime updated by hbTime unexpectedly (#52704)) if (hbResponse.getRebootTime() > this.lastStartTime) { this.lastStartTime = hbResponse.getRebootTime(); isChanged = true; @@ -516,6 +520,18 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay) becomeDead = true; } + if (!isAlive.get()) { + isChanged = true; + if (hbResponse.getRebootTime() == -1) { + // Only update lastStartTime by hbResponse.hbTime if the RebootTime is not set from an OK-response. + // Just for backwards compatibility purpose in case the response is from an ancient version + this.lastStartTime = hbResponse.getHbTime(); + } + LOG.info("{} is alive, last start time: {}, hbTime: {}", this.toString(), this.lastStartTime, + hbResponse.getHbTime()); + setAlive(true); + } + if (this.cpuCores != hbResponse.getCpuCores()) { isChanged = true; this.cpuCores = hbResponse.getCpuCores(); diff --git a/fe/fe-core/src/test/java/com/starrocks/system/ComputeNodeTest.java b/fe/fe-core/src/test/java/com/starrocks/system/ComputeNodeTest.java index f2a6793edd919..75a5fc2a7da0b 100644 --- a/fe/fe-core/src/test/java/com/starrocks/system/ComputeNodeTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/system/ComputeNodeTest.java @@ -118,4 +118,401 @@ public void testUpdateStartTime() { Assert.assertTrue(node.getLastStartTime() == 1000000L); Assert.assertTrue(needSync); } +<<<<<<< HEAD +======= + + @Test + public void testShutdownStatus() { + ComputeNode node = new ComputeNode(); + ComputeNode nodeInFollower = new ComputeNode(); + long hbTimestamp = System.currentTimeMillis(); + BackendHbResponse hbResponse = + new BackendHbResponse(node.getId(), node.getBePort(), node.getHttpPort(), node.getBrpcPort(), + node.getStarletPort(), hbTimestamp, node.getVersion(), node.getCpuCores(), 0); + + node.handleHbResponse(hbResponse, false); + { // regular HbResponse + Assert.assertFalse(node.handleHbResponse(hbResponse, false)); + Assert.assertTrue(node.isAlive()); + Assert.assertEquals(ComputeNode.Status.OK, node.getStatus()); + } + { // first shutdown HbResponse + BackendHbResponse shutdownResponse = + new BackendHbResponse(node.getId(), TStatusCode.SHUTDOWN, "BE is in shutting down"); + + Assert.assertTrue(node.handleHbResponse(shutdownResponse, false)); + Assert.assertFalse(node.isAlive()); + Assert.assertEquals(ComputeNode.Status.SHUTDOWN, node.getStatus()); + Assert.assertEquals(shutdownResponse.getHbTime(), node.getLastUpdateMs()); + } + { // second shutdown HbResponse + BackendHbResponse shutdownResponse = + new BackendHbResponse(node.getId(), TStatusCode.SHUTDOWN, "BE is in shutting down"); + + Assert.assertTrue(node.handleHbResponse(shutdownResponse, false)); + Assert.assertTrue(nodeInFollower.handleHbResponse(shutdownResponse, true)); + Assert.assertFalse(node.isAlive()); + Assert.assertEquals(ComputeNode.Status.SHUTDOWN, node.getStatus()); + Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInFollower.getStatus()); + Assert.assertEquals(shutdownResponse.getHbTime(), node.getLastUpdateMs()); + Assert.assertEquals(shutdownResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); + } + long lastUpdateTime = node.getLastUpdateMs(); + for (int i = 0; i <= Config.heartbeat_retry_times + 2; ++i) { + BackendHbResponse errorResponse = + new BackendHbResponse(node.getId(), TStatusCode.INTERNAL_ERROR, "Internal Error"); + + Assert.assertTrue(node.handleHbResponse(errorResponse, false)); + Assert.assertTrue(nodeInFollower.handleHbResponse(errorResponse, true)); + Assert.assertFalse(node.isAlive()); + // lasUpdateTime will not be updated + Assert.assertEquals(lastUpdateTime, node.getLastUpdateMs()); + + // start from the (heartbeat_retry_times)th response (started from 0), the status changed to disconnected. + if (i >= Config.heartbeat_retry_times) { + Assert.assertEquals(String.format("i=%d", i), ComputeNode.Status.DISCONNECTED, node.getStatus()); + Assert.assertEquals(String.format("i=%d", i), ComputeNode.Status.DISCONNECTED, nodeInFollower.getStatus()); + } else { + Assert.assertEquals(String.format("i=%d", i), ComputeNode.Status.SHUTDOWN, node.getStatus()); + Assert.assertEquals(String.format("i=%d", i), ComputeNode.Status.SHUTDOWN, nodeInFollower.getStatus()); + } + } + } + + private BackendHbResponse generateReplayResponse(BackendHbResponse response) { + return GsonUtils.GSON.fromJson(GsonUtils.GSON.toJson(response), BackendHbResponse.class); + } + + @Test + public void testShutdownCancelQuery() { + CoordinatorMonitor coordinatorMonitor = CoordinatorMonitor.getInstance(); + ResourceUsageMonitor resourceUsageMonitor = GlobalStateMgr.getCurrentState().getResourceUsageMonitor(); + + int oldHeartbeatRetry = Config.heartbeat_retry_times; + Config.heartbeat_retry_times = 1; + + ComputeNode nodeInLeader = new ComputeNode(); + ComputeNode nodeInFollower = new ComputeNode(); + long hbTimestamp = System.currentTimeMillis(); + BackendHbResponse hbResponse = + new BackendHbResponse(nodeInLeader.getId(), nodeInLeader.getBePort(), nodeInLeader.getHttpPort(), + nodeInLeader.getBrpcPort(), + nodeInLeader.getStarletPort(), hbTimestamp, nodeInLeader.getVersion(), + nodeInLeader.getCpuCores(), 0); + + try { + // first OK hbResponse for both leader and follower + nodeInLeader.handleHbResponse(hbResponse, false); + nodeInFollower.handleHbResponse(generateReplayResponse(hbResponse), true); + + // second OK hbResponse for both leader and follower + { + Assert.assertFalse(nodeInLeader.handleHbResponse(hbResponse, false)); + Assert.assertTrue(nodeInLeader.isAlive()); + Assert.assertEquals(ComputeNode.Status.OK, nodeInLeader.getStatus()); + + Assert.assertFalse(nodeInFollower.handleHbResponse(generateReplayResponse(hbResponse), true)); + Assert.assertTrue(nodeInFollower.isAlive()); + Assert.assertEquals(ComputeNode.Status.OK, nodeInFollower.getStatus()); + } + + // second OK hbResponse for both leader and follower + { + Assert.assertFalse(nodeInLeader.handleHbResponse(hbResponse, false)); + Assert.assertTrue(nodeInLeader.isAlive()); + Assert.assertEquals(ComputeNode.Status.OK, nodeInLeader.getStatus()); + + Assert.assertFalse(nodeInFollower.handleHbResponse(generateReplayResponse(hbResponse), true)); + Assert.assertTrue(nodeInFollower.isAlive()); + Assert.assertEquals(ComputeNode.Status.OK, nodeInFollower.getStatus()); + } + + // first shutdown hbResponse + { + BackendHbResponse shutdownResponse = + new BackendHbResponse(nodeInLeader.getId(), TStatusCode.SHUTDOWN, "BE is in shutting down"); + new Expectations(resourceUsageMonitor) { + { + resourceUsageMonitor.notifyBackendDead(); + times = 2; // one for leader and one for the follower + } + }; + new Expectations(coordinatorMonitor) { + { + coordinatorMonitor.addDeadBackend(anyLong); + times = 0; // should not call at all + } + }; + Assert.assertTrue(nodeInLeader.handleHbResponse(shutdownResponse, false)); + Assert.assertFalse(nodeInLeader.isAlive()); + Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInLeader.getStatus()); + Assert.assertEquals(shutdownResponse.getHbTime(), nodeInLeader.getLastUpdateMs()); + + Assert.assertTrue(nodeInFollower.handleHbResponse(generateReplayResponse(shutdownResponse), true)); + Assert.assertFalse(nodeInFollower.isAlive()); + Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInFollower.getStatus()); + Assert.assertEquals(shutdownResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); + } + // second shutdown hbResponse + { + BackendHbResponse shutdownResponse = + new BackendHbResponse(nodeInLeader.getId(), TStatusCode.SHUTDOWN, "BE is in shutting down"); + // triggers nothing + new Expectations(resourceUsageMonitor) { + { + resourceUsageMonitor.notifyBackendDead(); + times = 0; + } + }; + new Expectations(coordinatorMonitor) { + { + coordinatorMonitor.addDeadBackend(anyLong); + times = 0; + } + }; + Assert.assertTrue(nodeInLeader.handleHbResponse(shutdownResponse, false)); + Assert.assertFalse(nodeInLeader.isAlive()); + Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInLeader.getStatus()); + Assert.assertEquals(shutdownResponse.getHbTime(), nodeInLeader.getLastUpdateMs()); + + Assert.assertTrue(nodeInFollower.handleHbResponse(generateReplayResponse(shutdownResponse), true)); + Assert.assertFalse(nodeInFollower.isAlive()); + Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInFollower.getStatus()); + Assert.assertEquals(shutdownResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); + } + // first Error hbResponse + { + long hbTime = nodeInLeader.getLastUpdateMs(); + BackendHbResponse errorResponse = + new BackendHbResponse(nodeInLeader.getId(), TStatusCode.THRIFT_RPC_ERROR, "rpc error"); + // triggers nothing + new Expectations(resourceUsageMonitor) { + { + resourceUsageMonitor.notifyBackendDead(); + times = 0; + } + }; + new Expectations(coordinatorMonitor) { + { + coordinatorMonitor.addDeadBackend(anyLong); + times = 0; + } + }; + Assert.assertTrue(nodeInLeader.handleHbResponse(errorResponse, false)); + Assert.assertFalse(nodeInLeader.isAlive()); + Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInLeader.getStatus()); + // lastUpdateMs not updated + Assert.assertNotEquals(errorResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); + Assert.assertEquals(hbTime, nodeInFollower.getLastUpdateMs()); + + Assert.assertTrue(nodeInFollower.handleHbResponse(generateReplayResponse(errorResponse), true)); + Assert.assertFalse(nodeInFollower.isAlive()); + Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInFollower.getStatus()); + Assert.assertNotEquals(errorResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); + Assert.assertEquals(hbTime, nodeInFollower.getLastUpdateMs()); + } + // second Error hbResponse + { + long hbTime = nodeInLeader.getLastUpdateMs(); + BackendHbResponse errorResponse = + new BackendHbResponse(nodeInLeader.getId(), TStatusCode.THRIFT_RPC_ERROR, "rpc error"); + new Expectations(resourceUsageMonitor) { + { + resourceUsageMonitor.notifyBackendDead(); + times = 0; + } + }; + new Expectations(coordinatorMonitor) { + { + coordinatorMonitor.addDeadBackend(anyLong); + times = 2; + } + }; + Assert.assertTrue(nodeInLeader.handleHbResponse(errorResponse, false)); + Assert.assertFalse(nodeInLeader.isAlive()); + Assert.assertEquals(ComputeNode.Status.DISCONNECTED, nodeInLeader.getStatus()); + // lastUpdateMs not updated + Assert.assertNotEquals(errorResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); + Assert.assertEquals(hbTime, nodeInFollower.getLastUpdateMs()); + + Assert.assertTrue(nodeInFollower.handleHbResponse(generateReplayResponse(errorResponse), true)); + Assert.assertFalse(nodeInFollower.isAlive()); + Assert.assertEquals(ComputeNode.Status.DISCONNECTED, nodeInFollower.getStatus()); + Assert.assertNotEquals(errorResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); + Assert.assertEquals(hbTime, nodeInFollower.getLastUpdateMs()); + } + } finally { + Config.heartbeat_retry_times = oldHeartbeatRetry; + } + } + + private static class VerifyComputeNodeStatus { + public boolean isAlive; + public HeartbeatResponse.AliveStatus aliveStatus; + public ComputeNode.Status status; + + public VerifyComputeNodeStatus() { + // nothing + } + public VerifyComputeNodeStatus(boolean isAlive, HeartbeatResponse.AliveStatus aliveStatus, ComputeNode.Status status) { + this.isAlive = isAlive; + this.aliveStatus = aliveStatus; + this.status = status; + } + } + + @Test + public void testRpcErrorReplayRetryAndAliveStatusMismatch() throws InterruptedException { + int prevHeartbeatRetryTimes = Config.heartbeat_retry_times; + Config.heartbeat_retry_times = 3; + ComputeNode node = new ComputeNode(); + + List replayResponses = new ArrayList<>(); + List verifyReplayNodeStatus = new ArrayList<>(); + + BackendHbResponse hbResponse = + new BackendHbResponse(node.getId(), node.getBePort(), node.getHttpPort(), node.getBrpcPort(), + node.getStarletPort(), System.currentTimeMillis(), node.getVersion(), node.getCpuCores(), 0); + + node.handleHbResponse(hbResponse, false); + + { // regular HbResponse + Assert.assertFalse(node.handleHbResponse(hbResponse, false)); + Assert.assertTrue(node.isAlive()); + Assert.assertEquals(ComputeNode.Status.OK, node.getStatus()); + + verifyReplayNodeStatus.add( + new VerifyComputeNodeStatus(node.isAlive(), hbResponse.aliveStatus, node.getStatus())); + replayResponses.add(generateReplayResponse(hbResponse)); + } + + // 4 hbResponses + // the first 3 hbResponses still record the node as ALIVE + // the 4th hbResponses records the node as NOT_ALIVE + for (int i = 0; i <= Config.heartbeat_retry_times; ++i) { + BackendHbResponse errorResponse = + new BackendHbResponse(node.getId(), TStatusCode.INTERNAL_ERROR, "Internal Error"); + Assert.assertTrue(node.handleHbResponse(errorResponse, false)); + + VerifyComputeNodeStatus verifyStatus = new VerifyComputeNodeStatus(); + verifyStatus.isAlive = node.isAlive(); + verifyStatus.status = node.getStatus(); + + if (i == Config.heartbeat_retry_times) { + Assert.assertFalse(node.isAlive()); + Assert.assertEquals(HeartbeatResponse.AliveStatus.NOT_ALIVE, errorResponse.aliveStatus); + verifyStatus.aliveStatus = HeartbeatResponse.AliveStatus.NOT_ALIVE; + } else { + Assert.assertTrue(node.isAlive()); + Assert.assertEquals(HeartbeatResponse.AliveStatus.ALIVE, errorResponse.aliveStatus); + verifyStatus.aliveStatus = HeartbeatResponse.AliveStatus.ALIVE; + } + replayResponses.add(generateReplayResponse(errorResponse)); + verifyReplayNodeStatus.add(verifyStatus); + // delay a few milliseconds + Thread.sleep(1); + } + + // now reduce the retry config to 2 + // the first 2 hbResponses still detect the node as ALIVE + // the 3th hbResponse will mark the node as NOT_ALIVE but then dictated by the aliveStatus, force to ALIVE + // the 4th hbResponse will mark the node as NOT_ALIVE + Config.heartbeat_retry_times = Config.heartbeat_retry_times - 1; + ComputeNode replayNode = new ComputeNode(); + Assert.assertEquals(replayResponses.size(), verifyReplayNodeStatus.size()); + for (int i = 0; i < replayResponses.size(); ++i) { + // even though the `heartbeat_retry_times` changed, the replay result + // should be consistent with the one when generating the edit log. + BackendHbResponse hb = replayResponses.get(i); + VerifyComputeNodeStatus verifyStatus = verifyReplayNodeStatus.get(i); + + replayNode.handleHbResponse(hb, true); + Assert.assertEquals(verifyStatus.isAlive, replayNode.isAlive()); + Assert.assertEquals(verifyStatus.status, replayNode.getStatus()); + Assert.assertEquals(verifyStatus.aliveStatus, hb.aliveStatus); + } + + Config.heartbeat_retry_times = prevHeartbeatRetryTimes; + } + + void verifyNodeAliveAndStatus(ComputeNode node, boolean expectedAlive, ComputeNode.Status expectedStatus) { + Assert.assertEquals(expectedAlive, node.isAlive()); + Assert.assertEquals(expectedStatus, node.getStatus()); + } + + @Test + public void testSetAliveInterface() { + ComputeNode node = new ComputeNode(); + verifyNodeAliveAndStatus(node, false, ComputeNode.Status.CONNECTING); + Assert.assertTrue(node.setAlive(true)); + verifyNodeAliveAndStatus(node, true, ComputeNode.Status.OK); + // set again, nothing changed + Assert.assertFalse(node.setAlive(true)); + verifyNodeAliveAndStatus(node, true, ComputeNode.Status.OK); + + Assert.assertTrue(node.setAlive(false)); + verifyNodeAliveAndStatus(node, false, ComputeNode.Status.DISCONNECTED); + // set again, nothing changed + Assert.assertFalse(node.setAlive(false)); + verifyNodeAliveAndStatus(node, false, ComputeNode.Status.DISCONNECTED); + + node = new ComputeNode(); + // isAlive: true, Status: Connecting + node.getIsAlive().set(true); + // setAlive will only change the alive variable, keep the status unchanged + Assert.assertTrue(node.setAlive(false)); + verifyNodeAliveAndStatus(node, false, ComputeNode.Status.CONNECTING); + } + + BackendHbResponse generateHbResponse(ComputeNode node, TStatusCode tStatusCode, long rebootTimeSecs) { + if (tStatusCode != TStatusCode.OK) { + return new BackendHbResponse(node.getId(), tStatusCode, "Unknown Error"); + } + BackendHbResponse hbResponse = + new BackendHbResponse(node.getId(), node.getBePort(), node.getHttpPort(), node.getBrpcPort(), + node.getStarletPort(), System.currentTimeMillis(), node.getVersion(), node.getCpuCores(), 0); + hbResponse.setRebootTime(rebootTimeSecs); + return hbResponse; + } + + @Test + public void testComputeNodeLastStartTimeUpdate() { + int previousRetryTimes = Config.heartbeat_retry_times; + // no retry, set to not_alive for the first hb failure + Config.heartbeat_retry_times = 0; + + long nodeId = 1000; + ComputeNode node = new ComputeNode(nodeId, "127.0.0.1", 9050); + long rebootTimeA = System.currentTimeMillis() / 1000 - 60; + BackendHbResponse hbResponse; + + hbResponse = generateHbResponse(node, TStatusCode.OK, rebootTimeA); + Assert.assertTrue(node.handleHbResponse(hbResponse, false)); + Assert.assertEquals(rebootTimeA * 1000, node.getLastStartTime()); + Assert.assertTrue(node.isAlive()); + + // simulate that a few intermittent heartbeat probing failures due to high load or unstable network. + // FE marks the BE as dead and then the node is back alive + hbResponse = generateHbResponse(node, TStatusCode.THRIFT_RPC_ERROR, 0); + Assert.assertTrue(node.handleHbResponse(hbResponse, false)); + Assert.assertFalse(node.isAlive()); + // BE reports heartbeat with the same rebootTime + hbResponse = generateHbResponse(node, TStatusCode.OK, rebootTimeA); + Assert.assertTrue(node.handleHbResponse(hbResponse, false)); + Assert.assertTrue(node.isAlive()); + // reboot time should not change + Assert.assertEquals(rebootTimeA * 1000, node.getLastStartTime()); + + // response an OK status, but the rebootTime changed. + // This simulates the case that the BE is restarted quickly in between the two heartbeat probes. + long rebootTimeB = System.currentTimeMillis() / 1000; + hbResponse = generateHbResponse(node, TStatusCode.OK, rebootTimeB); + Assert.assertTrue(node.handleHbResponse(hbResponse, false)); + Assert.assertTrue(node.isAlive()); + // reboot time changed + Assert.assertEquals(rebootTimeB * 1000, node.getLastStartTime()); + + Config.heartbeat_retry_times = previousRetryTimes; + } +>>>>>>> b2cf5e7cd2 ([BugFix] fix backend node lastStartTime updated by hbTime unexpectedly (#52704)) } From d2106e21fdc3d3ad6096d4236c65a8029c7da22a Mon Sep 17 00:00:00 2001 From: Kevin Xiaohua Cai Date: Sat, 9 Nov 2024 19:15:01 +0800 Subject: [PATCH 2/2] fix conflict Signed-off-by: Kevin Xiaohua Cai --- .../com/starrocks/system/ComputeNode.java | 16 +- .../com/starrocks/system/ComputeNodeTest.java | 351 +----------------- 2 files changed, 4 insertions(+), 363 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/system/ComputeNode.java b/fe/fe-core/src/main/java/com/starrocks/system/ComputeNode.java index 12d57b1665a21..88d82df189399 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/ComputeNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/ComputeNode.java @@ -496,21 +496,7 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay) } this.lastUpdateMs = hbResponse.getHbTime(); -<<<<<<< HEAD - if (!isAlive.get()) { - isChanged = true; - // From version 2.5 we not use isAlive to determine whether to update the lastStartTime - // This line to set 'lastStartTime' will be removed in due time - this.lastStartTime = hbResponse.getHbTime(); - LOG.info("{} is alive, last start time: {}", this.toString(), hbResponse.getHbTime()); - this.isAlive.set(true); - } else if (this.lastStartTime <= 0) { - this.lastStartTime = hbResponse.getHbTime(); - } - -======= // RebootTime will be `-1` if not set from backend. ->>>>>>> b2cf5e7cd2 ([BugFix] fix backend node lastStartTime updated by hbTime unexpectedly (#52704)) if (hbResponse.getRebootTime() > this.lastStartTime) { this.lastStartTime = hbResponse.getRebootTime(); isChanged = true; @@ -529,7 +515,7 @@ >>>>>>> b2cf5e7cd2 ([BugFix] fix backend node lastStartTime updated by hbTime un } LOG.info("{} is alive, last start time: {}, hbTime: {}", this.toString(), this.lastStartTime, hbResponse.getHbTime()); - setAlive(true); + this.isAlive.set(true); } if (this.cpuCores != hbResponse.getCpuCores()) { diff --git a/fe/fe-core/src/test/java/com/starrocks/system/ComputeNodeTest.java b/fe/fe-core/src/test/java/com/starrocks/system/ComputeNodeTest.java index 75a5fc2a7da0b..1baec44152ebd 100644 --- a/fe/fe-core/src/test/java/com/starrocks/system/ComputeNodeTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/system/ComputeNodeTest.java @@ -18,6 +18,7 @@ import com.starrocks.common.Config; import com.starrocks.qe.CoordinatorMonitor; import com.starrocks.system.HeartbeatResponse.HbStatus; +import com.starrocks.thrift.TStatusCode; import mockit.Expectations; import org.junit.Assert; import org.junit.Test; @@ -118,359 +119,14 @@ public void testUpdateStartTime() { Assert.assertTrue(node.getLastStartTime() == 1000000L); Assert.assertTrue(needSync); } -<<<<<<< HEAD -======= - - @Test - public void testShutdownStatus() { - ComputeNode node = new ComputeNode(); - ComputeNode nodeInFollower = new ComputeNode(); - long hbTimestamp = System.currentTimeMillis(); - BackendHbResponse hbResponse = - new BackendHbResponse(node.getId(), node.getBePort(), node.getHttpPort(), node.getBrpcPort(), - node.getStarletPort(), hbTimestamp, node.getVersion(), node.getCpuCores(), 0); - - node.handleHbResponse(hbResponse, false); - { // regular HbResponse - Assert.assertFalse(node.handleHbResponse(hbResponse, false)); - Assert.assertTrue(node.isAlive()); - Assert.assertEquals(ComputeNode.Status.OK, node.getStatus()); - } - { // first shutdown HbResponse - BackendHbResponse shutdownResponse = - new BackendHbResponse(node.getId(), TStatusCode.SHUTDOWN, "BE is in shutting down"); - - Assert.assertTrue(node.handleHbResponse(shutdownResponse, false)); - Assert.assertFalse(node.isAlive()); - Assert.assertEquals(ComputeNode.Status.SHUTDOWN, node.getStatus()); - Assert.assertEquals(shutdownResponse.getHbTime(), node.getLastUpdateMs()); - } - { // second shutdown HbResponse - BackendHbResponse shutdownResponse = - new BackendHbResponse(node.getId(), TStatusCode.SHUTDOWN, "BE is in shutting down"); - - Assert.assertTrue(node.handleHbResponse(shutdownResponse, false)); - Assert.assertTrue(nodeInFollower.handleHbResponse(shutdownResponse, true)); - Assert.assertFalse(node.isAlive()); - Assert.assertEquals(ComputeNode.Status.SHUTDOWN, node.getStatus()); - Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInFollower.getStatus()); - Assert.assertEquals(shutdownResponse.getHbTime(), node.getLastUpdateMs()); - Assert.assertEquals(shutdownResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); - } - long lastUpdateTime = node.getLastUpdateMs(); - for (int i = 0; i <= Config.heartbeat_retry_times + 2; ++i) { - BackendHbResponse errorResponse = - new BackendHbResponse(node.getId(), TStatusCode.INTERNAL_ERROR, "Internal Error"); - - Assert.assertTrue(node.handleHbResponse(errorResponse, false)); - Assert.assertTrue(nodeInFollower.handleHbResponse(errorResponse, true)); - Assert.assertFalse(node.isAlive()); - // lasUpdateTime will not be updated - Assert.assertEquals(lastUpdateTime, node.getLastUpdateMs()); - - // start from the (heartbeat_retry_times)th response (started from 0), the status changed to disconnected. - if (i >= Config.heartbeat_retry_times) { - Assert.assertEquals(String.format("i=%d", i), ComputeNode.Status.DISCONNECTED, node.getStatus()); - Assert.assertEquals(String.format("i=%d", i), ComputeNode.Status.DISCONNECTED, nodeInFollower.getStatus()); - } else { - Assert.assertEquals(String.format("i=%d", i), ComputeNode.Status.SHUTDOWN, node.getStatus()); - Assert.assertEquals(String.format("i=%d", i), ComputeNode.Status.SHUTDOWN, nodeInFollower.getStatus()); - } - } - } - - private BackendHbResponse generateReplayResponse(BackendHbResponse response) { - return GsonUtils.GSON.fromJson(GsonUtils.GSON.toJson(response), BackendHbResponse.class); - } - - @Test - public void testShutdownCancelQuery() { - CoordinatorMonitor coordinatorMonitor = CoordinatorMonitor.getInstance(); - ResourceUsageMonitor resourceUsageMonitor = GlobalStateMgr.getCurrentState().getResourceUsageMonitor(); - - int oldHeartbeatRetry = Config.heartbeat_retry_times; - Config.heartbeat_retry_times = 1; - - ComputeNode nodeInLeader = new ComputeNode(); - ComputeNode nodeInFollower = new ComputeNode(); - long hbTimestamp = System.currentTimeMillis(); - BackendHbResponse hbResponse = - new BackendHbResponse(nodeInLeader.getId(), nodeInLeader.getBePort(), nodeInLeader.getHttpPort(), - nodeInLeader.getBrpcPort(), - nodeInLeader.getStarletPort(), hbTimestamp, nodeInLeader.getVersion(), - nodeInLeader.getCpuCores(), 0); - - try { - // first OK hbResponse for both leader and follower - nodeInLeader.handleHbResponse(hbResponse, false); - nodeInFollower.handleHbResponse(generateReplayResponse(hbResponse), true); - - // second OK hbResponse for both leader and follower - { - Assert.assertFalse(nodeInLeader.handleHbResponse(hbResponse, false)); - Assert.assertTrue(nodeInLeader.isAlive()); - Assert.assertEquals(ComputeNode.Status.OK, nodeInLeader.getStatus()); - - Assert.assertFalse(nodeInFollower.handleHbResponse(generateReplayResponse(hbResponse), true)); - Assert.assertTrue(nodeInFollower.isAlive()); - Assert.assertEquals(ComputeNode.Status.OK, nodeInFollower.getStatus()); - } - - // second OK hbResponse for both leader and follower - { - Assert.assertFalse(nodeInLeader.handleHbResponse(hbResponse, false)); - Assert.assertTrue(nodeInLeader.isAlive()); - Assert.assertEquals(ComputeNode.Status.OK, nodeInLeader.getStatus()); - - Assert.assertFalse(nodeInFollower.handleHbResponse(generateReplayResponse(hbResponse), true)); - Assert.assertTrue(nodeInFollower.isAlive()); - Assert.assertEquals(ComputeNode.Status.OK, nodeInFollower.getStatus()); - } - - // first shutdown hbResponse - { - BackendHbResponse shutdownResponse = - new BackendHbResponse(nodeInLeader.getId(), TStatusCode.SHUTDOWN, "BE is in shutting down"); - new Expectations(resourceUsageMonitor) { - { - resourceUsageMonitor.notifyBackendDead(); - times = 2; // one for leader and one for the follower - } - }; - new Expectations(coordinatorMonitor) { - { - coordinatorMonitor.addDeadBackend(anyLong); - times = 0; // should not call at all - } - }; - Assert.assertTrue(nodeInLeader.handleHbResponse(shutdownResponse, false)); - Assert.assertFalse(nodeInLeader.isAlive()); - Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInLeader.getStatus()); - Assert.assertEquals(shutdownResponse.getHbTime(), nodeInLeader.getLastUpdateMs()); - - Assert.assertTrue(nodeInFollower.handleHbResponse(generateReplayResponse(shutdownResponse), true)); - Assert.assertFalse(nodeInFollower.isAlive()); - Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInFollower.getStatus()); - Assert.assertEquals(shutdownResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); - } - // second shutdown hbResponse - { - BackendHbResponse shutdownResponse = - new BackendHbResponse(nodeInLeader.getId(), TStatusCode.SHUTDOWN, "BE is in shutting down"); - // triggers nothing - new Expectations(resourceUsageMonitor) { - { - resourceUsageMonitor.notifyBackendDead(); - times = 0; - } - }; - new Expectations(coordinatorMonitor) { - { - coordinatorMonitor.addDeadBackend(anyLong); - times = 0; - } - }; - Assert.assertTrue(nodeInLeader.handleHbResponse(shutdownResponse, false)); - Assert.assertFalse(nodeInLeader.isAlive()); - Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInLeader.getStatus()); - Assert.assertEquals(shutdownResponse.getHbTime(), nodeInLeader.getLastUpdateMs()); - - Assert.assertTrue(nodeInFollower.handleHbResponse(generateReplayResponse(shutdownResponse), true)); - Assert.assertFalse(nodeInFollower.isAlive()); - Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInFollower.getStatus()); - Assert.assertEquals(shutdownResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); - } - // first Error hbResponse - { - long hbTime = nodeInLeader.getLastUpdateMs(); - BackendHbResponse errorResponse = - new BackendHbResponse(nodeInLeader.getId(), TStatusCode.THRIFT_RPC_ERROR, "rpc error"); - // triggers nothing - new Expectations(resourceUsageMonitor) { - { - resourceUsageMonitor.notifyBackendDead(); - times = 0; - } - }; - new Expectations(coordinatorMonitor) { - { - coordinatorMonitor.addDeadBackend(anyLong); - times = 0; - } - }; - Assert.assertTrue(nodeInLeader.handleHbResponse(errorResponse, false)); - Assert.assertFalse(nodeInLeader.isAlive()); - Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInLeader.getStatus()); - // lastUpdateMs not updated - Assert.assertNotEquals(errorResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); - Assert.assertEquals(hbTime, nodeInFollower.getLastUpdateMs()); - - Assert.assertTrue(nodeInFollower.handleHbResponse(generateReplayResponse(errorResponse), true)); - Assert.assertFalse(nodeInFollower.isAlive()); - Assert.assertEquals(ComputeNode.Status.SHUTDOWN, nodeInFollower.getStatus()); - Assert.assertNotEquals(errorResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); - Assert.assertEquals(hbTime, nodeInFollower.getLastUpdateMs()); - } - // second Error hbResponse - { - long hbTime = nodeInLeader.getLastUpdateMs(); - BackendHbResponse errorResponse = - new BackendHbResponse(nodeInLeader.getId(), TStatusCode.THRIFT_RPC_ERROR, "rpc error"); - new Expectations(resourceUsageMonitor) { - { - resourceUsageMonitor.notifyBackendDead(); - times = 0; - } - }; - new Expectations(coordinatorMonitor) { - { - coordinatorMonitor.addDeadBackend(anyLong); - times = 2; - } - }; - Assert.assertTrue(nodeInLeader.handleHbResponse(errorResponse, false)); - Assert.assertFalse(nodeInLeader.isAlive()); - Assert.assertEquals(ComputeNode.Status.DISCONNECTED, nodeInLeader.getStatus()); - // lastUpdateMs not updated - Assert.assertNotEquals(errorResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); - Assert.assertEquals(hbTime, nodeInFollower.getLastUpdateMs()); - - Assert.assertTrue(nodeInFollower.handleHbResponse(generateReplayResponse(errorResponse), true)); - Assert.assertFalse(nodeInFollower.isAlive()); - Assert.assertEquals(ComputeNode.Status.DISCONNECTED, nodeInFollower.getStatus()); - Assert.assertNotEquals(errorResponse.getHbTime(), nodeInFollower.getLastUpdateMs()); - Assert.assertEquals(hbTime, nodeInFollower.getLastUpdateMs()); - } - } finally { - Config.heartbeat_retry_times = oldHeartbeatRetry; - } - } - - private static class VerifyComputeNodeStatus { - public boolean isAlive; - public HeartbeatResponse.AliveStatus aliveStatus; - public ComputeNode.Status status; - - public VerifyComputeNodeStatus() { - // nothing - } - public VerifyComputeNodeStatus(boolean isAlive, HeartbeatResponse.AliveStatus aliveStatus, ComputeNode.Status status) { - this.isAlive = isAlive; - this.aliveStatus = aliveStatus; - this.status = status; - } - } - - @Test - public void testRpcErrorReplayRetryAndAliveStatusMismatch() throws InterruptedException { - int prevHeartbeatRetryTimes = Config.heartbeat_retry_times; - Config.heartbeat_retry_times = 3; - ComputeNode node = new ComputeNode(); - - List replayResponses = new ArrayList<>(); - List verifyReplayNodeStatus = new ArrayList<>(); - - BackendHbResponse hbResponse = - new BackendHbResponse(node.getId(), node.getBePort(), node.getHttpPort(), node.getBrpcPort(), - node.getStarletPort(), System.currentTimeMillis(), node.getVersion(), node.getCpuCores(), 0); - - node.handleHbResponse(hbResponse, false); - - { // regular HbResponse - Assert.assertFalse(node.handleHbResponse(hbResponse, false)); - Assert.assertTrue(node.isAlive()); - Assert.assertEquals(ComputeNode.Status.OK, node.getStatus()); - - verifyReplayNodeStatus.add( - new VerifyComputeNodeStatus(node.isAlive(), hbResponse.aliveStatus, node.getStatus())); - replayResponses.add(generateReplayResponse(hbResponse)); - } - - // 4 hbResponses - // the first 3 hbResponses still record the node as ALIVE - // the 4th hbResponses records the node as NOT_ALIVE - for (int i = 0; i <= Config.heartbeat_retry_times; ++i) { - BackendHbResponse errorResponse = - new BackendHbResponse(node.getId(), TStatusCode.INTERNAL_ERROR, "Internal Error"); - Assert.assertTrue(node.handleHbResponse(errorResponse, false)); - - VerifyComputeNodeStatus verifyStatus = new VerifyComputeNodeStatus(); - verifyStatus.isAlive = node.isAlive(); - verifyStatus.status = node.getStatus(); - - if (i == Config.heartbeat_retry_times) { - Assert.assertFalse(node.isAlive()); - Assert.assertEquals(HeartbeatResponse.AliveStatus.NOT_ALIVE, errorResponse.aliveStatus); - verifyStatus.aliveStatus = HeartbeatResponse.AliveStatus.NOT_ALIVE; - } else { - Assert.assertTrue(node.isAlive()); - Assert.assertEquals(HeartbeatResponse.AliveStatus.ALIVE, errorResponse.aliveStatus); - verifyStatus.aliveStatus = HeartbeatResponse.AliveStatus.ALIVE; - } - replayResponses.add(generateReplayResponse(errorResponse)); - verifyReplayNodeStatus.add(verifyStatus); - // delay a few milliseconds - Thread.sleep(1); - } - - // now reduce the retry config to 2 - // the first 2 hbResponses still detect the node as ALIVE - // the 3th hbResponse will mark the node as NOT_ALIVE but then dictated by the aliveStatus, force to ALIVE - // the 4th hbResponse will mark the node as NOT_ALIVE - Config.heartbeat_retry_times = Config.heartbeat_retry_times - 1; - ComputeNode replayNode = new ComputeNode(); - Assert.assertEquals(replayResponses.size(), verifyReplayNodeStatus.size()); - for (int i = 0; i < replayResponses.size(); ++i) { - // even though the `heartbeat_retry_times` changed, the replay result - // should be consistent with the one when generating the edit log. - BackendHbResponse hb = replayResponses.get(i); - VerifyComputeNodeStatus verifyStatus = verifyReplayNodeStatus.get(i); - - replayNode.handleHbResponse(hb, true); - Assert.assertEquals(verifyStatus.isAlive, replayNode.isAlive()); - Assert.assertEquals(verifyStatus.status, replayNode.getStatus()); - Assert.assertEquals(verifyStatus.aliveStatus, hb.aliveStatus); - } - - Config.heartbeat_retry_times = prevHeartbeatRetryTimes; - } - - void verifyNodeAliveAndStatus(ComputeNode node, boolean expectedAlive, ComputeNode.Status expectedStatus) { - Assert.assertEquals(expectedAlive, node.isAlive()); - Assert.assertEquals(expectedStatus, node.getStatus()); - } - - @Test - public void testSetAliveInterface() { - ComputeNode node = new ComputeNode(); - verifyNodeAliveAndStatus(node, false, ComputeNode.Status.CONNECTING); - Assert.assertTrue(node.setAlive(true)); - verifyNodeAliveAndStatus(node, true, ComputeNode.Status.OK); - // set again, nothing changed - Assert.assertFalse(node.setAlive(true)); - verifyNodeAliveAndStatus(node, true, ComputeNode.Status.OK); - - Assert.assertTrue(node.setAlive(false)); - verifyNodeAliveAndStatus(node, false, ComputeNode.Status.DISCONNECTED); - // set again, nothing changed - Assert.assertFalse(node.setAlive(false)); - verifyNodeAliveAndStatus(node, false, ComputeNode.Status.DISCONNECTED); - - node = new ComputeNode(); - // isAlive: true, Status: Connecting - node.getIsAlive().set(true); - // setAlive will only change the alive variable, keep the status unchanged - Assert.assertTrue(node.setAlive(false)); - verifyNodeAliveAndStatus(node, false, ComputeNode.Status.CONNECTING); - } BackendHbResponse generateHbResponse(ComputeNode node, TStatusCode tStatusCode, long rebootTimeSecs) { if (tStatusCode != TStatusCode.OK) { - return new BackendHbResponse(node.getId(), tStatusCode, "Unknown Error"); + return new BackendHbResponse(node.getId(), "Unknown Error"); } BackendHbResponse hbResponse = new BackendHbResponse(node.getId(), node.getBePort(), node.getHttpPort(), node.getBrpcPort(), - node.getStarletPort(), System.currentTimeMillis(), node.getVersion(), node.getCpuCores(), 0); + node.getStarletPort(), System.currentTimeMillis(), node.getVersion(), node.getCpuCores()); hbResponse.setRebootTime(rebootTimeSecs); return hbResponse; } @@ -514,5 +170,4 @@ public void testComputeNodeLastStartTimeUpdate() { Config.heartbeat_retry_times = previousRetryTimes; } ->>>>>>> b2cf5e7cd2 ([BugFix] fix backend node lastStartTime updated by hbTime unexpectedly (#52704)) }