flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [05/12] flink git commit: [FLINK-4545] [network] replace the network buffers parameter
Date Sat, 06 May 2017 17:47:49 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 3db91b7..8677307 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -363,11 +363,6 @@ class LocalFlinkMiniCluster(
     if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) ==
         TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
 
-      val bufferSize: Int = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE)
-
-      val bufferMem: Long = config.getInteger(
-        TaskManagerOptions.NETWORK_NUM_BUFFERS).toLong * bufferSize.toLong
-
       val numTaskManager = config.getInteger(
         ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
         ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
@@ -381,10 +376,10 @@ class LocalFlinkMiniCluster(
       // each TaskManagers and each JobManager
       memorySize /= numTaskManager + 1 // the +1 is the job manager
 
-      // for each TaskManager, subtract the memory needed for memory buffers
-      memorySize -= bufferMem
+      // for each TaskManager, subtract the memory needed for network memory buffers
+      memorySize -= TaskManagerServices.calculateNetworkBufferMemory(memorySize, config)
       memorySize = (memorySize * memoryFraction).toLong
-      memorySize >>>= 20 // bytes to megabytes
+      memorySize >>= 20 // bytes to megabytes
       config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memorySize)
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 4ecfe59..d74bb3b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -23,7 +23,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
 case class NetworkEnvironmentConfiguration(
-    numNetworkBuffers: Int,
+    networkBufFraction: Float,
+    networkBufMin: Long,
+    networkBufMax: Long,
     networkBufferSize: Int,
     memoryType: MemoryType,
     ioMode: IOMode,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
new file mode 100644
index 0000000..ddd6462
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit test for {@link TaskManagerServicesConfiguration}.
+ */
+public class TaskManagerServicesConfigurationTest {
+	/**
+	 * Verifies that {@link TaskManagerServicesConfiguration#hasNewNetworkBufConf(Configuration)}
+	 * returns the correct result for old configurations via
+	 * {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+	 */
+	@SuppressWarnings("deprecation")
+	@Test
+	public void hasNewNetworkBufConfOld() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+		assertFalse(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+	}
+
+	/**
+	 * Verifies that {@link TaskManagerServicesConfiguration#hasNewNetworkBufConf(Configuration)}
+	 * returns the correct result for new configurations via
+	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and {@link
+	 * TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+	 */
+	@Test
+	public void hasNewNetworkBufConfNew() throws Exception {
+		Configuration config = new Configuration();
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		// fully defined:
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 2048);
+
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		// partly defined:
+		config = new Configuration();
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		config = new Configuration();
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		config = new Configuration();
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+	}
+
+	/**
+	 * Verifies that {@link TaskManagerServicesConfiguration#hasNewNetworkBufConf(Configuration)}
+	 * returns the correct result for mixed old/new configurations.
+	 */
+	@SuppressWarnings("deprecation")
+	@Test
+	public void hasNewNetworkBufConfMixed() throws Exception {
+		Configuration config = new Configuration();
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+		assertFalse(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
+
+		// old + 1 new parameter = new:
+		Configuration config1 = config.clone();
+		config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
+
+		config1 = config.clone();
+		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
+
+		config1 = config.clone();
+		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1024);
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
new file mode 100644
index 0000000..bf90634
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)}
using old
+	 * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+	 */
+	@SuppressWarnings("deprecation")
+	@Test
+	public void calculateNetworkBufOld() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+		// note: actual network buffer memory size is independent of the totalJavaMemorySize
+		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+			TaskManagerServices.calculateNetworkBufferMemory(10L << 20, config));
+		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+			TaskManagerServices.calculateNetworkBufferMemory(64L << 20, config));
+
+		// test integer overflow in the memory size
+		int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue());
// 2^33
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
+		assertEquals(2L << 32, TaskManagerServices.calculateNetworkBufferMemory(2L <<
33, config));
+	}
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)}
using new
+	 * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+	 */
+	@Test
+	public void calculateNetworkBufNew() throws Exception {
+		Configuration config = new Configuration();
+
+		// (1) defaults
+		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((64L << 20 + 1), config));
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((10L << 30), config));
+
+		calculateNetworkBufNew(config);
+	}
+
+	/**
+	 * Helper to test {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)}
with the
+	 * new configuration parameters.
+	 *
+	 * @param config configuration object
+	 */
+	private static void calculateNetworkBufNew(final Configuration config) {
+		// (2) fixed size memory
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
+
+		// note: actual network buffer memory size is independent of the totalJavaMemorySize
+		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBufferMemory(10L <<
20, config));
+		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBufferMemory(64L <<
20, config));
+		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBufferMemory(1L <<
30, config));
+
+		// (3) random fraction, min, and max values
+		Random ran = new Random();
+		for (int i = 0; i < 1_000; ++i){
+			float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
+			config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
+
+			long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
+			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
+
+			long max = Math.max(min, ran.nextLong());
+			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
+
+			long javaMem = Math.max(max + 1, ran.nextLong());
+
+			final long networkBufMem = TaskManagerServices.calculateNetworkBufferMemory(javaMem, config);
+			assertTrue("Lower bound not met with configuration: " + config.toString(),
+				networkBufMem >= min);
+			assertTrue("Upper bound not met with configuration: " + config.toString(),
+				networkBufMem <= max);
+			if (networkBufMem > min && networkBufMem < max) {
+				assertEquals(
+					"Wrong network buffer memory size with configuration: " + config.toString(),
+					(long) (javaMem * frac), networkBufMem);
+			}
+		}
+	}
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)}
using mixed
+	 * old/new configurations.
+	 */
+	@SuppressWarnings("deprecation")
+	@Test
+	public void calculateNetworkBufMixed() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+
+		// old + 1 new parameter = new:
+		Configuration config1 = config.clone();
+		config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		assertEquals(enforceBounds((long) (0.1f * (10L << 20)), defaultMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((64L << 20 + 1), config1));
+		assertEquals(enforceBounds((long) (0.1f * (10L << 30)), defaultMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((10L << 30), config1));
+
+		config1 = config.clone();
+		long newMin = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); // smallest value
possible
+		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, newMin);
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), newMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((10L << 20), config1));
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), newMin, defaultMax),
+			TaskManagerServices.calculateNetworkBufferMemory((10L << 30), config1));
+
+		config1 = config.clone();
+		long newMax = Math.max(64L << 20 + 1, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue());
+		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, newMax);
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, newMax),
+			TaskManagerServices.calculateNetworkBufferMemory((64L << 20 + 1), config1));
+		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, newMax),
+			TaskManagerServices.calculateNetworkBufferMemory((10L << 30), config1));
+		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
+
+		// old + any new parameter = new:
+		calculateNetworkBufNew(config);
+	}
+
+	/**
+	 * Returns the value or the lower/upper bound in case the value is less/greater than the
lower/upper bound, respectively.
+	 *
+	 * @param value value to inspec
+	 * @param lower lower bound
+	 * @param upper upper bound
+	 *
+	 * @return <tt>min(upper, max(lower, value))</tt>
+	 */
+	private static long enforceBounds(final long value, final long lower, final long upper)
{
+		return Math.min(upper, Math.max(lower, value));
+	}
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(TaskManagerServicesConfiguration)}
+	 * using the same (manual) test cases as in {@link #calculateHeapSizeMB()}.
+	 */
+	@Test
+	public void calculateNetworkBufFromHeapSize() throws Exception {
+		PowerMockito.mockStatic(EnvironmentInformation.class);
+		// some defaults:
+		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L <<
20); // 1000MB
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB
+
+		TaskManagerServicesConfiguration tmConfig;
+
+		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
+			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+			0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
+		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L <<
20); // 1000MB
+		assertEquals(100L << 20, TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+
+		tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB
+		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+
+		tmConfig = getTmConfig(-1, 0.1f,
+			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB
+		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+	}
+
+	/**
+	 * Returns a task manager services configuration for the tests
+	 *
+	 * @param managedMemory         see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
+	 * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
+	 * @param networkBufFraction	see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
+	 * @param networkBufMin			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}
+	 * @param networkBufMax			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
+	 * @param memType				on-heap or off-heap
+	 *
+	 * @return configuration object
+	 */
+	private static TaskManagerServicesConfiguration getTmConfig(
+		final long managedMemory, final float managedMemoryFraction, float networkBufFraction,
+		long networkBufMin, long networkBufMax,
+		final MemoryType memType) {
+
+		final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
+			networkBufFraction,
+			networkBufMin,
+			networkBufMax,
+			TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
+			memType,
+			null,
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
+			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
+			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(),
+			null);
+
+		return new TaskManagerServicesConfiguration(
+			mock(InetAddress.class),
+			new String[] {},
+			networkConfig,
+			QueryableStateConfiguration.disabled(),
+			1,
+			managedMemory,
+			false,
+			managedMemoryFraction,
+			mock(MetricRegistryConfiguration.class),
+			0);
+	}
+
+	/**
+	 * Test for {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} with some
+	 * manually calculated scenarios.
+	 */
+	@Test
+	public void calculateHeapSizeMB() throws Exception {
+		Configuration config = new Configuration();
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 64L << 20); // 64MB
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB
+
+		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
+		assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config));
+
+		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB
+		assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
+
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // use fraction of given memory
+		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
+		assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 973fddf..7837b27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -121,18 +121,21 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger
{
 				config,
 				false); // exit-jvm-on-fatal-error
 
+			final int networkBufNum = 32;
+			// note: the network buffer memory configured here is not actually used below but set
+			// accordingly to be consistent
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
-					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC,
+					0.1f, networkBufNum * BUFFER_SIZE, networkBufNum * BUFFER_SIZE, BUFFER_SIZE, MemoryType.HEAP,
IOManager.IOMode.SYNC,
 					0, 0, 2, 8, null);
 
 			ResourceID taskManagerId = ResourceID.generate();
 			
 			final TaskManagerLocation connectionInfo = new TaskManagerLocation(taskManagerId, InetAddress.getLocalHost(),
10000);
 
-			final MemoryManager memManager = new MemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP,
false);
+			final MemoryManager memManager = new MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE,
MemoryType.HEAP, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
 			final NetworkEnvironment network = new NetworkEnvironment(
-				new NetworkBufferPool(netConf.numNetworkBuffers(), netConf.networkBufferSize(), netConf.memoryType()),
+				new NetworkBufferPool(32, netConf.networkBufferSize(), netConf.memoryType()),
 				new LocalConnectionManager(),
 				new ResultPartitionManager(),
 				new TaskEventDispatcher(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index dbef01f..fea2b79 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -103,6 +103,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
 		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
+		// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test
case
+		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
 
 		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
index 557fa38..6ce8d17 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -66,7 +66,7 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2>
{
 
 	/**
 	 * Dynamic properties allow the user to specify additional configuration values with -D,
such as
-	 *  -D fs.overwrite-files=true  -D taskmanager.network.numberOfBuffers=16368
+	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>
 	 */
 	private final Option DYNAMIC_PROPERTIES;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bb49e53/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 9277d21..3d82132 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -119,7 +119,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 	/**
 	 * Dynamic properties allow the user to specify additional configuration values with -D,
such as
-	 *  -D fs.overwrite-files=true  -D taskmanager.network.numberOfBuffers=16368
+	 * <tt> -Dfs.overwrite-files=true  -Dtaskmanager.network.memory.min=536346624</tt>
 	 */
 	private final Option DYNAMIC_PROPERTIES;
 


Mime
View raw message