Skip to content

Commit 1437e02

Browse files
committed
fixup! fixup! [FLINK-38272][runtime] Fix unstable BatchJobRecoveryTest
1 parent 90dd583 commit 1437e02

File tree

2 files changed

+16
-33
lines changed

2 files changed

+16
-33
lines changed

flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingExecutionDeploymentTrackerWrapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Set;
2828
import java.util.concurrent.CompletableFuture;
2929

30+
/** Testing implementation of the {@link ExecutionDeploymentTracker}. */
3031
public class TestingExecutionDeploymentTrackerWrapper implements ExecutionDeploymentTracker {
3132
private final ExecutionDeploymentTracker originalTracker;
3233
private final CompletableFuture<ExecutionAttemptID> taskDeploymentFuture;

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,13 @@
124124
import java.util.Set;
125125
import java.util.concurrent.CompletableFuture;
126126
import java.util.concurrent.ScheduledExecutorService;
127-
import java.util.concurrent.TimeoutException;
128127
import java.util.concurrent.atomic.AtomicBoolean;
129128
import java.util.stream.Collectors;
130129
import java.util.stream.StreamSupport;
131130

132131
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState;
133132
import static org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder.createCustomParallelismDecider;
134133
import static org.apache.flink.runtime.util.JobVertexConnectionUtils.connectNewDataSetAsInput;
135-
import static org.apache.flink.util.Preconditions.checkArgument;
136134
import static org.apache.flink.util.Preconditions.checkState;
137135
import static org.assertj.core.api.Assertions.assertThat;
138136
import static org.junit.jupiter.api.Assertions.fail;
@@ -170,7 +168,6 @@ public class BatchJobRecoveryTest {
170168
private static final int SOURCE_PARALLELISM = 5;
171169
private static final int MIDDLE_PARALLELISM = 5;
172170
private static final int DECIDED_SINK_PARALLELISM = 2;
173-
private static final long TIMEOUT_MILLIS = 15000L;
174171
private static final JobVertexID SOURCE_ID = new JobVertexID();
175172
private static final JobVertexID MIDDLE_ID = new JobVertexID();
176173
private static final JobVertexID SINK_ID = new JobVertexID();
@@ -246,14 +243,14 @@ void testRecoverFromJMFailover() throws Exception {
246243

247244
runInMainThread(scheduler::startScheduling);
248245

249-
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler, TIMEOUT_MILLIS);
246+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
250247
runInMainThread(
251248
() -> {
252249
// transition all sources to finished.
253250
transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID);
254251
});
255252

256-
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler, TIMEOUT_MILLIS);
253+
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
257254
runInMainThread(
258255
() -> {
259256
// transition all middle tasks to RUNNING state
@@ -349,14 +346,14 @@ void testJobVertexUnFinishedAndOperatorCoordinatorNotSupportBatchSnapshot() thro
349346

350347
runInMainThread(scheduler::startScheduling);
351348

352-
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler, TIMEOUT_MILLIS);
349+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
353350
runInMainThread(
354351
() -> {
355352
// transition all sources to finished.
356353
transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID);
357354
});
358355

359-
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler, TIMEOUT_MILLIS);
356+
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
360357
runInMainThread(
361358
() -> {
362359
// transition first middle task to finished.
@@ -465,7 +462,7 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit
465462

466463
runInMainThread(scheduler::startScheduling);
467464

468-
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler, TIMEOUT_MILLIS);
465+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
469466
runInMainThread(
470467
() -> {
471468
// transition all sources to finished.
@@ -510,9 +507,9 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit
510507
}
511508
}
512509

513-
waitUntilAllExecutionsDeployed(MIDDLE_ID, newScheduler, TIMEOUT_MILLIS);
510+
waitUntilAllExecutionsDeployed(MIDDLE_ID, newScheduler);
514511

515-
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler, TIMEOUT_MILLIS);
512+
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
516513
runInMainThread(
517514
() -> {
518515
// transition all middle tasks to running
@@ -553,7 +550,7 @@ void testRecoverFromJMFailoverAndPartitionsUnavailable() throws Exception {
553550

554551
runInMainThread(scheduler::startScheduling);
555552

556-
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler, TIMEOUT_MILLIS);
553+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
557554
runInMainThread(
558555
() -> {
559556
// transition all sources to finished.
@@ -611,20 +608,20 @@ void testRecoverDecidedParallelismFromTheSameJobGraphInstance() throws Exception
611608

612609
runInMainThread(scheduler::startScheduling);
613610

614-
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler, TIMEOUT_MILLIS);
611+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
615612
runInMainThread(
616613
() -> {
617614
// transition all sources to finished.
618615
transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID);
619616
});
620617

621-
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler, TIMEOUT_MILLIS);
618+
waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
622619
runInMainThread(
623620
() -> { // transition all middle tasks to finished.
624621
transitionExecutionsState(scheduler, ExecutionState.FINISHED, MIDDLE_ID);
625622
});
626623

627-
waitUntilAllExecutionsDeployed(SINK_ID, scheduler, TIMEOUT_MILLIS);
624+
waitUntilAllExecutionsDeployed(SINK_ID, scheduler);
628625
runInMainThread(
629626
() -> {
630627
// transition all sinks to finished.
@@ -696,7 +693,7 @@ void testPartitionNotFoundTwiceAfterJMFailover() throws Exception {
696693
});
697694

698695
// transition all sources to finished.
699-
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler, TIMEOUT_MILLIS);
696+
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
700697
runInMainThread(
701698
() -> transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID));
702699

@@ -1236,19 +1233,10 @@ public Optional<ResourceID> storesLocalResourcesOn() {
12361233
}
12371234

12381235
private void waitUntilAllExecutionsDeployed(
1239-
JobVertexID vertexId, AdaptiveBatchScheduler scheduler, long maxWaitMillis)
1240-
throws Exception {
1241-
checkArgument(maxWaitMillis >= 0);
1242-
1243-
// this is a poor implementation - we may want to improve it eventually
1244-
final long deadline =
1245-
maxWaitMillis == 0
1246-
? Long.MAX_VALUE
1247-
: System.nanoTime() + (maxWaitMillis * 1_000_000);
1248-
1236+
JobVertexID vertexId, AdaptiveBatchScheduler scheduler) throws Exception {
12491237
AtomicBoolean isAllExecutionDeployed = new AtomicBoolean(false);
12501238

1251-
while (System.nanoTime() < deadline && !isAllExecutionDeployed.get()) {
1239+
while (!isAllExecutionDeployed.get()) {
12521240
runInMainThread(
12531241
() -> {
12541242
List<ExecutionAttemptID> attemptIds =
@@ -1266,13 +1254,7 @@ private void waitUntilAllExecutionsDeployed(
12661254
isAllExecutionDeployed.set(true);
12671255
}
12681256
});
1269-
try {
1270-
Thread.sleep(2);
1271-
} catch (InterruptedException ignored) {
1272-
}
1273-
}
1274-
if (!isAllExecutionDeployed.get()) {
1275-
throw new TimeoutException("Not all execution vertices deployed");
1257+
Thread.sleep(2);
12761258
}
12771259
}
12781260
}

0 commit comments

Comments
 (0)