flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: FLINK-2213: Makes the number of vcores per YARN container configurable.
Date Tue, 23 Feb 2016 12:58:39 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1c48e3462 -> de2163060


FLINK-2213: Makes the number of vcores per YARN container configurable.

This closes #1588


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de216306
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de216306
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de216306

Branch: refs/heads/master
Commit: de2163060d042c0a4ff1989c08fb16f78d040a8a
Parents: 1c48e34
Author: Kostas Kloudas <kkloudas@gmail.com>
Authored: Thu Feb 4 15:01:58 2016 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Feb 23 09:56:36 2016 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +
 .../flink/configuration/ConfigConstants.java    |   5 +
 .../YARNSessionCapacitySchedulerITCase.java     | 444 +++++++++++++++++++
 .../flink/yarn/YARNSessionFIFOITCase.java       | 431 ------------------
 .../org/apache/flink/yarn/YarnTestBase.java     |   6 +
 .../org/apache/flink/yarn/YarnJobManager.scala  |   6 +-
 6 files changed, 462 insertions(+), 432 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de216306/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 79a9527..70963bb 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -240,6 +240,8 @@ definition. This scheme is used **ONLY** if no other scheme is specified
(explic
 	
 	yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 
+- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default,
the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise.

+
 - `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting
custom environment variables for the TaskManager processes.
 
 - `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port)
With this configuration option, users can specify a port, a range of ports or a list of ports
for the  Application Master (and JobManager) RPC port. By default we recommend using the default
value (0) to let the operating system choose an appropriate port. In particular when multiple
AMs are running on the  same physical host, fixed port assignments prevent the AM from starting.

http://git-wip-us.apache.org/repos/asf/flink/blob/de216306/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index ccf90b5..f4e13f6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -243,6 +243,11 @@ public final class ConfigConstants {
 	// ------------------------ YARN Configuration ------------------------
 
 	/**
+	 * The vcores exposed by YYARN.
+	 */
+	public static final String YARN_VCORES = "yarn.containers.vcores";
+
+	/**
 	 * Percentage of heap space to remove from containers started by YARN.
 	 */
 	public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio";

http://git-wip-us.apache.org/repos/asf/flink/blob/de216306/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index bf47dde..ca3a38b 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -17,16 +17,53 @@
  */
 package org.apache.flink.yarn;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.base.Joiner;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import static org.apache.flink.yarn.UtilsTest.addTestAppender;
 import static org.apache.flink.yarn.UtilsTest.checkForLogString;
 
@@ -62,6 +99,220 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		LOG.info("Finished testClientStartup()");
 	}
 
+	/**
+	 * Test per-job yarn cluster
+	 *
+	 * This also tests the prefixed CliFrontend options for the YARN case
+	 * We also test if the requested parallelism of 2 is passed through.
+	 * The parallelism is requested at the YARN client (-ys).
+	 */
+	@Test
+	public void perJobYarnCluster() {
+		LOG.info("Starting perJobYarnCluster()");
+		addTestAppender(JobClient.class, Level.INFO);
+		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}
, "streaming")); // exclude streaming wordcount here.
+		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
+		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
+				"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
+				"-yn", "1",
+				"-ys", "2", //test that the job is executed with a DOP of 2
+				"-yjm", "768",
+				"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
+				/* test succeeded after this string */
+			"Job execution complete",
+				/* prohibited strings: (we want to see (2/2)) */
+			new String[]{"System.out)(1/1) switched to FINISHED "},
+			RunTypes.CLI_FRONTEND, 0, true);
+		LOG.info("Finished perJobYarnCluster()");
+	}
+
+
+	/**
+	 * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213).
+	 */
+	@Test(timeout=100000) // timeout after 100 seconds
+	public void testTaskManagerFailure() {
+		LOG.info("Starting testTaskManagerFailure()");
+		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t",
flinkLibFolder.getAbsolutePath(),
+				"-n", "1",
+				"-jm", "768",
+				"-tm", "1024",
+				"-s", "3", // set the slots 3 to check if the vCores are set properly!
+				"-nm", "customName",
+				"-Dfancy-configuration-value=veryFancy",
+				"-Dyarn.maximum-failed-containers=3",
+				"-D" + ConfigConstants.YARN_VCORES + "=2"},
+			"Number of connected TaskManagers changed to 1. Slots available: 3",
+			RunTypes.YARN_SESSION);
+
+		Assert.assertEquals(2, getRunningContainers());
+
+		// ------------------------ Test if JobManager web interface is accessible -------
+
+		YarnClient yc = null;
+		try {
+			yc = YarnClient.createYarnClient();
+			yc.init(yarnConfiguration);
+			yc.start();
+
+			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+			Assert.assertEquals(1, apps.size()); // Only one running
+			ApplicationReport app = apps.get(0);
+			Assert.assertEquals("customName", app.getName());
+			String url = app.getTrackingUrl();
+			if(!url.endsWith("/")) {
+				url += "/";
+			}
+			if(!url.startsWith("http://")) {
+				url = "http://" + url;
+			}
+			LOG.info("Got application URL from YARN {}", url);
+
+			String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/");
+
+			JsonNode parsedTMs = new ObjectMapper().readTree(response);
+			ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers");
+			Assert.assertNotNull(taskManagers);
+			Assert.assertEquals(1, taskManagers.size());
+			Assert.assertEquals(3, taskManagers.get(0).get("slotsNumber").asInt());
+
+			// get the configuration from webinterface & check if the dynamic properties from
YARN show up there.
+			String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config");
+			Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
+
+			Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
+			Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
+			Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES));
+
+			// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
+			// first, get the hostname/port
+			String oC = outContent.toString();
+			Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)");
+			Matcher matches = p.matcher(oC);
+			String hostname = null;
+			String port = null;
+			while(matches.find()) {
+				hostname = matches.group(1).toLowerCase();
+				port = matches.group(2);
+			}
+			LOG.info("Extracted hostname:port: {} {}", hostname, port);
+
+			Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
+				parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
+			Assert.assertEquals("unable to find port in " + jsonConfig, port,
+				parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
+
+			// test logfile access
+			String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
+			Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster/JobManager (Version"));
+		} catch(Throwable e) {
+			LOG.warn("Error while running test",e);
+			Assert.fail(e.getMessage());
+		}
+
+		// ------------------------ Kill container with TaskManager and check if vcores are set
correctly -------
+
+		// find container id of taskManager:
+		ContainerId taskManagerContainer = null;
+		NodeManager nodeManager = null;
+		UserGroupInformation remoteUgi = null;
+		NMTokenIdentifier nmIdent = null;
+		try {
+			remoteUgi = UserGroupInformation.getCurrentUser();
+		} catch (IOException e) {
+			LOG.warn("Unable to get curr user", e);
+			Assert.fail();
+		}
+		for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
+			NodeManager nm = yarnCluster.getNodeManager(nmId);
+			ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
+			for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
+				String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands());
+				if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) {
+					taskManagerContainer = entry.getKey();
+					nodeManager = nm;
+					nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null,
"",0);
+					// allow myself to do stuff with the container
+					// remoteUgi.addCredentials(entry.getValue().getCredentials());
+					remoteUgi.addTokenIdentifier(nmIdent);
+				}
+			}
+			sleep(500);
+		}
+
+		Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer);
+		Assert.assertNotNull("Illegal state", nodeManager);
+
+		try {
+			List<NodeReport> nodeReports = yc.getNodeReports(NodeState.RUNNING);
+
+			// we asked for one node with 2 vcores so we expect 2 vcores
+			int userVcores = 0;
+			for (NodeReport rep: nodeReports) {
+				userVcores += rep.getUsed().getVirtualCores();
+			}
+			Assert.assertEquals(2, userVcores);
+		} catch (Exception e) {
+			Assert.fail("Test failed: " + e.getMessage());
+		}
+
+		yc.stop();
+
+		List<ContainerId> toStop = new LinkedList<ContainerId>();
+		toStop.add(taskManagerContainer);
+		StopContainersRequest scr = StopContainersRequest.newInstance(toStop);
+
+		try {
+			nodeManager.getNMContext().getContainerManager().stopContainers(scr);
+		} catch (Throwable e) {
+			LOG.warn("Error stopping container", e);
+			Assert.fail("Error stopping container: "+e.getMessage());
+		}
+
+		// stateful termination check:
+		// wait until we saw a container being killed and AFTERWARDS a new one launched
+		boolean ok = false;
+		do {
+			LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());
+
+			String o = errContent.toString();
+			int killedOff = o.indexOf("Container killed by the ApplicationMaster");
+			if (killedOff != -1) {
+				o = o.substring(killedOff);
+				ok = o.indexOf("Launching container") > 0;
+			}
+			sleep(1000);
+		} while(!ok);
+
+
+		// send "stop" command to command line interface
+		runner.sendStop();
+		// wait for the thread to stop
+		try {
+			runner.join(1000);
+		} catch (InterruptedException e) {
+			LOG.warn("Interrupted while stopping runner", e);
+		}
+		LOG.warn("stopped");
+
+		// ----------- Send output to logger
+		System.setOut(originalStdout);
+		System.setErr(originalStderr);
+		String oC = outContent.toString();
+		String eC = errContent.toString();
+		LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC);
+		LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC);
+
+		// ------ Check if everything happened correctly
+		Assert.assertTrue("Expect to see failed container", eC.contains("New messages from the
YARN cluster"));
+		Assert.assertTrue("Expect to see failed container", eC.contains("Container killed by the
ApplicationMaster"));
+		Assert.assertTrue("Expect to see new container started", eC.contains("Launching container")
&& eC.contains("on host"));
+
+		// cleanup auth for the subsequent tests.
+		remoteUgi.getTokenIdentifiers().remove(nmIdent);
+
+		LOG.info("Finished testTaskManagerFailure()");
+	}
 
 	/**
 	 * Test deployment to non-existing queue. (user-reported error)
@@ -81,6 +332,199 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase
{
 		LOG.info("Finished testNonexistingQueue()");
 	}
 
+	/**
+	 * Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the
YARN client.
+	 */
+	@Test
+	public void perJobYarnClusterWithParallelism() {
+		LOG.info("Starting perJobYarnClusterWithParallelism()");
+		// write log messages to stdout as well, so that the runWithArgs() method
+		// is catching the log output
+		addTestAppender(JobClient.class, Level.INFO);
+		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"},
"streaming")); // exclude streaming wordcount here.
+		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
+		runWithArgs(new String[]{"run",
+				"-p", "2", //test that the job is executed with a DOP of 2
+				"-m", "yarn-cluster",
+				"-yj", flinkUberjar.getAbsolutePath(),
+				"-yt", flinkLibFolder.getAbsolutePath(),
+				"-yn", "1",
+				"-yjm", "768",
+				"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
+				/* test succeeded after this string */
+			"Job execution complete",
+				/* prohibited strings: (we want to see (2/2)) */
+			new String[]{"System.out)(1/1) switched to FINISHED "},
+			RunTypes.CLI_FRONTEND, 0, true);
+		LOG.info("Finished perJobYarnClusterWithParallelism()");
+	}
+
+	/**
+	 * Test a fire-and-forget job submission to a YARN cluster.
+	 */
+	@Test(timeout=60000)
+	public void testDetachedPerJobYarnCluster() {
+		LOG.info("Starting testDetachedPerJobYarnCluster()");
+
+		File exampleJarLocation = YarnTestBase.findFile(
+			".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch",
+			new ContainsName(new String[] {"-WordCount.jar"}));
+
+		Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation);
+
+		testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
+
+		LOG.info("Finished testDetachedPerJobYarnCluster()");
+	}
+
+	/**
+	 * Test a fire-and-forget job submission to a YARN cluster.
+	 */
+	@Test(timeout=60000)
+	public void testDetachedPerJobYarnClusterWithStreamingJob() {
+		LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
+
+		File exampleJarLocation = YarnTestBase.findFile(
+			".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming",
+			new ContainsName(new String[] {"-WordCount.jar"}));
+		Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation);
+
+		testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
+
+		LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
+	}
+
+	private void testDetachedPerJobYarnClusterInternal(String job) {
+		YarnClient yc = YarnClient.createYarnClient();
+		yc.init(yarnConfiguration);
+		yc.start();
+
+		// get temporary folder for writing output of wordcount example
+		File tmpOutFolder = null;
+		try{
+			tmpOutFolder = tmp.newFolder();
+		}
+		catch(IOException e) {
+			throw new RuntimeException(e);
+		}
+
+		// get temporary file for reading input data for wordcount example
+		File tmpInFile;
+		try{
+			tmpInFile = tmp.newFile();
+			FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
+		}
+		catch(IOException e) {
+			throw new RuntimeException(e);
+		}
+
+		Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
+				"-yt", flinkLibFolder.getAbsolutePath(),
+				"-yn", "1",
+				"-yjm", "768",
+				"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
+				"-ytm", "1024",
+				"-ys", "2", // test requesting slots from YARN.
+				"--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()},
+			"Job has been submitted with JobID",
+			RunTypes.CLI_FRONTEND);
+
+		// it should usually be 2, but on slow machines, the number varies
+		Assert.assertTrue("There should be at most 2 containers running", getRunningContainers()
<= 2);
+		// give the runner some time to detach
+		for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) {
+			try {
+				Thread.sleep(500);
+			} catch (InterruptedException e) {
+			}
+		}
+		Assert.assertFalse("The runner should detach.", runner.isAlive());
+		LOG.info("CLI Frontend has returned, so the job is running");
+
+		// find out the application id and wait until it has finished.
+		try {
+			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+
+			ApplicationId tmpAppId;
+			if (apps.size() == 1) {
+				// Better method to find the right appId. But sometimes the app is shutting down very
fast
+				// Only one running
+				tmpAppId = apps.get(0).getApplicationId();
+
+				LOG.info("waiting for the job with appId {} to finish", tmpAppId);
+				// wait until the app has finished
+				while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
+					sleep(500);
+				}
+			} else {
+				// get appId by finding the latest finished appid
+				apps = yc.getApplications();
+				Collections.sort(apps, new Comparator<ApplicationReport>() {
+					@Override
+					public int compare(ApplicationReport o1, ApplicationReport o2) {
+						return o1.getApplicationId().compareTo(o2.getApplicationId())*-1;
+					}
+				});
+				tmpAppId = apps.get(0).getApplicationId();
+				LOG.info("Selected {} as the last appId from {}", tmpAppId, Arrays.toString(apps.toArray()));
+			}
+			final ApplicationId id = tmpAppId;
+
+			// now it has finished.
+			// check the output files.
+			File[] listOfOutputFiles = tmpOutFolder.listFiles();
+
+
+			Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
+			LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder );
+
+			// read all output files in output folder to one output string
+			String content = "";
+			for(File f:listOfOutputFiles)
+			{
+				if(f.isFile())
+				{
+					content += FileUtils.readFileToString(f) + "\n";
+				}
+			}
+			//String content = FileUtils.readFileToString(taskmanagerOut);
+			// check for some of the wordcount outputs.
+			Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'",
content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
+			Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der
29") || content.contains("(der,29)") || content.contains("(mind,1)"));
+
+			// check if the heap size for the TaskManager was set correctly
+			File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {
+				@Override
+				public boolean accept(File dir, String name) {
+					return name.contains("jobmanager.log") && dir.getAbsolutePath().contains(id.toString());
+				}
+			});
+			Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
+			content = FileUtils.readFileToString(jobmanagerLog);
+			// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
+			String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m";
+			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
+				content.contains(expected));
+			expected = " (2/2) (attempt #0) to ";
+			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
+					"This string checks that the job has been started with a parallelism of 2. Log contents:
'"+jobmanagerLog+"'",
+				content.contains(expected));
+
+			// make sure the detached app is really finished.
+			LOG.info("Checking again that app has finished");
+			ApplicationReport rep;
+			do {
+				sleep(500);
+				rep = yc.getApplicationReport(id);
+				LOG.info("Got report {}", rep);
+			} while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING);
+
+		} catch(Throwable t) {
+			LOG.warn("Error while detached yarn session was running", t);
+			Assert.fail(t.getMessage());
+		}
+	}
+
 	@After
 	public void checkForProhibitedLogContents() {
 		ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);

http://git-wip-us.apache.org/repos/asf/flink/blob/de216306/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 98dc85f..db9af8c 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -18,36 +18,18 @@
 
 package org.apache.flink.yarn;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.google.common.base.Joiner;
-
-import org.apache.commons.io.FileUtils;
-
 import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.TestBaseUtils;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 
@@ -63,18 +45,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.EnumSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import static org.apache.flink.yarn.UtilsTest.addTestAppender;
 import static org.apache.flink.yarn.UtilsTest.checkForLogString;
@@ -107,22 +80,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	/**
 	 * Test regular operation, including command line parameter parsing.
 	 */
-	@Test
-	public void testClientStartup() {
-		LOG.info("Starting testClientStartup()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-						"-n", "1",
-						"-jm", "768",
-						"-tm", "1024",
-						"-s", "2" // Test that 2 slots are started on the TaskManager.
-				},
-				"Number of connected TaskManagers changed to 1. Slots available: 2", null, RunTypes.YARN_SESSION,
0);
-		LOG.info("Finished testClientStartup()");
-	}
-
-	/**
-	 * Test regular operation, including command line parameter parsing.
-	 */
 	@Test(timeout=60000) // timeout after a minute.
 	public void testDetachedMode() {
 		LOG.info("Starting testDetachedMode()");
@@ -172,173 +129,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	}
 
 	/**
-	 * Test TaskManager failure
-	 */
-	@Test(timeout=100000) // timeout after 100 seconds
-	public void testTaskManagerFailure() {
-		LOG.info("Starting testTaskManagerFailure()");
-		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t",
flinkLibFolder.getAbsolutePath(),
-				"-n", "1",
-				"-jm", "768",
-				"-tm", "1024",
-				"-nm", "customName",
-				"-Dfancy-configuration-value=veryFancy",
-				"-Dyarn.maximum-failed-containers=3"},
-				"Number of connected TaskManagers changed to 1. Slots available: 1",
-				RunTypes.YARN_SESSION);
-
-		Assert.assertEquals(2, getRunningContainers());
-
-		// ------------------------ Test if JobManager web interface is accessible -------
-		try {
-			YarnClient yc = YarnClient.createYarnClient();
-			yc.init(yarnConfiguration);
-			yc.start();
-			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-			Assert.assertEquals(1, apps.size()); // Only one running
-			ApplicationReport app = apps.get(0);
-			Assert.assertEquals("customName", app.getName());
-			String url = app.getTrackingUrl();
-			if(!url.endsWith("/")) {
-				url += "/";
-			}
-			if(!url.startsWith("http://")) {
-				url = "http://" + url;
-			}
-			LOG.info("Got application URL from YARN {}", url);
-
-			String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/");
-			
-			
-			JsonNode parsedTMs = new ObjectMapper().readTree(response);
-			ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers");
-			Assert.assertNotNull(taskManagers);
-			Assert.assertEquals(1, taskManagers.size());
-			Assert.assertEquals(1, taskManagers.get(0).get("slotsNumber").asInt());
-
-			// get the configuration from webinterface & check if the dynamic properties from
YARN show up there.
-			String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config");
-			Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
-
-			Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
-			Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
-
-			// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
-			// first, get the hostname/port
-			String oC = outContent.toString();
-			Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)");
-			Matcher matches = p.matcher(oC);
-			String hostname = null;
-			String port = null;
-			while(matches.find()) {
-				hostname = matches.group(1).toLowerCase();
-				port = matches.group(2);
-			}
-			LOG.info("Extracted hostname:port: {} {}", hostname, port);
-
-			Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
-					parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
-			Assert.assertEquals("unable to find port in " + jsonConfig, port,
-					parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
-
-			// test logfile access
-			String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
-			Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster/JobManager (Version"));
-		} catch(Throwable e) {
-			LOG.warn("Error while running test",e);
-			Assert.fail(e.getMessage());
-		}
-
-		// ------------------------ Kill container with TaskManager  -------
-
-		// find container id of taskManager:
-		ContainerId taskManagerContainer = null;
-		NodeManager nodeManager = null;
-		UserGroupInformation remoteUgi = null;
-		NMTokenIdentifier nmIdent = null;
-		try {
-			remoteUgi = UserGroupInformation.getCurrentUser();
-		} catch (IOException e) {
-			LOG.warn("Unable to get curr user", e);
-			Assert.fail();
-		}
-		for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
-			NodeManager nm = yarnCluster.getNodeManager(nmId);
-			ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
-			for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
-				String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands());
-				if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) {
-					taskManagerContainer = entry.getKey();
-					nodeManager = nm;
-					nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null,
"",0);
-					// allow myself to do stuff with the container
-					// remoteUgi.addCredentials(entry.getValue().getCredentials());
-					remoteUgi.addTokenIdentifier(nmIdent);
-				}
-			}
-			sleep(500);
-		}
-
-		Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer);
-		Assert.assertNotNull("Illegal state", nodeManager);
-
-		List<ContainerId> toStop = new LinkedList<ContainerId>();
-		toStop.add(taskManagerContainer);
-		StopContainersRequest scr = StopContainersRequest.newInstance(toStop);
-
-		try {
-			nodeManager.getNMContext().getContainerManager().stopContainers(scr);
-		} catch (Throwable e) {
-			LOG.warn("Error stopping container", e);
-			Assert.fail("Error stopping container: "+e.getMessage());
-		}
-
-		// stateful termination check:
-		// wait until we saw a container being killed and AFTERWARDS a new one launched
-		boolean ok = false;
-		do {
-			LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());
-
-			String o = errContent.toString();
-			int killedOff = o.indexOf("Container killed by the ApplicationMaster");
-			if (killedOff != -1) {
-				o = o.substring(killedOff);
-				ok = o.indexOf("Launching container") > 0;
-			}
-			sleep(1000);
-		} while(!ok);
-
-
-		// send "stop" command to command line interface
-		runner.sendStop();
-		// wait for the thread to stop
-		try {
-			runner.join(1000);
-		} catch (InterruptedException e) {
-			LOG.warn("Interrupted while stopping runner", e);
-		}
-		LOG.warn("stopped");
-
-		// ----------- Send output to logger
-		System.setOut(originalStdout);
-		System.setErr(originalStderr);
-		String oC = outContent.toString();
-		String eC = errContent.toString();
-		LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC);
-		LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC);
-
-		// ------ Check if everything happened correctly
-		Assert.assertTrue("Expect to see failed container", eC.contains("New messages from the
YARN cluster"));
-		Assert.assertTrue("Expect to see failed container", eC.contains("Container killed by the
ApplicationMaster"));
-		Assert.assertTrue("Expect to see new container started", eC.contains("Launching container")
&& eC.contains("on host"));
-
-		// cleanup auth for the subsequent tests.
-		remoteUgi.getTokenIdentifiers().remove(nmIdent);
-
-		LOG.info("Finished testTaskManagerFailure()");
-	}
-
-	/**
 	 * Test querying the YARN cluster.
 	 *
 	 * This test validates through 666*2 cores in the "cluster".
@@ -421,227 +211,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	}
 
 	/**
-	 * Test per-job yarn cluster
-	 *
-	 * This also tests the prefixed CliFrontend options for the YARN case
-	 * We also test if the requested parallelism of 2 is passed through.
-	 * The parallelism is requested at the YARN client (-ys).
-	 */
-	@Test
-	public void perJobYarnCluster() {
-		LOG.info("Starting perJobYarnCluster()");
-		addTestAppender(JobClient.class, Level.INFO);
-		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}
, "streaming")); // exclude streaming wordcount here.
-		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
-		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
-						"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
-						"-yn", "1",
-						"-ys", "2", //test that the job is executed with a DOP of 2
-						"-yjm", "768",
-						"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
-				/* test succeeded after this string */
-				"Job execution complete",
-				/* prohibited strings: (we want to see (2/2)) */
-				new String[]{"System.out)(1/1) switched to FINISHED "},
-				RunTypes.CLI_FRONTEND, 0, true);
-		LOG.info("Finished perJobYarnCluster()");
-	}
-
-	/**
-	 * Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the
YARN client.
-	 */
-	@Test
-	public void perJobYarnClusterWithParallelism() {
-		LOG.info("Starting perJobYarnClusterWithParallelism()");
-		// write log messages to stdout as well, so that the runWithArgs() method
-		// is catching the log output
-		addTestAppender(JobClient.class, Level.INFO);
-		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"},
"streaming")); // exclude streaming wordcount here.
-		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
-		runWithArgs(new String[]{"run",
-						"-p", "2", //test that the job is executed with a DOP of 2
-						"-m", "yarn-cluster",
-						"-yj", flinkUberjar.getAbsolutePath(),
-						"-yt", flinkLibFolder.getAbsolutePath(),
-						"-yn", "1",
-						"-yjm", "768",
-						"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
-				/* test succeeded after this string */
-				"Job execution complete",
-				/* prohibited strings: (we want to see (2/2)) */
-				new String[]{"System.out)(1/1) switched to FINISHED "},
-				RunTypes.CLI_FRONTEND, 0, true);
-		LOG.info("Finished perJobYarnClusterWithParallelism()");
-	}
-
-	private void testDetachedPerJobYarnClusterInternal(String job) {
-		YarnClient yc = YarnClient.createYarnClient();
-		yc.init(yarnConfiguration);
-		yc.start();
-
-		// get temporary folder for writing output of wordcount example
-		File tmpOutFolder = null;
-		try{
-			tmpOutFolder = tmp.newFolder();
-		}
-		catch(IOException e) {
-			throw new RuntimeException(e);
-		}
-
-		// get temporary file for reading input data for wordcount example
-		File tmpInFile;
-		try{
-			tmpInFile = tmp.newFile();
-			FileUtils.writeStringToFile(tmpInFile,WordCountData.TEXT);
-		}
-		catch(IOException e) {
-			throw new RuntimeException(e);
-		}
-
-		Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
-						"-yt", flinkLibFolder.getAbsolutePath(),
-						"-yn", "1",
-						"-yjm", "768",
-						"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
-						"-ytm", "1024",
-						"-ys", "2", // test requesting slots from YARN.
-						"--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()},
-				"Job has been submitted with JobID",
-				RunTypes.CLI_FRONTEND);
-
-		// it should usually be 2, but on slow machines, the number varies
-		Assert.assertTrue("There should be at most 2 containers running", getRunningContainers()
<= 2);
-		// give the runner some time to detach
-		for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) {
-			try {
-				Thread.sleep(500);
-			} catch (InterruptedException e) {
-			}
-		}
-		Assert.assertFalse("The runner should detach.", runner.isAlive());
-		LOG.info("CLI Frontend has returned, so the job is running");
-
-		// find out the application id and wait until it has finished.
-		try {
-			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-
-			ApplicationId tmpAppId;
-			if (apps.size() == 1) {
-				// Better method to find the right appId. But sometimes the app is shutting down very
fast
-				// Only one running
-				tmpAppId = apps.get(0).getApplicationId();
-
-				LOG.info("waiting for the job with appId {} to finish", tmpAppId);
-				// wait until the app has finished
-				while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
-					sleep(500);
-				}
-			} else {
-				// get appId by finding the latest finished appid
-				apps = yc.getApplications();
-				Collections.sort(apps, new Comparator<ApplicationReport>() {
-					@Override
-					public int compare(ApplicationReport o1, ApplicationReport o2) {
-						return o1.getApplicationId().compareTo(o2.getApplicationId())*-1;
-					}
-				});
-				tmpAppId = apps.get(0).getApplicationId();
-				LOG.info("Selected {} as the last appId from {}", tmpAppId, Arrays.toString(apps.toArray()));
-			}
-			final ApplicationId id = tmpAppId;
-
-			// now it has finished.
-			// check the output files.
-			File[] listOfOutputFiles = tmpOutFolder.listFiles();
-
-
-			Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
-			LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder );
-
-			// read all output files in output folder to one output string
-			String content = "";
-			for(File f:listOfOutputFiles)
-			{
-				if(f.isFile())
-				{
-					content += FileUtils.readFileToString(f) + "\n";
-				}
-			}
-			//String content = FileUtils.readFileToString(taskmanagerOut);
-			// check for some of the wordcount outputs.
-			Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'",
content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
-			Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der
29") || content.contains("(der,29)") || content.contains("(mind,1)"));
-
-			// check if the heap size for the TaskManager was set correctly
-			File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {
-				@Override
-				public boolean accept(File dir, String name) {
-					return name.contains("jobmanager.log") && dir.getAbsolutePath().contains(id.toString());
-				}
-			});
-			Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
-			content = FileUtils.readFileToString(jobmanagerLog);
-			// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
-			String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m";
-			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
-					content.contains(expected));
-			expected = " (2/2) (attempt #0) to ";
-			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
-							"This string checks that the job has been started with a parallelism of 2. Log contents:
'"+jobmanagerLog+"'",
-					content.contains(expected));
-
-			// make sure the detached app is really finished.
-			LOG.info("Checking again that app has finished");
-			ApplicationReport rep;
-			do {
-				sleep(500);
-				rep = yc.getApplicationReport(id);
-				LOG.info("Got report {}", rep);
-			} while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING);
-
-		} catch(Throwable t) {
-			LOG.warn("Error while detached yarn session was running", t);
-			Assert.fail(t.getMessage());
-		}
-	}
-
-	/**
-	 * Test a fire-and-forget job submission to a YARN cluster.
-	 */
-	@Test(timeout=60000)
-	public void testDetachedPerJobYarnCluster() {
-		LOG.info("Starting testDetachedPerJobYarnCluster()");
-
-		File exampleJarLocation = YarnTestBase.findFile(
-			".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch",
-			new ContainsName(new String[] {"-WordCount.jar"}));
-		
-		Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation);
-
-		testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
-
-		LOG.info("Finished testDetachedPerJobYarnCluster()");
-	}
-
-	/**
-	 * Test a fire-and-forget job submission to a YARN cluster.
-	 */
-	@Test(timeout=60000)
-	public void testDetachedPerJobYarnClusterWithStreamingJob() {
-		LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
-
-		File exampleJarLocation = YarnTestBase.findFile(
-			".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming",
-			new ContainsName(new String[] {"-WordCount.jar"}));
-		Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation);
-
-		testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
-
-		LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
-	}
-
-
-	/**
 	 * Test the YARN Java API
 	 */
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/de216306/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index d3132d7..a0a517c 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -121,6 +121,7 @@ public abstract class YarnTestBase extends TestLogger {
 		yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
true);
 		yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
 		yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4);
 		yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
 		yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
 		yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in
the MiniYARNCluster.
@@ -371,6 +372,11 @@ public abstract class YarnTestBase extends TestLogger {
 			TestBaseUtils.setEnv(map);
 
 			Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
+
+			// wait for the nodeManagers to connect
+			while(!yarnCluster.waitForNodeManagersToConnect(500)) {
+				LOG.info("Waiting for Nodemanagers to connect");
+			}
 		} catch (Exception ex) {
 			ex.printStackTrace();
 			LOG.error("setup failure", ex);

http://git-wip-us.apache.org/repos/asf/flink/blob/de216306/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 2663451..314c5bd 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -518,10 +518,14 @@ class YarnJobManager(
     val priority = Records.newRecord(classOf[Priority])
     priority.setPriority(0)
 
+    val taskManagerSlots = env.get(FlinkYarnClientBase.ENV_SLOTS).toInt
+    val vcores: Int = flinkConfiguration
+      .getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1))
+
     // Resource requirements for worker containers
     val capability = Records.newRecord(classOf[Resource])
     capability.setMemory(memoryPerTaskManager)
-    capability.setVirtualCores(1) // hard-code that number (YARN is not accounting for CPUs)
+    capability.setVirtualCores(vcores)
     new ContainerRequest(capability, null, null, priority)
   }
 


Mime
View raw message