[yarn] Adjust default values for YARN heap memory cutoff
This closes #717
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c037be7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c037be7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c037be7
Branch: refs/heads/master
Commit: 3c037be7d2884ff0276952b30638852a01ba687b
Parents: 11b021b
Author: Robert Metzger <rmetzger@apache.org>
Authored: Fri May 22 18:10:15 2015 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed May 27 09:56:54 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 12 ++++++++--
.../YARNSessionCapacitySchedulerITCase.java | 4 ++--
.../flink/yarn/YARNSessionFIFOITCase.java | 24 ++++++++++----------
.../org/apache/flink/yarn/FlinkYarnClient.java | 8 +++----
4 files changed, 28 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 92acd3f..9d8da21 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -553,9 +553,17 @@ public final class ConfigConstants {
// ------------------------ YARN Configuration ------------------------
- public static final int DEFAULT_YARN_MIN_HEAP_CUTOFF = 384;
+ /**
+ * Minimum amount of Heap memory to subtract from the requested TaskManager size.
+ * We came up with these values experimentally.
+ * Flink fails when the cutoff is set only to 500 mb.
+ */
+ public static final int DEFAULT_YARN_MIN_HEAP_CUTOFF = 600;
- public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.15f;
+ /**
+ * Relative amount of memory to subtract from the requested memory.
+ */
+ public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
// ------------------------ File System Behavior ------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 9c6748a..4051ff5 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -56,7 +56,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.info("Starting testClientStartup()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1",
- "-jm", "512",
+ "-jm", "768",
"-tm", "1024", "-qu", "qa-team"},
"Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION,
0);
LOG.info("Finished testClientStartup()");
@@ -73,7 +73,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
addTestAppender(FlinkYarnClient.class, Level.WARN);
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1",
- "-jm", "512",
+ "-jm", "768",
"-tm", "1024",
"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION,
1);
checkForLogString("The specified queue 'doesntExist' does not exist. Available queues:
default, qa-team");
http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index e22e0ef..f4febed 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -102,7 +102,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Starting testClientStartup()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1",
- "-jm", "512",
+ "-jm", "768",
"-tm", "1024",
"-s", "2" // Test that 2 slots are started on the TaskManager.
},
@@ -119,7 +119,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1",
- "-jm", "512",
+ "-jm", "768",
"-tm", "1024",
"--detached"},
"Flink JobManager is now running on", RunTypes.YARN_SESSION);
@@ -164,7 +164,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Starting testTaskManagerFailure()");
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1",
- "-jm", "512",
+ "-jm", "768",
"-tm", "1024",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3"},
@@ -332,7 +332,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Starting testNonexistingQueue()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1",
- "-jm", "512",
+ "-jm", "768",
"-tm", "1024",
"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available:
1", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testNonexistingQueue()");
@@ -408,7 +408,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
"-yj", flinkUberjar.getAbsolutePath(),
"-yn", "1",
"-ys", "2", //test that the job is executed with a DOP of 2
- "-yjm", "512",
+ "-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
/* test succeeded after this string */
"Job execution switched to status FINISHED.",
@@ -431,7 +431,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
"-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yn", "1",
- "-yjm", "512",
+ "-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
/* test succeeded after this string */
"Job execution switched to status FINISHED.",
@@ -467,7 +467,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
"-yn", "1",
- "-yjm", "512",
+ "-yjm", "768",
"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
"-ytm", "1024",
"-ys", "2", // test requesting slots from YARN.
@@ -541,8 +541,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog);
// expecting 512 mb, because TM was started with 1024, we cut off 50% (NOT THE DEFAULT
VALUE).
- Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms512m
-Xmx512m' not found in JobManager log: '"+jobmanagerLog+"'",
- content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms512m -Xmx512m"));
+ Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms424m
-Xmx424m' not found in JobManager log: '"+jobmanagerLog+"'",
+ content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m"));
Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log."
+
"This string checks that the job has been started with a parallelism of 2. Log contents:
'"+jobmanagerLog+"'",
content.contains(" (2/2) (attempt #0) to "));
@@ -558,7 +558,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
} catch(Throwable t) {
LOG.warn("Error while detached yarn session was running", t);
- Assert.fail();
+ Assert.fail(t.getMessage());
}
}
@@ -604,8 +604,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
flinkYarnClient.setTaskManagerCount(1);
- flinkYarnClient.setJobManagerMemory(512);
- flinkYarnClient.setTaskManagerMemory(512);
+ flinkYarnClient.setJobManagerMemory(768);
+ flinkYarnClient.setTaskManagerMemory(768);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
String confDirPath = System.getenv("FLINK_CONF_DIR");
flinkYarnClient.setConfigurationDirectory(confDirPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 502d72d..88b2987 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -99,8 +99,8 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
/**
* Minimum memory requirements, checked by the Client.
*/
- private static final int MIN_JM_MEMORY = 128;
- private static final int MIN_TM_MEMORY = 128;
+ private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than
the min heap cutoff
+ private static final int MIN_TM_MEMORY = 768;
private Configuration conf;
private YarnClient yarnClient;
@@ -164,7 +164,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
@Override
public void setJobManagerMemory(int memoryMb) {
if(memoryMb < MIN_JM_MEMORY) {
- throw new IllegalArgumentException("The JobManager memory is below the minimum required
memory amount "
+ throw new IllegalArgumentException("The JobManager memory ("+memoryMb+") is below the
minimum required memory amount "
+ "of "+MIN_JM_MEMORY+" MB");
}
this.jobManagerMemoryMb = memoryMb;
@@ -173,7 +173,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
@Override
public void setTaskManagerMemory(int memoryMb) {
if(memoryMb < MIN_TM_MEMORY) {
- throw new IllegalArgumentException("The TaskManager memory is below the minimum required
memory amount "
+ throw new IllegalArgumentException("The TaskManager memory ("+memoryMb+") is below the
minimum required memory amount "
+ "of "+MIN_TM_MEMORY+" MB");
}
this.taskManagerMemoryMb = memoryMb;
|