Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.program.LocalMiniClusterFactory;
import org.apache.flink.client.program.PerJobMiniClusterFactory;
import org.apache.flink.client.program.SessionMiniClusterFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.JobClient;
Expand All @@ -47,7 +49,9 @@
/** An {@link PipelineExecutor} for executing a {@link Pipeline} locally. */
@Internal
public class LocalExecutor implements PipelineExecutor {

private static final Logger LOG = LoggerFactory.getLogger(LocalExecutor.class);

private final ExecutorService executorService =
Executors.newFixedThreadPool(1, new ExecutorThreadFactory("Flink-LocalExecutor-IO"));

Expand Down Expand Up @@ -97,8 +101,19 @@ public CompletableFuture<JobClient> execute(
PipelineExecutorUtils.getStreamGraph(pipeline, configuration);

streamGraph.serializeUserDefinedInstances();
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
.submitJob(streamGraph, userCodeClassloader)

DeploymentOptions.LocalExecutionMode localExecutionMode =
configuration.get(DeploymentOptions.LOCAL_EXECTION_MODE);

LocalMiniClusterFactory localMiniClusterFactory;
if (DeploymentOptions.LocalExecutionMode.PRE_JOB.equals(localExecutionMode)) {
localMiniClusterFactory =
PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory);
} else {
localMiniClusterFactory = new SessionMiniClusterFactory();
}

return localMiniClusterFactory.submitJob(streamGraph, userCodeClassloader)
.whenComplete(
(ignored, throwable) -> {
if (throwable == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.client.program;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.graph.ExecutionPlan;

import java.util.concurrent.CompletableFuture;

/**
* The interface of MiniClusterFactory.
*/
@Internal
public interface LocalMiniClusterFactory {

CompletableFuture<JobClient> submitJob
(ExecutionPlan executionPlan, ClassLoader userCodeClassloader) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* Starts a {@link MiniCluster} for every submitted job. This class guarantees to tear down the
* MiniCluster in case of normal or exceptional job completion.
*/
public final class PerJobMiniClusterFactory {
public final class PerJobMiniClusterFactory implements LocalMiniClusterFactory {

private static final Logger LOG = LoggerFactory.getLogger(PerJobMiniClusterFactory.class);

Expand All @@ -68,6 +68,7 @@ private PerJobMiniClusterFactory(
}

/** Starts a {@link MiniCluster} and submits a job. */
@Override
public CompletableFuture<JobClient> submitJob(
ExecutionPlan executionPlan, ClassLoader userCodeClassloader) throws Exception {
MiniClusterConfiguration miniClusterConfig =
Expand Down Expand Up @@ -153,7 +154,7 @@ private MiniClusterConfiguration getMiniClusterConfig(int maximumParallelism) {
.build();
}

private static void shutDownCluster(MiniCluster miniCluster) {
public static void shutDownCluster(MiniCluster miniCluster) {
miniCluster
.closeAsync()
.whenComplete(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.function.FunctionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import static org.apache.flink.client.program.PerJobMiniClusterFactory.shutDownCluster;

/**
* Starts a {@link MiniCluster} for all submitted job. This class only tear down the MiniCluster
* when jvm shut down.
*/
public final class SessionMiniClusterFactory implements LocalMiniClusterFactory {

private static final Logger LOG = LoggerFactory.getLogger(SessionMiniClusterFactory.class);

public static final MiniCluster MINI_CLUSTER;

static {
MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig();
MINI_CLUSTER = new MiniCluster(miniClusterConfig);
LOG.info("In SessionMiniClusterFactory, MINI_CLUSTER is : " + MINI_CLUSTER);
try {
MINI_CLUSTER.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
System.out.println(
"JVM is shutting down. Running cleanup hook...");
try {
shutDownCluster(MINI_CLUSTER);
} catch (Exception e) {
System.err.println(
"Error closing resource during shutdown: "
+ e.getMessage());
}
}));
}

/** Starts a {@link MiniCluster} and submits a job. */
@Override
public CompletableFuture<JobClient> submitJob(
ExecutionPlan executionPlan, ClassLoader userCodeClassloader) throws Exception {
return MINI_CLUSTER
.submitJob(executionPlan)
.thenApplyAsync(
FunctionUtils.uncheckedFunction(
submissionResult -> {
org.apache.flink.client.ClientUtils
.waitUntilJobInitializationFinished(
() ->
MINI_CLUSTER
.getJobStatus(
submissionResult
.getJobID())
.get(),
() ->
MINI_CLUSTER
.requestJobResult(
submissionResult
.getJobID())
.get(),
userCodeClassloader);
return submissionResult;
}))
.thenApply(
result ->
new MiniClusterJobClient(
result.getJobID(),
MINI_CLUSTER,
userCodeClassloader,
MiniClusterJobClient.JobFinalizationBehavior
.NOTHING))
.whenComplete(
(ignored, throwable) -> {
if (throwable != null) {
LOG.error(
"Failed to create the JobClient."
+ " But we retain the mini cluster for subsequent jobs.");
}
})
.thenApply(Function.identity());
}

private static MiniClusterConfiguration getMiniClusterConfig() {
Configuration configuration = new Configuration(new Configuration());

if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.set(RestOptions.BIND_PORT, "0");
}

int numTaskManagers = configuration.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS);

int numSlotsPerTaskManager =
configuration.get(TaskManagerOptions.MINI_CLUSTER_NUMBER_SLOTS_PER_TASK_MANAGER);

return new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(numTaskManagers)
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testutils.WaitingCancelableInvokable;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link SessionMiniClusterFactory}. */
class SessionMiniClusterFactoryTest {

private final SessionMiniClusterFactory sessionMiniClusterFactory = initializeMiniCluster();

@Test
void testJobExecution() throws Exception {
JobClient jobClient =
sessionMiniClusterFactory
.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader())
.get();
JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
assertThat(jobExecutionResult).isNotNull();
Map<String, Object> actual = jobClient.getAccumulators().get();
assertThat(actual).isNotNull();
assertThatMiniClusterIsNotShutdown();
}

@Test
void testJobClient() throws Exception {
JobGraph cancellableJobGraph = getCancellableJobGraph();
JobClient jobClient =
sessionMiniClusterFactory
.submitJob(cancellableJobGraph, ClassLoader.getSystemClassLoader())
.get();
assertThat(jobClient.getJobID()).isEqualTo(cancellableJobGraph.getJobID());
assertThat(jobClient.getJobStatus().get()).isIn(JobStatus.CREATED, JobStatus.RUNNING);
jobClient.cancel().get();
assertThat(jobClient.getJobExecutionResult())
.failsWithin(Duration.ofSeconds(1))
.withThrowableOfType(ExecutionException.class)
.withMessageContaining("Job was cancelled");
assertThatMiniClusterIsNotShutdown();
}

@Test
void testJobClientSavepoint() throws Exception {
JobClient jobClient =
sessionMiniClusterFactory
.submitJob(getCancellableJobGraph(), ClassLoader.getSystemClassLoader())
.get();
while (jobClient.getJobStatus().get() != JobStatus.RUNNING) {
Thread.sleep(50);
}
assertThatThrownBy(
() -> jobClient.triggerSavepoint(null, SavepointFormatType.DEFAULT).get(),
"is not a streaming job.")
.isInstanceOf(ExecutionException.class);
assertThat(jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT))
.failsWithin(Duration.ofSeconds(5L))
.withThrowableOfType(ExecutionException.class)
.withMessageContaining("is not a streaming job.");
}

@Test
void testMultipleExecutions() throws Exception {
{
JobClient jobClient =
sessionMiniClusterFactory
.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader())
.get();
jobClient.getJobExecutionResult().get();
assertThatMiniClusterIsNotShutdown();
}
{
JobClient jobClient =
sessionMiniClusterFactory
.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader())
.get();
jobClient.getJobExecutionResult().get();
assertThatMiniClusterIsNotShutdown();
}
}

@Test
void testJobClientAfterShutdown() throws Exception {
JobClient jobClient =
sessionMiniClusterFactory
.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader())
.get();
jobClient.getJobExecutionResult().get();
assertThatMiniClusterIsNotShutdown();
jobClient.cancel();
}

private SessionMiniClusterFactory initializeMiniCluster() {
return new SessionMiniClusterFactory();
}

private void assertThatMiniClusterIsNotShutdown() {
assertThat(SessionMiniClusterFactory.MINI_CLUSTER.isRunning()).isTrue();
}

private static JobGraph getNoopJobGraph() {
return JobGraphTestUtils.singleNoOpJobGraph();
}

private static JobGraph getCancellableJobGraph() {
JobVertex jobVertex = new JobVertex("jobVertex");
jobVertex.setInvokableClass(WaitingCancelableInvokable.class);
jobVertex.setParallelism(1);
return JobGraphTestUtils.streamingJobGraph(jobVertex);
}
}
Loading