flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-4127] Check API compatbility for 1.1 in flink-core
Date Wed, 13 Jul 2016 08:43:53 GMT
Repository: flink
Updated Branches:
  refs/heads/master 74b09ce0d -> 6b7bb7614


[FLINK-4127] Check API compatbility for 1.1 in flink-core

This closes #2177


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b7bb761
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b7bb761
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b7bb761

Branch: refs/heads/master
Commit: 6b7bb7614848303d31811496fcb7107a78d448c3
Parents: 74b09ce
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Jun 28 15:12:05 2016 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Jul 13 10:43:40 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            | 28 +++++++++++++++-----
 .../flink/api/common/io/BinaryInputFormat.java  |  3 +++
 .../common/io/CheckpointableInputFormat.java    |  4 +--
 .../api/common/io/DelimitedInputFormat.java     |  3 +++
 .../flink/configuration/ConfigConstants.java    | 20 +++++++-------
 .../ContaineredTaskManagerParameters.java       | 10 +++----
 .../java/org/apache/flink/yarn/UtilsTest.java   | 18 ++++++-------
 .../main/java/org/apache/flink/yarn/Utils.java  | 12 ++++-----
 .../flink/yarn/YarnApplicationMasterRunner.java |  8 +++---
 9 files changed, 63 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 1e4f83a..1f0bf7e 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -41,9 +41,9 @@ The configuration files for the TaskManagers can be different, Flink does
not as
 
 - `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts,
both JobManager and TaskManager, and Flink's YARN client. This can be used to set different
garbage collectors or to include remote debuggers into the JVMs running Flink's services.
Use `env.java.opts.jobmanager` and `env.java.opts.taskmanager` for JobManager or TaskManager-specific
options, respectively.
 
-- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition
to the regular `env.java.opts`.
+- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition
to the regular `env.java.opts`. This configuration option is ignored by the YARN client.
 
-- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition
to the regular `env.java.opts`.
+- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition
to the regular `env.java.opts`. This configuration option is ignored by the YARN client.
 
 - `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator
of the distributed system (DEFAULT: localhost).
 
@@ -71,7 +71,7 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is
going to
 
 ### Managed Memory
 
-By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb`
for its managed memory. Managed memory helps Flink to run the operators efficiently. It prevents
`OutOfMemoryException`s because Flink knows how much memory it can use to execute operations.
If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations
can be performed directly on the raw data without having to deserialize the data to convert
it into Java objects. All in all, managed memory improves the robustness and speed of the
system.
+By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb`
for its managed memory. Managed memory helps Flink to run the batch operators efficiently.
It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute
operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory,
some operations can be performed directly on the raw data without having to deserialize the
data to convert it into Java objects. All in all, managed memory improves the robustness and
speed of the system.
 
 The default fraction for managed memory can be adjusted using the `taskmanager.memory.fraction`
parameter. An absolute value may be set using `taskmanager.memory.size` (overrides the fraction
parameter). If desired, the managed memory may be allocated outside the JVM heap. This may
improve performance in setups with large memory sizes.
 
@@ -107,12 +107,18 @@ Please make sure to set the maximum ticket life span high long running
jobs. The
 
 If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a
Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with
Kerberos using the `kinit` tool.
 
+
 ### Other
 
 - `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated
by the systems directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories
are specified, then the temporary files will be distributed across the directories in a round-robin
fashion. The I/O manager component will spawn one reading and one writing thread per directory.
A directory may be listed multiple times to have the I/O manager use multiple threads for
it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system's
tmp dir).
 
+- `taskmanager.log.path`: The config parameter defining the taskmanager log file location
+
 - `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 8081).
 
+- `jobmanager.web.tmpdir`: This configuration parameter allows defining the Flink web directory
to be used by the web interface. The web interface
+will copy its static files into the directory. Also uploaded job jars are stored in the directory.
By default, the temporary directory is used.
+
 - `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files
by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false)
 
 - `fs.output.always-create-directory`: File writers running with a parallelism larger than
one create a directory for the output file path and put the different result files (one per
parallel writer task) into that directory. If this option is set to *true*, writers with a
parallelism of 1 will also create a directory and place a single result file into it. If the
option is set to *false*, the writer will directly create the file directly at the output
path, without creating a containing directory. (DEFAULT: false)
@@ -238,13 +244,19 @@ definition. This scheme is used **ONLY** if no other scheme is specified
(explic
 - `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and fan-out
for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate
merging/partitioning, if set too small (DEFAULT: 128).
 - `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling when this
fraction of its memory budget is full (DEFAULT: 0.8).
 
+### Resource Manager
+
+The configuration keys in this section are independent of the used resource management framework
(YARN, Mesos, Standalone, ...)
+
+- `resourcemanager.rpc.port`: The config parameter defining the network port to connect to
for communication with the resource manager. By default, the port
+of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration
key to define port ranges.
+
+
 ## YARN
 
 - `yarn.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers
started by YARN. When a user requests a certain amount of memory for each TaskManager container
(for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx`
argument) because the JVM is also allocating memory outside the heap. YARN is very strict
with killing containers which are using more memory than requested. Therefore, we remove a
15% of the memory from the requested heap as a safety margin.
 - `yarn.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut off the requested
heap size.
 
-- `yarn.reallocate-failed` (Default 'true') Controls whether YARN should reallocate failed
containers
-
 - `yarn.maximum-failed-containers` (Default: number of requested containers). Maximum number
of containers the system is going to reallocate in case of a failure.
 
 - `yarn.application-attempts` (Default: 1). Number of ApplicationMaster restarts. Note that
that the entire Flink cluster will restart and the YARN Client will loose the connection.
Also, the JobManager address will change and you'll need to set the JM host:port manually.
It is recommended to leave this option at 1.
@@ -253,9 +265,11 @@ definition. This scheme is used **ONLY** if no other scheme is specified
(explic
 
 - `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted
to YARN, the JobManager's host and the number of available processing slots is written into
a properties file, so that the Flink client is able to pick those details up. This configuration
parameter allows changing the default location of that file (for example for environments
sharing a Flink installation between users)
 
+- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default,
the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise.
+
 - `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.`
will be passed as environment variables to the ApplicationMaster/JobManager process. For example
for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set:
 
-	yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+	`yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"`
 
 - `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default,
the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise.
 
@@ -263,7 +277,7 @@ definition. This scheme is used **ONLY** if no other scheme is specified
(explic
 
 - `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port)
With this configuration option, users can specify a port, a range of ports or a list of ports
for the  Application Master (and JobManager) RPC port. By default we recommend using the default
value (0) to let the operating system choose an appropriate port. In particular when multiple
AMs are running on the  same physical host, fixed port assignments prevent the AM from starting.
 
-For example when running Flink on YARN on an environment with a restrictive firewall, this
option allows specifying a range of allowed ports.
+  For example when running Flink on YARN on an environment with a restrictive firewall, this
option allows specifying a range of allowed ports.
 
 
 ## High Availability Mode

http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index 96e0e0d..14280d9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -373,6 +374,7 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
 	//  Checkpointing
 	// --------------------------------------------------------------------------------------------
 
+	@PublicEvolving
 	@Override
 	public Tuple2<Long, Long> getCurrentState() throws IOException {
 		if (this.blockBasedInput == null) {
@@ -385,6 +387,7 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
 		);
 	}
 
+	@PublicEvolving
 	@Override
 	public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException
{
 		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");

http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
index 17b0625..266914b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
@@ -25,7 +25,7 @@ import java.io.IOException;
 import java.io.Serializable;
 
 /**
- * An interface the describes {@link InputFormat}s that allow checkpointing/restoring their
state.
+ * An interface that describes {@link InputFormat}s that allow checkpointing/restoring their
state.
  *
  * @param <S> The type of input split.
  * @param <T> The type of the channel state to be checkpointed / included in the snapshot.
@@ -40,7 +40,7 @@ public interface CheckpointableInputFormat<S extends InputSplit, T extends
Seria
 	 *
 	 * @return The state of the channel.
 	 *
-	 * @throws Exception Thrown if the creation of the state object failed.
+	 * @throws IOException Thrown if the creation of the state object failed.
 	 */
 	T getCurrentState() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 4cd200d..59c1730 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -629,11 +630,13 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
imple
 	//  Checkpointing
 	// --------------------------------------------------------------------------------------------
 
+	@PublicEvolving
 	@Override
 	public Long getCurrentState() throws IOException {
 		return this.offset;
 	}
 
+	@PublicEvolving
 	@Override
 	public void reopen(FileInputSplit split, Long state) throws IOException {
 		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");

http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5715796..588423a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -291,29 +291,27 @@ public final class ConfigConstants {
 	 * Percentage of heap space to remove from containers (YARN / Mesos), to compensate
 	 * for other JVM memory usage.
 	 */
-	public static final String CONTAINERED_HEAP_CUTOFF_RATIO = "containered.heap-cutoff-ratio";
+	public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio";
 
 	/**
 	 * Minimum amount of heap memory to remove in containers, as a safety margin.
 	 */
-	public static final String CONTAINERED_HEAP_CUTOFF_MIN = "containered.heap-cutoff-min";
+	public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min";
 
 	/**
 	 * Prefix for passing custom environment variables to Flink's master process.
 	 * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
-	 * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+	 * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 	 * in the flink-conf.yaml.
 	 */
-	public static final String CONTAINERED_MASTER_ENV_PREFIX = "containered.application-master.env.";
+	public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env.";
 
 	/**
-	 * Similar to the {@see CONTAINERED_MASTER_ENV_PREFIX}, this configuration prefix allows
+	 * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
 	 * setting custom environment variables for the workers (TaskManagers)
 	 */
-	public static final String CONTAINERED_TASK_MANAGER_ENV_PREFIX = "containered.taskmanager.env.";
+	public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
 
-	// --------------------------Standalone Setup -----------------------------
-	
 	
 	// ------------------------ YARN Configuration ------------------------
 
@@ -324,12 +322,14 @@ public final class ConfigConstants {
 
 	/**
 	 * Percentage of heap space to remove from containers started by YARN.
+	 * @deprecated in favor of {@code #CONTAINERIZED_HEAP_CUTOFF_RATIO}
 	 */
 	@Deprecated
 	public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio";
 
 	/**
 	 * Minimum amount of memory to remove from the heap space as a safety margin.
+	 * @deprecated in favor of {@code #CONTAINERIZED_HEAP_CUTOFF_MIN}
 	 */
 	@Deprecated
 	public static final String YARN_HEAP_CUTOFF_MIN = "yarn.heap-cutoff-min";
@@ -377,7 +377,7 @@ public final class ConfigConstants {
 	 * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
 	 * 	yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 	 * in the flink-conf.yaml.
-	 * @deprecated Please use {@code CONTAINERED_MASTER_ENV_PREFIX}.
+	 * @deprecated Please use {@code CONTAINERIZED_MASTER_ENV_PREFIX}.
 	 */
 	@Deprecated
 	public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env.";
@@ -391,7 +391,7 @@ public final class ConfigConstants {
 	/**
 	 * Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows
 	 * setting custom environment variables.
-	 * @deprecated Please use {@code CONTAINERED_TASK_MANAGER_ENV_PREFIX}.
+	 * @deprecated Please use {@code CONTAINERIZED_TASK_MANAGER_ENV_PREFIX}.
 	 */
 	@Deprecated
 	public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env.";

http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 532b1af..3dc4394 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -114,22 +114,22 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable
{
 		// (1) compute how much memory we subtract from the total memory, to get the Java memory
 
 		final float memoryCutoffRatio = config.getFloat(
-			ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO,
+			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
 			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
 
 		final int minCutoff = config.getInteger(
-			ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN,
+			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
 			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
 
 		if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given="
+				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value
given="
 				+ memoryCutoffRatio);
 		}
 
 		if (minCutoff >= containerMemoryMB) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN + "'='" + minCutoff
+				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "'='" + minCutoff
 				+ "' is larger than the total container memory " + containerMemoryMB);
 		}
 
@@ -166,7 +166,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable
{
 		
 		// (3) obtain the additional environment variables from the configuration
 		final HashMap<String, String> envVars = new HashMap<>();
-		final String prefix = ConfigConstants.CONTAINERED_TASK_MANAGER_ENV_PREFIX;
+		final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
 		
 		for (String key : config.keySet()) {
 			if (key.startsWith(prefix) && key.length() > prefix.length()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 784bf24..c710064 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -57,8 +57,8 @@ public class UtilsTest {
 	@Test
 	public void testHeapCutoff() {
 		Configuration conf = new Configuration();
-		conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 0.15);
-		conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 384);
+		conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15);
+		conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
 
 		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
 		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
@@ -66,14 +66,14 @@ public class UtilsTest {
 		// test different configuration
 		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, "1000");
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.1");
+		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, "1000");
+		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.1");
 		Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.5");
+		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.5");
 		Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1");
+		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 
 		// test also deprecated keys
@@ -88,21 +88,21 @@ public class UtilsTest {
 	@Test(expected = IllegalArgumentException.class)
 	public void illegalArgument() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1.1");
+		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1.1");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void illegalArgumentNegative() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "-0.01");
+		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "-0.01");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void tooMuchCutoff() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "6000");
+		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "6000");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index f56c024..d5bad2f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -67,23 +67,23 @@ public final class Utils {
 	public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration
conf) {
 
 		BootstrapTools.substituteDeprecatedConfigKey(conf,
-			ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO);
+			ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
 		BootstrapTools.substituteDeprecatedConfigKey(conf,
-			ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN);
+			ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
 
-		float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO,
+		float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
 			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
-		int minCutoff = conf.getInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN,
+		int minCutoff = conf.getInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
 			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
 
 		if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO
+				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO
 				+ "' must be between 0 and 1. Value given=" + memoryCutoffRatio);
 		}
 		if (minCutoff > memory) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN
+				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN
 				+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b7bb761/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 563425f..582910b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -439,19 +439,19 @@ public class YarnApplicationMasterRunner {
 
 		BootstrapTools.substituteDeprecatedConfigKey(configuration,
 			ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
-			ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO);
+			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
 
 		BootstrapTools.substituteDeprecatedConfigKey(configuration,
 			ConfigConstants.YARN_HEAP_CUTOFF_MIN,
-			ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN);
+			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
 
 		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
 			ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
-			ConfigConstants.CONTAINERED_MASTER_ENV_PREFIX);
+			ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
 
 		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
 			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
-			ConfigConstants.CONTAINERED_TASK_MANAGER_ENV_PREFIX);
+			ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
 		return configuration;
 	}


Mime
View raw message