Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2C4D0200B3C for ; Wed, 13 Jul 2016 10:43:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2AD73160A6E; Wed, 13 Jul 2016 08:43:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D025B160A57 for ; Wed, 13 Jul 2016 10:43:54 +0200 (CEST) Received: (qmail 2912 invoked by uid 500); 13 Jul 2016 08:43:54 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 2903 invoked by uid 99); 13 Jul 2016 08:43:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jul 2016 08:43:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B1D96E04BE; Wed, 13 Jul 2016 08:43:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Message-Id: <19fcb23f10fc4d60b6ae72254d958ec5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-4127] Check API compatbility for 1.1 in flink-core Date: Wed, 13 Jul 2016 08:43:53 +0000 (UTC) archived-at: Wed, 13 Jul 2016 08:43:56 -0000 Repository: flink Updated Branches: refs/heads/master 74b09ce0d -> 6b7bb7614 [FLINK-4127] Check API compatbility for 1.1 in flink-core This closes #2177 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b7bb761 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b7bb761 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b7bb761 Branch: refs/heads/master Commit: 6b7bb7614848303d31811496fcb7107a78d448c3 Parents: 74b09ce Author: Robert Metzger Authored: Tue Jun 28 15:12:05 2016 +0200 Committer: Robert Metzger Committed: Wed Jul 13 10:43:40 2016 +0200 ---------------------------------------------------------------------- docs/setup/config.md | 28 +++++++++++++++----- .../flink/api/common/io/BinaryInputFormat.java | 3 +++ .../common/io/CheckpointableInputFormat.java | 4 +-- .../api/common/io/DelimitedInputFormat.java | 3 +++ .../flink/configuration/ConfigConstants.java | 20 +++++++------- .../ContaineredTaskManagerParameters.java | 10 +++---- .../java/org/apache/flink/yarn/UtilsTest.java | 18 ++++++------- .../main/java/org/apache/flink/yarn/Utils.java | 12 ++++----- .../flink/yarn/YarnApplicationMasterRunner.java | 8 +++--- 9 files changed, 63 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index 1e4f83a..1f0bf7e 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -41,9 +41,9 @@ The configuration files for the TaskManagers can be different, Flink does not as - `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts, both JobManager and TaskManager, and Flink's YARN client. This can be used to set different garbage collectors or to include remote debuggers into the JVMs running Flink's services. Use `env.java.opts.jobmanager` and `env.java.opts.taskmanager` for JobManager or TaskManager-specific options, respectively. -- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`. +- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client. -- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`. +- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client. - `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). @@ -71,7 +71,7 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to ### Managed Memory -By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system. +By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system. The default fraction for managed memory can be adjusted using the `taskmanager.memory.fraction` parameter. An absolute value may be set using `taskmanager.memory.size` (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes. @@ -107,12 +107,18 @@ Please make sure to set the maximum ticket life span high long running jobs. The If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool. + ### Other - `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the systems directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round-robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system's tmp dir). +- `taskmanager.log.path`: The config parameter defining the taskmanager log file location + - `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 8081). +- `jobmanager.web.tmpdir`: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface +will copy its static files into the directory. Also uploaded job jars are stored in the directory. By default, the temporary directory is used. + - `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false) - `fs.output.always-create-directory`: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to *true*, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to *false*, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false) @@ -238,13 +244,19 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic - `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128). - `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling when this fraction of its memory budget is full (DEFAULT: 0.8). +### Resource Manager + +The configuration keys in this section are independent of the used resource management framework (YARN, Mesos, Standalone, ...) + +- `resourcemanager.rpc.port`: The config parameter defining the network port to connect to for communication with the resource manager. By default, the port +of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges. + + ## YARN - `yarn.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin. - `yarn.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested heap size. -- `yarn.reallocate-failed` (Default 'true') Controls whether YARN should reallocate failed containers - - `yarn.maximum-failed-containers` (Default: number of requested containers). Maximum number of containers the system is going to reallocate in case of a failure. - `yarn.application-attempts` (Default: 1). Number of ApplicationMaster restarts. Note that that the entire Flink cluster will restart and the YARN Client will loose the connection. Also, the JobManager address will change and you'll need to set the JM host:port manually. It is recommended to leave this option at 1. @@ -253,9 +265,11 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic - `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN, the JobManager's host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users) +- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise. + - `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: - yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" + `yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"` - `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise. @@ -263,7 +277,7 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic - `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. -For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports. + For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports. ## High Availability Mode http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java index 96e0e0d..14280d9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.io; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -373,6 +374,7 @@ public abstract class BinaryInputFormat extends FileInputFormat // Checkpointing // -------------------------------------------------------------------------------------------- + @PublicEvolving @Override public Tuple2 getCurrentState() throws IOException { if (this.blockBasedInput == null) { @@ -385,6 +387,7 @@ public abstract class BinaryInputFormat extends FileInputFormat ); } + @PublicEvolving @Override public void reopen(FileInputSplit split, Tuple2 state) throws IOException { Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java index 17b0625..266914b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java @@ -25,7 +25,7 @@ import java.io.IOException; import java.io.Serializable; /** - * An interface the describes {@link InputFormat}s that allow checkpointing/restoring their state. + * An interface that describes {@link InputFormat}s that allow checkpointing/restoring their state. * * @param The type of input split. * @param The type of the channel state to be checkpointed / included in the snapshot. @@ -40,7 +40,7 @@ public interface CheckpointableInputFormat extends FileInputFormat imple // Checkpointing // -------------------------------------------------------------------------------------------- + @PublicEvolving @Override public Long getCurrentState() throws IOException { return this.offset; } + @PublicEvolving @Override public void reopen(FileInputSplit split, Long state) throws IOException { Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 5715796..588423a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -291,29 +291,27 @@ public final class ConfigConstants { * Percentage of heap space to remove from containers (YARN / Mesos), to compensate * for other JVM memory usage. */ - public static final String CONTAINERED_HEAP_CUTOFF_RATIO = "containered.heap-cutoff-ratio"; + public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio"; /** * Minimum amount of heap memory to remove in containers, as a safety margin. */ - public static final String CONTAINERED_HEAP_CUTOFF_MIN = "containered.heap-cutoff-min"; + public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min"; /** * Prefix for passing custom environment variables to Flink's master process. * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: - * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" + * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. */ - public static final String CONTAINERED_MASTER_ENV_PREFIX = "containered.application-master.env."; + public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env."; /** - * Similar to the {@see CONTAINERED_MASTER_ENV_PREFIX}, this configuration prefix allows + * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows * setting custom environment variables for the workers (TaskManagers) */ - public static final String CONTAINERED_TASK_MANAGER_ENV_PREFIX = "containered.taskmanager.env."; + public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env."; - // --------------------------Standalone Setup ----------------------------- - // ------------------------ YARN Configuration ------------------------ @@ -324,12 +322,14 @@ public final class ConfigConstants { /** * Percentage of heap space to remove from containers started by YARN. + * @deprecated in favor of {@code #CONTAINERIZED_HEAP_CUTOFF_RATIO} */ @Deprecated public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio"; /** * Minimum amount of memory to remove from the heap space as a safety margin. + * @deprecated in favor of {@code #CONTAINERIZED_HEAP_CUTOFF_MIN} */ @Deprecated public static final String YARN_HEAP_CUTOFF_MIN = "yarn.heap-cutoff-min"; @@ -377,7 +377,7 @@ public final class ConfigConstants { * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. - * @deprecated Please use {@code CONTAINERED_MASTER_ENV_PREFIX}. + * @deprecated Please use {@code CONTAINERIZED_MASTER_ENV_PREFIX}. */ @Deprecated public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env."; @@ -391,7 +391,7 @@ public final class ConfigConstants { /** * Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows * setting custom environment variables. - * @deprecated Please use {@code CONTAINERED_TASK_MANAGER_ENV_PREFIX}. + * @deprecated Please use {@code CONTAINERIZED_TASK_MANAGER_ENV_PREFIX}. */ @Deprecated public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index 532b1af..3dc4394 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -114,22 +114,22 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // (1) compute how much memory we subtract from the total memory, to get the Java memory final float memoryCutoffRatio = config.getFloat( - ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); final int minCutoff = config.getInteger( - ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF); if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); } if (minCutoff >= containerMemoryMB) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN + "'='" + minCutoff + + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "'='" + minCutoff + "' is larger than the total container memory " + containerMemoryMB); } @@ -166,7 +166,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // (3) obtain the additional environment variables from the configuration final HashMap envVars = new HashMap<>(); - final String prefix = ConfigConstants.CONTAINERED_TASK_MANAGER_ENV_PREFIX; + final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; for (String key : config.keySet()) { if (key.startsWith(prefix) && key.length() > prefix.length()) { http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java index 784bf24..c710064 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -57,8 +57,8 @@ public class UtilsTest { @Test public void testHeapCutoff() { Configuration conf = new Configuration(); - conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 0.15); - conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 384); + conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15); + conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 384); Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) ); Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) ); @@ -66,14 +66,14 @@ public class UtilsTest { // test different configuration Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, "1000"); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.1"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, "1000"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.1"); Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.5"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.5"); Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); // test also deprecated keys @@ -88,21 +88,21 @@ public class UtilsTest { @Test(expected = IllegalArgumentException.class) public void illegalArgument() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1.1"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1.1"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } @Test(expected = IllegalArgumentException.class) public void illegalArgumentNegative() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "-0.01"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "-0.01"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } @Test(expected = IllegalArgumentException.class) public void tooMuchCutoff() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "6000"); + conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "6000"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index f56c024..d5bad2f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -67,23 +67,23 @@ public final class Utils { public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) { BootstrapTools.substituteDeprecatedConfigKey(conf, - ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO); + ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO); BootstrapTools.substituteDeprecatedConfigKey(conf, - ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN); + ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN); - float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, + float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); - int minCutoff = conf.getInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, + int minCutoff = conf.getInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF); if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO + + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); } if (minCutoff > memory) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN + + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "' is higher (" + minCutoff + ") than the requested amount of memory " + memory); } http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 563425f..582910b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -439,19 +439,19 @@ public class YarnApplicationMasterRunner { BootstrapTools.substituteDeprecatedConfigKey(configuration, ConfigConstants.YARN_HEAP_CUTOFF_RATIO, - ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO); + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO); BootstrapTools.substituteDeprecatedConfigKey(configuration, ConfigConstants.YARN_HEAP_CUTOFF_MIN, - ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN); + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN); BootstrapTools.substituteDeprecatedConfigPrefix(configuration, ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, - ConfigConstants.CONTAINERED_MASTER_ENV_PREFIX); + ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX); BootstrapTools.substituteDeprecatedConfigPrefix(configuration, ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX, - ConfigConstants.CONTAINERED_TASK_MANAGER_ENV_PREFIX); + ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX); return configuration; }