flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [09/22] flink git commit: [FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend
Date Thu, 15 Feb 2018 10:42:52 GMT
[FLINK-8609] [flip6] Enable Flip-6 job mode in CliFrontend

This commit allows to deploy detached job mode clusters via the
CliFrontend. In order to do that, it first extracts the JobGraph
from the PackagedProgram and then uses the ClusterDescriptor to
deploy the job mode cluster.

This closes #5432.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9bc9749
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9bc9749
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9bc9749

Branch: refs/heads/master
Commit: c9bc97490ed8a658f03cb65a776fba2053b2d46f
Parents: 84418d0
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Feb 6 16:47:28 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 15 08:44:19 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    | 166 +++++++++++++++----
 1 file changed, 130 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9bc9749/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 560bc6f..23e82bc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -23,10 +23,13 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
@@ -37,6 +40,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -44,9 +48,11 @@ import org.apache.flink.optimizer.plan.FlinkPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -68,6 +74,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -113,6 +120,8 @@ public class CliFrontend {
 
 	private final int defaultParallelism;
 
+	private final boolean flip6;
+
 	public CliFrontend(
 			Configuration configuration,
 			List<CustomCommandLine<?>> customCommandLines) throws Exception {
@@ -135,6 +144,8 @@ public class CliFrontend {
 
 		this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
 		this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+
+		this.flip6 = CoreOptions.FLIP6_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -195,7 +206,11 @@ public class CliFrontend {
 
 		final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
 
-		runProgram(customCommandLine, commandLine, runOptions, program);
+		try {
+			runProgram(customCommandLine, commandLine, runOptions, program);
+		} finally {
+			program.deleteExtractedLibraries();
+		}
 	}
 
 	private <T> void runProgram(
@@ -210,51 +225,72 @@ public class CliFrontend {
 
 			final ClusterClient<T> client;
 
-			if (clusterId != null) {
-				client = clusterDescriptor.retrieve(clusterId);
-			} else {
-				final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
-				client = clusterDescriptor.deploySessionCluster(clusterSpecification);
-			}
+			// directly deploy the job if the cluster is started in job mode and detached
+			if (flip6 && clusterId == null && runOptions.getDetachedMode()) {
+				int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
 
-			try {
-				client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
-				client.setDetached(runOptions.getDetachedMode());
-				LOG.debug("Client slots is set to {}", client.getMaxSlots());
-
-				LOG.debug(runOptions.getSavepointRestoreSettings().toString());
-
-				int userParallelism = runOptions.getParallelism();
-				LOG.debug("User parallelism is set to {}", userParallelism);
-				if (client.getMaxSlots() != -1 && userParallelism == -1) {
-					logAndSysout("Using the parallelism provided by the remote cluster ("
-						+ client.getMaxSlots() + "). "
-						+ "To use another parallelism, set it at the ./bin/flink client.");
-					userParallelism = client.getMaxSlots();
-				} else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
-					userParallelism = defaultParallelism;
-				}
+				final JobGraph jobGraph = createJobGraph(configuration, program, parallelism);
 
-				executeProgram(program, client, userParallelism);
-			} finally {
-				if (clusterId == null && !client.isDetached()) {
-					// terminate the cluster only if we have started it before and if it's not detached
-					try {
-						clusterDescriptor.terminateCluster(client.getClusterId());
-					} catch (FlinkException e) {
-						LOG.info("Could not properly terminate the Flink cluster.", e);
-					}
-				}
+				final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
+				client = clusterDescriptor.deployJobCluster(
+					clusterSpecification,
+					jobGraph,
+					runOptions.getDetachedMode());
+
+				logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
 
 				try {
 					client.shutdown();
 				} catch (Exception e) {
 					LOG.info("Could not properly shut down the client.", e);
 				}
+			} else {
+				if (clusterId != null) {
+					client = clusterDescriptor.retrieve(clusterId);
+				} else {
+					// also in job mode we have to deploy a session cluster because the job
+					// might consist of multiple parts (e.g. when using collect)
+					final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
+					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
+				}
+
+				try {
+					client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
+					client.setDetached(runOptions.getDetachedMode());
+					LOG.debug("Client slots is set to {}", client.getMaxSlots());
+
+					LOG.debug("{}", runOptions.getSavepointRestoreSettings());
+
+					int userParallelism = runOptions.getParallelism();
+					LOG.debug("User parallelism is set to {}", userParallelism);
+					if (client.getMaxSlots() != -1 && userParallelism == -1) {
+						logAndSysout("Using the parallelism provided by the remote cluster ("
+							+ client.getMaxSlots() + "). "
+							+ "To use another parallelism, set it at the ./bin/flink client.");
+						userParallelism = client.getMaxSlots();
+					} else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
+						userParallelism = defaultParallelism;
+					}
+
+					executeProgram(program, client, userParallelism);
+				} finally {
+					if (clusterId == null && !client.isDetached()) {
+						// terminate the cluster only if we have started it before and if it's not detached
+						try {
+							clusterDescriptor.terminateCluster(client.getClusterId());
+						} catch (FlinkException e) {
+							LOG.info("Could not properly terminate the Flink cluster.", e);
+						}
+					}
+
+					try {
+						client.shutdown();
+					} catch (Exception e) {
+						LOG.info("Could not properly shut down the client.", e);
+					}
+				}
 			}
 		} finally {
-			program.deleteExtractedLibraries();
-
 			try {
 				clusterDescriptor.close();
 			} catch (Exception e) {
@@ -1081,6 +1117,64 @@ public class CliFrontend {
 		return customCommandLines;
 	}
 
+	/**
+	 * Creates a {@link JobGraph} from the given {@link PackagedProgram}.
+	 *
+	 * @param configuration to use for the optimizer and job graph generator
+	 * @param packagedProgram to extract the JobGraph from
+	 * @param defaultParallelism for the JobGraph
+	 * @return JobGraph extracted from the PackagedProgram
+	 * @throws ProgramInvocationException if the JobGraph generation failed
+	 */
+	private static JobGraph createJobGraph(Configuration configuration, PackagedProgram packagedProgram,
int defaultParallelism) throws ProgramInvocationException {
+		Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
+		final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(),
configuration);
+		final FlinkPlan flinkPlan;
+
+		if (packagedProgram.isUsingProgramEntryPoint()) {
+
+			final JobWithJars jobWithJars = packagedProgram.getPlanWithJars();
+
+			final Plan plan = jobWithJars.getPlan();
+
+			if (plan.getDefaultParallelism() <= 0) {
+				plan.setDefaultParallelism(defaultParallelism);
+			}
+
+			flinkPlan = optimizer.compile(jobWithJars.getPlan());
+		} else if (packagedProgram.isUsingInteractiveMode()) {
+			final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer);
+
+			optimizerPlanEnvironment.setParallelism(defaultParallelism);
+
+			flinkPlan = optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);
+		} else {
+			throw new ProgramInvocationException("PackagedProgram does not have a valid invocation
mode.");
+		}
+
+		final JobGraph jobGraph;
+
+		if (flinkPlan instanceof StreamingPlan) {
+			jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
+			jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
+		} else {
+			final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration);
+			jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan);
+		}
+
+		for (URL url : packagedProgram.getAllLibraries()) {
+			try {
+				jobGraph.addJar(new Path(url.toURI()));
+			} catch (URISyntaxException e) {
+				throw new ProgramInvocationException("Invalid URL for jar file: " + url + '.', e);
+			}
+		}
+
+		jobGraph.setClasspaths(packagedProgram.getClasspaths());
+
+		return jobGraph;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Custom command-line
 	// --------------------------------------------------------------------------------------------


Mime
View raw message