flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [06/12] flink git commit: [FLINK-4545] [network] replace the network buffers parameter
Date Sat, 06 May 2017 17:47:50 GMT
[FLINK-4545] [network] replace the network buffers parameter

Instead, allow the configuration with the following three new (more flexible)
parameters:
 * "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1)
 * "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB)
 * "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB)

This closes #3721


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bb49e53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bb49e53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bb49e53

Branch: refs/heads/master
Commit: 0bb49e538c118b8265377355a9667789a3971966
Parents: ac72450
Author: Nico Kruber <nico@data-artisans.com>
Authored: Thu Apr 6 14:41:52 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat May 6 19:40:29 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  80 ++++-
 docs/setup/yarn_setup.md                        |   2 +-
 .../addons/hbase/HBaseConnectorITCase.java      |  37 +++
 .../flink/configuration/TaskManagerOptions.java |  20 ++
 flink-dist/pom.xml                              |  24 ++
 flink-dist/src/main/flink-bin/bin/config.sh     | 133 ++++++++
 .../src/main/flink-bin/bin/taskmanager.sh       |  31 +-
 flink-dist/src/test/bin/calcTMHeapSizeMB.sh     |  42 +++
 flink-dist/src/test/bin/calcTMNetBufMem.sh      |  39 +++
 ...kManagerHeapSizeCalculationJavaBashTest.java | 306 +++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |  35 +++
 .../clusterframework/BootstrapTools.java        |   2 +-
 .../ContaineredTaskManagerParameters.java       |  26 +-
 .../io/network/buffer/NetworkBufferPool.java    |   9 +-
 .../partition/SpillableSubpartition.java        |   6 +-
 .../minicluster/MiniClusterConfiguration.java   |   9 +-
 .../taskexecutor/TaskManagerServices.java       | 235 +++++++++++++-
 .../TaskManagerServicesConfiguration.java       | 114 ++++++-
 .../minicluster/LocalFlinkMiniCluster.scala     |  11 +-
 .../NetworkEnvironmentConfiguration.scala       |   4 +-
 .../TaskManagerServicesConfigurationTest.java   | 111 +++++++
 .../taskexecutor/TaskManagerServicesTest.java   | 289 ++++++++++++++++++
 ...askManagerComponentsStartupShutdownTest.java |   9 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   2 +
 .../org/apache/flink/yarn/cli/FlinkYarnCLI.java |   2 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   2 +-
 26 files changed, 1477 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index a3daac0..c4a7354 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -82,13 +82,13 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to
 
 ### Managed Memory
 
-By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.
+By default, Flink allocates a fraction of `0.7` of the free memory (total memory configured via `taskmanager.heap.mb` minus memory used for network buffers) for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.
 
 The default fraction for managed memory can be adjusted using the `taskmanager.memory.fraction` parameter. An absolute value may be set using `taskmanager.memory.size` (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes.
 
 - `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on-heap or off-heap (depending on `taskmanager.memory.off-heap`) for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio with respect to the size of the task manager JVM as specified by `taskmanager.memory.fraction`. (DEFAULT: -1)
 
-- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
+- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
 
 - `taskmanager.memory.off-heap`: If set to `true`, the task manager allocates memory which is used for sorting, hash tables, and caching of intermediate results outside of the JVM heap. For setups with larger quantities of memory, this can improve the efficiency of the operations performed on the memory (DEFAULT: false).
 
@@ -174,7 +174,11 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `fs.output.always-create-directory`: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to *true*, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to *false*, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false)
 
-- `taskmanager.network.numberOfBuffers`: The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: 2048).
+- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1)
+
+- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB)
+
+- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB)
 
 - `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends:
    -  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
@@ -257,11 +261,17 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 - `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the system's directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: **The system's tmp dir**).
 
-- `taskmanager.network.numberOfBuffers`: The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**).
+- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that `taskmanager.network.memory.min` and `taskmanager.network.memory.max` may override this fraction. (DEFAULT: **0.1**)
+
+- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: **64 MB**). Previously, this was determined from `taskmanager.network.numberOfBuffers` and `taskmanager.memory.segment-size`.
+
+- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: **1 GB**). Previously, this was determined from `taskmanager.network.numberOfBuffers` and `taskmanager.memory.segment-size`.
+
+- `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`.
 
 - `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on the JVM's heap space for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio of the heap memory available to the JVM, as specified by `taskmanager.memory.fraction`. (DEFAULT: **-1**)
 
-- `taskmanager.memory.fraction`: The relative amount of memory that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of 0.8 means that TaskManagers reserve 80% of the JVM's heap space for internal data buffers, leaving 20% of the JVM's heap space free for objects created by user-defined functions. (DEFAULT: **0.7**) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
+- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
 
 - `taskmanager.debug.memory.startLogThread`: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool.
 
@@ -614,9 +624,54 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated
 
 ## Background
 
+
 ### Configuring the Network Buffers
 
-If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers:
+If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you
+need to adapt the amount of memory used for network buffers in order for your program to run on your
+task managers.
+
+Network buffers are a critical resource for the communication layers. They are used to buffer
+records before transmission over a network, and to buffer incoming data before dissecting it into
+records and handing them to the application. A sufficient number of network buffers is critical to
+achieve a good throughput.
+
+<div class="alert alert-info">
+Since Flink 1.3, you may follow the idiom "more is better" without any penalty on the latency (we
+prevent excessive buffering in each outgoing and incoming channel, i.e. *buffer bloat*, by limiting
+the actual number of buffers used by each channel).
+</div>
+
+In general, configure the task manager to have enough buffers that each logical network connection
+you expect to be open at the same time has a dedicated buffer. A logical network connection exists
+for each point-to-point exchange of data over the network, which typically happens at
+repartitioning or broadcasting steps (shuffle phase). In those, each parallel task inside the
+TaskManager has to be able to talk to all other parallel tasks.
+
+#### Setting Memory Fractions
+
+Previously, the number of network buffers was set manually which became a quite error-prone task
+(see below). Since Flink 1.3, it is possible to define a fraction of memory that is being used for
+network buffers with the following configuration parameters:
+
+- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers (DEFAULT: 0.1),
+- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB),
+- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB), and
+- `taskmanager.memory.segment-size`: Size of memory buffers used by the memory manager and the
+network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
+
+#### Setting the Number of Network Buffers directly
+
+<div class="alert alert-warning">
+  <strong>Note:</strong> This way of configuring the amount of memory used for network buffers is deprecated. Please consider using the method above by defining a fraction of memory to use.
+</div>
+
+The required number of buffers on a task manager is
+*total-degree-of-parallelism* (number of targets) \* *intra-node-parallelism* (number of sources in one task manager) \* *n*
+with *n* being a constant that defines how many repartitioning-/broadcasting steps you expect to be
+active at the same time. Since the *intra-node-parallelism* is typically the number of cores, and
+more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently
+boils down to
 
 ```
 #slots-per-TM^2 * #TMs * 4
@@ -624,16 +679,11 @@ If you ever see the Exception `java.io.IOException: Insufficient number of netwo
 
 Where `#slots per TM` are the [number of slots per TaskManager](#configuring-taskmanager-processing-slots) and `#TMs` are the total number of task managers.
 
-Network buffers are a critical resource for the communication layers. They are used to buffer records before transmission over a network, and to buffer incoming data before dissecting it into records and handing them to the
-application. A sufficient number of network buffers is critical to achieve a good throughput.
-
-In general, configure the task manager to have enough buffers that each logical network connection you expect to be open at the same time has a dedicated buffer. A logical network connection exists for each point-to-point exchange of data over the network, which typically happens at repartitioning- or broadcasting steps (shuffle phase). In those, each parallel task inside the TaskManager has to be able to talk to all other parallel tasks. Hence, the required number of buffers on a task manager is *total-degree-of-parallelism* (number of targets) \* *intra-node-parallelism* (number of sources in one task manager) \* *n*. Here, *n* is a constant that defines how many repartitioning-/broadcasting steps you expect to be active at the same time.
-
-Since the *intra-node-parallelism* is typically the number of cores, and more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently boils down to `#slots-per-TM^2 * #TMs * 4`.
-
-To support for example a cluster of 20 8-slot machines, you should use roughly 5000 network buffers for optimal throughput.
+To support, for example, a cluster of 20 8-slot machines, you should use roughly 5000 network
+buffers for optimal throughput.
 
-Each network buffer has by default a size of 32 KiBytes. In the above example, the system would allocate roughly 300 MiBytes for network buffers.
+Each network buffer has by default a size of 32 KiBytes. In the example above, the system would thus
+allocate roughly 300 MiBytes for network buffers.
 
 The number and size of network buffers can be configured with the following parameters:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index 3149ec2..1ce45ad 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -122,7 +122,7 @@ The system will use the configuration in `conf/flink-conf.yaml`. Please follow o
 
 Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelism.default` if the number of slots has been specified.
 
-If you don't want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368`.
+If you don't want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624`.
 
 The example invocation starts 11 containers (even though only 10 containers were requested), since there is one additional container for the ApplicationMaster and Job Manager.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index c1aa9a0..33bbe12 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -22,7 +22,11 @@ package org.apache.flink.addons.hbase;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
@@ -36,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -74,6 +79,12 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 	public static void activateHBaseCluster() throws IOException {
 		registerHBaseMiniClusterInClasspath();
 		prepareTable();
+		LimitNetworkBuffersTestEnvironment.setAsContext();
+	}
+
+	@AfterClass
+	public static void resetExecutionEnvironmentFactory() {
+		LimitNetworkBuffersTestEnvironment.unsetAsContext();
 	}
 
 	private static void prepareTable() throws IOException {
@@ -335,4 +346,30 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 	}
 
 
+	/**
+	 * Allows the tests to use {@link ExecutionEnvironment#getExecutionEnvironment()} but with a
+	 * configuration that limits the maximum memory used for network buffers since the current
+	 * defaults are too high for Travis-CI.
+	 */
+	private static abstract class LimitNetworkBuffersTestEnvironment extends ExecutionEnvironment {
+
+		public static void setAsContext() {
+			Configuration config = new Configuration();
+			// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
+			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
+			final LocalEnvironment le = new LocalEnvironment(config);
+
+			initializeContextEnvironment(new ExecutionEnvironmentFactory() {
+				@Override
+				public ExecutionEnvironment createExecutionEnvironment() {
+					return le;
+				}
+			});
+		}
+
+		public static void unsetAsContext() {
+			resetContextEnvironment();
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index e915c0b..c5063d1 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -85,11 +85,30 @@ public class TaskManagerOptions {
 	/**
 	 * Number of buffers used in the network stack. This defines the number of possible tasks and
 	 * shuffles.
+	 *
+	 * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
+	 * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
 	 */
+	@Deprecated
 	public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
 			key("taskmanager.network.numberOfBuffers")
 			.defaultValue(2048);
 
+	/** Fraction of JVM memory to use for network buffers. */
+	public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
+			key("taskmanager.network.memory.fraction")
+			.defaultValue(0.1f);
+
+	/** Minimum memory size for network buffers (in bytes) */
+	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN =
+			key("taskmanager.network.memory.min")
+			.defaultValue(64L << 20); // 64 MB
+
+	/** Maximum memory size for network buffers (in bytes) */
+	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX =
+			key("taskmanager.network.memory.max")
+			.defaultValue(1024L << 20); // 1 GB
+
 
 	/** Minimum backoff for partition requests of input channels. */
 	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
@@ -101,6 +120,7 @@ public class TaskManagerOptions {
 			key("taskmanager.net.request-backoff.max")
 			.defaultValue(10000);
 
+
 	/**
 	 * Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel).
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index ff87d53..5f6a39b 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -254,6 +254,16 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 		<!-- end optional Flink libraries -->
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<!-- end test dependencies -->
 	</dependencies>
 
 	<profiles>
@@ -312,6 +322,20 @@ under the License.
 	<build>
 		<plugins>
 
+			<!--unit tests-->
+			<!--plugin must appear BEFORE the shade-plugin to not mess up package order and include the non-uber JAR into the assembly-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<reuseForks>false</reuseForks>
+					<!-- <workingDirectory>${project.build.testOutputDirectory}</workingDirectory> -->
+					<systemPropertyVariables>
+						<log.level>WARN</log.level>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
+
 			<!-- binary compatibility checks -->
 			<plugin>
 				<groupId>com.github.siom79.japicmp</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 0894481..7e3c1d4 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -108,6 +108,11 @@ KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
 KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
 KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate"
 
+KEY_TASKM_NET_BUF_FRACTION="taskmanager.network.memory.fraction"
+KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min"
+KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max"
+KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback
+
 KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
 
 KEY_ENV_PID_DIR="env.pid.dir"
@@ -231,6 +236,31 @@ if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then
     FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}")
 fi
 
+
+# Define FLINK_TM_NET_BUF_FRACTION if it is not already set
+if [ -z "${FLINK_TM_NET_BUF_FRACTION}" ]; then
+    FLINK_TM_NET_BUF_FRACTION=$(readFromConfig ${KEY_TASKM_NET_BUF_FRACTION} 0.1 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback)
+if [ -z "${FLINK_TM_NET_BUF_MIN}" -a -z "${FLINK_TM_NET_BUF_MAX}" ]; then
+    FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 "${YAML_CONF}")
+    FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN}
+fi
+
+# Define FLINK_TM_NET_BUF_MIN if it is not already set
+if [ -z "${FLINK_TM_NET_BUF_MIN}" -o "${FLINK_TM_NET_BUF_MIN}" = "-1" ]; then
+    # default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each)
+    FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_NET_BUF_MAX if it is not already set
+if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
+    # default: 1GB = 1073741824 bytes
+    FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 "${YAML_CONF}")
+fi
+
+
 # Verify that NUMA tooling is available
 command -v numactl >/dev/null 2>&1
 if [[ $? -ne 0 ]]; then
@@ -463,3 +493,106 @@ TMSlaves() {
 useOffHeapMemory() {
     [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
 }
+
+HAVE_AWK=
+# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config)
+calculateNetworkBufferMemory() {
+    local network_buffers_bytes
+    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+        echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+        exit 1
+    fi
+
+    if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
+        # fix memory size for network buffers
+        network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
+    else
+        if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
+            echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
+            echo "Min must be less than or equal to max."
+            echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
+            exit 1
+        fi
+
+        # Bash only performs integer arithmetic so floating point computation is performed using awk
+        if [[ -z "${HAVE_AWK}" ]] ; then
+            command -v awk >/dev/null 2>&1
+            if [[ $? -ne 0 ]]; then
+                echo "[ERROR] Program 'awk' not found."
+                echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+                exit 1
+            fi
+            HAVE_AWK=true
+        fi
+
+        # We calculate the memory using a fraction of the total memory
+        if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
+            echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
+            echo "It must be between 0.0 and 1.0."
+            echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+            exit 1
+        fi
+
+        network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+    fi
+
+    # recalculate the JVM heap memory by taking the network buffers into account
+    local tm_heap_size_bytes=$((${FLINK_TM_HEAP} << 20)) # megabytes to bytes
+    if [[ "${tm_heap_size_bytes}" -le "${network_buffers_bytes}" ]]; then
+        echo "[ERROR] Configured TaskManager memory size (${FLINK_TM_HEAP} MB, from '${KEY_TASKM_MEM_SIZE}') must be larger than the network buffer memory size (${network_buffers_bytes} bytes, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')."
+        exit 1
+    fi
+
+    echo ${network_buffers_bytes}
+}
+
+# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)
+calculateTaskManagerHeapSizeMB() {
+    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+        echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+        exit 1
+    fi
+
+    local tm_heap_size_mb=${FLINK_TM_HEAP}
+
+    if useOffHeapMemory; then
+
+        local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes
+        tm_heap_size_mb=$((tm_heap_size_mb - network_buffers_mb))
+
+        if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+            # We split up the total memory in heap and off-heap memory
+            if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+                echo "[ERROR] Remaining TaskManager memory size (${tm_heap_size_mb} MB, from: '${KEY_TASKM_MEM_SIZE}' (${FLINK_TM_HEAP} MB) minus network buffer memory size (${network_buffers_mb} MB, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')) must be larger than the managed memory size (${FLINK_TM_MEM_MANAGED_SIZE} MB, from: '${KEY_TASKM_MEM_MANAGED_SIZE}')."
+                exit 1
+            fi
+
+            tm_heap_size_mb=$((tm_heap_size_mb - FLINK_TM_MEM_MANAGED_SIZE))
+        else
+            # Bash only performs integer arithmetic so floating point computation is performed using awk
+            if [[ -z "${HAVE_AWK}" ]] ; then
+                command -v awk >/dev/null 2>&1
+                if [[ $? -ne 0 ]]; then
+                    echo "[ERROR] Program 'awk' not found."
+                    echo "Please install 'awk' or define '${KEY_TASKM_MEM_MANAGED_SIZE}' instead of '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
+                    exit 1
+                fi
+                HAVE_AWK=true
+            fi
+
+            # We calculate the memory using a fraction of the total memory
+            if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_MEM_MANAGED_FRACTION}"` != "1" ]]; then
+                echo "[ERROR] Configured TaskManager managed memory fraction '${FLINK_TM_MEM_MANAGED_FRACTION}' is not a valid value."
+                echo "It must be between 0.0 and 1.0."
+                echo "Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
+                exit 1
+            fi
+
+            # recalculate the JVM heap memory by taking the off-heap ratio into account
+            local offheap_managed_memory_size=`awk "BEGIN { printf \"%.0f\n\", ${tm_heap_size_mb} * ${FLINK_TM_MEM_MANAGED_FRACTION} }"`
+            tm_heap_size_mb=$((tm_heap_size_mb - offheap_managed_memory_size))
+        fi
+    fi
+
+    echo ${tm_heap_size_mb}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 8431408..b16abc9 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -57,39 +57,10 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
 
     if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
 
-        TM_HEAP_SIZE=${FLINK_TM_HEAP}
+        TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
         # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
         TM_MAX_OFFHEAP_SIZE="8388607T"
 
-        if useOffHeapMemory; then
-            if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
-                # We split up the total memory in heap and off-heap memory
-                if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
-                    echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')."
-                    exit 1
-                fi
-                TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
-            else
-                # Bash only performs integer arithmetic so floating point computation is performed using awk
-                command -v awk >/dev/null 2>&1
-                if [[ $? -ne 0 ]]; then
-                    echo "[ERROR] Program 'awk' not found."
-                    echo "Please install 'awk' or define '${KEY_TASKM_MEM_MANAGED_SIZE}' instead of '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
-                    exit 1
-                fi
-                # We calculate the memory using a fraction of the total memory
-                if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_MEM_MANAGED_FRACTION}"` != "1" ]]; then
-                    echo "[ERROR] Configured TaskManager managed memory fraction '${FLINK_TM_MEM_MANAGED_FRACTION}' is not a valid value."
-                    echo "It must be between 0.0 and 1.0."
-                    echo "Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
-                    exit 1
-                fi
-                # recalculate the JVM heap memory by taking the off-heap ratio into account
-                OFFHEAP_MANAGED_MEMORY_SIZE=`awk "BEGIN { printf \"%.0f\n\", ${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION} }"`
-                TM_HEAP_SIZE=$((FLINK_TM_HEAP - OFFHEAP_MANAGED_MEMORY_SIZE))
-            fi
-        fi
-
         export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
 
     fi

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/bin/calcTMHeapSizeMB.sh b/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
new file mode 100755
index 0000000..3956643
--- /dev/null
+++ b/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Wrapper script to compare the TM heap size calculation of config.sh with Java
+USAGE="Usage: calcTMHeapSizeMB.sh <memTotal> <offHeap> <netBufFrac> <netBufMin> <netBufMax> <managedMemMB> <managedMemFrac>"
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+FLINK_TM_HEAP=$1
+FLINK_TM_OFFHEAP=$2
+FLINK_TM_NET_BUF_FRACTION=$3
+FLINK_TM_NET_BUF_MIN=$4
+FLINK_TM_NET_BUF_MAX=$5
+FLINK_TM_MEM_MANAGED_SIZE=$6
+FLINK_TM_MEM_MANAGED_FRACTION=$7
+
+if [[ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]]; then
+  echo "$USAGE"
+  exit 1
+fi
+
+FLINK_CONF_DIR=${bin}/../../main/resources
+. ${bin}/../../main/flink-bin/bin/config.sh
+
+calculateTaskManagerHeapSizeMB

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/test/bin/calcTMNetBufMem.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/bin/calcTMNetBufMem.sh b/flink-dist/src/test/bin/calcTMNetBufMem.sh
new file mode 100755
index 0000000..9948d9c
--- /dev/null
+++ b/flink-dist/src/test/bin/calcTMNetBufMem.sh
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Wrapper script to compare the TM heap size calculation of config.sh with Java
+USAGE="Usage: calcTMNetBufMem.sh <memTotal> <netBufFrac> <netBufMin> <netBufMax>"
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+FLINK_TM_HEAP=$1
+FLINK_TM_NET_BUF_FRACTION=$2
+FLINK_TM_NET_BUF_MIN=$3
+FLINK_TM_NET_BUF_MAX=$4
+
+if [[ -z "${FLINK_TM_NET_BUF_MAX}" ]]; then
+  echo "$USAGE"
+  exit 1
+fi
+
+FLINK_CONF_DIR=${bin}/../../main/resources
+. ${bin}/../../main/flink-bin/bin/config.sh
+
+calculateNetworkBufferMemory

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
new file mode 100644
index 0000000..11d8ec7
--- /dev/null
+++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation used by the bash script
+ * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
+ * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
+
+	/** Key that is used by <tt>config.sh</tt>. */
+	private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+	/**
+	 * Number of tests with random values.
+	 *
+	 * NOTE: calling the external test script is slow and thus low numbers are preferred for general
+	 * testing.
+	 */
+	private static final int NUM_RANDOM_TESTS = 20;
+
+	@Before
+	public void checkOperatingSystem() {
+		Assume.assumeTrue("This test checks shell scripts which are not available on Windows.",
+			!OperatingSystem.isWindows());
+	}
+
+	/**
+	 * Tests that {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)} has the same
+	 * result as the shell script.
+	 */
+	@Test
+	public void compareNetworkBufShellScriptWithJava() throws Exception {
+		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+		// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+		compareNetworkBufJavaVsScript(
+			getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
+
+		compareNetworkBufJavaVsScript(
+			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f);
+
+		compareNetworkBufJavaVsScript(
+			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
+
+		// some automated tests with random (but valid) values
+
+		Random ran = new Random();
+		for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+			// tolerate that values differ by 1% (due to different floating point precisions)
+			compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f);
+		}
+	}
+
+	/**
+	 * Tests that {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} has the same
+	 * result as the shell script.
+	 */
+	@Test
+	public void compareHeapSizeShellScriptWithJava() throws Exception {
+		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+		// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+		compareHeapSizeJavaVsScript(
+			getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
+
+		compareHeapSizeJavaVsScript(
+			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f);
+
+		compareHeapSizeJavaVsScript(
+			getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
+
+		// some automated tests with random (but valid) values
+
+		Random ran = new Random();
+		for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+			// tolerate that values differ by 1% (due to different floating point precisions)
+			compareHeapSizeJavaVsScript(getRandomConfig(ran), 0.01f);
+		}
+	}
+
+	/**
+	 * Returns a flink configuration object with the given values.
+	 *
+	 * @param javaMemMB
+	 * 		total JVM memory to use (in megabytes)
+	 * @param useOffHeap
+	 * 		whether to use off-heap memory (<tt>true</tt>) or not (<tt>false</tt>)
+	 * @param netBufMemFrac
+	 * 		fraction of JVM memory to use for network buffers
+	 * @param netBufMemMin
+	 * 		minimum memory size for network buffers (in bytes)
+	 * @param netBufMemMax
+	 * 		maximum memory size for network buffers (in bytes)
+	 * @param managedMemSizeMB
+	 * 		amount of managed memory (in megabytes)
+	 * @param managedMemFrac
+	 * 		fraction of free memory to use for managed memory (if <tt>managedMemSizeMB</tt> is
+	 * 		<tt>-1</tt>)
+	 *
+	 * @return flink configuration
+	 */
+	private static Configuration getConfig(
+			final int javaMemMB, final boolean useOffHeap, final float netBufMemFrac,
+			final long netBufMemMin, final long netBufMemMax, final int managedMemSizeMB,
+			final float managedMemFrac) {
+
+		Configuration config = new Configuration();
+
+		config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB);
+		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap);
+
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, netBufMemMin);
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, netBufMemMax);
+
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB);
+		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, managedMemFrac);
+
+		return config;
+	}
+
+	/**
+	 * Returns a flink configuration object with random values (only those relevant to the tests in
+	 * this class.
+	 *
+	 * @param ran  random number generator
+	 *
+	 * @return flink configuration
+	 */
+	private static Configuration getRandomConfig(final Random ran) {
+
+		float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
+
+		// note: we are testing with integers only here to avoid overly complicated checks for
+		// overflowing or negative Long values - this should be enough for any practical scenario
+		// though
+		long min = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue() + ran.nextInt(Integer.MAX_VALUE);
+		long max = ran.nextInt(Integer.MAX_VALUE) + min;
+
+		int javaMemMB = Math.max((int) (max >> 20), ran.nextInt(Integer.MAX_VALUE)) + 1;
+		boolean useOffHeap = ran.nextBoolean();
+
+		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+		if (ran.nextBoolean()) {
+			// use fixed-size managed memory
+			Configuration config = getConfig(javaMemMB, useOffHeap, frac, min, max, managedMemSize, managedMemFrac);
+			long totalJavaMemorySize = ((long) javaMemMB) << 20; // megabytes to bytes
+			final int networkBufMB =
+				(int) (TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20);
+			// max (exclusive): total - netbuf
+			managedMemSize = Math.min(javaMemMB - networkBufMB - 1, ran.nextInt(Integer.MAX_VALUE));
+		} else {
+			// use fraction of given memory
+			managedMemFrac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
+		}
+
+		return getConfig(javaMemMB, useOffHeap, frac, min, max, managedMemSize, managedMemFrac);
+	}
+
+	// Helper functions
+
+	/**
+	 * Calculates the heap size via
+	 * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} and the shell script
+	 * and verifies that these are equal.
+	 *
+	 * @param config     flink configuration
+	 * @param tolerance  tolerate values that are off by this factor (0.01 = 1%)
+	 */
+	private void compareNetworkBufJavaVsScript(final Configuration config, final float tolerance)
+			throws IOException {
+
+		final long totalJavaMemorySizeMB = config.getLong(KEY_TASKM_MEM_SIZE, 0L);
+
+		long javaNetworkBufMem = TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySizeMB << 20, config);
+
+		String[] command = {"src/test/bin/calcTMNetBufMem.sh",
+			String.valueOf(totalJavaMemorySizeMB),
+			String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
+			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)),
+			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX))};
+		String scriptOutput = executeScript(command);
+
+		long absoluteTolerance = (long) (javaNetworkBufMem * tolerance);
+		if (absoluteTolerance < 1) {
+			assertEquals(
+				"Different network buffer memory sizes with configuration: " + config.toString(),
+				String.valueOf(javaNetworkBufMem), scriptOutput);
+		} else {
+			Long scriptNetworkBufMem = Long.valueOf(scriptOutput);
+			assertThat(
+				"Different network buffer memory sizes (Java: " + javaNetworkBufMem + ", Script: " + scriptNetworkBufMem +
+					") with configuration: " + config.toString(), scriptNetworkBufMem,
+				allOf(greaterThanOrEqualTo(javaNetworkBufMem - absoluteTolerance),
+					lessThanOrEqualTo(javaNetworkBufMem + absoluteTolerance)));
+		}
+	}
+
+	/**
+	 * Calculates the heap size via
+	 * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} and the shell script
+	 * and verifies that these are equal.
+	 *
+	 * @param config     flink configuration
+	 * @param tolerance  tolerate values that are off by this factor (0.01 = 1%)
+	 */
+	private void compareHeapSizeJavaVsScript(final Configuration config, float tolerance)
+			throws IOException {
+
+		final long totalJavaMemorySizeMB = config.getLong(KEY_TASKM_MEM_SIZE, 0L);
+
+		long javaHeapSizeMB = TaskManagerServices.calculateHeapSizeMB(totalJavaMemorySizeMB, config);
+
+		String[] command = {"src/test/bin/calcTMHeapSizeMB.sh",
+			String.valueOf(totalJavaMemorySizeMB),
+			String.valueOf(config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)),
+			String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
+			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)),
+			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)),
+			String.valueOf(config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE)),
+			String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
+		String scriptOutput = executeScript(command);
+
+		long absoluteTolerance = (long) (javaHeapSizeMB * tolerance);
+		if (absoluteTolerance < 1) {
+			assertEquals("Different heap sizes with configuration: " + config.toString(),
+				String.valueOf(javaHeapSizeMB), scriptOutput);
+		} else {
+			Long scriptHeapSizeMB = Long.valueOf(scriptOutput);
+			assertThat(
+				"Different heap sizes (Java: " + javaHeapSizeMB + ", Script: " + scriptHeapSizeMB +
+					") with configuration: " + config.toString(), scriptHeapSizeMB,
+				allOf(greaterThanOrEqualTo(javaHeapSizeMB - absoluteTolerance),
+					lessThanOrEqualTo(javaHeapSizeMB + absoluteTolerance)));
+		}
+	}
+
+	/**
+	 * Executes the given shell script wrapper and returns its output.
+	 *
+	 * @param command  command to run
+	 *
+	 * @return raw script output
+	 */
+	private String executeScript(final String[] command) throws IOException {
+		ProcessBuilder pb = new ProcessBuilder(command);
+		pb.redirectErrorStream(true);
+		Process process = pb.start();
+		BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+		StringBuilder sb = new StringBuilder();
+		String s;
+		while ((s = reader.readLine()) != null) {
+			sb.append(s);
+		}
+		return sb.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-dist/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/resources/log4j-test.properties b/flink-dist/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..9f24837
--- /dev/null
+++ b/flink-dist/src/test/resources/log4j-test.properties
@@ -0,0 +1,35 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, console
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.dir}/${mvn.forkNumber}.log
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index e9d3cbd..ea508d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -309,7 +309,7 @@ public class BootstrapTools {
 	 * Get an instance of the dynamic properties option.
 	 *
 	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
-	 *  -Dfs.overwrite-files=true  -Dtaskmanager.network.numberOfBuffers=16368
+	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>
      */
 	public static Option newDynamicPropertiesOption() {
 		return new Option(DYNAMIC_PROPERTIES_OPT, true, "Dynamic properties");

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 8ff3c25..9d679cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -141,26 +141,10 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 		final long javaMemorySizeMB = containerMemoryMB - cutoff;
 
-		// (2) split the Java memory between heap and off-heap
+		// (2) split the remaining Java memory between heap and off-heap
+		final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
+		final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : javaMemorySizeMB - heapSizeMB; 
 
-		final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
-
-		final long heapSizeMB;
-		long offHeapSize = -1;
-		if (useOffHeap) {
-			offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
-
-			if (offHeapSize <= 0) {
-				double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
-
-				offHeapSize = (long) (fraction * javaMemorySizeMB);
-			}
-
-			heapSizeMB = javaMemorySizeMB - offHeapSize;
-		} else {
-			heapSizeMB = javaMemorySizeMB;
-		}
-		
 		// (3) obtain the additional environment variables from the configuration
 		final HashMap<String, String> envVars = new HashMap<>();
 		final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
@@ -172,7 +156,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 				envVars.put(envVarKey, config.getString(key, null));
 			}
 		}
-		
+
 		// done
 		return new ContaineredTaskManagerParameters(
 			containerMemoryMB, heapSizeMB, offHeapSize, numSlots, envVars);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index a36bdf4..1eb44c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -194,12 +194,15 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
 				throw new IOException(String.format("Insufficient number of network buffers: " +
 								"required %d, but only %d available. The total number of network " +
-								"buffers is currently set to %d. You can increase this " +
-								"number by setting the configuration key '%s'.",
+								"buffers is currently set to %d of %d bytes each. You can increase this " +
+								"number by setting the configuration keys '%s', '%s', and '%s'.",
 						numRequiredBuffers,
 						totalNumberOfMemorySegments - numTotalRequiredBuffers,
 						totalNumberOfMemorySegments,
-						TaskManagerOptions.NETWORK_NUM_BUFFERS.key()));
+						memorySegmentSize,
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
 			}
 
 			this.numTotalRequiredBuffers += numRequiredBuffers;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 11c6d16..654d528 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -52,8 +52,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * <p>Since the network buffer pool size for outgoing partitions is usually
  * quite small, e.g. via the {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL}
  * and {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE} parameters
- * for bounded channels or from the default value of
- * {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}, most spillable partitions
+ * for bounded channels or from the default values of
+ * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+ * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}, and
+ * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, most spillable partitions
  * will be spilled for real-world data sets.
  */
 class SpillableSubpartition extends ResultSubpartition {

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 823b3f2..b8d6bbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -190,7 +191,7 @@ public class MiniClusterConfiguration {
 	 * 3. Distribute the available free memory equally among all components (JMs, RMs and TMs) and
 	 * calculate the managed memory from the share of memory for a single task manager.
 	 *
-	 * @return
+	 * @return size of managed memory per task manager (in megabytes)
 	 */
 	private long getOrCalculateManagedMemoryPerTaskManager() {
 		if (managedMemoryPerTaskManager == -1) {
@@ -206,9 +207,6 @@ public class MiniClusterConfiguration {
 				// share the available memory among all running components
 
 				float memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
-				long networkBuffersMemory =
-					(long) config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS) *
-						(long) config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
 
 				long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
 
@@ -217,12 +215,13 @@ public class MiniClusterConfiguration {
 				long memoryPerComponent = freeMemory / (numTaskManagers + numResourceManagers + numJobManagers);
 
 				// subtract the network buffer memory
+				long networkBuffersMemory = TaskManagerServices.calculateNetworkBufferMemory(memoryPerComponent, config);
 				long memoryMinusNetworkBuffers = memoryPerComponent - networkBuffersMemory;
 
 				// calculate the managed memory size
 				long managedMemoryBytes = (long) (memoryMinusNetworkBuffers * memoryFraction);
 
-				return managedMemoryBytes >>> 20;
+				return managedMemoryBytes >> 20; // bytes to megabytes
 			} else {
 				return memorySize;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e3c8345..ecf81d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -46,7 +48,6 @@ import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -256,9 +257,11 @@ public class TaskManagerServices {
 			}
 			memorySize = configuredMemory << 20; // megabytes to bytes
 		} else {
+			// similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)
 			float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
 
 			if (memType == MemoryType.HEAP) {
+				// network buffers already allocated -> use memoryFraction of the remaining heap:
 				long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction);
 				if (preAllocateMemory) {
 					LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
@@ -269,7 +272,10 @@ public class TaskManagerServices {
 				}
 				memorySize = relativeMemSize;
 			} else if (memType == MemoryType.OFF_HEAP) {
-				// The maximum heap memory has been adjusted according to the fraction
+				// The maximum heap memory has been adjusted according to the fraction (see
+				// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
+				// maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * memoryFraction = jvmHeapNoNet * (1 - memoryFraction)
+				// directMemorySize = jvmHeapNoNet * memoryFraction
 				long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
 				long directMemorySize = (long) (maxMemory / (1.0 - memoryFraction) * memoryFraction);
 				if (preAllocateMemory) {
@@ -321,9 +327,19 @@ public class TaskManagerServices {
 
 		NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();
 
+		final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration);
+		int segmentSize = networkEnvironmentConfiguration.networkBufferSize();
+
+		// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
+		final long numNetBuffersLong = networkBuf / segmentSize;
+		if (numNetBuffersLong > Integer.MAX_VALUE) {
+			throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf
+				+ ") corresponds to more than MAX_INT pages.");
+		}
+
 		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
-			networkEnvironmentConfiguration.numNetworkBuffers(),
-			networkEnvironmentConfiguration.networkBufferSize(),
+			(int) numNetBuffersLong,
+			segmentSize,
 			networkEnvironmentConfiguration.memoryType());
 
 		ConnectionManager connectionManager;
@@ -376,6 +392,217 @@ public class TaskManagerServices {
 	}
 
 	/**
+	 * Calculates the amount of memory used for network buffers based on the total memory to use and
+	 * the according configuration parameters.
+	 *
+	 * The following configuration parameters are involved:
+	 * <ul>
+	 *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
+	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
+	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
+	 *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
+	 * </ul>.
+	 *
+	 * @param totalJavaMemorySize
+	 * 		overall available memory to use (heap and off-heap, in bytes)
+	 * @param config
+	 * 		configuration object
+	 *
+	 * @return memory to use for network buffers (in bytes)
+	 */
+	@SuppressWarnings("deprecation")
+	public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
+		Preconditions.checkArgument(totalJavaMemorySize > 0);
+
+		int segmentSize = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
+
+		final long networkBufBytes;
+		if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) {
+			// new configuration based on fractions of available memory with selectable min and max
+			float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+			long networkBufMin = config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN);
+			long networkBufMax = config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX);
+
+			TaskManagerServicesConfiguration
+				.checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax);
+
+			networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
+				(long) (networkBufFraction * totalJavaMemorySize)));
+
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(networkBufBytes < totalJavaMemorySize,
+					"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
+					"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+					"Network buffer memory size too large: " + networkBufBytes + " >= " +
+						totalJavaMemorySize + " (total JVM memory size)");
+		} else {
+			// use old (deprecated) network buffers parameter
+			int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+			networkBufBytes = (long) numNetworkBuffers * (long) segmentSize;
+
+			TaskManagerServicesConfiguration.checkNetworkConfigOld(numNetworkBuffers);
+
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(networkBufBytes < totalJavaMemorySize,
+					networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+					"Network buffer memory size too large: " + networkBufBytes + " >= " +
+						totalJavaMemorySize + " (total JVM memory size)");
+		}
+
+		return networkBufBytes;
+	}
+
+	/**
+	 * Calculates the amount of memory used for network buffers inside the current JVM instance
+	 * based on the available heap or the max heap size and the according configuration parameters.
+	 *
+	 * For containers or when started via scripts, if started with a memory limit and set to use
+	 * off-heap memory, the maximum heap size for the JVM is adjusted accordingly and we are able
+	 * to extract the intended values from this.
+	 *
+	 * The following configuration parameters are involved:
+	 * <ul>
+	 *  <li>{@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},</li>
+	 *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
+	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
+	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
+	 *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
+	 * </ul>.
+	 *
+	 * @param tmConfig task manager services configuration object
+	 *
+	 * @return memory to use for network buffers (in bytes)
+	 */
+	public static long calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig) {
+		final NetworkEnvironmentConfiguration networkConfig = tmConfig.getNetworkConfig();
+
+		final float networkBufFraction = networkConfig.networkBufFraction();
+		final long networkBufMin = networkConfig.networkBufMin();
+		final long networkBufMax = networkConfig.networkBufMax();
+
+		if (networkBufMin == networkBufMax) {
+			// fixed network buffer pool size
+			return networkBufMin;
+		}
+
+		// relative network buffer pool size using the fraction
+
+		final MemoryType memType = networkConfig.memoryType();
+
+		final long networkBufBytes;
+		if (memType == MemoryType.HEAP) {
+			// use fraction parts of the available heap memory
+
+			final long relativeMemSize = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
+			networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
+				(long) (networkBufFraction * relativeMemSize)));
+
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(networkBufBytes < relativeMemSize,
+					"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
+					"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+					"Network buffer memory size too large: " + networkBufBytes + " >= " +
+						relativeMemSize + "(free JVM heap size)");
+		} else if (memType == MemoryType.OFF_HEAP) {
+			// The maximum heap memory has been adjusted accordingly as in
+			// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config))
+			// and we need to invert these calculations.
+
+			final long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
+
+			// check if a value has been configured
+			long configuredMemory = tmConfig.getConfiguredMemory() << 20; // megabytes to bytes
+
+			final long jvmHeapNoNet;
+			if (configuredMemory > 0) {
+				// The maximum heap memory has been adjusted according to configuredMemory, i.e.
+				// maxJvmHeap = jvmHeapNoNet - configuredMemory
+
+				jvmHeapNoNet = maxMemory + configuredMemory;
+			} else {
+				// The maximum heap memory has been adjusted according to the fraction, i.e.
+				// maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * managedFraction = jvmHeapNoNet * (1 - managedFraction)
+
+				final float managedFraction = tmConfig.getMemoryFraction();
+				jvmHeapNoNet = (long) (maxMemory / (1.0 - managedFraction));
+			}
+
+			// finally extract the network buffer memory size again from:
+			// jvmHeapNoNet = jvmHeap - networkBufBytes
+			//              = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
+			networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
+				(long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction)));
+
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(networkBufBytes < maxMemory,
+					"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
+					"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+					"Network buffer memory size too large: " + networkBufBytes + " >= " +
+						maxMemory + "(maximum JVM heap size)");
+		} else {
+			throw new RuntimeException("No supported memory type detected.");
+		}
+
+		return networkBufBytes;
+	}
+
+	/**
+	 * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
+	 * based on the total memory to use and the given configuration parameters.
+	 *
+	 * @param totalJavaMemorySizeMB
+	 * 		overall available memory to use (heap and off-heap)
+	 * @param config
+	 * 		configuration object
+	 *
+	 * @return heap memory to use (in megabytes)
+	 */
+	public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
+		Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
+
+		final long totalJavaMemorySize = totalJavaMemorySizeMB << 20; // megabytes to bytes
+
+		// split the available Java memory between heap and off-heap
+
+		final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
+
+		final long heapSizeMB;
+		if (useOffHeap) {
+
+			// subtract the Java memory used for network buffers
+			final long networkBufMB = calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20; // bytes to megabytes
+			final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
+
+			long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+
+			if (offHeapSize <= 0) {
+				// calculate off-heap section via fraction
+				double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+				offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
+			}
+
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
+					TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+					"Managed memory size too large for " + networkBufMB +
+						" MB network buffer memory and a total of " + totalJavaMemorySizeMB +
+						" MB JVM memory");
+
+			heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
+		} else {
+			heapSizeMB = totalJavaMemorySizeMB;
+		}
+
+		return heapSizeMB;
+	}
+
+	/**
 	 * Validates that all the directories denoted by the strings do actually exist, are proper
 	 * directories (not files), and are writable.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 366be34..3fee689 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.net.InetAddress;
@@ -47,6 +49,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * the io manager and the metric registry
  */
 public class TaskManagerServicesConfiguration {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServicesConfiguration.class);
 
 	private final InetAddress taskManagerAddress;
 
@@ -58,6 +61,11 @@ public class TaskManagerServicesConfiguration {
 
 	private final QueryableStateConfiguration queryableStateConfig;
 
+	/**
+	 * Managed memory (in megabytes).
+	 *
+	 * @see TaskManagerOptions#MANAGED_MEMORY_SIZE
+	 */
 	private final long configuredMemory;
 
 	private final boolean preAllocateMemory;
@@ -126,6 +134,13 @@ public class TaskManagerServicesConfiguration {
 		return memoryFraction;
 	}
 
+	/**
+	 * Returns the size of the managed memory (in megabytes), if configured.
+	 *
+	 * @return managed memory or a default value (currently <tt>-1</tt>) if not configured
+	 *
+	 * @see TaskManagerOptions#MANAGED_MEMORY_SIZE
+	 */
 	public long getConfiguredMemory() {
 		return configuredMemory;
 	}
@@ -228,6 +243,7 @@ public class TaskManagerServicesConfiguration {
 	 * @param slots to start the task manager with
 	 * @return Network environment configuration
 	 */
+	@SuppressWarnings("deprecation")
 	private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
 		Configuration configuration,
 		boolean localTaskManagerCommunication,
@@ -245,11 +261,6 @@ public class TaskManagerServicesConfiguration {
 		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
 			"Number of task slots must be at least one.");
 
-		final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
-
-		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
-			TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), "");
-
 		final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
 
 		// check page size of for minimum size
@@ -284,6 +295,27 @@ public class TaskManagerServicesConfiguration {
 			}
 		}
 
+		// network buffer memory fraction
+
+		float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+		long networkBufMin = configuration.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN);
+		long networkBufMax = configuration.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX);
+		checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);
+
+		// fallback: number of network buffers
+		final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+		checkNetworkConfigOld(numNetworkBuffers);
+
+		if (!hasNewNetworkBufConf(configuration)) {
+			// map old config to new one:
+			networkBufMin = networkBufMax = numNetworkBuffers * pageSize;
+		} else {
+			if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+				LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
+					TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+			}
+		}
+
 		final NettyConfig nettyConfig;
 		if (!localTaskManagerCommunication) {
 			final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
@@ -317,7 +349,9 @@ public class TaskManagerServicesConfiguration {
 			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
 
 		return new NetworkEnvironmentConfiguration(
-			numNetworkBuffers,
+			networkBufFraction,
+			networkBufMin,
+			networkBufMax,
 			pageSize,
 			memType,
 			ioMode,
@@ -329,6 +363,69 @@ public class TaskManagerServicesConfiguration {
 	}
 
 	/**
+	 * Validates the (old) network buffer configuration.
+	 *
+	 * @param numNetworkBuffers		number of buffers used in the network stack
+	 *
+	 * @throws IllegalConfigurationException if the condition does not hold
+	 */
+	@SuppressWarnings("deprecation")
+	protected static void checkNetworkConfigOld(final int numNetworkBuffers) {
+		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+			TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+			"Must have at least one network buffer");
+	}
+
+	/**
+	 * Validates the (new) network buffer configuration.
+	 *
+	 * @param pageSize 				size of memory buffers
+	 * @param networkBufFraction	fraction of JVM memory to use for network buffers
+	 * @param networkBufMin 		minimum memory size for network buffers (in bytes)
+	 * @param networkBufMax 		maximum memory size for network buffers (in bytes)
+	 *
+	 * @throws IllegalConfigurationException if the condition does not hold
+	 */
+	protected static void checkNetworkBufferConfig(
+			final int pageSize, final float networkBufFraction, final long networkBufMin,
+			final long networkBufMax) throws IllegalConfigurationException {
+
+		checkConfigParameter(networkBufFraction > 0.0f && networkBufFraction < 1.0f, networkBufFraction,
+			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+			"Network buffer memory fraction of the free memory must be between 0.0 and 1.0");
+
+		checkConfigParameter(networkBufMin >= pageSize, networkBufMin,
+			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+			"Minimum memory for network buffers must allow at least one network " +
+				"buffer with respect to the memory segment size");
+
+		checkConfigParameter(networkBufMax >= pageSize, networkBufMax,
+			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
+			"Maximum memory for network buffers must allow at least one network " +
+				"buffer with respect to the memory segment size");
+
+		checkConfigParameter(networkBufMax >= networkBufMin, networkBufMax,
+			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
+			"Maximum memory for network buffers must not be smaller than minimum memory (" +
+				TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ": " + networkBufMin + ")");
+	}
+
+	/**
+	 * Returns whether the new network buffer memory configuration is present in the configuration
+	 * object, i.e. at least one new parameter is given or the old one is not present.
+	 *
+	 * @param config configuration object
+	 * @return <tt>true</tt> if the new configuration method is used, <tt>false</tt> otherwise
+	 */
+	@SuppressWarnings("deprecation")
+	public static boolean hasNewNetworkBufConf(final Configuration config) {
+		return config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
+			config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
+			config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX) ||
+			!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+	}
+
+	/**
 	 * Creates the {@link QueryableStateConfiguration} from the given Configuration.
 	 */
 	private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) {
@@ -353,8 +450,11 @@ public class TaskManagerServicesConfiguration {
 	 * @param parameter         The parameter value. Will be shown in the exception message.
 	 * @param name              The name of the config parameter. Will be shown in the exception message.
 	 * @param errorMessage  The optional custom error message to append to the exception message.
+	 *
+	 * @throws IllegalConfigurationException if the condition does not hold
 	 */
-	private static void checkConfigParameter(boolean condition, Object parameter, String name, String errorMessage) {
+	static void checkConfigParameter(boolean condition, Object parameter, String name, String errorMessage)
+			throws IllegalConfigurationException {
 		if (!condition) {
 			throw new IllegalConfigurationException("Invalid configuration value for " + 
 					name + " : " + parameter + " - " + errorMessage);


Mime
View raw message