flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [04/10] flink git commit: [FLINK-1827] [tests] Move test classes in test folders and fix scope of test dependencies.
Date Wed, 04 May 2016 19:22:19 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
deleted file mode 100644
index fc1e5bc..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Scanner;
-import java.util.concurrent.ConcurrentMap;
-
-
-/**
- * This base class allows to use the MiniYARNCluster.
- * The cluster is re-used for all tests.
- *
- * This class is located in a different package which is build after flink-dist. This way,
- * we can use the YARN uberjar of flink to start a Flink YARN session.
- *
- * The test is not thread-safe. Parallel execution of tests is not possible!
- */
-public abstract class YarnTestBase extends TestLogger {
-	private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class);
-
-	protected final static PrintStream originalStdout = System.out;
-	protected final static PrintStream originalStderr = System.err;
-
-	protected static String TEST_CLUSTER_NAME_KEY = "flink-yarn-minicluster-name";
-
-	protected final static int NUM_NODEMANAGERS = 2;
-
-	/** The tests are scanning for these strings in the final output. */
-	protected final static String[] PROHIBITED_STRINGS = {
-			"Exception", // we don't want any exceptions to happen
-			"Started SelectChannelConnector@0.0.0.0:8081" // Jetty should start on a random port in YARN mode.
-	};
-
-	/** These strings are white-listed, overriding teh prohibited strings */
-	protected final static String[] WHITELISTED_STRINGS = {
-			"akka.remote.RemoteTransportExceptionNoStackTrace",
-			// workaround for annoying InterruptedException logging:
-		    // https://issues.apache.org/jira/browse/YARN-1022
-			"java.lang.InterruptedException"
-	};
-
-	// Temp directory which is deleted after the unit test.
-	@ClassRule
-	public static TemporaryFolder tmp = new TemporaryFolder();
-
-	protected static MiniYARNCluster yarnCluster = null;
-
-	/**
-	 * Uberjar (fat jar) file of Flink
-	 */
-	protected static File flinkUberjar;
-
-	protected static final Configuration yarnConfiguration;
-
-	/**
-	 * lib/ folder of the flink distribution.
-	 */
-	protected static File flinkLibFolder;
-
-	static {
-		yarnConfiguration = new YarnConfiguration();
-		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
-		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways
-		yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
-		yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
-		yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-		yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
-		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4);
-		yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
-		yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
-		yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
-		// so we have to change the number of cores for testing.
-		yarnConfiguration.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
-	}
-
-
-
-	/**
-	 * Sleep a bit between the tests (we are re-using the YARN cluster for the tests)
-	 */
-	@After
-	public void sleep() {
-		try {
-			Thread.sleep(500);
-		} catch (InterruptedException e) {
-			Assert.fail("Should not happen");
-		}
-	}
-
-	private YarnClient yarnClient = null;
-	@Before
-	public void checkClusterEmpty() throws IOException, YarnException {
-		if(yarnClient == null) {
-			yarnClient = YarnClient.createYarnClient();
-			yarnClient.init(yarnConfiguration);
-			yarnClient.start();
-		}
-
-		List<ApplicationReport> apps = yarnClient.getApplications();
-		for(ApplicationReport app : apps) {
-			if(app.getYarnApplicationState() != YarnApplicationState.FINISHED
-					&& app.getYarnApplicationState() != YarnApplicationState.KILLED
-					&& app.getYarnApplicationState() != YarnApplicationState.FAILED) {
-				Assert.fail("There is at least one application on the cluster is not finished." +
-						"App "+app.getApplicationId()+" is in state "+app.getYarnApplicationState());
-			}
-		}
-	}
-
-	/**
-	 * Locate a file or directory
-	 */
-	public static File findFile(String startAt, FilenameFilter fnf) {
-		File root = new File(startAt);
-		String[] files = root.list();
-		if(files == null) {
-			return null;
-		}
-		for(String file : files) {
-			File f = new File(startAt + File.separator + file);
-			if(f.isDirectory()) {
-				File r = findFile(f.getAbsolutePath(), fnf);
-				if(r != null) {
-					return r;
-				}
-			} else if (fnf.accept(f.getParentFile(), f.getName())) {
-				return f;
-			}
-		}
-		return null;
-	}
-
-	/**
-	 * Filter to find root dir of the flink-yarn dist.
-	 */
-	public static class RootDirFilenameFilter implements FilenameFilter {
-		@Override
-		public boolean accept(File dir, String name) {
-			return name.startsWith("flink-dist") && name.endsWith(".jar") && dir.toString().contains("/lib");
-		}
-	}
-
-	public static class ContainsName implements FilenameFilter {
-		private String[] names;
-		private String excludeInPath = null;
-
-		/**
-		 * @param names which have to be included in the filename.
-		 */
-		public ContainsName(String[] names) {
-			this.names = names;
-		}
-
-		public ContainsName(String[] names, String excludeInPath) {
-			this.names = names;
-			this.excludeInPath = excludeInPath;
-		}
-
-		@Override
-		public boolean accept(File dir, String name) {
-			if(excludeInPath == null) {
-				for(String n: names) {
-					if(!name.contains(n)) {
-						return false;
-					}
-				}
-				return true;
-			} else {
-				for(String n: names) {
-					if(!name.contains(n)) {
-						return false;
-					}
-				}
-				return !dir.toString().contains(excludeInPath);
-			}
-		}
-	}
-
-	public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException {
-		tmp.create();
-		File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
-
-		FileWriter writer = new FileWriter(yarnSiteXML);
-		yarnConf.writeXml(writer);
-		writer.flush();
-		writer.close();
-		return yarnSiteXML;
-	}
-
-	/**
-	 * This method checks the written TaskManager and JobManager log files
-	 * for exceptions.
-	 *
-	 * WARN: Please make sure the tool doesn't find old logfiles from previous test runs.
-	 * So always run "mvn clean" before running the tests here.
-	 *
-	 */
-	public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) {
-		File cwd = new File("target/" + yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
-		Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists());
-		Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory());
-		
-		File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
-			@Override
-			public boolean accept(File dir, String name) {
-			// scan each file for prohibited strings.
-			File f = new File(dir.getAbsolutePath()+ "/" + name);
-			try {
-				Scanner scanner = new Scanner(f);
-				while (scanner.hasNextLine()) {
-					final String lineFromFile = scanner.nextLine();
-					for (String aProhibited : prohibited) {
-						if (lineFromFile.contains(aProhibited)) {
-							
-							boolean whitelistedFound = false;
-							for (String white : whitelisted) {
-								if (lineFromFile.contains(white)) {
-									whitelistedFound = true;
-									break;
-								}
-							}
-							
-							if (!whitelistedFound) {
-								// logging in FATAL to see the actual message in TRAVIS tests.
-								Marker fatal = MarkerFactory.getMarker("FATAL");
-								LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile);
-								return true;
-							}
-						}
-					}
-
-				}
-			} catch (FileNotFoundException e) {
-				LOG.warn("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath());
-			}
-
-			return false;
-			}
-		});
-		if(foundFile != null) {
-			Scanner scanner =  null;
-			try {
-				scanner = new Scanner(foundFile);
-			} catch (FileNotFoundException e) {
-				Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath());
-			}
-			LOG.warn("Found a file with a prohibited string. Printing contents:");
-			while (scanner.hasNextLine()) {
-				LOG.warn("LINE: "+scanner.nextLine());
-			}
-			Assert.fail("Found a file "+foundFile+" with a prohibited string: "+Arrays.toString(prohibited));
-		}
-	}
-
-	public static void sleep(int time) {
-		try {
-			Thread.sleep(time);
-		} catch (InterruptedException e) {
-			LOG.warn("Interruped",e);
-		}
-	}
-
-	public static int getRunningContainers() {
-		int count = 0;
-		for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
-			NodeManager nm = yarnCluster.getNodeManager(nmId);
-			ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
-			count += containers.size();
-		}
-		return count;
-	}
-
-	public static void startYARNWithConfig(Configuration conf) {
-		// set the home directory to a tmp directory. Flink on YARN is using the home dir to distribute the file
-		File homeDir = null;
-		try {
-			homeDir = tmp.newFolder();
-		} catch (IOException e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-		System.setProperty("user.home", homeDir.getAbsolutePath());
-		String uberjarStartLoc = "..";
-		LOG.info("Trying to locate uberjar in {}", new File(uberjarStartLoc));
-		flinkUberjar = findFile(uberjarStartLoc, new RootDirFilenameFilter());
-		Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
-		String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
-		flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/
-		Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder);
-		Assert.assertTrue("lib folder not found", flinkLibFolder.exists());
-		Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory());
-
-		if (!flinkUberjar.exists()) {
-			Assert.fail("Unable to locate yarn-uberjar.jar");
-		}
-
-		try {
-			LOG.info("Starting up MiniYARNCluster");
-			if (yarnCluster == null) {
-				yarnCluster = new MiniYARNCluster(conf.get(YarnTestBase.TEST_CLUSTER_NAME_KEY), NUM_NODEMANAGERS, 1, 1);
-
-				yarnCluster.init(conf);
-				yarnCluster.start();
-			}
-
-			Map<String, String> map = new HashMap<String, String>(System.getenv());
-
-			File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
-			Assert.assertNotNull(flinkConfDirPath);
-
-			map.put("FLINK_CONF_DIR", flinkConfDirPath.getParent());
-
-			File yarnConfFile = writeYarnSiteConfigXML(conf);
-			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
-			map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos
-			TestBaseUtils.setEnv(map);
-
-			Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
-
-			// wait for the nodeManagers to connect
-			while(!yarnCluster.waitForNodeManagersToConnect(500)) {
-				LOG.info("Waiting for Nodemanagers to connect");
-			}
-		} catch (Exception ex) {
-			ex.printStackTrace();
-			LOG.error("setup failure", ex);
-			Assert.fail();
-		}
-	}
-
-	/**
-	 * Default @BeforeClass impl. Overwrite this for passing a different configuration
-	 */
-	@BeforeClass
-	public static void setup() {
-		startYARNWithConfig(yarnConfiguration);
-	}
-
-	// -------------------------- Runner -------------------------- //
-
-	protected static ByteArrayOutputStream outContent;
-	protected static ByteArrayOutputStream errContent;
-	enum RunTypes {
-		YARN_SESSION, CLI_FRONTEND
-	}
-
-	/**
-	 * This method returns once the "startedAfterString" has been seen.
-	 */
-	protected Runner startWithArgs(String[] args, String startedAfterString, RunTypes type) {
-		LOG.info("Running with args {}", Arrays.toString(args));
-
-		outContent = new ByteArrayOutputStream();
-		errContent = new ByteArrayOutputStream();
-		System.setOut(new PrintStream(outContent));
-		System.setErr(new PrintStream(errContent));
-
-
-		final int START_TIMEOUT_SECONDS = 60;
-
-		Runner runner = new Runner(args, type);
-		runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs()).");
-		runner.start();
-
-		for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
-			sleep(1000);
-			// check output for correct TaskManager startup.
-			if(outContent.toString().contains(startedAfterString)
-					|| errContent.toString().contains(startedAfterString) ) {
-				LOG.info("Found expected output in redirected streams");
-				return runner;
-			}
-			// check if thread died
-			if(!runner.isAlive()) {
-				sendOutput();
-				Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue());
-			}
-		}
-
-		sendOutput();
-		Assert.fail("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " +
-				"expected string did not show up");
-		return null;
-	}
-
-	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) {
-		runWithArgs(args,terminateAfterString, failOnStrings, type, returnCode, false);
-	}
-	/**
-	 * The test has been passed once the "terminateAfterString" has been seen.
-	 * @param args Command line arguments for the runner
-	 * @param terminateAfterString the runner is searching the stdout and stderr for this string. as soon as it appears, the test has passed
-	 * @param failOnStrings The runner is searching stdout and stderr for the strings specified here. If one appears, the test has failed
-	 * @param type Set the type of the runner
-	 * @param returnCode Expected return code from the runner.
-	 * @param checkLogForTerminateString  If true, the runner checks also the log4j logger for the terminate string
-	 */
-	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode, boolean checkLogForTerminateString) {
-		LOG.info("Running with args {}", Arrays.toString(args));
-
-		outContent = new ByteArrayOutputStream();
-		errContent = new ByteArrayOutputStream();
-		System.setOut(new PrintStream(outContent));
-		System.setErr(new PrintStream(errContent));
-
-
-		// we wait for at most three minutes
-		final int START_TIMEOUT_SECONDS = 180;
-		final long deadline = System.currentTimeMillis() + (START_TIMEOUT_SECONDS * 1000);
-		
-		Runner runner = new Runner(args, type);
-		runner.start();
-
-		boolean expectedStringSeen = false;
-		boolean testPassedFromLog4j = false;
-		do {
-			sleep(1000);
-			String outContentString = outContent.toString();
-			String errContentString = errContent.toString();
-			if(failOnStrings != null) {
-				for (String failOnString : failOnStrings) {
-					if (outContentString.contains(failOnString)
-							|| errContentString.contains(failOnString)) {
-						LOG.warn("Failing test. Output contained illegal string '" + failOnString + "'");
-						sendOutput();
-						// stopping runner.
-						runner.sendStop();
-						Assert.fail("Output contained illegal string '" + failOnString + "'");
-					}
-				}
-			}
-			// check output for the expected terminateAfterString.
-			if(checkLogForTerminateString) {
-				LoggingEvent matchedEvent = UtilsTest.getEventContainingString(terminateAfterString);
-				if(matchedEvent != null) {
-					testPassedFromLog4j = true;
-					LOG.info("Found expected output in logging event {}", matchedEvent);
-				}
-
-			}
-
-			if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString) || testPassedFromLog4j ) {
-				expectedStringSeen = true;
-				LOG.info("Found expected output in redirected streams");
-				// send "stop" command to command line interface
-				LOG.info("RunWithArgs: request runner to stop");
-				runner.sendStop();
-				// wait for the thread to stop
-				try {
-					runner.join(30000);
-				}
-				catch (InterruptedException e) {
-					LOG.warn("Interrupted while stopping runner", e);
-				}
-				LOG.warn("RunWithArgs runner stopped.");
-			}
-			else {
-				// check if thread died
-				if (!runner.isAlive()) {
-					if (runner.getReturnValue() != 0) {
-						Assert.fail("Runner thread died before the test was finished. Return value = "
-								+ runner.getReturnValue());
-					} else {
-						LOG.info("Runner stopped earlier than expected with return value = 0");
-					}
-					// leave loop: the runner died, so we can not expect new strings to show up.
-					break;
-				}
-			}
-		}
-		while (!expectedStringSeen && System.currentTimeMillis() < deadline);
-		
-		sendOutput();
-		Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " +
-				"expected string did not show up", expectedStringSeen);
-
-		// check for 0 return code
-		Assert.assertEquals("Expected return value", returnCode, runner.getReturnValue());
-		LOG.info("Test was successful");
-	}
-
-	protected static void sendOutput() {
-		System.setOut(originalStdout);
-		System.setErr(originalStderr);
-
-		LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString());
-		LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString());
-	}
-
-	public static class Runner extends Thread {
-		private final String[] args;
-		private int returnValue;
-		private RunTypes type;
-		private FlinkYarnSessionCli yCli;
-
-		public Runner(String[] args, RunTypes type) {
-			this.args = args;
-			this.type = type;
-		}
-
-		public int getReturnValue() {
-			return returnValue;
-		}
-
-		@Override
-		public void run() {
-			switch(type) {
-				case YARN_SESSION:
-					yCli = new FlinkYarnSessionCli("", "");
-					returnValue = yCli.run(args);
-					break;
-				case CLI_FRONTEND:
-					try {
-						CliFrontend cli = new CliFrontend();
-						returnValue = cli.parseParameters(args);
-					} catch (Exception e) {
-						throw new RuntimeException(e);
-					}
-					break;
-				default:
-					throw new RuntimeException("Unknown type " + type);
-			}
-
-			if(returnValue != 0) {
-				Assert.fail("The YARN session returned with non-null value="+returnValue);
-			}
-		}
-
-		public void sendStop() {
-			if(yCli != null) {
-				yCli.stop();
-			}
-		}
-	}
-
-	// -------------------------- Tear down -------------------------- //
-
-	@AfterClass
-	public static void copyOnTravis() {
-		// When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files)
-		// to <flinkRoot>/target/flink-yarn-tests-*.
-		// The files from there are picked up by the ./tools/travis_watchdog.sh script
-		// to upload them to Amazon S3.
-		if(isOnTravis()) {
-			File target = new File("../target" + yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
-			if(!target.mkdirs()) {
-				LOG.warn("Error creating dirs to {}", target);
-			}
-			File src = tmp.getRoot();
-			LOG.info("copying the final files from {} to {}", src.getAbsolutePath(), target.getAbsolutePath());
-			try {
-				FileUtils.copyDirectoryToDirectory(src, target);
-			} catch (IOException e) {
-				LOG.warn("Error copying the final files from {} to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), e.getMessage(), e);
-			}
-		}
-	}
-
-	public static boolean isOnTravis() {
-		return System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/resources/log4j-test.properties b/flink-yarn-tests/src/main/resources/log4j-test.properties
deleted file mode 100644
index ebe0d37..0000000
--- a/flink-yarn-tests/src/main/resources/log4j-test.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
-
-# log whats going on between the tests
-log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO
-log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO
-log4j.logger.org.apache.flink.yarn.YarnHighAvailability=INFO
-log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch=INFO
-log4j.logger.org.apache.flink.runtime.leaderelection=INFO
-log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
deleted file mode 100644
index 2f93785..0000000
--- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import java.util.concurrent.ExecutorService
-
-import akka.actor.ActorRef
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
-
-import scala.concurrent.duration.FiniteDuration
-
-/** [[YarnJobManager]] implementation which mixes in the [[TestingJobManagerLike]] mixin.
-  *
-  * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition
-  * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
-  *
-  * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute concurrent tasks in the
-  *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
-  * @param instanceManager Instance manager to manage the registered
-  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
-  * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
-  * @param archive Archive for finished Flink jobs
-  * @param restartStrategyFactory Default restart strategy for job restarts
-  * @param timeout Timeout for futures
-  * @param leaderElectionService LeaderElectionService to participate in the leader election
-  */
-class TestingYarnJobManager(
-    flinkConfiguration: Configuration,
-    executorService: ExecutorService,
-    instanceManager: InstanceManager,
-    scheduler: Scheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
-    archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
-    timeout: FiniteDuration,
-    leaderElectionService: LeaderElectionService,
-    submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
-    jobRecoveryTimeout: FiniteDuration)
-  extends YarnJobManager(
-    flinkConfiguration,
-    executorService,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    archive,
-    restartStrategyFactory,
-    timeout,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout)
-  with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
deleted file mode 100644
index 73ab7eb..0000000
--- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
-import org.apache.flink.runtime.io.disk.iomanager.IOManager
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
-
-/** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin.
-  *
-  * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition
-  * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
-  *
-  * @param config Configuration object for the actor
-  * @param resourceID The Yarn container id
-  * @param connectionInfo Connection information of this actor
-  * @param memoryManager MemoryManager which is responsibel for Flink's managed memory allocation
-  * @param ioManager IOManager responsible for I/O
-  * @param network NetworkEnvironment for this actor
-  * @param numberOfSlots Number of slots for this TaskManager
-  * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading
-  *                              JobManager
-  */
-class TestingYarnTaskManager(
-    config: TaskManagerConfiguration,
-    resourceID: ResourceID,
-    connectionInfo: InstanceConnectionInfo,
-    memoryManager: MemoryManager,
-    ioManager: IOManager,
-    network: NetworkEnvironment,
-    numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService)
-  extends YarnTaskManager(
-    config,
-    resourceID,
-    connectionInfo,
-    memoryManager,
-    ioManager,
-    network,
-    numberOfSlots,
-    leaderRetrievalService)
-  with TestingTaskManagerLike {
-
-  object YarnTaskManager {
-
-    /** Entry point (main method) to run the TaskManager on YARN.
-      * @param args The command line arguments.
-      */
-    def main(args: Array[String]): Unit = {
-      YarnTaskManagerRunner.runYarnTaskManager(args, classOf[TestingYarnTaskManager])
-    }
-
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
new file mode 100644
index 0000000..30116af
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.test.util.TestBaseUtils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkYarnSessionCliTest {
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	@Test
+	public void testDynamicProperties() throws IOException {
+
+		Map<String, String> map = new HashMap<String, String>(System.getenv());
+		File tmpFolder = tmp.newFolder();
+		File fakeConf = new File(tmpFolder, "flink-conf.yaml");
+		fakeConf.createNewFile();
+		map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
+		TestBaseUtils.setEnv(map);
+		Options options = new Options();
+		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
+		cli.getYARNSessionCLIOptions(options);
+
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd = null;
+		try {
+			cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail("Parsing failed with " + e.getMessage());
+		}
+
+		AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd);
+
+		Assert.assertNotNull(flinkYarnClient);
+
+		Map<String, String> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
+		Assert.assertEquals(1, dynProperties.size());
+		Assert.assertEquals("5 min", dynProperties.get("akka.ask.timeout"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
new file mode 100644
index 0000000..b0757f5
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SignalHandler;
+
+/**
+ * Yarn application master which starts the {@link TestingYarnJobManager},
+ * {@link TestingResourceManager}, and the {@link TestingMemoryArchivist}.
+ */
+public class TestingApplicationMaster extends YarnApplicationMasterRunner {
+
+	@Override
+	public Class<? extends JobManager> getJobManagerClass() {
+		return TestingYarnJobManager.class;
+	}
+
+	@Override
+	public Class<? extends MemoryArchivist> getArchivistClass() {
+		return TestingMemoryArchivist.class;
+	}
+
+	@Override
+	protected Class<? extends TaskManager> getTaskManagerClass() {
+		return TestingYarnTaskManager.class;
+	}
+
+	@Override
+	public Class<? extends YarnFlinkResourceManager> getResourceManagerClass() {
+		return TestingYarnFlinkResourceManager.class;
+	}
+
+	public static void main(String[] args) {
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
+		SignalHandler.register(LOG);
+
+		// run and exit with the proper return code
+		int returnCode = new TestingApplicationMaster().run(args);
+		System.exit(returnCode);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
new file mode 100644
index 0000000..1efc336
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the
+ * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the set of files which
+ * are shipped to the yarn cluster. This is necessary to load the testing classes.
+ */
+public class TestingFlinkYarnClient extends FlinkYarnClientBase {
+
+	public TestingFlinkYarnClient() {
+		List<File> filesToShip = new ArrayList<>();
+
+		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
+		Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " +
+			"Make sure to package the flink-yarn-tests module.");
+
+		File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime"));
+		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " +
+			"jar. Make sure to package the flink-runtime module.");
+
+		filesToShip.add(testingJar);
+		filesToShip.add(testingRuntimeJar);
+
+		setShipFiles(filesToShip);
+	}
+
+	@Override
+	protected Class<?> getApplicationMasterClass() {
+		return TestingApplicationMaster.class;
+	}
+
+	public static class TestJarFinder implements FilenameFilter {
+
+		private final String jarName;
+
+		public TestJarFinder(final String jarName) {
+			this.jarName = jarName;
+		}
+
+		@Override
+		public boolean accept(File dir, String name) {
+			return name.startsWith(jarName) && name.endsWith("-tests.jar") &&
+				dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
new file mode 100644
index 0000000..5a61b8f
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Flink's testing resource manager for Yarn.
+ */
+public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {
+
+	public TestingYarnFlinkResourceManager(
+		Configuration flinkConfig,
+		YarnConfiguration yarnConfig,
+		LeaderRetrievalService leaderRetrievalService,
+		String applicationMasterHostName,
+		String webInterfaceURL,
+		ContaineredTaskManagerParameters taskManagerParameters,
+		ContainerLaunchContext taskManagerLaunchContext,
+		int yarnHeartbeatIntervalMillis,
+		int maxFailedContainers,
+		int numInitialTaskManagers) {
+
+		super(
+			flinkConfig,
+			yarnConfig,
+			leaderRetrievalService,
+			applicationMasterHostName,
+			webInterfaceURL,
+			taskManagerParameters,
+			taskManagerLaunchContext,
+			yarnHeartbeatIntervalMillis,
+			maxFailedContainers,
+			numInitialTaskManagers);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
new file mode 100644
index 0000000..8586a77
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import java.io.IOException;
+
+/**
+ * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}.
+ */
+public class TestingYarnTaskManagerRunner {
+	public static void main(String[] args) throws IOException {
+		YarnTaskManagerRunner.runYarnTaskManager(args, TestingYarnTaskManager.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
new file mode 100644
index 0000000..784bf24
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class UtilsTest {
+	private static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class);
+
+	@Test
+	public void testUberjarLocator() {
+		File dir = YarnTestBase.findFile("..", new YarnTestBase.RootDirFilenameFilter());
+		Assert.assertNotNull(dir);
+		Assert.assertTrue(dir.getName().endsWith(".jar"));
+		dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
+		Assert.assertTrue(dir.exists());
+		Assert.assertTrue(dir.isDirectory());
+		List<String> files = Arrays.asList(dir.list());
+		Assert.assertTrue(files.contains("lib"));
+		Assert.assertTrue(files.contains("bin"));
+		Assert.assertTrue(files.contains("conf"));
+	}
+
+	/**
+	 * Remove 15% of the heap, at least 384MB.
+	 *
+	 */
+	@Test
+	public void testHeapCutoff() {
+		Configuration conf = new Configuration();
+		conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 0.15);
+		conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 384);
+
+		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
+		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
+
+		// test different configuration
+		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
+
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, "1000");
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.1");
+		Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
+
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.5");
+		Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
+
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+
+		// test also deprecated keys
+		conf = new Configuration();
+		conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
+		conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
+
+		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
+		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void illegalArgument() {
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1.1");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void illegalArgumentNegative() {
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "-0.01");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void tooMuchCutoff() {
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "6000");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
+
+	@Test
+	public void testGetEnvironmentVariables() {
+		Configuration testConf = new Configuration();
+		testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native");
+
+		Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
+
+		Assert.assertEquals(1, res.size());
+		Map.Entry<String, String> entry = res.entrySet().iterator().next();
+		Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
+		Assert.assertEquals("/usr/lib/native", entry.getValue());
+	}
+
+	@Test
+	public void testGetEnvironmentVariablesErroneous() {
+		Configuration testConf = new Configuration();
+		testConf.setString("yarn.application-master.env.", "/usr/lib/native");
+
+		Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
+
+		Assert.assertEquals(0, res.size());
+	}
+
+	//
+	// --------------- Tools to test if a certain string has been logged with Log4j. -------------
+	// See :  http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
+	//
+	private static TestAppender testAppender;
+	public static void addTestAppender(Class target, Level level) {
+		testAppender = new TestAppender();
+		testAppender.setThreshold(level);
+		org.apache.log4j.Logger lg = org.apache.log4j.Logger.getLogger(target);
+		lg.setLevel(level);
+		lg.addAppender(testAppender);
+		//org.apache.log4j.Logger.getRootLogger().addAppender(testAppender);
+	}
+
+	public static void checkForLogString(String expected) {
+		LoggingEvent found = getEventContainingString(expected);
+		if(found != null) {
+			LOG.info("Found expected string '"+expected+"' in log message "+found);
+			return;
+		}
+		Assert.fail("Unable to find expected string '" + expected + "' in log messages");
+	}
+
+	public static LoggingEvent getEventContainingString(String expected) {
+		if(testAppender == null) {
+			throw new NullPointerException("Initialize test appender first");
+		}
+		LoggingEvent found = null;
+		// make sure that different threads are not logging while the logs are checked
+		synchronized (testAppender.events) {
+			for (LoggingEvent event : testAppender.events) {
+				if (event.getMessage().toString().contains(expected)) {
+					found = event;
+					break;
+				}
+			}
+		}
+		return found;
+	}
+
+	public static class TestAppender extends AppenderSkeleton {
+		public final List<LoggingEvent> events = new ArrayList<>();
+		public void close() {}
+		public boolean requiresLayout() {return false;}
+		@Override
+		protected void append(LoggingEvent event) {
+			synchronized (events){
+				events.add(event);
+			}
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
new file mode 100644
index 0000000..a93abf0
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.testkit.JavaTestKit;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+public class YARNHighAvailabilityITCase extends YarnTestBase {
+
+	private static TestingServer zkServer;
+
+	private static ActorSystem actorSystem;
+
+	private static final int numberApplicationAttempts = 10;
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	@BeforeClass
+	public static void setup() {
+		actorSystem = AkkaUtils.createDefaultActorSystem();
+
+		try {
+			zkServer = new TestingServer();
+			zkServer.start();
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Could not start ZooKeeper testing cluster.");
+		}
+
+		yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
+		yarnConfiguration.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts);
+
+		startYARNWithConfig(yarnConfiguration);
+	}
+
+	@AfterClass
+	public static void teardown() throws IOException {
+		if(zkServer != null) {
+			zkServer.stop();
+		}
+
+		JavaTestKit.shutdownActorSystem(actorSystem);
+		actorSystem = null;
+	}
+
+	/**
+	 * Tests that the application master can be killed multiple times and that the surviving
+	 * TaskManager succesfully reconnects to the newly started JobManager.
+	 * @throws Exception
+	 */
+	@Test
+	public void testMultipleAMKill() throws Exception {
+		final int numberKillingAttempts = numberApplicationAttempts - 1;
+
+		TestingFlinkYarnClient flinkYarnClient = new TestingFlinkYarnClient();
+
+		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
+		flinkYarnClient.setTaskManagerCount(1);
+		flinkYarnClient.setJobManagerMemory(768);
+		flinkYarnClient.setTaskManagerMemory(1024);
+		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+		flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+
+		String confDirPath = System.getenv("FLINK_CONF_DIR");
+		flinkYarnClient.setConfigurationDirectory(confDirPath);
+
+		String fsStateHandlePath = tmp.getRoot().getPath();
+
+		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
+		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
+			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
+			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
+			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
+			"@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
+		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
+
+		AbstractFlinkYarnCluster yarnCluster = null;
+
+		final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
+
+		try {
+			yarnCluster = flinkYarnClient.deploy();
+			yarnCluster.connectToCluster();
+			final Configuration config = yarnCluster.getFlinkConfiguration();
+
+			new JavaTestKit(actorSystem) {{
+				for (int attempt = 0; attempt < numberKillingAttempts; attempt++) {
+					new Within(timeout) {
+						@Override
+						protected void run() {
+							try {
+								LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+								ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+								ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
+
+								gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
+
+								expectMsgEquals(Messages.getAcknowledge());
+
+								gateway.tell(PoisonPill.getInstance());
+							} catch (Exception e) {
+								throw new AssertionError("Could not complete test.", e);
+							}
+						}
+					};
+				}
+
+				new Within(timeout) {
+					@Override
+					protected void run() {
+						try {
+							LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+							ActorGateway gateway2 = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+							ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway2.leaderSessionID());
+							gateway2.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
+
+							expectMsgEquals(Messages.getAcknowledge());
+						} catch (Exception e) {
+							throw new AssertionError("Could not complete test.", e);
+						}
+					}
+				};
+
+			}};
+		} finally {
+			if (yarnCluster != null) {
+				yarnCluster.shutdown(false);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
new file mode 100644
index 0000000..38e17a5
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.base.Joiner;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.yarn.UtilsTest.addTestAppender;
+import static org.apache.flink.yarn.UtilsTest.checkForLogString;
+
+
+/**
+ * This test starts a MiniYARNCluster with a CapacityScheduler.
+ * Is has, by default a queue called "default". The configuration here adds another queue: "qa-team".
+ */
+public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
+	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
+
+	@BeforeClass
+	public static void setup() {
+		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
+		yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
+		yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
+		yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
+		yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-capacityscheduler");
+		startYARNWithConfig(yarnConfiguration);
+	}
+
+	/**
+	 * Test regular operation, including command line parameter parsing.
+	 */
+	@Test
+	public void testClientStartup() {
+		LOG.info("Starting testClientStartup()");
+		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
+						"-n", "1",
+						"-jm", "768",
+						"-tm", "1024", "-qu", "qa-team"},
+				"Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0);
+		LOG.info("Finished testClientStartup()");
+	}
+
+	/**
+	 * Test per-job yarn cluster
+	 *
+	 * This also tests the prefixed CliFrontend options for the YARN case
+	 * We also test if the requested parallelism of 2 is passed through.
+	 * The parallelism is requested at the YARN client (-ys).
+	 */
+	@Test
+	public void perJobYarnCluster() {
+		LOG.info("Starting perJobYarnCluster()");
+		addTestAppender(JobClient.class, Level.INFO);
+		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here.
+		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
+		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
+				"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
+				"-yn", "1",
+				"-ys", "2", //test that the job is executed with a DOP of 2
+				"-yjm", "768",
+				"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
+				/* test succeeded after this string */
+			"Job execution complete",
+				/* prohibited strings: (we want to see (2/2)) */
+			new String[]{"System.out)(1/1) switched to FINISHED "},
+			RunTypes.CLI_FRONTEND, 0, true);
+		LOG.info("Finished perJobYarnCluster()");
+	}
+
+
+	/**
+	 * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213).
+	 */
+	@Test(timeout=100000) // timeout after 100 seconds
+	public void testTaskManagerFailure() {
+		LOG.info("Starting testTaskManagerFailure()");
+		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
+				"-n", "1",
+				"-jm", "768",
+				"-tm", "1024",
+				"-s", "3", // set the slots 3 to check if the vCores are set properly!
+				"-nm", "customName",
+				"-Dfancy-configuration-value=veryFancy",
+				"-Dyarn.maximum-failed-containers=3",
+				"-D" + ConfigConstants.YARN_VCORES + "=2"},
+			"Number of connected TaskManagers changed to 1. Slots available: 3",
+			RunTypes.YARN_SESSION);
+
+		Assert.assertEquals(2, getRunningContainers());
+
+		// ------------------------ Test if JobManager web interface is accessible -------
+
+		YarnClient yc = null;
+		try {
+			yc = YarnClient.createYarnClient();
+			yc.init(yarnConfiguration);
+			yc.start();
+
+			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+			Assert.assertEquals(1, apps.size()); // Only one running
+			ApplicationReport app = apps.get(0);
+			Assert.assertEquals("customName", app.getName());
+			String url = app.getTrackingUrl();
+			if(!url.endsWith("/")) {
+				url += "/";
+			}
+			if(!url.startsWith("http://")) {
+				url = "http://" + url;
+			}
+			LOG.info("Got application URL from YARN {}", url);
+
+			String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/");
+
+			JsonNode parsedTMs = new ObjectMapper().readTree(response);
+			ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers");
+			Assert.assertNotNull(taskManagers);
+			Assert.assertEquals(1, taskManagers.size());
+			Assert.assertEquals(3, taskManagers.get(0).get("slotsNumber").asInt());
+
+			// get the configuration from webinterface & check if the dynamic properties from YARN show up there.
+			String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config");
+			Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
+
+			Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
+			Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
+			Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES));
+
+			// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
+			// first, get the hostname/port
+			String oC = outContent.toString();
+			Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)");
+			Matcher matches = p.matcher(oC);
+			String hostname = null;
+			String port = null;
+			while(matches.find()) {
+				hostname = matches.group(1).toLowerCase();
+				port = matches.group(2);
+			}
+			LOG.info("Extracted hostname:port: {} {}", hostname, port);
+
+			Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
+				parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
+			Assert.assertEquals("unable to find port in " + jsonConfig, port,
+				parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
+
+			// test logfile access
+			String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
+			Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster"));
+			Assert.assertTrue(logs.contains("Starting JobManager"));
+			Assert.assertTrue(logs.contains("Starting JobManager Web Frontend"));
+		} catch(Throwable e) {
+			LOG.warn("Error while running test",e);
+			Assert.fail(e.getMessage());
+		}
+
+		// ------------------------ Kill container with TaskManager and check if vcores are set correctly -------
+
+		// find container id of taskManager:
+		ContainerId taskManagerContainer = null;
+		NodeManager nodeManager = null;
+		UserGroupInformation remoteUgi = null;
+		NMTokenIdentifier nmIdent = null;
+		try {
+			remoteUgi = UserGroupInformation.getCurrentUser();
+		} catch (IOException e) {
+			LOG.warn("Unable to get curr user", e);
+			Assert.fail();
+		}
+		for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
+			NodeManager nm = yarnCluster.getNodeManager(nmId);
+			ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
+			for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
+				String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands());
+				if(command.contains(YarnTaskManager.class.getSimpleName())) {
+					taskManagerContainer = entry.getKey();
+					nodeManager = nm;
+					nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0);
+					// allow myself to do stuff with the container
+					// remoteUgi.addCredentials(entry.getValue().getCredentials());
+					remoteUgi.addTokenIdentifier(nmIdent);
+				}
+			}
+			sleep(500);
+		}
+
+		Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer);
+		Assert.assertNotNull("Illegal state", nodeManager);
+
+		try {
+			List<NodeReport> nodeReports = yc.getNodeReports(NodeState.RUNNING);
+
+			// we asked for one node with 2 vcores so we expect 2 vcores
+			int userVcores = 0;
+			for (NodeReport rep: nodeReports) {
+				userVcores += rep.getUsed().getVirtualCores();
+			}
+			Assert.assertEquals(2, userVcores);
+		} catch (Exception e) {
+			Assert.fail("Test failed: " + e.getMessage());
+		}
+
+		yc.stop();
+
+		List<ContainerId> toStop = new LinkedList<ContainerId>();
+		toStop.add(taskManagerContainer);
+		StopContainersRequest scr = StopContainersRequest.newInstance(toStop);
+
+		try {
+			nodeManager.getNMContext().getContainerManager().stopContainers(scr);
+		} catch (Throwable e) {
+			LOG.warn("Error stopping container", e);
+			Assert.fail("Error stopping container: "+e.getMessage());
+		}
+
+		// stateful termination check:
+		// wait until we saw a container being killed and AFTERWARDS a new one launched
+		boolean ok = false;
+		do {
+			LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());
+
+			String o = errContent.toString();
+			int killedOff = o.indexOf("Container killed by the ApplicationMaster");
+			if (killedOff != -1) {
+				o = o.substring(killedOff);
+				ok = o.indexOf("Launching TaskManager") > 0;
+			}
+			sleep(1000);
+		} while(!ok);
+
+
+		// send "stop" command to command line interface
+		runner.sendStop();
+		// wait for the thread to stop
+		try {
+			runner.join(1000);
+		} catch (InterruptedException e) {
+			LOG.warn("Interrupted while stopping runner", e);
+		}
+		LOG.warn("stopped");
+
+		// ----------- Send output to logger
+		System.setOut(originalStdout);
+		System.setErr(originalStderr);
+		String oC = outContent.toString();
+		String eC = errContent.toString();
+		LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC);
+		LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC);
+
+		// ------ Check if everything happened correctly
+		Assert.assertTrue("Expect to see failed container",
+			eC.contains("New messages from the YARN cluster"));
+
+		Assert.assertTrue("Expect to see failed container",
+			eC.contains("Container killed by the ApplicationMaster"));
+
+		Assert.assertTrue("Expect to see new container started",
+			eC.contains("Launching TaskManager") && eC.contains("on host"));
+
+		// cleanup auth for the subsequent tests.
+		remoteUgi.getTokenIdentifiers().remove(nmIdent);
+
+		LOG.info("Finished testTaskManagerFailure()");
+	}
+
+	/**
+	 * Test deployment to non-existing queue. (user-reported error)
+	 * Deployment to the queue is possible because there are no queues, so we don't check.
+	 */
+	@Test
+	public void testNonexistingQueue() {
+		LOG.info("Starting testNonexistingQueue()");
+		addTestAppender(FlinkYarnClient.class, Level.WARN);
+		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+				"-t", flinkLibFolder.getAbsolutePath(),
+				"-n", "1",
+				"-jm", "768",
+				"-tm", "1024",
+				"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1);
+		checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team");
+		LOG.info("Finished testNonexistingQueue()");
+	}
+
+	/**
+	 * Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN client.
+	 */
+	@Test
+	public void perJobYarnClusterWithParallelism() {
+		LOG.info("Starting perJobYarnClusterWithParallelism()");
+		// write log messages to stdout as well, so that the runWithArgs() method
+		// is catching the log output
+		addTestAppender(JobClient.class, Level.INFO);
+		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here.
+		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
+		runWithArgs(new String[]{"run",
+				"-p", "2", //test that the job is executed with a DOP of 2
+				"-m", "yarn-cluster",
+				"-yj", flinkUberjar.getAbsolutePath(),
+				"-yt", flinkLibFolder.getAbsolutePath(),
+				"-yn", "1",
+				"-yjm", "768",
+				"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
+				/* test succeeded after this string */
+			"Job execution complete",
+				/* prohibited strings: (we want to see (2/2)) */
+			new String[]{"System.out)(1/1) switched to FINISHED "},
+			RunTypes.CLI_FRONTEND, 0, true);
+		LOG.info("Finished perJobYarnClusterWithParallelism()");
+	}
+
+	/**
+	 * Test a fire-and-forget job submission to a YARN cluster.
+	 */
+	@Test(timeout=60000)
+	public void testDetachedPerJobYarnCluster() {
+		LOG.info("Starting testDetachedPerJobYarnCluster()");
+
+		File exampleJarLocation = YarnTestBase.findFile(
+			".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch",
+			new ContainsName(new String[] {"-WordCount.jar"}));
+
+		Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation);
+
+		testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
+
+		LOG.info("Finished testDetachedPerJobYarnCluster()");
+	}
+
+	/**
+	 * Test a fire-and-forget job submission to a YARN cluster.
+	 */
+	@Test(timeout=60000)
+	public void testDetachedPerJobYarnClusterWithStreamingJob() {
+		LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
+
+		File exampleJarLocation = YarnTestBase.findFile(
+			".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming",
+			new ContainsName(new String[] {"-WordCount.jar"}));
+		Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation);
+
+		testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
+
+		LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
+	}
+
+	private void testDetachedPerJobYarnClusterInternal(String job) {
+		YarnClient yc = YarnClient.createYarnClient();
+		yc.init(yarnConfiguration);
+		yc.start();
+
+		// get temporary folder for writing output of wordcount example
+		File tmpOutFolder = null;
+		try{
+			tmpOutFolder = tmp.newFolder();
+		}
+		catch(IOException e) {
+			throw new RuntimeException(e);
+		}
+
+		// get temporary file for reading input data for wordcount example
+		File tmpInFile;
+		try{
+			tmpInFile = tmp.newFile();
+			FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
+		}
+		catch(IOException e) {
+			throw new RuntimeException(e);
+		}
+
+		Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
+				"-yt", flinkLibFolder.getAbsolutePath(),
+				"-yn", "1",
+				"-yjm", "768",
+				"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
+				"-ytm", "1024",
+				"-ys", "2", // test requesting slots from YARN.
+				"--yarndetached", job, "--input", tmpInFile.getAbsoluteFile().toString(), "--output", tmpOutFolder.getAbsoluteFile().toString()},
+			"Job has been submitted with JobID",
+			RunTypes.CLI_FRONTEND);
+
+		// it should usually be 2, but on slow machines, the number varies
+		Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2);
+		// give the runner some time to detach
+		for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) {
+			try {
+				Thread.sleep(500);
+			} catch (InterruptedException e) {
+			}
+		}
+		Assert.assertFalse("The runner should detach.", runner.isAlive());
+		LOG.info("CLI Frontend has returned, so the job is running");
+
+		// find out the application id and wait until it has finished.
+		try {
+			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+
+			ApplicationId tmpAppId;
+			if (apps.size() == 1) {
+				// Better method to find the right appId. But sometimes the app is shutting down very fast
+				// Only one running
+				tmpAppId = apps.get(0).getApplicationId();
+
+				LOG.info("waiting for the job with appId {} to finish", tmpAppId);
+				// wait until the app has finished
+				while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
+					sleep(500);
+				}
+			} else {
+				// get appId by finding the latest finished appid
+				apps = yc.getApplications();
+				Collections.sort(apps, new Comparator<ApplicationReport>() {
+					@Override
+					public int compare(ApplicationReport o1, ApplicationReport o2) {
+						return o1.getApplicationId().compareTo(o2.getApplicationId())*-1;
+					}
+				});
+				tmpAppId = apps.get(0).getApplicationId();
+				LOG.info("Selected {} as the last appId from {}", tmpAppId, Arrays.toString(apps.toArray()));
+			}
+			final ApplicationId id = tmpAppId;
+
+			// now it has finished.
+			// check the output files.
+			File[] listOfOutputFiles = tmpOutFolder.listFiles();
+
+
+			Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
+			LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder );
+
+			// read all output files in output folder to one output string
+			String content = "";
+			for(File f:listOfOutputFiles)
+			{
+				if(f.isFile())
+				{
+					content += FileUtils.readFileToString(f) + "\n";
+				}
+			}
+			//String content = FileUtils.readFileToString(taskmanagerOut);
+			// check for some of the wordcount outputs.
+			Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
+			Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)"));
+
+			// check if the heap size for the TaskManager was set correctly
+			File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {
+				@Override
+				public boolean accept(File dir, String name) {
+					return name.contains("jobmanager.log") && dir.getAbsolutePath().contains(id.toString());
+				}
+			});
+			Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
+			content = FileUtils.readFileToString(jobmanagerLog);
+			// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
+			String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms424m -Xmx424m";
+			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
+				content.contains(expected));
+			expected = " (2/2) (attempt #0) to ";
+			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
+					"This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
+				content.contains(expected));
+
+			// make sure the detached app is really finished.
+			LOG.info("Checking again that app has finished");
+			ApplicationReport rep;
+			do {
+				sleep(500);
+				rep = yc.getApplicationReport(id);
+				LOG.info("Got report {}", rep);
+			} while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING);
+
+		} catch(Throwable t) {
+			LOG.warn("Error while detached yarn session was running", t);
+			Assert.fail(t.getMessage());
+		}
+	}
+
+	@After
+	public void checkForProhibitedLogContents() {
+		ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
+	}
+}


Mime
View raw message