5454import org .apache .flink .runtime .jobgraph .JobVertex ;
5555import org .apache .flink .runtime .jobgraph .JobVertexID ;
5656import org .apache .flink .runtime .jobgraph .OperatorID ;
57+ import org .apache .flink .runtime .jobmaster .TestingExecutionDeploymentTrackerWrapper ;
5758import org .apache .flink .runtime .jobmaster .event .ExecutionVertexFinishedEvent ;
5859import org .apache .flink .runtime .jobmaster .event .FileSystemJobEventStore ;
5960import org .apache .flink .runtime .jobmaster .event .JobEvent ;
@@ -159,6 +160,9 @@ public class BatchJobRecoveryTest {
159160 private ScheduledExecutor delayedExecutor =
160161 new ScheduledExecutorServiceAdapter (EXECUTOR_RESOURCE .getExecutor ());
161162
163+ private TestingExecutionDeploymentTrackerWrapper executionDeploymentTracker =
164+ new TestingExecutionDeploymentTrackerWrapper ();
165+
162166 private static final OperatorID OPERATOR_ID = new OperatorID (1234L , 5678L );
163167 private static final int NUM_SPLITS = 10 ;
164168 private static final int SOURCE_PARALLELISM = 5 ;
@@ -216,6 +220,7 @@ void setUp() throws IOException {
216220
217221 this .serializedJobGraph = serializeJobGraph (createDefaultJobGraph ());
218222 allPartitionWithMetrics .clear ();
223+ executionDeploymentTracker = new TestingExecutionDeploymentTrackerWrapper ();
219224 }
220225
221226 @ AfterEach
@@ -238,11 +243,14 @@ void testRecoverFromJMFailover() throws Exception {
238243
239244 runInMainThread (scheduler ::startScheduling );
240245
246+ waitUntilAllExecutionsDeployed (SOURCE_ID , scheduler );
241247 runInMainThread (
242248 () -> {
243249 // transition all sources to finished.
244250 transitionExecutionsState (scheduler , ExecutionState .FINISHED , SOURCE_ID );
245251 });
252+
253+ waitUntilAllExecutionsDeployed (MIDDLE_ID , scheduler );
246254 runInMainThread (
247255 () -> {
248256 // transition all middle tasks to RUNNING state
@@ -338,11 +346,14 @@ void testJobVertexUnFinishedAndOperatorCoordinatorNotSupportBatchSnapshot() thro
338346
339347 runInMainThread (scheduler ::startScheduling );
340348
349+ waitUntilAllExecutionsDeployed (SOURCE_ID , scheduler );
341350 runInMainThread (
342351 () -> {
343352 // transition all sources to finished.
344353 transitionExecutionsState (scheduler , ExecutionState .FINISHED , SOURCE_ID );
345354 });
355+
356+ waitUntilAllExecutionsDeployed (MIDDLE_ID , scheduler );
346357 runInMainThread (
347358 () -> {
348359 // transition first middle task to finished.
@@ -451,6 +462,7 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit
451462
452463 runInMainThread (scheduler ::startScheduling );
453464
465+ waitUntilAllExecutionsDeployed (SOURCE_ID , scheduler );
454466 runInMainThread (
455467 () -> {
456468 // transition all sources to finished.
@@ -495,14 +507,13 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit
495507 }
496508 }
497509
498- for (ExecutionVertex taskVertex :
499- getExecutionVertices (MIDDLE_ID , newScheduler .getExecutionGraph ())) {
500- waitUntilExecutionVertexState (taskVertex , ExecutionState .DEPLOYING , 15000L );
501- }
510+ waitUntilAllExecutionsDeployed (MIDDLE_ID , newScheduler );
502511
512+ waitUntilAllExecutionsDeployed (MIDDLE_ID , scheduler );
503513 runInMainThread (
504514 () -> {
505515 // transition all middle tasks to running
516+ transitionExecutionsState (scheduler , ExecutionState .INITIALIZING , MIDDLE_ID );
506517 transitionExecutionsState (scheduler , ExecutionState .RUNNING , MIDDLE_ID );
507518 });
508519
@@ -539,6 +550,7 @@ void testRecoverFromJMFailoverAndPartitionsUnavailable() throws Exception {
539550
540551 runInMainThread (scheduler ::startScheduling );
541552
553+ waitUntilAllExecutionsDeployed (SOURCE_ID , scheduler );
542554 runInMainThread (
543555 () -> {
544556 // transition all sources to finished.
@@ -596,15 +608,20 @@ void testRecoverDecidedParallelismFromTheSameJobGraphInstance() throws Exception
596608
597609 runInMainThread (scheduler ::startScheduling );
598610
611+ waitUntilAllExecutionsDeployed (SOURCE_ID , scheduler );
599612 runInMainThread (
600613 () -> {
601614 // transition all sources to finished.
602615 transitionExecutionsState (scheduler , ExecutionState .FINISHED , SOURCE_ID );
603616 });
617+
618+ waitUntilAllExecutionsDeployed (MIDDLE_ID , scheduler );
604619 runInMainThread (
605620 () -> { // transition all middle tasks to finished.
606621 transitionExecutionsState (scheduler , ExecutionState .FINISHED , MIDDLE_ID );
607622 });
623+
624+ waitUntilAllExecutionsDeployed (SINK_ID , scheduler );
608625 runInMainThread (
609626 () -> {
610627 // transition all sinks to finished.
@@ -676,6 +693,7 @@ void testPartitionNotFoundTwiceAfterJMFailover() throws Exception {
676693 });
677694
678695 // transition all sources to finished.
696+ waitUntilAllExecutionsDeployed (SOURCE_ID , scheduler );
679697 runInMainThread (
680698 () -> transitionExecutionsState (scheduler , ExecutionState .FINISHED , SOURCE_ID ));
681699
@@ -1124,6 +1142,7 @@ private AdaptiveBatchScheduler createScheduler(
11241142 jobGraph ,
11251143 mainThreadExecutor .getMainThreadExecutor (),
11261144 EXECUTOR_RESOURCE .getExecutor ())
1145+ .setExecutionDeploymentTracker (executionDeploymentTracker )
11271146 .setRestartBackoffTimeStrategy (
11281147 new FixedDelayRestartBackoffTimeStrategy
11291148 .FixedDelayRestartBackoffTimeStrategyFactory (10 , 0 )
@@ -1212,4 +1231,30 @@ public Optional<ResourceID> storesLocalResourcesOn() {
12121231 };
12131232 }
12141233 }
1234+
1235+ private void waitUntilAllExecutionsDeployed (
1236+ JobVertexID vertexId , AdaptiveBatchScheduler scheduler ) throws Exception {
1237+ AtomicBoolean isAllExecutionDeployed = new AtomicBoolean (false );
1238+
1239+ while (!isAllExecutionDeployed .get ()) {
1240+ runInMainThread (
1241+ () -> {
1242+ List <ExecutionAttemptID > attemptIds =
1243+ Arrays .stream (
1244+ scheduler
1245+ .getExecutionJobVertex (vertexId )
1246+ .getTaskVertices ())
1247+ .map (ExecutionVertex ::getCurrentExecutionAttempt )
1248+ .map (Execution ::getAttemptId )
1249+ .collect (Collectors .toList ());
1250+ if (!attemptIds .isEmpty ()
1251+ && executionDeploymentTracker
1252+ .getDeployedExecutions ()
1253+ .containsAll (attemptIds )) {
1254+ isAllExecutionDeployed .set (true );
1255+ }
1256+ });
1257+ Thread .sleep (2 );
1258+ }
1259+ }
12151260}
0 commit comments