flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [3/3] flink git commit: [yarn] Adjust default values for YARN heap memory cutoff
Date Wed, 27 May 2015 07:57:40 GMT
[yarn] Adjust default values for YARN heap memory cutoff

This closes #717


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c037be7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c037be7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c037be7

Branch: refs/heads/master
Commit: 3c037be7d2884ff0276952b30638852a01ba687b
Parents: 11b021b
Author: Robert Metzger <rmetzger@apache.org>
Authored: Fri May 22 18:10:15 2015 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed May 27 09:56:54 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 12 ++++++++--
 .../YARNSessionCapacitySchedulerITCase.java     |  4 ++--
 .../flink/yarn/YARNSessionFIFOITCase.java       | 24 ++++++++++----------
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  8 +++----
 4 files changed, 28 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 92acd3f..9d8da21 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -553,9 +553,17 @@ public final class ConfigConstants {
 	// ------------------------ YARN Configuration ------------------------
 
 
-	public static final int DEFAULT_YARN_MIN_HEAP_CUTOFF = 384;
+	/**
+	 * Minimum amount of Heap memory to subtract from the requested TaskManager size.
+	 * We came up with these values experimentally.
+	 * Flink fails when the cutoff is set only to 500 mb.
+	 */
+	public static final int DEFAULT_YARN_MIN_HEAP_CUTOFF = 600;
 
-	public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.15f;
+	/**
+	 * Relative amount of memory to subtract from the requested memory.
+	 */
+	public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
 	
 	
 	// ------------------------ File System Behavior ------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 9c6748a..4051ff5 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -56,7 +56,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		LOG.info("Starting testClientStartup()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 						"-n", "1",
-						"-jm", "512",
+						"-jm", "768",
 						"-tm", "1024", "-qu", "qa-team"},
 				"Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION,
0);
 		LOG.info("Finished testClientStartup()");
@@ -73,7 +73,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		addTestAppender(FlinkYarnClient.class, Level.WARN);
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 				"-n", "1",
-				"-jm", "512",
+				"-jm", "768",
 				"-tm", "1024",
 				"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION,
1);
 		checkForLogString("The specified queue 'doesntExist' does not exist. Available queues:
default, qa-team");

http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index e22e0ef..f4febed 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -102,7 +102,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		LOG.info("Starting testClientStartup()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 						"-n", "1",
-						"-jm", "512",
+						"-jm", "768",
 						"-tm", "1024",
 						"-s", "2" // Test that 2 slots are started on the TaskManager.
 				},
@@ -119,7 +119,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
 		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 						"-n", "1",
-						"-jm", "512",
+						"-jm", "768",
 						"-tm", "1024",
 						"--detached"},
 				"Flink JobManager is now running on", RunTypes.YARN_SESSION);
@@ -164,7 +164,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		LOG.info("Starting testTaskManagerFailure()");
 		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 				"-n", "1",
-				"-jm", "512",
+				"-jm", "768",
 				"-tm", "1024",
 				"-Dfancy-configuration-value=veryFancy",
 				"-Dyarn.maximum-failed-containers=3"},
@@ -332,7 +332,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		LOG.info("Starting testNonexistingQueue()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 				"-n", "1",
-				"-jm", "512",
+				"-jm", "768",
 				"-tm", "1024",
 				"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available:
1", null, RunTypes.YARN_SESSION, 0);
 		LOG.info("Finished testNonexistingQueue()");
@@ -408,7 +408,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 						"-yj", flinkUberjar.getAbsolutePath(),
 						"-yn", "1",
 						"-ys", "2", //test that the job is executed with a DOP of 2
-						"-yjm", "512",
+						"-yjm", "768",
 						"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
 				/* test succeeded after this string */
 				"Job execution switched to status FINISHED.",
@@ -431,7 +431,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 						"-m", "yarn-cluster",
 						"-yj", flinkUberjar.getAbsolutePath(),
 						"-yn", "1",
-						"-yjm", "512",
+						"-yjm", "768",
 						"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
 				/* test succeeded after this string */
 				"Job execution switched to status FINISHED.",
@@ -467,7 +467,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
 		Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
 						"-yn", "1",
-						"-yjm", "512",
+						"-yjm", "768",
 						"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
 						"-ytm", "1024",
 						"-ys", "2", // test requesting slots from YARN.
@@ -541,8 +541,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
 			content = FileUtils.readFileToString(jobmanagerLog);
 			// expecting 512 mb, because TM was started with 1024, we cut off 50% (NOT THE DEFAULT
VALUE).
-			Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms512m
-Xmx512m' not found in JobManager log: '"+jobmanagerLog+"'",
-					content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms512m -Xmx512m"));
+			Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms424m
-Xmx424m' not found in JobManager log: '"+jobmanagerLog+"'",
+					content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m"));
 			Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log."
+
 							"This string checks that the job has been started with a parallelism of 2. Log contents:
'"+jobmanagerLog+"'",
 					content.contains(" (2/2) (attempt #0) to "));
@@ -558,7 +558,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
 		} catch(Throwable t) {
 			LOG.warn("Error while detached yarn session was running", t);
-			Assert.fail();
+			Assert.fail(t.getMessage());
 		}
 	}
 
@@ -604,8 +604,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
 		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
 		flinkYarnClient.setTaskManagerCount(1);
-		flinkYarnClient.setJobManagerMemory(512);
-		flinkYarnClient.setTaskManagerMemory(512);
+		flinkYarnClient.setJobManagerMemory(768);
+		flinkYarnClient.setTaskManagerMemory(768);
 		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
 		String confDirPath = System.getenv("FLINK_CONF_DIR");
 		flinkYarnClient.setConfigurationDirectory(confDirPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 502d72d..88b2987 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -99,8 +99,8 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 	/**
 	 * Minimum memory requirements, checked by the Client.
 	 */
-	private static final int MIN_JM_MEMORY = 128;
-	private static final int MIN_TM_MEMORY = 128;
+	private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than
the min heap cutoff
+	private static final int MIN_TM_MEMORY = 768;
 
 	private Configuration conf;
 	private YarnClient yarnClient;
@@ -164,7 +164,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 	@Override
 	public void setJobManagerMemory(int memoryMb) {
 		if(memoryMb < MIN_JM_MEMORY) {
-			throw new IllegalArgumentException("The JobManager memory is below the minimum required
memory amount "
+			throw new IllegalArgumentException("The JobManager memory ("+memoryMb+") is below the
minimum required memory amount "
 					+ "of "+MIN_JM_MEMORY+" MB");
 		}
 		this.jobManagerMemoryMb = memoryMb;
@@ -173,7 +173,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 	@Override
 	public void setTaskManagerMemory(int memoryMb) {
 		if(memoryMb < MIN_TM_MEMORY) {
-			throw new IllegalArgumentException("The TaskManager memory is below the minimum required
memory amount "
+			throw new IllegalArgumentException("The TaskManager memory ("+memoryMb+") is below the
minimum required memory amount "
 					+ "of "+MIN_TM_MEMORY+" MB");
 		}
 		this.taskManagerMemoryMb = memoryMb;


Mime
View raw message