Skip to content

Commit 2f74b17

Browse files
authored
NIFI-15903 Improved flaky tests ClusteredConnectorIT and ContentClaimTruncationIT (apache#11203)
- Added test method for robust cluster node removal based on coordinator reported status Signed-off-by: David Handermann <exceptionfactory@apache.org>
1 parent 7b25800 commit 2f74b17

4 files changed

Lines changed: 44 additions & 16 deletions

File tree

nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,6 +1171,23 @@ public void waitForProcessorState(final String processorId, final String expecte
11711171
final Integer terminatedThreadCount = snapshotDto.getTerminatedThreadCount();
11721172

11731173
if ("RUNNING".equals(expectedState) || (activeThreadCount == 0 && terminatedThreadCount == 0)) {
1174+
// The logical state masks the framework's physical STOPPING state as STOPPED. The framework's
1175+
// verifyCanStart check evaluates the physical state, so when the caller is waiting for STOPPED
1176+
// we additionally require the physical state to have settled to STOPPED or DISABLED. Without
1177+
// this, a tight loop of runProcessorOnce + waitForStoppedProcessor can race against an
1178+
// in-flight stop transition and the next start request fails with "cannot be started because
1179+
// it is not stopped. Current state is STOPPING".
1180+
if ("STOPPED".equalsIgnoreCase(expectedState)) {
1181+
final String physicalState = entity.getComponent().getPhysicalState();
1182+
final boolean physicalStateSettled = physicalState == null
1183+
|| "STOPPED".equalsIgnoreCase(physicalState)
1184+
|| "DISABLED".equalsIgnoreCase(physicalState);
1185+
if (!physicalStateSettled) {
1186+
Thread.sleep(10L);
1187+
continue;
1188+
}
1189+
}
1190+
11741191
logger.info("Processor {} is now in desired state of {} with {} active threads and {} terminated threads",
11751192
processorId, expectedState, activeThreadCount, terminatedThreadCount);
11761193
return;

nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,30 @@ protected void reconnectNode(final int nodeIndex) throws NiFiClientException, IO
703703
getNifiClient().getControllerClient().connectNode(nodeEntity.getNode().getNodeId(), nodeEntity);
704704
}
705705

706+
/**
707+
* Removes a node from the cluster and waits until the cluster coordinator no longer reports the node.
708+
* Mutating cluster requests (such as deleting a component) reject while a non-CONNECTED node is still
709+
* known to the coordinator, so callers that need to issue follow-up mutating requests must wait for
710+
* the removal to be reflected in the cluster state rather than relying on the synchronous response of
711+
* {@code deleteNode} alone.
712+
*
713+
* @param nodeIndex the 1-based index of the node
714+
*/
715+
protected void removeNodeFromCluster(final int nodeIndex) throws NiFiClientException, IOException, InterruptedException {
716+
final NodeEntity nodeEntity = getNodeEntity(nodeIndex);
717+
final int expectedPort = getClientApiPort() + nodeIndex - 1;
718+
getNifiClient().getControllerClient().deleteNode(nodeEntity.getNode().getNodeId());
719+
720+
waitFor(() -> {
721+
try {
722+
return getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
723+
.noneMatch(dto -> dto.getApiPort() == expectedPort);
724+
} catch (final Exception e) {
725+
return false;
726+
}
727+
});
728+
}
729+
706730
protected NodeEntity getNodeEntity(final int nodeIndex) throws NiFiClientException, IOException {
707731
final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
708732
final int expectedPort = getClientApiPort() + nodeIndex - 1;

nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithMissingConnectionWithDataIT.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ public void testFailsToJoinWithMissingConnectionThatHasData() throws NiFiClientE
9999
node2.stop();
100100

101101
// Remove node from the cluster
102-
getNifiClient().getControllerClient().deleteNode(node2Dto.getNodeId());
103-
waitFor(() -> isNodeRemoved(5672));
102+
removeNodeFromCluster(2);
104103

105104
// Drop the data in the queue and delete the queue.
106105
getClientUtil().emptyQueue(CONNECTION_UUID);
@@ -113,15 +112,6 @@ public void testFailsToJoinWithMissingConnectionThatHasData() throws NiFiClientE
113112
waitFor(() -> isNodeDisconnected(5672));
114113
}
115114

116-
private boolean isNodeRemoved(final int apiPort) {
117-
try {
118-
return getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
119-
.noneMatch(dto -> dto.getApiPort() == apiPort);
120-
} catch (Exception e) {
121-
return false;
122-
}
123-
}
124-
125115
private boolean isNodeDisconnected(final int apiPort) {
126116
try {
127117
final NodeDTO nodeDto = getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()

nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.nifi.web.api.dto.ConnectorConfigurationDTO;
2525
import org.apache.nifi.web.api.dto.ConnectorValueReferenceDTO;
2626
import org.apache.nifi.web.api.entity.ConnectorEntity;
27-
import org.apache.nifi.web.api.entity.NodeEntity;
2827
import org.junit.jupiter.api.Test;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
@@ -130,8 +129,7 @@ public void testDeleteConnectorOnConnect() throws NiFiClientException, IOExcepti
130129
final ConnectorClient connectorClient = getNifiClient().getConnectorClient();
131130
assertThrows(NiFiClientException.class, () -> connectorClient.deleteConnector(connector));
132131

133-
final NodeEntity node2Entity = getNodeEntity(2);
134-
getNifiClient().getControllerClient().deleteNode(node2Entity.getNode().getNodeId());
132+
removeNodeFromCluster(2);
135133

136134
// Should now be able to delete connector
137135
connectorClient.deleteConnector(connector);
@@ -172,8 +170,7 @@ public void testDeleteConnectorOnConnectWithDataQueued() throws NiFiClientExcept
172170
assertThrows(NiFiClientException.class, () -> connectorClient.deleteConnector(connector));
173171

174172
// Remove node 2 from cluster.
175-
final NodeEntity node2Entity = getNodeEntity(2);
176-
getNifiClient().getControllerClient().deleteNode(node2Entity.getNode().getNodeId());
173+
removeNodeFromCluster(2);
177174

178175
// We cannot delete the connector directly because it has data queued. Stop Node 1, delete the flow.json.gz file, and restart Node 1.
179176
getNiFiInstance().getNodeInstance(1).stop();

0 commit comments

Comments
 (0)