flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7400][cluster] fix cut-off memory not used for off-heap reserve as intended
Date Wed, 01 Nov 2017 12:11:59 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0ba528c71 -> 0df8e0797


[FLINK-7400][cluster] fix cut-off memory not used for off-heap reserve as intended

+ fix description of `containerized.heap-cutoff-ratio`

[FLINK-7400][yarn] add an integration test for yarn container memory restrictions using off-heap
memory

[FLINK-7400] address PR comments

This closes #4506.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0df8e079
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0df8e079
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0df8e079

Branch: refs/heads/master
Commit: 0df8e0797459ef9e8dfa177920def08bc2f11d65
Parents: 0ba528c
Author: Nico Kruber <nico@data-artisans.com>
Authored: Wed Aug 9 11:53:03 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Nov 1 13:11:18 2017 +0100

----------------------------------------------------------------------
 docs/ops/config.md                              |  2 +-
 .../ContaineredTaskManagerParameters.java       |  3 +-
 .../YARNSessionCapacitySchedulerITCase.java     | 55 +++++++++++++++++++-
 3 files changed, 56 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0df8e079/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index b72bc10..9ee7106 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -451,7 +451,7 @@ of the JobManager, because the same ActorSystem is used. Its not possible
to use
 
 ### YARN
 
-- `containerized.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from
containers started by YARN. When a user requests a certain amount of memory for each TaskManager
container (for example 4 GB), we can not pass this amount as the maximum heap space for the
JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is
very strict with killing containers which are using more memory than requested. Therefore,
we remove a 15% of the memory from the requested heap as a safety margin.
+- `containerized.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from
containers started by YARN. When a user requests a certain amount of memory for each TaskManager
container (for example 4 GB), we can not pass this amount as the maximum heap space for the
JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is
very strict with killing containers which are using more memory than requested. Therefore,
we remove this fraction of the memory from the requested heap as a safety margin and add it
to the memory used off-heap.
 
 - `containerized.heap-cutoff-min`: (Default 600 MB) Minimum amount of memory to cut off the
requested heap size.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0df8e079/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 7e9891f..c35cf81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -141,7 +141,8 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable
{
 
 		// (2) split the remaining Java memory between heap and off-heap
 		final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
-		final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : javaMemorySizeMB - heapSizeMB;

+		// use the cut-off memory for off-heap (that was its intention)
+		final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : containerMemoryMB - heapSizeMB;
 
 		// (3) obtain the additional environment variables from the configuration
 		final HashMap<String, String> envVars = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/0df8e079/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 4441d8a..03c61e8 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
@@ -124,9 +127,57 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase
{
 				"-ys", "2", //test that the job is executed with a DOP of 2
 				"-yjm", "768",
 				"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
-				/* test succeeded after this string */
+			/* test succeeded after this string */
 			"Job execution complete",
-			/* prohibited strings: (we want to see "DataSink (...) (2/2) switched to FINISHED") */
+			/* prohibited strings: (to verify the parallelism) */
+			// (we should see "DataSink (...) (1/2)" and "DataSink (...) (2/2)" instead)
+			new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"},
+			RunTypes.CLI_FRONTEND, 0, true);
+		LOG.info("Finished perJobYarnCluster()");
+	}
+
+	/**
+	 * Test per-job yarn cluster and memory calculations for off-heap use (see FLINK-7400) with
the
+	 * same job as {@link #perJobYarnCluster()}.
+	 *
+	 * <p>This ensures that with (any) pre-allocated off-heap memory by us, there is some
off-heap
+	 * memory remaining for Flink's libraries. Creating task managers will thus fail if no off-heap
+	 * memory remains.
+	 */
+	@Test
+	public void perJobYarnClusterOffHeap() {
+		LOG.info("Starting perJobYarnCluster()");
+		addTestAppender(JobClient.class, Level.INFO);
+		File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
+		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
+
+		// set memory constraints (otherwise this is the same test as perJobYarnCluster() above)
+		final long taskManagerMemoryMB = 1024;
+		//noinspection NumericOverflow if the calculation of the total Java memory size overflows,
default configuration parameters are wrong in the first place, so we can ignore this inspection
+		final long networkBuffersMB = TaskManagerServices
+			.calculateNetworkBufferMemory(
+				(taskManagerMemoryMB -
+					ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.defaultValue()) << 20,
+				new Configuration()) >> 20;
+		final long offHeapMemory = taskManagerMemoryMB
+			- ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.defaultValue()
+			// cutoff memory (will be added automatically)
+			- networkBuffersMB // amount of memory used for network buffers
+			- 100; // reserve something for the Java heap space
+
+		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
+				"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
+				"-yn", "1",
+				"-ys", "2", //test that the job is executed with a DOP of 2
+				"-yjm", "768",
+				"-ytm", String.valueOf(taskManagerMemoryMB),
+				"-yD", "taskmanager.memory.off-heap=true",
+				"-yD", "taskmanager.memory.size=" + offHeapMemory,
+				"-yD", "taskmanager.memory.preallocate=true", exampleJarLocation.getAbsolutePath()},
+			/* test succeeded after this string */
+			"Job execution complete",
+			/* prohibited strings: (to verify the parallelism) */
+			// (we should see "DataSink (...) (1/2)" and "DataSink (...) (2/2)" instead)
 			new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"},
 			RunTypes.CLI_FRONTEND, 0, true);
 		LOG.info("Finished perJobYarnCluster()");


Mime
View raw message