flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7080] [yarn] Add Yarn per job mode deployment
Date Sun, 30 Jul 2017 17:45:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master bc9418656 -> a954ea113


[FLINK-7080] [yarn] Add Yarn per job mode deployment

Upload user code jar from JobGraph

This closes #4284.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a954ea11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a954ea11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a954ea11

Branch: refs/heads/master
Commit: a954ea113bc29a4480af579387c6e9b81bd76f85
Parents: bc94186
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Jul 6 18:26:41 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sun Jul 30 19:44:49 2017 +0200

----------------------------------------------------------------------
 .../client/deployment/ClusterDescriptor.java    |   6 +-
 .../deployment/StandaloneClusterDescriptor.java |   2 +-
 .../clusterframework/BootstrapTools.java        |  10 +-
 .../runtime/entrypoint/ClusterEntrypoint.java   |   4 +
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java   |  15 ++
 .../yarn/TestingYarnClusterDescriptor.java      |  16 +-
 .../java/org/apache/flink/yarn/YARNITCase.java  | 110 +++++++++
 .../yarn/AbstractYarnClusterDescriptor.java     | 233 +++++++++++--------
 .../flink/yarn/YarnClusterDescriptor.java       |  12 +-
 .../flink/yarn/YarnClusterDescriptorV2.java     |  13 +-
 .../entrypoint/YarnJobClusterEntrypoint.java    |   2 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   |  16 +-
 12 files changed, 324 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index ba83607..a62ceff 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -51,8 +51,12 @@ public interface ClusterDescriptor<ClientType extends ClusterClient>
{
 	/**
 	 * Deploys a per-job cluster with the given job on the cluster.
 	 *
+	 * @param clusterSpecification Initial cluster specification with which the Flink cluster
is launched
+	 * @param jobGraph JobGraph with which the job cluster is started
 	 * @return Cluster client to talk to the Flink cluster
 	 * @throws ClusterDeploymentException if the cluster could not be deployed
 	 */
-	ClientType deployJobCluster(final JobGraph jobGraph) throws ClusterDeploymentException;
+	ClientType deployJobCluster(
+		final ClusterSpecification clusterSpecification,
+		final JobGraph jobGraph);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 435692f..51e267a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -56,7 +56,7 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
 	}
 
 	@Override
-	public StandaloneClusterClient deployJobCluster(JobGraph jobGraph) throws ClusterDeploymentException
{
+	public StandaloneClusterClient deployJobCluster(ClusterSpecification clusterSpecification,
JobGraph jobGraph) {
 		throw new UnsupportedOperationException("Can't deploy a standalone per-job cluster.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index fd30e43..4cf8166 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -228,8 +228,14 @@ public class BootstrapTools {
 
 		Configuration cfg = baseConfig.clone();
 
-		cfg.setString(JobManagerOptions.ADDRESS, jobManagerHostname);
-		cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
+		if (jobManagerHostname != null && !jobManagerHostname.isEmpty()) {
+			cfg.setString(JobManagerOptions.ADDRESS, jobManagerHostname);
+		}
+
+		if (jobManagerPort > 0) {
+			cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
+		}
+
 		cfg.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, registrationTimeout.toString());
 		if (numSlots != -1){
 			cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 98348ef..2538f20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -127,6 +127,10 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler
{
 		synchronized (lock) {
 			initializeServices(configuration);
 
+			// write host information into configuration
+			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
+			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
+
 			startClusterComponents(
 				configuration,
 				commonRpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 810efff..dbf61a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -200,6 +201,20 @@ public class AkkaRpcServiceUtils {
 		return prefix + '_' + nameOffset;
 	}
 
+	/**
+	 * Extracts the hostname and the port of the remote actor system from the given Akka URL.
The
+	 * result is an {@link InetSocketAddress} instance containing the extracted hostname and
port. If
+	 * the Akka URL does not contain the hostname and port information, e.g. a local Akka URL
is
+	 * provided, then an {@link Exception} is thrown.
+	 *
+	 * @param akkaURL The URL to extract the host and port from.
+	 * @return The InetSocketAddress with teh extracted host and port.
+	 * @throws Exception Thrown, if the given string does not represent a proper url
+	 */
+	public static InetSocketAddress createInetSocketAddressFromAkkaURL(String akkaURL) throws
Exception {
+		return AkkaUtils.getInetSockeAddressFromAkkaURL(akkaURL);
+	}
+
 	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index b2fe133..30d2798 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.Preconditions;
@@ -58,20 +59,25 @@ public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor
 	}
 
 	@Override
-	protected Class<?> getApplicationMasterClass() {
-		return TestingApplicationMaster.class;
+	protected String getYarnSessionClusterEntrypoint() {
+		return TestingApplicationMaster.class.getName();
 	}
 
 	@Override
-	public YarnClusterClient deployJobCluster(JobGraph jobGraph) {
+	protected String getYarnJobClusterEntrypoint() {
+		throw new UnsupportedOperationException("Does not support Yarn per-job clusters.");
+	}
+
+	@Override
+	public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph
jobGraph) {
 		throw new UnsupportedOperationException("Cannot deploy a per-job cluster yet.");
 	}
 
-	private static class TestJarFinder implements FilenameFilter {
+	static class TestJarFinder implements FilenameFilter {
 
 		private final String jarName;
 
-		public TestJarFinder(final String jarName) {
+		TestJarFinder(final String jarName) {
 			this.jarName = jarName;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
new file mode 100644
index 0000000..bc28c5b
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * Test cases for the deployment of Yarn Flink clusters.
+ */
+public class YARNITCase extends YarnTestBase {
+
+	@BeforeClass
+	public static void setup() {
+		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
+		startYARNWithConfig(YARN_CONFIGURATION);
+	}
+
+	@Ignore("The cluster cannot be stopped yet.")
+	@Test
+	public void testPerJobMode() {
+		Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
+		YarnClusterDescriptorV2 yarnClusterDescriptorV2 = new YarnClusterDescriptorV2(configuration,
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR));
+
+		yarnClusterDescriptorV2.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+		yarnClusterDescriptorV2.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+
+		final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+			.setMasterMemoryMB(768)
+			.setTaskManagerMemoryMB(1024)
+			.setSlotsPerTaskManager(1)
+			.setNumberTaskManagers(1)
+			.createClusterSpecification();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(2);
+
+		env.addSource(new InfiniteSource())
+			.shuffle()
+			.addSink(new DiscardingSink<Integer>());
+
+		final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+		File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
+
+		jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+
+		YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification,
jobGraph);
+	}
+
+	private static class InfiniteSource implements ParallelSourceFunction<Integer> {
+
+		private static final long serialVersionUID = 1642561062000662861L;
+		private volatile boolean running;
+		private final Random random;
+
+		InfiniteSource() {
+			running = true;
+			random = new Random();
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			while (running) {
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(random.nextInt());
+				}
+
+				Thread.sleep(5L);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 908e91e..dfc8b6a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -159,7 +161,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	/**
 	 * The class to bootstrap the application master of the Yarn cluster (runs main method).
 	 */
-	protected abstract Class<?> getApplicationMasterClass();
+	protected abstract String getYarnSessionClusterEntrypoint();
+
+	protected abstract String getYarnJobClusterEntrypoint();
 
 	public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
 		return flinkConfiguration;
@@ -355,31 +359,130 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	@Override
 	public YarnClusterClient deploySessionCluster(ClusterSpecification clusterSpecification)
{
 		try {
-			if (UserGroupInformation.isSecurityEnabled()) {
-				// note: UGI::hasKerberosCredentials inaccurately reports false
-				// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
-				// so we check only in ticket cache scenario.
-				boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
-
-				UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
-				if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
-						&& useTicketCache && !loginUser.hasKerberosCredentials()) {
-					LOG.error("Hadoop security with Kerberos is enabled but the login user does not have
Kerberos credentials");
-					throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user
" +
-							"does not have Kerberos credentials");
-				}
-			}
-			return deployInternal(clusterSpecification);
+			return deployInternal(
+				clusterSpecification,
+				getYarnSessionClusterEntrypoint(),
+				null);
 		} catch (Exception e) {
-			throw new RuntimeException("Couldn't deploy Yarn cluster", e);
+			throw new RuntimeException("Couldn't deploy Yarn session cluster", e);
 		}
 	}
 
-	protected ClusterSpecification validateClusterResources(
+	@Override
+	public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph
jobGraph) {
+		try {
+			return deployInternal(
+				clusterSpecification,
+				getYarnJobClusterEntrypoint(),
+				jobGraph);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not deploy Yarn job cluster.", e);
+		}
+	}
+
+	/**
+	 * This method will block until the ApplicationMaster/JobManager have been
+	 * deployed on YARN.
+	 *
+	 * @param clusterSpecification Initial cluster specification for the to be deployed Flink
cluster
+	 * @param jobGraph A job graph which is deployed with the Flink cluster, null if none
+	 */
+	protected YarnClusterClient deployInternal(
 			ClusterSpecification clusterSpecification,
-			int yarnMinAllocationMB,
-			Resource maximumResourceCapability,
-			ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
+			String yarnClusterEntrypoint,
+			@Nullable JobGraph jobGraph) throws Exception {
+
+		if (UserGroupInformation.isSecurityEnabled()) {
+			// note: UGI::hasKerberosCredentials inaccurately reports false
+			// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+			// so we check only in ticket cache scenario.
+			boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+
+			UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+			if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
+				&& useTicketCache && !loginUser.hasKerberosCredentials()) {
+				LOG.error("Hadoop security with Kerberos is enabled but the login user does not have
Kerberos credentials");
+				throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user
" +
+					"does not have Kerberos credentials");
+			}
+		}
+
+		isReadyForDeployment(clusterSpecification);
+
+		final YarnClient yarnClient = getYarnClient();
+
+		// ------------------ Check if the specified queue exists --------------------
+
+		checkYarnQueues(yarnClient);
+
+		// ------------------ Add dynamic properties to local flinkConfiguraton ------
+		Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
+		for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
+			flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
+		}
+
+		// ------------------ Check if the YARN ClusterClient has the requested resources --------------
+
+		// Create application via yarnClient
+		final YarnClientApplication yarnApplication = yarnClient.createApplication();
+		final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
+
+		Resource maxRes = appResponse.getMaximumResourceCapability();
+
+		final ClusterResourceDescription freeClusterMem;
+		try {
+			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		} catch (YarnException | IOException e) {
+			failSessionDuringDeployment(yarnClient, yarnApplication);
+			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.",
e);
+		}
+
+		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+
+		final ClusterSpecification validClusterSpecification;
+		try {
+			validClusterSpecification = validateClusterResources(
+				clusterSpecification,
+				yarnMinAllocationMB,
+				maxRes,
+				freeClusterMem);
+		} catch (YarnDeploymentException yde) {
+			failSessionDuringDeployment(yarnClient, yarnApplication);
+			throw yde;
+		}
+
+		LOG.info("Cluster specification: {}", validClusterSpecification);
+
+		ApplicationReport report = startAppMaster(
+			yarnClusterEntrypoint,
+			jobGraph,
+			yarnClient,
+			yarnApplication,
+			clusterSpecification);
+
+		String host = report.getHost();
+		int port = report.getRpcPort();
+
+		// Correctly initialize the Flink config
+		flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
+		flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+
+		// the Flink cluster is deployed in YARN. Represent cluster
+		return createYarnClusterClient(
+			this,
+			clusterSpecification.getNumberTaskManagers(),
+			clusterSpecification.getSlotsPerTaskManager(),
+			yarnClient,
+			report,
+			flinkConfiguration,
+			true);
+	}
+
+	protected ClusterSpecification validateClusterResources(
+		ClusterSpecification clusterSpecification,
+		int yarnMinAllocationMB,
+		Resource maximumResourceCapability,
+		ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
 
 		int taskManagerCount = clusterSpecification.getNumberTaskManagers();
 		int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
@@ -470,82 +573,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	}
 
-	/**
-	 * This method will block until the ApplicationMaster/JobManager have been
-	 * deployed on YARN.
-	 */
-	protected YarnClusterClient deployInternal(ClusterSpecification clusterSpecification) throws
Exception {
-
-		isReadyForDeployment(clusterSpecification);
-
-		final YarnClient yarnClient = getYarnClient();
-
-		// ------------------ Check if the specified queue exists --------------------
-
-		checkYarnQueues(yarnClient);
-
-		// ------------------ Add dynamic properties to local flinkConfiguraton ------
-		Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
-		for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
-			flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
-		}
-
-		// ------------------ Check if the YARN ClusterClient has the requested resources --------------
-
-		// Create application via yarnClient
-		final YarnClientApplication yarnApplication = yarnClient.createApplication();
-		final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
-
-		Resource maxRes = appResponse.getMaximumResourceCapability();
-
-		final ClusterResourceDescription freeClusterMem;
-		try {
-			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
-		} catch (YarnException | IOException e) {
-			failSessionDuringDeployment(yarnClient, yarnApplication);
-			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.",
e);
-		}
-
-		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
-
-		final ClusterSpecification validClusterSpecification;
-		try {
-			validClusterSpecification = validateClusterResources(
-				clusterSpecification,
-				yarnMinAllocationMB,
-				maxRes,
-				freeClusterMem);
-		} catch (YarnDeploymentException yde) {
-			failSessionDuringDeployment(yarnClient, yarnApplication);
-			throw yde;
-		}
-
-		LOG.info("Cluster specification: {}", validClusterSpecification);
-
-		ApplicationReport report = startAppMaster(
-			null,
-			yarnClient,
-			yarnApplication,
-			clusterSpecification);
-
-		String host = report.getHost();
-		int port = report.getRpcPort();
-
-		// Correctly initialize the Flink config
-		flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
-		flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
-
-		// the Flink cluster is deployed in YARN. Represent cluster
-		return createYarnClusterClient(
-			this,
-			clusterSpecification.getNumberTaskManagers(),
-			clusterSpecification.getSlotsPerTaskManager(),
-			yarnClient,
-			report,
-			flinkConfiguration,
-			true);
-	}
-
 	private void checkYarnQueues(YarnClient yarnClient) {
 		try {
 			List<QueueInfo> queues = yarnClient.getAllQueues();
@@ -577,6 +604,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	public ApplicationReport startAppMaster(
+			String yarnClusterEntrypoint,
 			JobGraph jobGraph,
 			YarnClient yarnClient,
 			YarnClientApplication yarnApplication,
@@ -661,6 +689,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 					1));
 		}
 
+		if (jobGraph != null) {
+			// add the user code jars from the provided JobGraph
+			for (org.apache.flink.core.fs.Path path : jobGraph.getUserJars()) {
+				userJarFiles.add(new File(path.toUri()));
+			}
+		}
+
 		// local resource map for Yarn
 		final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size()
+ userJarFiles.size());
 		// list of remote paths (after upload)
@@ -803,6 +838,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
+			yarnClusterEntrypoint,
 			hasLogback,
 			hasLog4j,
 			hasKrb5,
@@ -1293,6 +1329,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	protected ContainerLaunchContext setupApplicationMasterContainer(
+			String yarnClusterEntrypoint,
 			boolean hasLogback,
 			boolean hasLog4j,
 			boolean hasKrb5,
@@ -1334,7 +1371,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		startCommandValues.put("logging", logging);
-		startCommandValues.put("class", getApplicationMasterClass().getName());
+		startCommandValues.put("class", yarnClusterEntrypoint);
 		startCommandValues.put("redirects",
 			"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
 			"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err");

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index b518f9e..8759c3e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
@@ -31,12 +32,17 @@ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor
{
 	}
 
 	@Override
-	protected Class<?> getApplicationMasterClass() {
-		return YarnApplicationMasterRunner.class;
+	protected String getYarnSessionClusterEntrypoint() {
+		return YarnApplicationMasterRunner.class.getName();
 	}
 
 	@Override
-	public YarnClusterClient deployJobCluster(JobGraph jobGraph) {
+	protected String getYarnJobClusterEntrypoint() {
+		throw new UnsupportedOperationException("The old Yarn descriptor does not support proper
per-job mode.");
+	}
+
+	@Override
+	public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph
jobGraph) {
 		throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index dd60f53..00b73a8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
 import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
 
 /**
@@ -35,12 +37,17 @@ public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor
{
 	}
 
 	@Override
-	protected Class<?> getApplicationMasterClass() {
-		return YarnSessionClusterEntrypoint.class;
+	protected String getYarnSessionClusterEntrypoint() {
+		return YarnSessionClusterEntrypoint.class.getName();
 	}
 
 	@Override
-	public YarnClusterClient deployJobCluster(JobGraph jobGraph) {
+	protected String getYarnJobClusterEntrypoint() {
+		return YarnJobClusterEntrypoint.class.getName();
+	}
+
+	@Override
+	public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph
jobGraph) {
 		throw new UnsupportedOperationException("Cannot yet deploy a per-job yarn cluster.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 7eabdd5..489998e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -46,7 +46,7 @@ import java.io.ObjectInputStream;
 import java.util.Map;
 
 /**
- * Entry point ofr Yarn per-job clusters.
+ * Entry point for Yarn per-job clusters.
  */
 public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a954ea11/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 86122d6..bcb8559 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -134,7 +134,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			"-Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME; //
if set
 		final String log4j =
 			"-Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; // if set
-		final String mainClass = clusterDescriptor.getApplicationMasterClass().getName();
+		final String mainClass = clusterDescriptor.getYarnSessionClusterEntrypoint();
 		final String args = "";
 		final String redirects =
 			"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
@@ -149,6 +149,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					false,
 					false,
 					false,
@@ -162,6 +163,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					false,
 					false,
 					true,
@@ -176,6 +178,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					true,
 					false,
 					false,
@@ -189,6 +192,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					true,
 					false,
 					true,
@@ -203,6 +207,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					false,
 					true,
 					false,
@@ -216,6 +221,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					false,
 					true,
 					true,
@@ -230,6 +236,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					true,
 					true,
 					false,
@@ -243,6 +250,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					true,
 					true,
 					true,
@@ -260,6 +268,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					true,
 					true,
 					false,
@@ -273,6 +282,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					true,
 					true,
 					true,
@@ -289,6 +299,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					true,
 					true,
 					false,
@@ -302,6 +313,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					true,
 					true,
 					true,
@@ -319,6 +331,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					true,
 					true,
 					true,
@@ -336,6 +349,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
 				.setupApplicationMasterContainer(
+					mainClass,
 					true,
 					true,
 					true,


Mime
View raw message