flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/3] flink git commit: [FLINK-2641] integrate off-heap memory configuration - add offheap configuration parameter taskmanager.memory.off-heap - remove offheap ratio parameter and reuse memory fraction parameter - set JVM -XX:MaxDirectMemorySize paramete
Date Wed, 16 Sep 2015 14:57:22 GMT
[FLINK-2641] integrate off-heap memory configuration
- add offheap configuration parameter taskmanager.memory.off-heap
- remove offheap ratio parameter and reuse memory fraction parameter
- set JVM -XX:MaxDirectMemorySize parameter correctly

This closes #1129


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76a40d59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76a40d59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76a40d59

Branch: refs/heads/master
Commit: 76a40d59e6623cfbc6e265d26da4e739e5e7ed18
Parents: a3150a3
Author: Maximilian Michels <mxm@apache.org>
Authored: Mon Sep 14 15:07:23 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Sep 16 16:16:03 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 10 ---
 flink-dist/src/main/flink-bin/bin/config.sh     | 51 +++++++++++++--
 flink-dist/src/main/flink-bin/bin/jobmanager.sh | 10 +--
 .../src/main/flink-bin/bin/taskmanager.sh       | 38 ++++++++++--
 .../io/network/buffer/NetworkBufferPool.java    |  9 +--
 .../flink/runtime/taskmanager/TaskManager.scala | 65 +++++++++++++-------
 6 files changed, 132 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index bbaf71a..cd7fd76 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -125,11 +125,6 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";
 
 	/**
-	 * The fraction of off-heap memory relative to the heap size.
-	 */
-	public static final String TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY = "taskmanager.memory.off-heap-ratio";
-	
-	/**
 	 * The config parameter defining the memory allocation method (JVM heap or off-heap).
 	*/
 	public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap";
@@ -546,11 +541,6 @@ public final class ConfigConstants {
 	 * The default fraction of the free memory allocated by the task manager's memory manager.
 	 */
 	public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;
-
-	/**
-	 * The default ratio of heap to off-heap memory, when the TaskManager is started with off-heap
memory.
-	 */
-	public static final float DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO = 3.0f;
 	
 	/**
 	 * Default number of buffers used in the network stack.

http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 2aa9c78..f4f58f2 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -87,8 +87,17 @@ DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters
ru
 # CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
 ########################################################################################################################
 
-KEY_JOBM_HEAP_MB="jobmanager.heap.mb"
-KEY_TASKM_HEAP_MB="taskmanager.heap.mb"
+KEY_JOBM_MEM_SIZE="jobmanager.heap.mb"
+KEY_TASKM_MEM_SIZE="taskmanager.heap.mb"
+KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
+KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
+KEY_TASKM_MEM_NETWORK_BUFFERS="taskmanager.network.numberOfBuffers"
+# BEGIN:deprecated
+KEY_TASKM_MEM_NETWORK_BUFFER_SIZE="taskmanager.network.bufferSizeInBytes"
+# END:deprecated
+KEY_TASKM_MEM_SEGMENT_SIZE="taskmanager.memory.segment-size"
+KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
+
 KEY_ENV_PID_DIR="env.pid.dir"
 KEY_ENV_LOG_MAX="env.log.max"
 KEY_ENV_JAVA_HOME="env.java.home"
@@ -132,7 +141,8 @@ FLINK_ROOT_DIR_MANGLED=`manglePath "$FLINK_ROOT_DIR"`
 if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; fi
 FLINK_BIN_DIR=$FLINK_ROOT_DIR_MANGLED/bin
 FLINK_LOG_DIR=$FLINK_ROOT_DIR_MANGLED/log
-YAML_CONF=${FLINK_CONF_DIR}/flink-conf.yaml
+FLINK_CONF_FILE="flink-conf.yaml"
+YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}
 
 ########################################################################################################################
 # ENVIRONMENT VARIABLES
@@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$"
 
 # Define FLINK_JM_HEAP if it is not already set
 if [ -z "${FLINK_JM_HEAP}" ]; then
-    FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}")
+    FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
-    FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}")
+    FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
+    FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
+    FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
+    BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" "${YAML_CONF}")
+    if [ "${BUFFER_SIZE}" -eq "0" ]; then
+        BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFER_SIZE} "$((32 * 1024))"
"${YAML_CONF}")
+    fi
+    NUM_BUFFERS=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFERS} "2048" "${YAML_CONF}")
+    FLINK_TM_MEM_NETWORK_SIZE=$((((NUM_BUFFERS * BUFFER_SIZE) >> 20) + 1))
+fi
+
+# Define FLINK_TM_OFFHEAP if it is not already set
+if [ -z "${FLINK_TM_OFFHEAP}" ]; then
+    FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} 0 "${YAML_CONF}")
 fi
 
 if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
@@ -211,7 +246,7 @@ fi
 
 # Arguments for the JVM. Used for job and task manager JVMs.
 # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
-# KEY_JOBM_HEAP_MB and KEY_TASKM_HEAP_MB for that!
+# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
 if [ -z "${JVM_ARGS}" ]; then
     JVM_ARGS=""
 fi
@@ -308,3 +343,7 @@ readSlaves() {
         fi
     done < "$SLAVES_FILE"
 }
+
+useOffHeapMemory() {
+    [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index c18a909..45b8e79 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -43,21 +43,21 @@ if [[ $STARTSTOP == "start" ]]; then
         STREAMINGMODE="batch"
     fi
 
-    if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]]; then
-        echo "[ERROR] Configured JobManager JVM heap size is not a number. Please set '$KEY_JOBM_HEAP_MB'
in $FLINK_CONF_FILE."
+    if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP}" -lt "0" ]]; then
+        echo "[ERROR] Configured JobManager memory size is not a valid value. Please set
'${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
         exit 1
     fi
 
     if [ "$EXECUTIONMODE" = "local" ]; then
-        if [[ ! ${FLINK_TM_HEAP} =~ $IS_NUMBER ]]; then
-            echo "[ERROR] Configured JobManager JVM heap size is not a number. Please set
'$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+        if [[ ! ${FLINK_TM_HEAP} =~ $IS_NUMBER ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then
+            echo "[ERROR] Configured TaskManager memory size is not a valid value. Please
set ${KEY_TASKM_MEM_SIZE} in ${FLINK_CONF_FILE}."
             exit 1
         fi
 
         FLINK_JM_HEAP=`expr $FLINK_JM_HEAP + $FLINK_TM_HEAP`
     fi
 
-    if [ "$FLINK_JM_HEAP" -gt 0 ]; then
+    if [ "${FLINK_JM_HEAP}" -gt "0" ]; then
         export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP"m -Xmx"$FLINK_JM_HEAP"m"
     fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index c41270d..f5aecc6 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -51,13 +51,43 @@ if [[ $STARTSTOP == "start" ]]; then
         fi
     fi
 
-    if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB'
in $FLINK_CONF_FILE."
+    if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then
+        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}'
in ${FLINK_CONF_FILE}."
         exit 1
     fi
 
-    if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m"
+    if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+        TM_HEAP_SIZE=${FLINK_TM_HEAP}
+        TM_OFFHEAP_SIZE=0
+        # some space for Netty initialization
+        NETTY_BUFFERS=1
+
+        if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then
+            if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+                # We split up the total memory in heap and off-heap memory
+                if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+                    echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}')
must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+                    exit 1
+                fi
+                TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+                TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+            else
+                # We calculate the memory using a fraction of the total memory
+                if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 1.0"` !=
"0" ]] || [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} <= 0.0"` != "0" ]];
then
+                    echo "[ERROR] Configured TaskManager managed memory fraction is not a
valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}"
+                    exit 1
+                fi
+                # recalculate the JVM heap memory by taking the off-heap ratio into account
+                TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} *
${FLINK_TM_MEM_MANAGED_FRACTION}")`
+                TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE))
+            fi
+        fi
+
+        TM_HEAP_SIZE=$((TM_HEAP_SIZE - FLINK_TM_MEM_NETWORK_SIZE - NETTY_BUFFERS))
+        echo export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE
+ FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))M"
+        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE
+ FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))M"
+
     fi
 
     # Startup parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 209d925..641d13e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -196,10 +196,11 @@ public class NetworkBufferPool implements BufferPoolFactory {
 				throw new IOException(String.format("Insufficient number of network buffers: " +
 								"required %d, but only %d available. The total number of network " +
 								"buffers is currently set to %d. You can increase this " +
-								"number by setting the configuration key '" +
-								ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY +  "'.",
-						numRequiredBuffers, totalNumberOfMemorySegments - numTotalRequiredBuffers,
-						totalNumberOfMemorySegments));
+								"number by setting the configuration key '%s'.",
+						numRequiredBuffers,
+						totalNumberOfMemorySegments - numTotalRequiredBuffers,
+						totalNumberOfMemorySegments,
+						ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY));
 			}
 
 			this.numTotalRequiredBuffers += numRequiredBuffers;

http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f11b933..1563a7a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1578,7 +1578,7 @@ object TaskManager {
       LOG.info(s"Using $configuredMemory MB for Flink managed memory.")
       configuredMemory << 20 // megabytes to bytes
     }
-    else if (memType == MemoryType.HEAP) {
+    else {
       val fraction = configuration.getFloat(
         ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
         ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
@@ -1586,32 +1586,53 @@ object TaskManager {
                            ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
                            "MemoryManager fraction of the free memory must be between 0.0
and 1.0")
 
-      val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-        fraction).toLong
+      if (memType == MemoryType.HEAP) {
+        val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()
*
+          fraction).toLong
 
-      LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
-        s" heap memory (${relativeMemSize >> 20} MB).")
+        LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
+          s" heap memory (${relativeMemSize >> 20} MB).")
 
-      relativeMemSize
-    }
-    else {
-      val ratio = configuration.getFloat(
-        ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-      
-      checkConfigParameter(ratio > 0.0f,
-        ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-        "MemoryManager ratio (off-heap memory / heap size) must be larger than zero")
-      
-      val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-      val relativeMemSize = (maxHeapSize * ratio).toLong
+        relativeMemSize
+      }
+      else if (memType == MemoryType.OFF_HEAP) {
+
+        val networkBufferSizeNew = configuration.getLong(
+          ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+          ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
+
+        val networkBufferSizeOld = configuration.getLong(
+          ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+        val networkBufferSize =
+          if (networkBufferSizeNew != -1) {
+            networkBufferSizeNew
+          } else if (networkBufferSizeOld == -1) {
+            ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
+          } else {
+            networkBufferSizeOld
+          }
+
+        val numNetworkBuffers = configuration.getLong(
+          ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+          ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
+
+        // direct memory for Netty's off-heap buffers
+        val networkMemory = (numNetworkBuffers * networkBufferSize) + (1 << 20)
 
-      LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " +
-        s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+        // The maximum heap memory has been adjusted according to the fraction
+        val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory() + networkMemory
+        val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong
 
-      relativeMemSize
+        LOG.info(s"Using $fraction of the maximum memory size for " +
+          s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+
+        directMemorySize
+      }
+      else {
+        throw new RuntimeException("No supported memory type detected.")
+      }
     }
-    
+
     val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY
 
     // now start the memory manager


Mime
View raw message