Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EFF7B17E3F for ; Wed, 27 May 2015 07:57:38 +0000 (UTC) Received: (qmail 89626 invoked by uid 500); 27 May 2015 07:57:38 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 89516 invoked by uid 500); 27 May 2015 07:57:38 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 89467 invoked by uid 99); 27 May 2015 07:57:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 May 2015 07:57:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B59F2E0AFA; Wed, 27 May 2015 07:57:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Date: Wed, 27 May 2015 07:57:40 -0000 Message-Id: <16fab62e181245ccaec26ee2a8ea8f57@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] flink git commit: [yarn] Adjust default values for YARN heap memory cutoff [yarn] Adjust default values for YARN heap memory cutoff This closes #717 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c037be7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c037be7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c037be7 Branch: refs/heads/master Commit: 3c037be7d2884ff0276952b30638852a01ba687b Parents: 11b021b Author: Robert Metzger Authored: Fri May 22 18:10:15 2015 +0200 Committer: Robert Metzger Committed: Wed May 27 09:56:54 2015 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 12 ++++++++-- .../YARNSessionCapacitySchedulerITCase.java | 4 ++-- .../flink/yarn/YARNSessionFIFOITCase.java | 24 ++++++++++---------- .../org/apache/flink/yarn/FlinkYarnClient.java | 8 +++---- 4 files changed, 28 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 92acd3f..9d8da21 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -553,9 +553,17 @@ public final class ConfigConstants { // ------------------------ YARN Configuration ------------------------ - public static final int DEFAULT_YARN_MIN_HEAP_CUTOFF = 384; + /** + * Minimum amount of Heap memory to subtract from the requested TaskManager size. + * We came up with these values experimentally. + * Flink fails when the cutoff is set only to 500 mb. + */ + public static final int DEFAULT_YARN_MIN_HEAP_CUTOFF = 600; - public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.15f; + /** + * Relative amount of memory to subtract from the requested memory. + */ + public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f; // ------------------------ File System Behavior ------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index 9c6748a..4051ff5 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -56,7 +56,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { LOG.info("Starting testClientStartup()"); runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "1", - "-jm", "512", + "-jm", "768", "-tm", "1024", "-qu", "qa-team"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0); LOG.info("Finished testClientStartup()"); @@ -73,7 +73,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { addTestAppender(FlinkYarnClient.class, Level.WARN); runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "1", - "-jm", "512", + "-jm", "768", "-tm", "1024", "-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1); checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team"); http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index e22e0ef..f4febed 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -102,7 +102,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { LOG.info("Starting testClientStartup()"); runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "1", - "-jm", "512", + "-jm", "768", "-tm", "1024", "-s", "2" // Test that 2 slots are started on the TaskManager. }, @@ -119,7 +119,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { addTestAppender(FlinkYarnSessionCli.class, Level.INFO); Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "1", - "-jm", "512", + "-jm", "768", "-tm", "1024", "--detached"}, "Flink JobManager is now running on", RunTypes.YARN_SESSION); @@ -164,7 +164,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { LOG.info("Starting testTaskManagerFailure()"); Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "1", - "-jm", "512", + "-jm", "768", "-tm", "1024", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3"}, @@ -332,7 +332,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { LOG.info("Starting testNonexistingQueue()"); runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-n", "1", - "-jm", "512", + "-jm", "768", "-tm", "1024", "-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0); LOG.info("Finished testNonexistingQueue()"); @@ -408,7 +408,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { "-yj", flinkUberjar.getAbsolutePath(), "-yn", "1", "-ys", "2", //test that the job is executed with a DOP of 2 - "-yjm", "512", + "-yjm", "768", "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, /* test succeeded after this string */ "Job execution switched to status FINISHED.", @@ -431,7 +431,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yn", "1", - "-yjm", "512", + "-yjm", "768", "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, /* test succeeded after this string */ "Job execution switched to status FINISHED.", @@ -467,7 +467,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yn", "1", - "-yjm", "512", + "-yjm", "768", "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly "-ytm", "1024", "-ys", "2", // test requesting slots from YARN. @@ -541,8 +541,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase { Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog); content = FileUtils.readFileToString(jobmanagerLog); // expecting 512 mb, because TM was started with 1024, we cut off 50% (NOT THE DEFAULT VALUE). - Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms512m -Xmx512m' not found in JobManager log: '"+jobmanagerLog+"'", - content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms512m -Xmx512m")); + Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m' not found in JobManager log: '"+jobmanagerLog+"'", + content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m")); Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log." + "This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'", content.contains(" (2/2) (attempt #0) to ")); @@ -558,7 +558,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { } catch(Throwable t) { LOG.warn("Error while detached yarn session was running", t); - Assert.fail(); + Assert.fail(t.getMessage()); } } @@ -604,8 +604,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase { AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient(); Assert.assertNotNull("unable to get yarn client", flinkYarnClient); flinkYarnClient.setTaskManagerCount(1); - flinkYarnClient.setJobManagerMemory(512); - flinkYarnClient.setTaskManagerMemory(512); + flinkYarnClient.setJobManagerMemory(768); + flinkYarnClient.setTaskManagerMemory(768); flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); String confDirPath = System.getenv("FLINK_CONF_DIR"); flinkYarnClient.setConfigurationDirectory(confDirPath); http://git-wip-us.apache.org/repos/asf/flink/blob/3c037be7/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java index 502d72d..88b2987 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java @@ -99,8 +99,8 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { /** * Minimum memory requirements, checked by the Client. */ - private static final int MIN_JM_MEMORY = 128; - private static final int MIN_TM_MEMORY = 128; + private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff + private static final int MIN_TM_MEMORY = 768; private Configuration conf; private YarnClient yarnClient; @@ -164,7 +164,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { @Override public void setJobManagerMemory(int memoryMb) { if(memoryMb < MIN_JM_MEMORY) { - throw new IllegalArgumentException("The JobManager memory is below the minimum required memory amount " + throw new IllegalArgumentException("The JobManager memory ("+memoryMb+") is below the minimum required memory amount " + "of "+MIN_JM_MEMORY+" MB"); } this.jobManagerMemoryMb = memoryMb; @@ -173,7 +173,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { @Override public void setTaskManagerMemory(int memoryMb) { if(memoryMb < MIN_TM_MEMORY) { - throw new IllegalArgumentException("The TaskManager memory is below the minimum required memory amount " + throw new IllegalArgumentException("The TaskManager memory ("+memoryMb+") is below the minimum required memory amount " + "of "+MIN_TM_MEMORY+" MB"); } this.taskManagerMemoryMb = memoryMb;