flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7110] [client] Add per-job cluster deployment interface
Date Wed, 26 Jul 2017 15:58:11 GMT
Repository: flink
Updated Branches:
  refs/heads/master 94559bae0 -> ee16ee5a2


[FLINK-7110] [client] Add per-job cluster deployment interface

Rename deploySession to deploySessionCluster, deployJob to deployJobCluster; Add ClusterDeploymentDescription
to deployJobCluster method

This closes #4270.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee16ee5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee16ee5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee16ee5a

Branch: refs/heads/master
Commit: ee16ee5a2d18413f00bf4bc3702e010c864e0e74
Parents: 94559ba
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Jul 6 09:33:18 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Jul 26 17:39:52 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/cli/DefaultCLI.java |  2 +-
 .../deployment/ClusterDeploymentException.java  | 41 ++++++++++++++++++++
 .../client/deployment/ClusterDescriptor.java    | 11 +++++-
 .../deployment/StandaloneClusterDescriptor.java |  8 +++-
 .../yarn/TestingYarnClusterDescriptor.java      |  6 +++
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  2 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  2 +-
 .../flink/yarn/YarnClusterDescriptor.java       |  7 ++++
 .../flink/yarn/YarnClusterDescriptorV2.java     |  6 +++
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  4 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   |  4 +-
 12 files changed, 85 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 49e9752..042a44a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -83,6 +83,6 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient>
{
 			List<URL> userJarFiles) throws UnsupportedOperationException {
 
 		StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
-		return descriptor.deploy();
+		return descriptor.deploySessionCluster();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDeploymentException.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDeploymentException.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDeploymentException.java
new file mode 100644
index 0000000..536ea7d
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDeploymentException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Class which indicates a problem when deploying a Flink cluster.
+ */
+public class ClusterDeploymentException extends FlinkException {
+
+	private static final long serialVersionUID = -4327724979766139208L;
+
+	public ClusterDeploymentException(String message) {
+		super(message);
+	}
+
+	public ClusterDeploymentException(Throwable cause) {
+		super(cause);
+	}
+
+	public ClusterDeploymentException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index 29836a4..e2ffafa 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client.deployment;
 
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 
 /**
  * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster
communication.
@@ -44,5 +45,13 @@ public interface ClusterDescriptor<ClientType extends ClusterClient>
{
 	 * @return Client for the cluster
 	 * @throws UnsupportedOperationException if this cluster descriptor doesn't support the
operation
 	 */
-	ClientType deploy() throws UnsupportedOperationException;
+	ClientType deploySessionCluster() throws UnsupportedOperationException;
+
+	/**
+	 * Deploys a per-job cluster with the given job on the cluster.
+	 *
+	 * @return Cluster client to talk to the Flink cluster
+	 * @throws ClusterDeploymentException if the cluster could not be deployed
+	 */
+	ClientType deployJobCluster(final JobGraph jobGraph) throws ClusterDeploymentException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 0507c3f..ebdd0a8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.deployment;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 
 /**
  * A deployment descriptor for an existing cluster.
@@ -50,7 +51,12 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
 	}
 
 	@Override
-	public StandaloneClusterClient deploy() throws UnsupportedOperationException {
+	public StandaloneClusterClient deploySessionCluster() throws UnsupportedOperationException
{
 		throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
 	}
+
+	@Override
+	public StandaloneClusterClient deployJobCluster(JobGraph jobGraph) throws ClusterDeploymentException
{
+		throw new UnsupportedOperationException("Can't deploy a standalone per-job cluster.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 377edaa..cde13ff 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.Preconditions;
 
 import java.io.File;
@@ -59,6 +60,11 @@ public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor
 		return TestingApplicationMaster.class;
 	}
 
+	@Override
+	public YarnClusterClient deployJobCluster(JobGraph jobGraph) {
+		throw new UnsupportedOperationException("Cannot deploy a per-job cluster yet.");
+	}
+
 	private static class TestJarFinder implements FilenameFilter {
 
 		private final String jarName;

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 327b376..df8133d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -137,7 +137,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		HighAvailabilityServices highAvailabilityServices = null;
 
 		try {
-			yarnCluster = flinkYarnClient.deploy();
+			yarnCluster = flinkYarnClient.deploySessionCluster();
 
 			final ClusterClient finalYarnCluster = yarnCluster;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 64fc5d1..8e1e877 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -238,7 +238,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		// deploy
 		ClusterClient yarnCluster = null;
 		try {
-			yarnCluster = flinkYarnClient.deploy();
+			yarnCluster = flinkYarnClient.deploySessionCluster();
 		} catch (Exception e) {
 			LOG.warn("Failing test", e);
 			Assert.fail("Error while deploying YARN cluster: " + e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c86565b..c5277a1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -429,7 +429,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	@Override
-	public YarnClusterClient deploy() {
+	public YarnClusterClient deploySessionCluster() {
 		try {
 			if (UserGroupInformation.isSecurityEnabled()) {
 				// note: UGI::hasKerberosCredentials inaccurately reports false

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index db5206a..dae0f64 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
 /**
  * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link
YarnApplicationMasterRunner}.
  */
@@ -27,4 +29,9 @@ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor
{
 	protected Class<?> getApplicationMasterClass() {
 		return YarnApplicationMasterRunner.class;
 	}
+
+	@Override
+	public YarnClusterClient deployJobCluster(JobGraph jobGraph) {
+		throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index b22b163..9807273 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
 /**
  * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is
used to start the new application master for a job under flip-6.
  * This implementation is now however tricky, since YarnClusterDescriptorV2 is related YarnClusterClientV2,
but AbstractYarnClusterDescriptor is related
@@ -31,4 +33,8 @@ public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor
{
 		return YarnFlinkApplicationMasterRunner.class;
 	}
 
+	@Override
+	public YarnClusterClient deployJobCluster(JobGraph jobGraph) {
+		throw new UnsupportedOperationException("Cannot yet deploy a per-job yarn cluster.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 73279da..60223ef 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -551,7 +551,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
 
 		try {
-			return yarnClusterDescriptor.deploy();
+			return yarnClusterDescriptor.deploySessionCluster();
 		} catch (Exception e) {
 			throw new RuntimeException("Error deploying the YARN cluster", e);
 		}
@@ -627,7 +627,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			}
 
 			try {
-				yarnCluster = yarnDescriptor.deploy();
+				yarnCluster = yarnDescriptor.deploySessionCluster();
 			} catch (Exception e) {
 				System.err.println("Error while deploying YARN cluster: " + e.getMessage());
 				e.printStackTrace(System.err);

http://git-wip-us.apache.org/repos/asf/flink/blob/ee16ee5a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 9326723..14738e6 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -69,7 +69,7 @@ public class YarnClusterDescriptorTest {
 		clusterDescriptor.setTaskManagerSlots(Integer.MAX_VALUE);
 
 		try {
-			clusterDescriptor.deploy();
+			clusterDescriptor.deploySessionCluster();
 
 			fail("The deploy call should have failed.");
 		} catch (RuntimeException e) {
@@ -98,7 +98,7 @@ public class YarnClusterDescriptorTest {
 		clusterDescriptor.setTaskManagerSlots(1);
 
 		try {
-			clusterDescriptor.deploy();
+			clusterDescriptor.deploySessionCluster();
 
 			fail("The deploy call should have failed.");
 		} catch (RuntimeException e) {


Mime
View raw message