diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java index 27d735967ffce..6aaa644e91e99 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy; @@ -57,6 +56,13 @@ class ExecutionGraphCoLocationRestartTest { static final TestExecutorExtension EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension(); + @RegisterExtension + static final TestingComponentMainThreadExecutor.Extension MAIN_EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Extension(); + + private final TestingComponentMainThreadExecutor mainThreadExecutor = + MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + private static final int NUM_TASKS = 31; @Test @@ -85,7 +91,7 @@ void testConstraintsAfterRestart() throws Exception { final SchedulerBase scheduler = new DefaultSchedulerBuilder( jobGraph, - ComponentMainThreadExecutorServiceAdapter.forMainThread(), + mainThreadExecutor.getMainThreadExecutor(), EXECUTOR_RESOURCE.getExecutor()) .setExecutionSlotAllocatorFactory( SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory( @@ -106,7 +112,7 @@ void testConstraintsAfterRestart() throws Exception { // enable the queued scheduling for the slot pool assertThat(eg.getState()).isEqualTo(JobStatus.CREATED); - scheduler.startScheduling(); + mainThreadExecutor.execute(scheduler::startScheduling); Predicate isDeploying = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING); @@ -117,7 +123,13 @@ void testConstraintsAfterRestart() throws Exception { // sanity checks validateConstraints(eg); - eg.getAllExecutionVertices().iterator().next().fail(new FlinkException("Test exception")); + mainThreadExecutor.execute( + () -> { + eg.getAllExecutionVertices() + .iterator() + .next() + .fail(new FlinkException("Test exception")); + }); assertThat(eg.getState()).isEqualTo(JobStatus.RESTARTING); @@ -125,11 +137,14 @@ void testConstraintsAfterRestart() throws Exception { // cancellation. This ensures the restarting actions to be performed in main thread. delayExecutor.triggerNonPeriodicScheduledTask(); - for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { - if (vertex.getExecutionState() == ExecutionState.CANCELING) { - vertex.getCurrentExecutionAttempt().completeCancelling(); - } - } + mainThreadExecutor.execute( + () -> { + for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { + if (vertex.getExecutionState() == ExecutionState.CANCELING) { + vertex.getCurrentExecutionAttempt().completeCancelling(); + } + } + }); // wait until we have restarted ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING, timeout); @@ -139,7 +154,10 @@ void testConstraintsAfterRestart() throws Exception { // checking execution vertex properties validateConstraints(eg); - ExecutionGraphTestUtils.finishAllVertices(eg); + mainThreadExecutor.execute( + () -> { + ExecutionGraphTestUtils.finishAllVertices(eg); + }); assertThat(eg.getState()).isEqualTo(FINISHED); }