Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
Expand Down Expand Up @@ -57,6 +56,13 @@ class ExecutionGraphCoLocationRestartTest {
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorExtension();

@RegisterExtension
static final TestingComponentMainThreadExecutor.Extension MAIN_EXECUTOR_RESOURCE =
new TestingComponentMainThreadExecutor.Extension();

private final TestingComponentMainThreadExecutor mainThreadExecutor =
MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();

private static final int NUM_TASKS = 31;

@Test
Expand Down Expand Up @@ -85,7 +91,7 @@ void testConstraintsAfterRestart() throws Exception {
final SchedulerBase scheduler =
new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
mainThreadExecutor.getMainThreadExecutor(),
EXECUTOR_RESOURCE.getExecutor())
.setExecutionSlotAllocatorFactory(
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
Expand All @@ -106,7 +112,7 @@ void testConstraintsAfterRestart() throws Exception {
// enable the queued scheduling for the slot pool
assertThat(eg.getState()).isEqualTo(JobStatus.CREATED);

scheduler.startScheduling();
mainThreadExecutor.execute(scheduler::startScheduling);

Predicate<AccessExecution> isDeploying =
ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
Expand All @@ -117,19 +123,28 @@ void testConstraintsAfterRestart() throws Exception {
// sanity checks
validateConstraints(eg);

eg.getAllExecutionVertices().iterator().next().fail(new FlinkException("Test exception"));
mainThreadExecutor.execute(
() -> {
eg.getAllExecutionVertices()
.iterator()
.next()
.fail(new FlinkException("Test exception"));
});

assertThat(eg.getState()).isEqualTo(JobStatus.RESTARTING);

// trigger registration of restartTasks(...) callback to cancelFuture before completing the
// cancellation. This ensures the restarting actions to be performed in main thread.
delayExecutor.triggerNonPeriodicScheduledTask();

for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
if (vertex.getExecutionState() == ExecutionState.CANCELING) {
vertex.getCurrentExecutionAttempt().completeCancelling();
}
}
mainThreadExecutor.execute(
() -> {
for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
if (vertex.getExecutionState() == ExecutionState.CANCELING) {
vertex.getCurrentExecutionAttempt().completeCancelling();
}
}
});

// wait until we have restarted
ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING, timeout);
Expand All @@ -139,7 +154,10 @@ void testConstraintsAfterRestart() throws Exception {
// checking execution vertex properties
validateConstraints(eg);

ExecutionGraphTestUtils.finishAllVertices(eg);
mainThreadExecutor.execute(
() -> {
ExecutionGraphTestUtils.finishAllVertices(eg);
});

assertThat(eg.getState()).isEqualTo(FINISHED);
}
Expand Down