flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [05/10] flink git commit: [FLINK-1827] [tests] Move test classes in test folders and fix scope of test dependencies.
Date Wed, 04 May 2016 19:22:20 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
new file mode 100644
index 0000000..8a348a1
--- /dev/null
+++ b/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util
+
+import java.util.concurrent.TimeoutException
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.pattern.Patterns._
+import akka.pattern.ask
+import org.apache.curator.test.TestingCluster
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager, TestingUtils}
+
+import org.apache.flink.runtime.testutils.TestingResourceManager
+
+import scala.concurrent.{Await, Future}
+
+/**
+ * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution
+ * on build servers. If multiple tests run in parallel, the cluster picks up the fork number and
+ * uses it to avoid port conflicts.
+ *
+ * @param userConfiguration Configuration object with the user provided configuration values
+ * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the
+ *                          same [[ActorSystem]], otherwise false.
+ */
+class ForkableFlinkMiniCluster(
+    userConfiguration: Configuration,
+    singleActorSystem: Boolean)
+  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
+
+  def this(userConfiguration: Configuration) = this(userConfiguration, true)
+
+  // --------------------------------------------------------------------------
+
+  var zookeeperCluster: Option[TestingCluster] = None
+
+  override def generateConfiguration(userConfiguration: Configuration): Configuration = {
+    val forkNumberString = System.getProperty("forkNumber")
+
+    val forkNumber = try {
+      Integer.parseInt(forkNumberString)
+    }
+    catch {
+      case e: NumberFormatException => -1
+    }
+
+    val config = userConfiguration.clone()
+
+    if (forkNumber != -1) {
+      val jobManagerRPC = 1024 + forkNumber*400
+      val taskManagerRPC = 1024 + forkNumber*400 + 100
+      val taskManagerData = 1024 + forkNumber*400 + 200
+      val resourceManagerRPC = 1024 + forkNumber*400 + 300
+
+      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
+      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
+      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
+      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerRPC)
+    }
+
+    super.generateConfiguration(config)
+  }
+
+  override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val jobManagerName = getJobManagerName(index)
+    val archiveName = getArchiveName(index)
+
+    val jobManagerPort = config.getInteger(
+      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+
+    if (jobManagerPort > 0) {
+      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
+    }
+
+    val (jobManager, _) = JobManager.startJobManagerActors(
+      config,
+      actorSystem,
+      Some(jobManagerName),
+      Some(archiveName),
+      classOf[TestingJobManager],
+      classOf[TestingMemoryArchivist])
+
+    jobManager
+  }
+
+  override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val resourceManagerName = getResourceManagerName(index)
+
+    val resourceManagerPort = config.getInteger(
+      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+
+    if (resourceManagerPort > 0) {
+      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
+    }
+
+    val resourceManager = FlinkResourceManager.startResourceManagerActors(
+      config,
+      system,
+      createLeaderRetrievalService(),
+      classOf[TestingResourceManager],
+      resourceManagerName)
+
+    resourceManager
+  }
+
+  override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val rpcPort = config.getInteger(
+      ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+
+    val dataPort = config.getInteger(
+      ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
+
+    if (rpcPort > 0) {
+      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
+    }
+    if (dataPort > 0) {
+      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
+    }
+
+    val localExecution = numTaskManagers == 1
+
+    TaskManager.startTaskManagerComponentsAndActor(
+      config,
+      ResourceID.generate(),
+      system,
+      hostname,
+      Some(TaskManager.TASK_MANAGER_NAME + index),
+      Some(createLeaderRetrievalService()),
+      localExecution,
+      classOf[TestingTaskManager])
+  }
+
+  def restartLeadingJobManager(): Unit = {
+    this.synchronized {
+      (jobManagerActorSystems, jobManagerActors) match {
+        case (Some(jmActorSystems), Some(jmActors)) =>
+          val leader = getLeaderGateway(AkkaUtils.getTimeout(configuration))
+          val index = getLeaderIndex(AkkaUtils.getTimeout(configuration))
+
+          clearLeader()
+
+          val stopped = gracefulStop(leader.actor(), TestingUtils.TESTING_DURATION)
+          Await.result(stopped, TestingUtils.TESTING_DURATION)
+
+          if(!singleActorSystem) {
+            jmActorSystems(index).shutdown()
+            jmActorSystems(index).awaitTermination()
+          }
+
+          val newJobManagerActorSystem = if(!singleActorSystem) {
+            startJobManagerActorSystem(index)
+          } else {
+            jmActorSystems.head
+          }
+
+          val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
+
+          jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1))
+          jobManagerActorSystems = Some(jmActorSystems.patch(
+            index,
+            Seq(newJobManagerActorSystem),
+            1))
+
+          val lrs = createLeaderRetrievalService()
+
+          jobManagerLeaderRetrievalService = Some(lrs)
+          lrs.start(this)
+
+        case _ => throw new Exception("The JobManager of the ForkableFlinkMiniCluster have not " +
+          "been started properly.")
+      }
+    }
+  }
+
+
+  def restartTaskManager(index: Int): Unit = {
+    (taskManagerActorSystems, taskManagerActors) match {
+      case (Some(tmActorSystems), Some(tmActors)) =>
+        val stopped = gracefulStop(tmActors(index), TestingUtils.TESTING_DURATION)
+        Await.result(stopped, TestingUtils.TESTING_DURATION)
+
+        if(!singleActorSystem) {
+          tmActorSystems(index).shutdown()
+          tmActorSystems(index).awaitTermination()
+        }
+
+        val taskManagerActorSystem  = if(!singleActorSystem) {
+          startTaskManagerActorSystem(index)
+        } else {
+          tmActorSystems.head
+        }
+
+        val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
+
+        taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1))
+        taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1))
+
+      case _ => throw new Exception("The TaskManager of the ForkableFlinkMiniCluster have not " +
+        "been started properly.")
+    }
+  }
+
+  override def start(): Unit = {
+    val zookeeperURL = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "")
+
+    zookeeperCluster = if(recoveryMode == RecoveryMode.ZOOKEEPER && zookeeperURL.equals("")) {
+      LOG.info("Starting ZooKeeper cluster.")
+
+      val testingCluster = new TestingCluster(1)
+
+      configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString)
+
+      testingCluster.start()
+
+      Some(testingCluster)
+    } else {
+      None
+    }
+
+    super.start()
+  }
+
+  override def stop(): Unit = {
+    super.stop()
+
+    zookeeperCluster.foreach{
+      LOG.info("Stopping ZooKeeper cluster.")
+      _.close()
+    }
+  }
+
+  def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = {
+    val futures = taskManagerActors.map {
+      _.map {
+        tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
+      }
+    }.getOrElse(Seq())
+
+    try {
+      Await.ready(Future.sequence(futures), timeout)
+    } catch {
+      case t: TimeoutException =>
+        throw new Exception("Timeout while waiting for TaskManagers to register at " +
+          s"${jobManager.path}")
+    }
+
+  }
+}
+
+object ForkableFlinkMiniCluster {
+
+  import org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT
+
+  def startCluster(
+                    numSlots: Int,
+                    numTaskManagers: Int,
+                    timeout: String = DEFAULT_AKKA_ASK_TIMEOUT)
+  : ForkableFlinkMiniCluster = {
+
+    val config = new Configuration()
+    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
+    config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
+
+    val cluster = new ForkableFlinkMiniCluster(config)
+
+    cluster.start()
+
+    cluster
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index ee7d0a3..0459039 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -105,6 +105,7 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils_2.10</artifactId>
 			<version>${project.version}</version>
+			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
 		
@@ -539,6 +540,26 @@ under the License.
 										<ignore/>
 									</action>
 								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>
+											net.alchim31.maven
+										</groupId>
+										<artifactId>
+											scala-maven-plugin
+										</artifactId>
+										<versionRange>
+											[3.1.4,)
+										</versionRange>
+										<goals>
+											<goal>compile</goal>
+											<goal>testCompile</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
 							</pluginExecutions>
 						</lifecycleMappingMetadata>
 					</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 4431106..6faee45 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -69,7 +69,8 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
-
+		config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s");
+		config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s");
 		cluster = new ForkableFlinkMiniCluster(config, false);
 		cluster.start();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 1d0951d..255eeee 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -38,17 +38,12 @@ under the License.
 	<packaging>jar</packaging>
 
 	<dependencies>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
+			<artifactId>flink-test-utils_2.10</artifactId>
 			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
 		</dependency>
 
 		<!-- Needed for the streaming wordcount example -->
@@ -66,12 +61,6 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
 			<artifactId>${shading-artifact.name}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
@@ -87,20 +76,7 @@ under the License.
 				</exclusion>
 			</exclusions>
 		</dependency>
-				
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>4.11</version>
-			<type>jar</type>
-		</dependency>
-
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
+		
 		<dependency>
 			<groupId>com.typesafe.akka</groupId>
 			<artifactId>akka-testkit_${scala.binary.version}</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
deleted file mode 100644
index 30116af..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.test.util.TestBaseUtils;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkYarnSessionCliTest {
-
-	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
-
-	@Test
-	public void testDynamicProperties() throws IOException {
-
-		Map<String, String> map = new HashMap<String, String>(System.getenv());
-		File tmpFolder = tmp.newFolder();
-		File fakeConf = new File(tmpFolder, "flink-conf.yaml");
-		fakeConf.createNewFile();
-		map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
-		TestBaseUtils.setEnv(map);
-		Options options = new Options();
-		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
-		cli.getYARNSessionCLIOptions(options);
-
-		CommandLineParser parser = new PosixParser();
-		CommandLine cmd = null;
-		try {
-			cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});
-		} catch(Exception e) {
-			e.printStackTrace();
-			Assert.fail("Parsing failed with " + e.getMessage());
-		}
-
-		AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd);
-
-		Assert.assertNotNull(flinkYarnClient);
-
-		Map<String, String> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
-		Assert.assertEquals(1, dynProperties.size());
-		Assert.assertEquals("5 min", dynProperties.get("akka.ask.timeout"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
deleted file mode 100644
index b0757f5..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.SignalHandler;
-
-/**
- * Yarn application master which starts the {@link TestingYarnJobManager},
- * {@link TestingResourceManager}, and the {@link TestingMemoryArchivist}.
- */
-public class TestingApplicationMaster extends YarnApplicationMasterRunner {
-
-	@Override
-	public Class<? extends JobManager> getJobManagerClass() {
-		return TestingYarnJobManager.class;
-	}
-
-	@Override
-	public Class<? extends MemoryArchivist> getArchivistClass() {
-		return TestingMemoryArchivist.class;
-	}
-
-	@Override
-	protected Class<? extends TaskManager> getTaskManagerClass() {
-		return TestingYarnTaskManager.class;
-	}
-
-	@Override
-	public Class<? extends YarnFlinkResourceManager> getResourceManagerClass() {
-		return TestingYarnFlinkResourceManager.class;
-	}
-
-	public static void main(String[] args) {
-		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
-		SignalHandler.register(LOG);
-
-		// run and exit with the proper return code
-		int returnCode = new TestingApplicationMaster().run(args);
-		System.exit(returnCode);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
deleted file mode 100644
index 1efc336..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import com.google.common.base.Preconditions;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the
- * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the set of files which
- * are shipped to the yarn cluster. This is necessary to load the testing classes.
- */
-public class TestingFlinkYarnClient extends FlinkYarnClientBase {
-
-	public TestingFlinkYarnClient() {
-		List<File> filesToShip = new ArrayList<>();
-
-		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
-		Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " +
-			"Make sure to package the flink-yarn-tests module.");
-
-		File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime"));
-		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " +
-			"jar. Make sure to package the flink-runtime module.");
-
-		filesToShip.add(testingJar);
-		filesToShip.add(testingRuntimeJar);
-
-		setShipFiles(filesToShip);
-	}
-
-	@Override
-	protected Class<?> getApplicationMasterClass() {
-		return TestingApplicationMaster.class;
-	}
-
-	public static class TestJarFinder implements FilenameFilter {
-
-		private final String jarName;
-
-		public TestJarFinder(final String jarName) {
-			this.jarName = jarName;
-		}
-
-		@Override
-		public boolean accept(File dir, String name) {
-			return name.startsWith(jarName) && name.endsWith("-tests.jar") &&
-				dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
deleted file mode 100644
index 5a61b8f..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * Flink's testing resource manager for Yarn.
- */
-public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {
-
-	public TestingYarnFlinkResourceManager(
-		Configuration flinkConfig,
-		YarnConfiguration yarnConfig,
-		LeaderRetrievalService leaderRetrievalService,
-		String applicationMasterHostName,
-		String webInterfaceURL,
-		ContaineredTaskManagerParameters taskManagerParameters,
-		ContainerLaunchContext taskManagerLaunchContext,
-		int yarnHeartbeatIntervalMillis,
-		int maxFailedContainers,
-		int numInitialTaskManagers) {
-
-		super(
-			flinkConfig,
-			yarnConfig,
-			leaderRetrievalService,
-			applicationMasterHostName,
-			webInterfaceURL,
-			taskManagerParameters,
-			taskManagerLaunchContext,
-			yarnHeartbeatIntervalMillis,
-			maxFailedContainers,
-			numInitialTaskManagers);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
deleted file mode 100644
index 8586a77..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import java.io.IOException;
-
-/**
- * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}.
- */
-public class TestingYarnTaskManagerRunner {
-	public static void main(String[] args) throws IOException {
-		YarnTaskManagerRunner.runYarnTaskManager(args, TestingYarnTaskManager.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
deleted file mode 100644
index 784bf24..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class UtilsTest {
-	private static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class);
-
-	@Test
-	public void testUberjarLocator() {
-		File dir = YarnTestBase.findFile("..", new YarnTestBase.RootDirFilenameFilter());
-		Assert.assertNotNull(dir);
-		Assert.assertTrue(dir.getName().endsWith(".jar"));
-		dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
-		Assert.assertTrue(dir.exists());
-		Assert.assertTrue(dir.isDirectory());
-		List<String> files = Arrays.asList(dir.list());
-		Assert.assertTrue(files.contains("lib"));
-		Assert.assertTrue(files.contains("bin"));
-		Assert.assertTrue(files.contains("conf"));
-	}
-
-	/**
-	 * Remove 15% of the heap, at least 384MB.
-	 *
-	 */
-	@Test
-	public void testHeapCutoff() {
-		Configuration conf = new Configuration();
-		conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 0.15);
-		conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 384);
-
-		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
-		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
-
-		// test different configuration
-		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
-
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, "1000");
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.1");
-		Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
-
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.5");
-		Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
-
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1");
-		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-
-		// test also deprecated keys
-		conf = new Configuration();
-		conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
-		conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
-
-		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
-		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void illegalArgument() {
-		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1.1");
-		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void illegalArgumentNegative() {
-		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "-0.01");
-		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void tooMuchCutoff() {
-		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "6000");
-		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-	}
-
-	@Test
-	public void testGetEnvironmentVariables() {
-		Configuration testConf = new Configuration();
-		testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native");
-
-		Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
-
-		Assert.assertEquals(1, res.size());
-		Map.Entry<String, String> entry = res.entrySet().iterator().next();
-		Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
-		Assert.assertEquals("/usr/lib/native", entry.getValue());
-	}
-
-	@Test
-	public void testGetEnvironmentVariablesErroneous() {
-		Configuration testConf = new Configuration();
-		testConf.setString("yarn.application-master.env.", "/usr/lib/native");
-
-		Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
-
-		Assert.assertEquals(0, res.size());
-	}
-
-	//
-	// --------------- Tools to test if a certain string has been logged with Log4j. -------------
-	// See :  http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
-	//
-	private static TestAppender testAppender;
-	public static void addTestAppender(Class target, Level level) {
-		testAppender = new TestAppender();
-		testAppender.setThreshold(level);
-		org.apache.log4j.Logger lg = org.apache.log4j.Logger.getLogger(target);
-		lg.setLevel(level);
-		lg.addAppender(testAppender);
-		//org.apache.log4j.Logger.getRootLogger().addAppender(testAppender);
-	}
-
-	public static void checkForLogString(String expected) {
-		LoggingEvent found = getEventContainingString(expected);
-		if(found != null) {
-			LOG.info("Found expected string '"+expected+"' in log message "+found);
-			return;
-		}
-		Assert.fail("Unable to find expected string '" + expected + "' in log messages");
-	}
-
-	public static LoggingEvent getEventContainingString(String expected) {
-		if(testAppender == null) {
-			throw new NullPointerException("Initialize test appender first");
-		}
-		LoggingEvent found = null;
-		// make sure that different threads are not logging while the logs are checked
-		synchronized (testAppender.events) {
-			for (LoggingEvent event : testAppender.events) {
-				if (event.getMessage().toString().contains(expected)) {
-					found = event;
-					break;
-				}
-			}
-		}
-		return found;
-	}
-
-	public static class TestAppender extends AppenderSkeleton {
-		public final List<LoggingEvent> events = new ArrayList<>();
-		public void close() {}
-		public boolean requiresLayout() {return false;}
-		@Override
-		protected void append(LoggingEvent event) {
-			synchronized (events){
-				events.add(event);
-			}
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
deleted file mode 100644
index a93abf0..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-public class YARNHighAvailabilityITCase extends YarnTestBase {
-
-	private static TestingServer zkServer;
-
-	private static ActorSystem actorSystem;
-
-	private static final int numberApplicationAttempts = 10;
-
-	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
-
-	@BeforeClass
-	public static void setup() {
-		actorSystem = AkkaUtils.createDefaultActorSystem();
-
-		try {
-			zkServer = new TestingServer();
-			zkServer.start();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Could not start ZooKeeper testing cluster.");
-		}
-
-		yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
-		yarnConfiguration.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts);
-
-		startYARNWithConfig(yarnConfiguration);
-	}
-
-	@AfterClass
-	public static void teardown() throws IOException {
-		if(zkServer != null) {
-			zkServer.stop();
-		}
-
-		JavaTestKit.shutdownActorSystem(actorSystem);
-		actorSystem = null;
-	}
-
-	/**
-	 * Tests that the application master can be killed multiple times and that the surviving
-	 * TaskManager succesfully reconnects to the newly started JobManager.
-	 * @throws Exception
-	 */
-	@Test
-	public void testMultipleAMKill() throws Exception {
-		final int numberKillingAttempts = numberApplicationAttempts - 1;
-
-		TestingFlinkYarnClient flinkYarnClient = new TestingFlinkYarnClient();
-
-		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
-		flinkYarnClient.setTaskManagerCount(1);
-		flinkYarnClient.setJobManagerMemory(768);
-		flinkYarnClient.setTaskManagerMemory(1024);
-		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
-		String confDirPath = System.getenv("FLINK_CONF_DIR");
-		flinkYarnClient.setConfigurationDirectory(confDirPath);
-
-		String fsStateHandlePath = tmp.getRoot().getPath();
-
-		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
-		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
-			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
-			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
-			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
-			"@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
-		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
-
-		AbstractFlinkYarnCluster yarnCluster = null;
-
-		final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
-
-		try {
-			yarnCluster = flinkYarnClient.deploy();
-			yarnCluster.connectToCluster();
-			final Configuration config = yarnCluster.getFlinkConfiguration();
-
-			new JavaTestKit(actorSystem) {{
-				for (int attempt = 0; attempt < numberKillingAttempts; attempt++) {
-					new Within(timeout) {
-						@Override
-						protected void run() {
-							try {
-								LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
-								ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
-								ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-								gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
-
-								expectMsgEquals(Messages.getAcknowledge());
-
-								gateway.tell(PoisonPill.getInstance());
-							} catch (Exception e) {
-								throw new AssertionError("Could not complete test.", e);
-							}
-						}
-					};
-				}
-
-				new Within(timeout) {
-					@Override
-					protected void run() {
-						try {
-							LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
-							ActorGateway gateway2 = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
-							ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway2.leaderSessionID());
-							gateway2.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
-
-							expectMsgEquals(Messages.getAcknowledge());
-						} catch (Exception e) {
-							throw new AssertionError("Could not complete test.", e);
-						}
-					}
-				};
-
-			}};
-		} finally {
-			if (yarnCluster != null) {
-				yarnCluster.shutdown(false);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
deleted file mode 100644
index 38e17a5..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ /dev/null
@@ -1,539 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.google.common.base.Joiner;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.log4j.Level;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.flink.yarn.UtilsTest.addTestAppender;
-import static org.apache.flink.yarn.UtilsTest.checkForLogString;
-
-
-/**
- * This test starts a MiniYARNCluster with a CapacityScheduler.
- * Is has, by default a queue called "default". The configuration here adds another queue: "qa-team".
- */
-public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
-	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
-
-	@BeforeClass
-	public static void setup() {
-		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
-		yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
-		yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
-		yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
-		yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-capacityscheduler");
-		startYARNWithConfig(yarnConfiguration);
-	}
-
-	/**
-	 * Test regular operation, including command line parameter parsing.
-	 */
-	@Test
-	public void testClientStartup() {
-		LOG.info("Starting testClientStartup()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-						"-n", "1",
-						"-jm", "768",
-						"-tm", "1024", "-qu", "qa-team"},
-				"Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0);
-		LOG.info("Finished testClientStartup()");
-	}
-
-	/**
-	 * Test per-job yarn cluster
-	 *
-	 * This also tests the prefixed CliFrontend options for the YARN case
-	 * We also test if the requested parallelism of 2 is passed through.
-	 * The parallelism is requested at the YARN client (-ys).
-	 */
-	@Test
-	public void perJobYarnCluster() {
-		LOG.info("Starting perJobYarnCluster()");
-		addTestAppender(JobClient.class, Level.INFO);
-		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here.
-		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
-		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
-				"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
-				"-yn", "1",
-				"-ys", "2", //test that the job is executed with a DOP of 2
-				"-yjm", "768",
-				"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
-				/* test succeeded after this string */
-			"Job execution complete",
-				/* prohibited strings: (we want to see (2/2)) */
-			new String[]{"System.out)(1/1) switched to FINISHED "},
-			RunTypes.CLI_FRONTEND, 0, true);
-		LOG.info("Finished perJobYarnCluster()");
-	}
-
-
-	/**
-	 * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213).
-	 */
-	@Test(timeout=100000) // timeout after 100 seconds
-	public void testTaskManagerFailure() {
-		LOG.info("Starting testTaskManagerFailure()");
-		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-				"-n", "1",
-				"-jm", "768",
-				"-tm", "1024",
-				"-s", "3", // set the slots 3 to check if the vCores are set properly!
-				"-nm", "customName",
-				"-Dfancy-configuration-value=veryFancy",
-				"-Dyarn.maximum-failed-containers=3",
-				"-D" + ConfigConstants.YARN_VCORES + "=2"},
-			"Number of connected TaskManagers changed to 1. Slots available: 3",
-			RunTypes.YARN_SESSION);
-
-		Assert.assertEquals(2, getRunningContainers());
-
-		// ------------------------ Test if JobManager web interface is accessible -------
-
-		YarnClient yc = null;
-		try {
-			yc = YarnClient.createYarnClient();
-			yc.init(yarnConfiguration);
-			yc.start();
-
-			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-			Assert.assertEquals(1, apps.size()); // Only one running
-			ApplicationReport app = apps.get(0);
-			Assert.assertEquals("customName", app.getName());
-			String url = app.getTrackingUrl();
-			if(!url.endsWith("/")) {
-				url += "/";
-			}
-			if(!url.startsWith("http://")) {
-				url = "http://" + url;
-			}
-			LOG.info("Got application URL from YARN {}", url);
-
-			String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/");
-
-			JsonNode parsedTMs = new ObjectMapper().readTree(response);
-			ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers");
-			Assert.assertNotNull(taskManagers);
-			Assert.assertEquals(1, taskManagers.size());
-			Assert.assertEquals(3, taskManagers.get(0).get("slotsNumber").asInt());
-
-			// get the configuration from webinterface & check if the dynamic properties from YARN show up there.
-			String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config");
-			Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
-
-			Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
-			Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
-			Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES));
-
-			// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
-			// first, get the hostname/port
-			String oC = outContent.toString();
-			Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)");
-			Matcher matches = p.matcher(oC);
-			String hostname = null;
-			String port = null;
-			while(matches.find()) {
-				hostname = matches.group(1).toLowerCase();
-				port = matches.group(2);
-			}
-			LOG.info("Extracted hostname:port: {} {}", hostname, port);
-
-			Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
-				parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
-			Assert.assertEquals("unable to find port in " + jsonConfig, port,
-				parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
-
-			// test logfile access
-			String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
-			Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster"));
-			Assert.assertTrue(logs.contains("Starting JobManager"));
-			Assert.assertTrue(logs.contains("Starting JobManager Web Frontend"));
-		} catch(Throwable e) {
-			LOG.warn("Error while running test",e);
-			Assert.fail(e.getMessage());
-		}
-
-		// ------------------------ Kill container with TaskManager and check if vcores are set correctly -------
-
-		// find container id of taskManager:
-		ContainerId taskManagerContainer = null;
-		NodeManager nodeManager = null;
-		UserGroupInformation remoteUgi = null;
-		NMTokenIdentifier nmIdent = null;
-		try {
-			remoteUgi = UserGroupInformation.getCurrentUser();
-		} catch (IOException e) {
-			LOG.warn("Unable to get curr user", e);
-			Assert.fail();
-		}
-		for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
-			NodeManager nm = yarnCluster.getNodeManager(nmId);
-			ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
-			for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
-				String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands());
-				if(command.contains(YarnTaskManager.class.getSimpleName())) {
-					taskManagerContainer = entry.getKey();
-					nodeManager = nm;
-					nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0);
-					// allow myself to do stuff with the container
-					// remoteUgi.addCredentials(entry.getValue().getCredentials());
-					remoteUgi.addTokenIdentifier(nmIdent);
-				}
-			}
-			sleep(500);
-		}
-
-		Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer);
-		Assert.assertNotNull("Illegal state", nodeManager);
-
-		try {
-			List<NodeReport> nodeReports = yc.getNodeReports(NodeState.RUNNING);
-
-			// we asked for one node with 2 vcores so we expect 2 vcores
-			int userVcores = 0;
-			for (NodeReport rep: nodeReports) {
-				userVcores += rep.getUsed().getVirtualCores();
-			}
-			Assert.assertEquals(2, userVcores);
-		} catch (Exception e) {
-			Assert.fail("Test failed: " + e.getMessage());
-		}
-
-		yc.stop();
-
-		List<ContainerId> toStop = new LinkedList<ContainerId>();
-		toStop.add(taskManagerContainer);
-		StopContainersRequest scr = StopContainersRequest.newInstance(toStop);
-
-		try {
-			nodeManager.getNMContext().getContainerManager().stopContainers(scr);
-		} catch (Throwable e) {
-			LOG.warn("Error stopping container", e);
-			Assert.fail("Error stopping container: "+e.getMessage());
-		}
-
-		// stateful termination check:
-		// wait until we saw a container being killed and AFTERWARDS a new one launched
-		boolean ok = false;
-		do {
-			LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());
-
-			String o = errContent.toString();
-			int killedOff = o.indexOf("Container killed by the ApplicationMaster");
-			if (killedOff != -1) {
-				o = o.substring(killedOff);
-				ok = o.indexOf("Launching TaskManager") > 0;
-			}
-			sleep(1000);
-		} while(!ok);
-
-
-		// send "stop" command to command line interface
-		runner.sendStop();
-		// wait for the thread to stop
-		try {
-			runner.join(1000);
-		} catch (InterruptedException e) {
-			LOG.warn("Interrupted while stopping runner", e);
-		}
-		LOG.warn("stopped");
-
-		// ----------- Send output to logger
-		System.setOut(originalStdout);
-		System.setErr(originalStderr);
-		String oC = outContent.toString();
-		String eC = errContent.toString();
-		LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC);
-		LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC);
-
-		// ------ Check if everything happened correctly
-		Assert.assertTrue("Expect to see failed container",
-			eC.contains("New messages from the YARN cluster"));
-
-		Assert.assertTrue("Expect to see failed container",
-			eC.contains("Container killed by the ApplicationMaster"));
-
-		Assert.assertTrue("Expect to see new container started",
-			eC.contains("Launching TaskManager") && eC.contains("on host"));
-
-		// cleanup auth for the subsequent tests.
-		remoteUgi.getTokenIdentifiers().remove(nmIdent);
-
-		LOG.info("Finished testTaskManagerFailure()");
-	}
-
-	/**
-	 * Test deployment to non-existing queue. (user-reported error)
-	 * Deployment to the queue is possible because there are no queues, so we don't check.
-	 */
-	@Test
-	public void testNonexistingQueue() {
-		LOG.info("Starting testNonexistingQueue()");
-		addTestAppender(FlinkYarnClient.class, Level.WARN);
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
-				"-t", flinkLibFolder.getAbsolutePath(),
-				"-n", "1",
-				"-jm", "768",
-				"-tm", "1024",
-				"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1);
-		checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team");
-		LOG.info("Finished testNonexistingQueue()");
-	}
-
-	/**
-	 * Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN client.
-	 */
-	@Test
-	public void perJobYarnClusterWithParallelism() {
-		LOG.info("Starting perJobYarnClusterWithParallelism()");
-		// write log messages to stdout as well, so that the runWithArgs() method
-		// is catching the log output
-		addTestAppender(JobClient.class, Level.INFO);
-		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here.
-		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
-		runWithArgs(new String[]{"run",
-				"-p", "2", //test that the job is executed with a DOP of 2
-				"-m", "yarn-cluster",
-				"-yj", flinkUberjar.getAbsolutePath(),
-				"-yt", flinkLibFolder.getAbsolutePath(),
-				"-yn", "1",
-				"-yjm", "768",
-				"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
-				/* test succeeded after this string */
-			"Job execution complete",
-				/* prohibited strings: (we want to see (2/2)) */
-			new String[]{"System.out)(1/1) switched to FINISHED "},
-			RunTypes.CLI_FRONTEND, 0, true);
-		LOG.info("Finished perJobYarnClusterWithParallelism()");
-	}
-
-	/**
-	 * Test a fire-and-forget job submission to a YARN cluster.
-	 */
-	@Test(timeout=60000)
-	public void testDetachedPerJobYarnCluster() {
-		LOG.info("Starting testDetachedPerJobYarnCluster()");
-
-		File exampleJarLocation = YarnTestBase.findFile(
-			".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch",
-			new ContainsName(new String[] {"-WordCount.jar"}));
-
-		Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation);
-
-		testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
-
-		LOG.info("Finished testDetachedPerJobYarnCluster()");
-	}
-
-	/**
-	 * Test a fire-and-forget job submission to a YARN cluster.
-	 */
-	@Test(timeout=60000)
-	public void testDetachedPerJobYarnClusterWithStreamingJob() {
-		LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
-
-		File exampleJarLocation = YarnTestBase.findFile(
-			".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming",
-			new ContainsName(new String[] {"-WordCount.jar"}));
-		Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation);
-
-		testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
-
-		LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
-	}
-
-	private void testDetachedPerJobYarnClusterInternal(String job) {
-		YarnClient yc = YarnClient.createYarnClient();
-		yc.init(yarnConfiguration);
-		yc.start();
-
-		// get temporary folder for writing output of wordcount example
-		File tmpOutFolder = null;
-		try{
-			tmpOutFolder = tmp.newFolder();
-		}
-		catch(IOException e) {
-			throw new RuntimeException(e);
-		}
-
-		// get temporary file for reading input data for wordcount example
-		File tmpInFile;
-		try{
-			tmpInFile = tmp.newFile();
-			FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
-		}
-		catch(IOException e) {
-			throw new RuntimeException(e);
-		}
-
-		Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
-				"-yt", flinkLibFolder.getAbsolutePath(),
-				"-yn", "1",
-				"-yjm", "768",
-				"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
-				"-ytm", "1024",
-				"-ys", "2", // test requesting slots from YARN.
-				"--yarndetached", job, "--input", tmpInFile.getAbsoluteFile().toString(), "--output", tmpOutFolder.getAbsoluteFile().toString()},
-			"Job has been submitted with JobID",
-			RunTypes.CLI_FRONTEND);
-
-		// it should usually be 2, but on slow machines, the number varies
-		Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2);
-		// give the runner some time to detach
-		for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) {
-			try {
-				Thread.sleep(500);
-			} catch (InterruptedException e) {
-			}
-		}
-		Assert.assertFalse("The runner should detach.", runner.isAlive());
-		LOG.info("CLI Frontend has returned, so the job is running");
-
-		// find out the application id and wait until it has finished.
-		try {
-			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-
-			ApplicationId tmpAppId;
-			if (apps.size() == 1) {
-				// Better method to find the right appId. But sometimes the app is shutting down very fast
-				// Only one running
-				tmpAppId = apps.get(0).getApplicationId();
-
-				LOG.info("waiting for the job with appId {} to finish", tmpAppId);
-				// wait until the app has finished
-				while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
-					sleep(500);
-				}
-			} else {
-				// get appId by finding the latest finished appid
-				apps = yc.getApplications();
-				Collections.sort(apps, new Comparator<ApplicationReport>() {
-					@Override
-					public int compare(ApplicationReport o1, ApplicationReport o2) {
-						return o1.getApplicationId().compareTo(o2.getApplicationId())*-1;
-					}
-				});
-				tmpAppId = apps.get(0).getApplicationId();
-				LOG.info("Selected {} as the last appId from {}", tmpAppId, Arrays.toString(apps.toArray()));
-			}
-			final ApplicationId id = tmpAppId;
-
-			// now it has finished.
-			// check the output files.
-			File[] listOfOutputFiles = tmpOutFolder.listFiles();
-
-
-			Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
-			LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder );
-
-			// read all output files in output folder to one output string
-			String content = "";
-			for(File f:listOfOutputFiles)
-			{
-				if(f.isFile())
-				{
-					content += FileUtils.readFileToString(f) + "\n";
-				}
-			}
-			//String content = FileUtils.readFileToString(taskmanagerOut);
-			// check for some of the wordcount outputs.
-			Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
-			Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)"));
-
-			// check if the heap size for the TaskManager was set correctly
-			File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {
-				@Override
-				public boolean accept(File dir, String name) {
-					return name.contains("jobmanager.log") && dir.getAbsolutePath().contains(id.toString());
-				}
-			});
-			Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
-			content = FileUtils.readFileToString(jobmanagerLog);
-			// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
-			String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms424m -Xmx424m";
-			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
-				content.contains(expected));
-			expected = " (2/2) (attempt #0) to ";
-			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
-					"This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
-				content.contains(expected));
-
-			// make sure the detached app is really finished.
-			LOG.info("Checking again that app has finished");
-			ApplicationReport rep;
-			do {
-				sleep(500);
-				rep = yc.getApplicationReport(id);
-				LOG.info("Got report {}", rep);
-			} while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING);
-
-		} catch(Throwable t) {
-			LOG.warn("Error while detached yarn session was running", t);
-			Assert.fail(t.getMessage());
-		}
-	}
-
-	@After
-	public void checkForProhibitedLogContents() {
-		ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
deleted file mode 100644
index cb402a3..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-
-import org.apache.log4j.Level;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.List;
-
-import static org.apache.flink.yarn.UtilsTest.addTestAppender;
-import static org.apache.flink.yarn.UtilsTest.checkForLogString;
-
-
-/**
- * This test starts a MiniYARNCluster with a FIFO scheduler.
- * There are no queues for that scheduler.
- */
-public class YARNSessionFIFOITCase extends YarnTestBase {
-	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
-
-	/*
-	Override init with FIFO scheduler.
-	 */
-	@BeforeClass
-	public static void setup() {
-		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
-		yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
-		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
-		yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo");
-		startYARNWithConfig(yarnConfiguration);
-	}
-
-	@After
-	public void checkForProhibitedLogContents() {
-		ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
-	}
-
-	/**
-	 * Test regular operation, including command line parameter parsing.
-	 */
-	@Test(timeout=60000) // timeout after a minute.
-	public void testDetachedMode() {
-		LOG.info("Starting testDetachedMode()");
-		addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
-		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
-						"-t", flinkLibFolder.getAbsolutePath(),
-						"-n", "1",
-						"-jm", "768",
-						"-tm", "1024",
-						"--name", "MyCustomName", // test setting a custom name
-						"--detached"},
-				"Flink JobManager is now running on", RunTypes.YARN_SESSION);
-
-		checkForLogString("The Flink YARN client has been started in detached mode");
-
-		Assert.assertFalse("The runner should detach.", runner.isAlive());
-
-		LOG.info("Waiting until two containers are running");
-		// wait until two containers are running
-		while(getRunningContainers() < 2) {
-			sleep(500);
-		}
-		LOG.info("Two containers are running. Killing the application");
-
-		// kill application "externally".
-		try {
-			YarnClient yc = YarnClient.createYarnClient();
-			yc.init(yarnConfiguration);
-			yc.start();
-			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-			Assert.assertEquals(1, apps.size()); // Only one running
-			ApplicationReport app = apps.get(0);
-
-			Assert.assertEquals("MyCustomName", app.getName());
-			ApplicationId id = app.getApplicationId();
-			yc.killApplication(id);
-
-			while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
-				sleep(500);
-			}
-		} catch(Throwable t) {
-			LOG.warn("Killing failed", t);
-			Assert.fail();
-		}
-
-		LOG.info("Finished testDetachedMode()");
-	}
-
-	/**
-	 * Test querying the YARN cluster.
-	 *
-	 * This test validates through 666*2 cores in the "cluster".
-	 */
-	@Test
-	public void testQueryCluster() {
-		LOG.info("Starting testQueryCluster()");
-		runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332",null, RunTypes.YARN_SESSION, 0); // we have 666*2 cores.
-		LOG.info("Finished testQueryCluster()");
-	}
-
-	/**
-	 * Test deployment to non-existing queue. (user-reported error)
-	 * Deployment to the queue is possible because there are no queues, so we don't check.
-	 */
-	@Test
-	public void testNonexistingQueue() {
-		LOG.info("Starting testNonexistingQueue()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
-				"-t", flinkLibFolder.getAbsolutePath(),
-				"-n", "1",
-				"-jm", "768",
-				"-tm", "1024",
-				"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0);
-		LOG.info("Finished testNonexistingQueue()");
-	}
-
-	/**
-	 * The test cluster has the following resources:
-	 * - 2 Nodes with 4096 MB each.
-	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
-	 *
-	 * We allocate:
-	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
-	 * 5 TaskManagers with 1585 MB
-	 *
-	 * user sees a total request of: 8181 MB (fits)
-	 * system sees a total request of: 8437 (doesn't fit due to min alloc mb)
-	 */
-	@Ignore("The test is too resource consuming (8.5 GB of memory)")
-	@Test
-	public void testResourceComputation() {
-		addTestAppender(FlinkYarnClient.class, Level.WARN);
-		LOG.info("Starting testResourceComputation()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-				"-n", "5",
-				"-jm", "256",
-				"-tm", "1585"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
-		LOG.info("Finished testResourceComputation()");
-		checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available.");
-	}
-
-	/**
-	 * The test cluster has the following resources:
-	 * - 2 Nodes with 4096 MB each.
-	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
-	 *
-	 * We allocate:
-	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
-	 * 2 TaskManagers with 3840 MB
-	 *
-	 * the user sees a total request of: 7936 MB (fits)
-	 * the system sees a request of: 8192 MB (fits)
-	 * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit.
-	 *
-	 * --> check if the system properly rejects allocating this session.
-	 */
-	@Ignore("The test is too resource consuming (8 GB of memory)")
-	@Test
-	public void testfullAlloc() {
-		addTestAppender(FlinkYarnClient.class, Level.WARN);
-		LOG.info("Starting testfullAlloc()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-				"-n", "2",
-				"-jm", "256",
-				"-tm", "3840"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
-		LOG.info("Finished testfullAlloc()");
-		checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" +
-				"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
-	}
-
-	/**
-	 * Test the YARN Java API
-	 */
-	@Test
-	public void testJavaAPI() {
-		final int WAIT_TIME = 15;
-		LOG.info("Starting testJavaAPI()");
-
-		AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
-		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
-		flinkYarnClient.setTaskManagerCount(1);
-		flinkYarnClient.setJobManagerMemory(768);
-		flinkYarnClient.setTaskManagerMemory(1024);
-		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-		String confDirPath = System.getenv("FLINK_CONF_DIR");
-		flinkYarnClient.setConfigurationDirectory(confDirPath);
-		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
-		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
-
-		// deploy
-		AbstractFlinkYarnCluster yarnCluster = null;
-		try {
-			yarnCluster = flinkYarnClient.deploy();
-			yarnCluster.connectToCluster();
-		} catch (Exception e) {
-			LOG.warn("Failing test", e);
-			Assert.fail("Error while deploying YARN cluster: "+e.getMessage());
-		}
-		GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
-		for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
-			try {
-				Thread.sleep(1000);
-			} catch (InterruptedException e) {
-				LOG.warn("Interrupted", e);
-			}
-			GetClusterStatusResponse status = yarnCluster.getClusterStatus();
-			if(status != null && status.equals(expectedStatus)) {
-				LOG.info("Cluster reached status " + status);
-				break; // all good, cluster started
-			}
-			if(second > WAIT_TIME) {
-				// we waited for 15 seconds. cluster didn't come up correctly
-				Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds");
-			}
-		}
-
-		// use the cluster
-		Assert.assertNotNull(yarnCluster.getJobManagerAddress());
-		Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
-
-		LOG.info("Shutting down cluster. All tests passed");
-		// shutdown cluster
-		yarnCluster.shutdown(false);
-		LOG.info("Finished testJavaAPI()");
-	}
-}


Mime
View raw message