flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/5] flink git commit: [FLINK-9121] [flip6] Remove Flip6 prefixes and other references
Date Mon, 02 Apr 2018 15:13:18 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3d371787d -> aa88a425b


http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 4a2219a..302fe3e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -62,7 +62,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * Tests the availability of accumulator results during runtime.
  */
-@Category(Flip6.class)
+@Category(New.class)
 public class AccumulatorLiveITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AccumulatorLiveITCase.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index dd1c398..074b721 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -81,7 +81,7 @@ public class AutoParallelismITCase extends TestLogger {
 			assertEquals(PARALLELISM, resultCollection.size());
 		}
 		catch (Exception ex) {
-			if (MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(MiniClusterType.OLD)) {
+			if (MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(MiniClusterType.LEGACY)) {
 				throw ex;
 			}
 			assertTrue(

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index aeff578..a3a551c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -78,7 +78,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
 	public static void setupCluster() throws Exception {
 		configuration = new Configuration();
 
-		if (CoreOptions.FLIP6_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
 			configuration.setInteger(WebOptions.PORT, 0);
 			final MiniCluster miniCluster = new MiniCluster(
 				new MiniClusterConfiguration.Builder()
@@ -115,7 +115,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
 	 */
 	@Test(expected = FlinkException.class)
 	public void testInvalidAkkaConfiguration() throws Throwable {
-		assumeTrue(CoreOptions.OLD_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
+		assumeTrue(CoreOptions.LEGACY_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
 		Configuration config = new Configuration();
 		config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 2b97de8..d217a2a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -157,7 +157,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger
{
 	 */
 	public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception
{
 		Configuration config = new Configuration();
-		config.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
+		config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, FileStateBackendBasePath.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index f76375d..b85a410 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -149,7 +149,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			final Throwable[] errorRef = new Throwable[1];
 
 			final Configuration configuration = new Configuration();
-			configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
+			configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 
 			// start the test program, which infinitely blocks
 			Runnable programRunner = new Runnable() {

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 69fe7d6..7dc6f0c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -68,7 +68,7 @@ public class TaskManagerProcessFailureBatchRecoveryITCase extends AbstractTaskMa
 	public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws
Exception {
 
 		final Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
+		configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort,
configuration);
 		env.setParallelism(PARALLELISM);
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index 1ecbff3..766a799 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -67,7 +67,7 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
 		final File tempCheckpointDir = tempFolder.newFolder();
 
 		final Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
+		configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
 			"localhost",
 			jobManagerPort,

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 72f700a..b5c2aaf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -144,8 +144,8 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger
{
 			} catch (Exception e) {
 				String exceptionString = ExceptionUtils.stringifyException(e);
 				if (!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") // legacy
-						|| exceptionString.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*")
// flip6
-						|| exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*")))
{ // flip6
+						|| exceptionString.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*")
// new
+						|| exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*")))
{ // new
 					throw e;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 4d2aaa0..37b8d41 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -36,7 +36,7 @@ import java.util.List;
  * flink-yarn-tests-X-tests.jar and the flink-runtime-X-tests.jar to the set of files which
  * are shipped to the yarn cluster. This is necessary to load the testing classes.
  */
-public class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
+public class TestingYarnClusterDescriptor extends LegacyYarnClusterDescriptor {
 
 	public TestingYarnClusterDescriptor(
 			Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index f9c03f9..18bcfeb 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -106,7 +106,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 	 */
 	@Test
 	public void testMultipleAMKill() throws Exception {
-		assumeTrue("This test only works with the old actor based code.", !flip6);
+		assumeTrue("This test only works with the old actor based code.", !isNewMode);
 		final int numberKillingAttempts = numberApplicationAttempts - 1;
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
 		final Configuration configuration = GlobalConfiguration.loadConfiguration();

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index ef6706a..758a098 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -57,15 +57,15 @@ public class YARNITCase extends YarnTestBase {
 		configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
 		final YarnClient yarnClient = getYarnClient();
 
-		try (final Flip6YarnClusterDescriptor flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor(
+		try (final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
 			configuration,
 			getYarnConfiguration(),
 			System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
 			yarnClient,
 			true)) {
 
-			flip6YarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-			flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+			yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+			yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
 			final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
 				.setMasterMemoryMB(768)
@@ -87,7 +87,7 @@ public class YARNITCase extends YarnTestBase {
 
 			jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
 
-			ClusterClient<ApplicationId> clusterClient = flip6YarnClusterDescriptor.deployJobCluster(
+			ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(
 				clusterSpecification,
 				jobGraph,
 				true);

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index d00a9c4..3767629 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -102,7 +102,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	 */
 	@Test
 	public void testClientStartup() throws IOException {
-		assumeTrue("Flip-6 does not start TMs upfront.", !flip6);
+		assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
 		LOG.info("Starting testClientStartup()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 						"-n", "1",
@@ -192,7 +192,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	 */
 	@Test(timeout = 100000) // timeout after 100 seconds
 	public void testTaskManagerFailure() throws Exception {
-		assumeTrue("Flip-6 does not start TMs upfront.", !flip6);
+		assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
 		LOG.info("Starting testTaskManagerFailure()");
 		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t",
flinkLibFolder.getAbsolutePath(),
 				"-n", "1",

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 464e73c..d9b02fb 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -99,7 +99,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		runner.join();
 		checkForLogString("The Flink YARN client has been started in detached mode");
 
-		if (!flip6) {
+		if (!isNewMode) {
 			LOG.info("Waiting until two containers are running");
 			// wait until two containers are running
 			while (getRunningContainers() < 2) {
@@ -241,7 +241,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
 		Configuration configuration = GlobalConfiguration.loadConfiguration();
 
-		try (final AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+		try (final AbstractYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor(
 			configuration,
 			getYarnConfiguration(),
 			confDirPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 2a1b099..c5d683e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -90,7 +90,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
 		configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, (4L << 20));
 
 		final YarnConfiguration yarnConfiguration = getYarnConfiguration();
-		final Flip6YarnClusterDescriptor clusterDescriptor = new Flip6YarnClusterDescriptor(
+		final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			configuration,
 			yarnConfiguration,
 			CliFrontend.getConfigurationDirectoryFromEnv(),

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 73abc87..421e4c0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -81,7 +81,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
 
-import static org.apache.flink.configuration.CoreOptions.OLD_MODE;
+import static org.apache.flink.configuration.CoreOptions.LEGACY_MODE;
 
 /**
  * This base class allows to use the MiniYARNCluster.
@@ -153,7 +153,7 @@ public abstract class YarnTestBase extends TestLogger {
 
 	protected org.apache.flink.configuration.Configuration flinkConfiguration;
 
-	protected boolean flip6;
+	protected boolean isNewMode;
 
 	static {
 		YARN_CONFIGURATION = new YarnConfiguration();
@@ -220,7 +220,7 @@ public abstract class YarnTestBase extends TestLogger {
 		}
 
 		flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
-		flip6 = CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE));
+		isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE));
 	}
 
 	@Nullable
@@ -528,7 +528,7 @@ public abstract class YarnTestBase extends TestLogger {
 
 				globalConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key(), keytab);
 				globalConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(), principal);
-				globalConfiguration.setString(CoreOptions.MODE.key(), OLD_MODE);
+				globalConfiguration.setString(CoreOptions.MODE.key(), LEGACY_MODE);
 
 				BootstrapTools.writeConfiguration(
 					globalConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c80818a..ef266f0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -1042,7 +1042,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			LOG.debug("Application State: {}", appState);
 			switch(appState) {
 				case FAILED:
-				case FINISHED: //TODO: the finished state may be valid in flip-6
+				case FINISHED:
 				case KILLED:
 					throw new YarnDeploymentException("The YARN application unexpectedly switched to state
"
 						+ appState + " during deployment. \n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
deleted file mode 100644
index 1374ca2..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.deployment.ClusterDeploymentException;
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
-import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is
used to start the
- * new application master for a job under flip-6.
- */
-public class Flip6YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
-
-	public Flip6YarnClusterDescriptor(
-			Configuration flinkConfiguration,
-			YarnConfiguration yarnConfiguration,
-			String configurationDirectory,
-			YarnClient yarnClient,
-			boolean sharedYarnClient) {
-		super(
-			flinkConfiguration,
-			yarnConfiguration,
-			configurationDirectory,
-			yarnClient,
-			sharedYarnClient);
-	}
-
-	@Override
-	protected String getYarnSessionClusterEntrypoint() {
-		return YarnSessionClusterEntrypoint.class.getName();
-	}
-
-	@Override
-	protected String getYarnJobClusterEntrypoint() {
-		return YarnJobClusterEntrypoint.class.getName();
-	}
-
-	@Override
-	public ClusterClient<ApplicationId> deployJobCluster(
-		ClusterSpecification clusterSpecification,
-		JobGraph jobGraph,
-		boolean detached) throws ClusterDeploymentException {
-
-		// this is required to work with Flip-6 because the slots are allocated
-		// lazily
-		jobGraph.setAllowQueuedScheduling(true);
-
-		try {
-			return deployInternal(
-				clusterSpecification,
-				"Flink per-job cluster",
-				getYarnJobClusterEntrypoint(),
-				jobGraph,
-				detached);
-		} catch (Exception e) {
-			throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
-		}
-	}
-
-	@Override
-	protected ClusterClient<ApplicationId> createYarnClusterClient(
-			AbstractYarnClusterDescriptor descriptor,
-			int numberTaskManagers,
-			int slotsPerTaskManager,
-			ApplicationReport report,
-			Configuration flinkConfiguration,
-			boolean perJobCluster) throws Exception {
-		return new RestClusterClient<>(
-			flinkConfiguration,
-			report.getApplicationId());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java
new file mode 100644
index 0000000..443e6a5
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Legacy implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link
YarnApplicationMasterRunner}.
+ */
+public class LegacyYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+
+	public LegacyYarnClusterDescriptor(
+			Configuration flinkConfiguration,
+			YarnConfiguration yarnConfiguration,
+			String configurationDirectory,
+			YarnClient yarnClient,
+			boolean sharedYarnClient) {
+		super(
+			flinkConfiguration,
+			yarnConfiguration,
+			configurationDirectory,
+			yarnClient,
+			sharedYarnClient);
+	}
+
+	@Override
+	protected String getYarnSessionClusterEntrypoint() {
+		return YarnApplicationMasterRunner.class.getName();
+	}
+
+	@Override
+	protected String getYarnJobClusterEntrypoint() {
+		throw new UnsupportedOperationException("The old Yarn descriptor does not support proper
per-job mode.");
+	}
+
+	@Override
+	public YarnClusterClient deployJobCluster(
+			ClusterSpecification clusterSpecification,
+			JobGraph jobGraph,
+			boolean detached) {
+		throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
+	}
+
+	@Override
+	protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor
descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration
flinkConfiguration, boolean perJobCluster) throws Exception {
+		return new YarnClusterClient(
+			descriptor,
+			numberTaskManagers,
+			slotsPerTaskManager,
+			report,
+			flinkConfiguration,
+			perJobCluster);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 8625cee..3dff72f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -29,7 +33,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 /**
- * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link
YarnApplicationMasterRunner}.
+ * Implementation of {@link AbstractYarnClusterDescriptor} which is used to start the
+ * application master.
  */
 public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
 
@@ -49,30 +54,45 @@ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor
{
 
 	@Override
 	protected String getYarnSessionClusterEntrypoint() {
-		return YarnApplicationMasterRunner.class.getName();
+		return YarnSessionClusterEntrypoint.class.getName();
 	}
 
 	@Override
 	protected String getYarnJobClusterEntrypoint() {
-		throw new UnsupportedOperationException("The old Yarn descriptor does not support proper
per-job mode.");
+		return YarnJobClusterEntrypoint.class.getName();
 	}
 
 	@Override
-	public YarnClusterClient deployJobCluster(
-			ClusterSpecification clusterSpecification,
-			JobGraph jobGraph,
-			boolean detached) {
-		throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
+	public ClusterClient<ApplicationId> deployJobCluster(
+		ClusterSpecification clusterSpecification,
+		JobGraph jobGraph,
+		boolean detached) throws ClusterDeploymentException {
+
+		// this is required because the slots are allocated lazily
+		jobGraph.setAllowQueuedScheduling(true);
+
+		try {
+			return deployInternal(
+				clusterSpecification,
+				"Flink per-job cluster",
+				getYarnJobClusterEntrypoint(),
+				jobGraph,
+				detached);
+		} catch (Exception e) {
+			throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
+		}
 	}
 
 	@Override
-	protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor
descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration
flinkConfiguration, boolean perJobCluster) throws Exception {
-		return new YarnClusterClient(
-			descriptor,
-			numberTaskManagers,
-			slotsPerTaskManager,
-			report,
+	protected ClusterClient<ApplicationId> createYarnClusterClient(
+			AbstractYarnClusterDescriptor descriptor,
+			int numberTaskManagers,
+			int slotsPerTaskManager,
+			ApplicationReport report,
+			Configuration flinkConfiguration,
+			boolean perJobCluster) throws Exception {
+		return new RestClusterClient<>(
 			flinkConfiguration,
-			perJobCluster);
+			report.getApplicationId());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 446377f..16abffa 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -40,7 +40,7 @@ import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
-import org.apache.flink.yarn.Flip6YarnClusterDescriptor;
+import org.apache.flink.yarn.LegacyYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -158,7 +158,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 	private final String yarnPropertiesFileLocation;
 
-	private final boolean flip6;
+	private final boolean isNewMode;
 
 	private final YarnConfiguration yarnConfiguration;
 
@@ -183,7 +183,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
 		this.acceptInteractiveInput = acceptInteractiveInput;
 
-		this.flip6 = configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.FLIP6_MODE);
+		this.isNewMode = configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE);
 
 		// Create the command line options
 
@@ -366,7 +366,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 	}
 
 	private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine
cmd) {
-		if (!flip6 && !cmd.hasOption(container.getOpt())) { // number of containers is
required option!
+		if (!isNewMode && !cmd.hasOption(container.getOpt())) { // number of containers
is required option!
 			LOG.error("Missing required argument {}", container.getOpt());
 			printUsage();
 			throw new IllegalArgumentException("Missing required argument " + container.getOpt());
@@ -971,15 +971,15 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		yarnClient.init(yarnConfiguration);
 		yarnClient.start();
 
-		if (flip6) {
-			return new Flip6YarnClusterDescriptor(
+		if (isNewMode) {
+			return new YarnClusterDescriptor(
 				configuration,
 				yarnConfiguration,
 				configurationDirectory,
 				yarnClient,
 				false);
 		} else {
-			return new YarnClusterDescriptor(
+			return new LegacyYarnClusterDescriptor(
 				configuration,
 				yarnConfiguration,
 				configurationDirectory,

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 52bf8bb..f206b66 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -57,7 +57,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the {@link YarnClusterDescriptor}.
+ * Tests for the {@link LegacyYarnClusterDescriptor}.
  */
 public class YarnClusterDescriptorTest extends TestLogger {
 
@@ -95,7 +95,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 		final Configuration flinkConfiguration = new Configuration();
 		flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
 
-		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+		LegacyYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor(
 			flinkConfiguration,
 			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
@@ -132,7 +132,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 		configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
 		configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
 
-		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+		LegacyYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor(
 			configuration,
 			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
@@ -166,7 +166,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	@Test
 	public void testSetupApplicationMasterContainer() {
 		Configuration cfg = new Configuration();
-		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+		LegacyYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor(
 			cfg,
 			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
@@ -417,7 +417,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	 */
 	@Test
 	public void testExplicitLibShipping() throws Exception {
-		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
+		AbstractYarnClusterDescriptor descriptor = new LegacyYarnClusterDescriptor(
 			new Configuration(),
 			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
@@ -460,7 +460,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	 */
 	@Test
 	public void testEnvironmentLibShipping() throws Exception {
-		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
+		AbstractYarnClusterDescriptor descriptor = new LegacyYarnClusterDescriptor(
 			new Configuration(),
 			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
@@ -500,7 +500,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	 */
 	@Test
 	public void testYarnClientShutDown() {
-		YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
+		LegacyYarnClusterDescriptor yarnClusterDescriptor = new LegacyYarnClusterDescriptor(
 			new Configuration(),
 			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
@@ -515,7 +515,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 		closableYarnClient.init(yarnConfiguration);
 		closableYarnClient.start();
 
-		yarnClusterDescriptor = new YarnClusterDescriptor(
+		yarnClusterDescriptor = new LegacyYarnClusterDescriptor(
 			new Configuration(),
 			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),

http://git-wip-us.apache.org/repos/asf/flink/blob/af5279e9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dab7262..189c033 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,9 +127,9 @@ under the License.
 		<powermock.version>1.6.5</powermock.version>
 		<hamcrest.version>1.3</hamcrest.version>
 		<japicmp.skip>false</japicmp.skip>
-		<!-- run all groups except flip6 by default -->
-		<test.excludedGroups>org.apache.flink.testutils.category.Flip6</test.excludedGroups>
-		<codebase>old</codebase>
+		<!-- run all groups except new by default -->
+		<test.excludedGroups>org.apache.flink.testutils.category.New</test.excludedGroups>
+		<codebase>legacy</codebase>
 		<!--
 			Keeping the MiniKDC version fixed instead of taking hadoop version dependency
 			to support testing Kafka, ZK etc., modules that does not have Hadoop dependency
@@ -593,16 +593,16 @@ under the License.
 	<profiles>
 
 		<profile>
-			<id>flip6</id>
+			<id>new</id>
 			<activation>
 				<property>
-					<name>flip6</name>
+					<name>new</name>
 				</property>
 			</activation>
 			<properties>
 				<!-- clear the excluded groups list -->
 				<test.excludedGroups></test.excludedGroups>
-				<codebase>flip6</codebase>
+				<codebase>new</codebase>
 			</properties>
 		</profile>
 


Mime
View raw message