flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/7] flink git commit: [FLINK-6270] port some memory and network task manager options to ConfigOption
Date Thu, 06 Apr 2017 17:36:09 GMT
Repository: flink
Updated Branches:
  refs/heads/master ae17718fb -> e2a4f47ed


[FLINK-6270] port some memory and network task manager options to ConfigOption

This closes #3683.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2a4f47e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2a4f47e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2a4f47e

Branch: refs/heads/master
Commit: e2a4f47ed8c95c7045f79bf9fe59cab39518710b
Parents: d1d761e
Author: Nico Kruber <nico@data-artisans.com>
Authored: Wed Apr 5 11:45:57 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Thu Apr 6 19:35:50 2017 +0200

----------------------------------------------------------------------
 .../kafka/KafkaShortRetentionTestBase.java      |  3 +-
 .../connectors/kafka/KafkaTestBase.java         |  3 +-
 .../flink/storm/api/FlinkLocalCluster.java      |  3 +-
 .../flink/configuration/ConfigConstants.java    | 44 ++++++++++++++++----
 .../flink/configuration/TaskManagerOptions.java | 43 +++++++++++++++++++
 .../ContaineredTaskManagerParameters.java       | 12 ++----
 .../io/network/buffer/NetworkBufferPool.java    |  4 +-
 .../partition/SpillableSubpartition.java        | 11 +++--
 .../minicluster/MiniClusterConfiguration.java   | 32 +++++---------
 .../TaskManagerServicesConfiguration.java       | 34 +++++++--------
 .../minicluster/LocalFlinkMiniCluster.scala     | 20 ++++-----
 .../PartialConsumePipelinedResultTest.java      |  3 +-
 .../runtime/jobmanager/JobManagerTest.java      |  4 +-
 .../TaskCancelAsyncProducerConsumerITCase.java  |  5 ++-
 .../TaskManagerProcessReapingTestBase.java      |  5 ++-
 .../taskmanager/TaskManagerStartupTest.java     | 11 ++---
 .../runtime/taskmanager/TaskManagerTest.java    |  2 +-
 .../runtime/testutils/TaskManagerProcess.java   | 10 ++---
 .../runtime/testingUtils/TestingUtils.scala     |  4 +-
 .../Flip6LocalStreamEnvironment.java            |  4 +-
 .../api/environment/LocalStreamEnvironment.java |  3 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  3 +-
 .../accumulators/AccumulatorErrorITCase.java    |  3 +-
 .../test/cancelling/CancelingTestBase.java      |  5 ++-
 ...tractEventTimeWindowCheckpointingITCase.java |  3 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |  3 +-
 .../test/checkpointing/SavepointITCase.java     |  3 +-
 .../StreamFaultToleranceTestBase.java           |  3 +-
 .../WindowCheckpointingITCase.java              |  3 +-
 .../JobSubmissionFailsITCase.java               |  3 +-
 .../test/manual/NotSoMiniClusterIterations.java |  7 ++--
 .../manual/StreamingScalabilityAndLatency.java  |  5 ++-
 .../test/misc/CustomSerializationITCase.java    |  3 +-
 .../test/misc/MiscellaneousIssuesITCase.java    |  3 +-
 ...SuccessAfterNetworkBuffersFailureITCase.java |  5 ++-
 .../query/AbstractQueryableStateITCase.java     |  3 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |  5 ++-
 ...agerHAProcessFailureBatchRecoveryITCase.java |  5 ++-
 .../TaskManagerFailureRecoveryITCase.java       |  3 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java |  3 +-
 .../test/streaming/runtime/TimestampITCase.java |  3 +-
 .../flink/test/web/WebFrontendITCase.java       |  3 +-
 42 files changed, 209 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 1e85370..954dc7d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -98,7 +99,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		// start also a re-usable Flink mini cluster
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
 		flink = new LocalFlinkMiniCluster(flinkConfig, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 5cec4f0..0c6bfa9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -113,7 +114,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index 367b313..d69d345 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.storm.api;
 
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.ClusterSummary;
 import org.apache.storm.generated.KillOptions;
@@ -92,7 +93,7 @@ public class FlinkLocalCluster {
 			Configuration configuration = new Configuration();
 			configuration.addAll(jobGraph.getJobConfiguration());
 
-			configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+			configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
 			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 
 			this.flink = new LocalFlinkMiniCluster(configuration, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index ce44ab8..de06b59 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -210,34 +210,52 @@ public final class ConfigConstants {
 	 * The config parameter defining the amount of memory to be allocated by the task manager's
 	 * memory manager (in megabytes). If not set, a relative fraction will be allocated, as defined
 	 * by {@link #TASK_MANAGER_MEMORY_FRACTION_KEY}.
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_SIZE} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_SIZE_KEY = "taskmanager.memory.size";
 	
 	/**
 	 * The config parameter defining the fraction of free memory allocated by the memory manager.
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";
 
 	/**
 	 * The config parameter defining the memory allocation method (JVM heap or off-heap).
-	*/
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#MEMORY_OFF_HEAP} instead
+	 */
+	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap";
 
 	/**
 	 * The config parameter for specifying whether TaskManager managed memory should be preallocated
 	 * when the TaskManager is starting. (default is false)
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY = "taskmanager.memory.preallocate";
 
 	/**
 	 * The config parameter defining the number of buffers used in the network stack. This defines the
 	 * number of possible tasks and shuffles.
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
 
 	/**
 	 * Config parameter defining the size of memory buffers used by the network stack and the memory manager.
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#MEMORY_SEGMENT_SIZE} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY = "taskmanager.memory.segment-size";
 	
 	/**
@@ -1126,20 +1144,29 @@ public final class ConfigConstants {
 	 * The default directory for temporary files of the task manager.
 	 */
 	public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir");
-	
+
 	/**
-	 * The default fraction of the free memory allocated by the task manager's memory manager.
+	 * Config key has been deprecated. Therefore, no default value required.
+	 *
+	 * @deprecated {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} provides the default value now
 	 */
+	@Deprecated
 	public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;
-	
+
 	/**
-	 * Default number of buffers used in the network stack.
+	 * Config key has been deprecated. Therefore, no default value required.
+	 *
+	 * @deprecated {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} provides the default value now
 	 */
+	@Deprecated
 	public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
 
 	/**
-	 * Default size of memory segments in the network stack and the memory manager.
+	 * Config key has been deprecated. Therefore, no default value required.
+	 *
+	 * @deprecated {@link TaskManagerOptions#MEMORY_SEGMENT_SIZE} provides the default value now
 	 */
+	@Deprecated
 	public static final int DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE = 32768;
 
 	/**
@@ -1179,8 +1206,11 @@ public final class ConfigConstants {
 	public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";
 
 	/**
-	 * The default setting for TaskManager memory eager allocation of managed memory
+	 * Config key has been deprecated. Therefore, no default value required.
+	 *
+	 * @deprecated {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} provides the default value now
 	 */
+	@Deprecated
 	public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = false;
 
 	/** @deprecated Please use {@link TaskManagerOptions#TASK_CANCELLATION_INTERVAL}. */

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index b891e35..adfc8e9 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -39,10 +39,53 @@ public class TaskManagerOptions {
 			key("taskmanager.jvm-exit-on-oom")
 			.defaultValue(false);
 
+	/** Size of memory buffers used by the network stack and the memory manager (in bytes). */
+	public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
+			key("taskmanager.memory.segment-size")
+			.defaultValue(32768);
+
+	/**
+	 * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
+	 * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
+	 */
+	public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
+			key("taskmanager.memory.size")
+			.defaultValue(-1L);
+
+	/**
+	 * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
+	 * not set.
+	 */
+	public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
+			key("taskmanager.memory.fraction")
+			.defaultValue(0.7f);
+
+	/**
+	 * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager
+	 * as well as the network buffers.
+	 **/
+	public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
+			key("taskmanager.memory.off-heap")
+			.defaultValue(false);
+
+	/** Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */
+	public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
+			key("taskmanager.memory.preallocate")
+			.defaultValue(false);
+
 	// ------------------------------------------------------------------------
 	//  Network Options
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Number of buffers used in the network stack. This defines the number of possible tasks and
+	 * shuffles.
+	 */
+	public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
+			key("taskmanager.network.numberOfBuffers")
+			.defaultValue(2048);
+
+
 	/** Minimum backoff for partition requests of input channels. */
 	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
 			key("taskmanager.net.request-backoff.initial")

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 3dc4394..0fc0870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -142,19 +143,14 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 		// (2) split the Java memory between heap and off-heap
 
-		final boolean useOffHeap = config.getBoolean(
-			ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false);
+		final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
 
 		final long heapSizeMB;
 		if (useOffHeap) {
-			long offHeapSize = config.getLong(
-				ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+			long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
 
 			if (offHeapSize <= 0) {
-				double fraction = config.getFloat(
-					ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-					ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
-
+				double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
 
 				offHeapSize = (long) (fraction * javaMemorySizeMB);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 5f2da03..a36bdf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
@@ -199,7 +199,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 						numRequiredBuffers,
 						totalNumberOfMemorySegments - numTotalRequiredBuffers,
 						totalNumberOfMemorySegments,
-						ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY));
+						TaskManagerOptions.NETWORK_NUM_BUFFERS.key()));
 			}
 
 			this.numTotalRequiredBuffers += numRequiredBuffers;

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index ad04e97..ae97c42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -50,9 +50,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * this state, different reader variants are returned (see
  * {@link SpillableSubpartitionView} and {@link SpilledSubpartitionView}).
  *
- * <p>Since the network buffer pool size is usually quite small (default is
- * {@link ConfigConstants#DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS}), most
- * spillable partitions will be spilled for real-world data sets.
+ * <p>Since the network buffer pool size for outgoing partitions is usually
+ * quite small, e.g. via the {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL}
+ * and {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE} parameters
+ * for bounded channels or from the default value of
+ * {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}, most spillable partitions
+ * will be spilled for real-world data sets.
  */
 class SpillableSubpartition extends ResultSubpartition {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 3a03ca3..823b3f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import scala.concurrent.duration.FiniteDuration;
@@ -165,7 +164,7 @@ public class MiniClusterConfiguration {
 		Configuration newConfiguration = new Configuration(config);
 		// set the memory
 		long memory = getOrCalculateManagedMemoryPerTaskManager();
-		newConfiguration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memory);
+		newConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memory);
 
 		return newConfiguration;
 	}
@@ -196,29 +195,20 @@ public class MiniClusterConfiguration {
 	private long getOrCalculateManagedMemoryPerTaskManager() {
 		if (managedMemoryPerTaskManager == -1) {
 			// no memory set in the mini cluster configuration
-			final ConfigOption<Integer> memorySizeOption = ConfigOptions
-				.key(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)
-				.defaultValue(-1);
 
-			int memorySize = config.getInteger(memorySizeOption);
+			long memorySize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
 
-			if (memorySize == -1) {
+			// we could probably use config.contains() but the previous implementation compared to
+			// the default (-1) thus allowing the user to explicitly specify this as well
+			// -> don't change this behaviour now
+			if (memorySize == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
 				// no memory set in the flink configuration
 				// share the available memory among all running components
-				final ConfigOption<Integer> bufferSizeOption = ConfigOptions
-					.key(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY)
-					.defaultValue(ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
 
-				final ConfigOption<Long> bufferMemoryOption = ConfigOptions
-					.key(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
-					.defaultValue((long) ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
-
-				final ConfigOption<Float> memoryFractionOption = ConfigOptions
-					.key(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY)
-					.defaultValue(ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
-
-				float memoryFraction = config.getFloat(memoryFractionOption);
-				long networkBuffersMemory = config.getLong(bufferMemoryOption) * config.getInteger(bufferSizeOption);
+				float memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+				long networkBuffersMemory =
+					(long) config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS) *
+						(long) config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
 
 				long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 8ad318a..366be34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -182,22 +182,20 @@ public class TaskManagerServicesConfiguration {
 				parseQueryableStateConfiguration(configuration);
 
 		// extract memory settings
-		long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
-		checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
-			ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+		long configuredMemory = configuration.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+		checkConfigParameter(
+			configuredMemory == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue() ||
+				configuredMemory > 0, configuredMemory,
+			TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
 			"MemoryManager needs at least one MB of memory. " +
 				"If you leave this config parameter empty, the system automatically " +
 				"pick a fraction of the available memory.");
 
-		boolean preAllocateMemory = configuration.getBoolean(
-			ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+		boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);
 
-		float memoryFraction = configuration.getFloat(
-			ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-			ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+		float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
 		checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,
-			ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+			TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
 			"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
 
 		final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
@@ -247,30 +245,26 @@ public class TaskManagerServicesConfiguration {
 		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
 			"Number of task slots must be at least one.");
 
-		final int numNetworkBuffers = configuration.getInteger(
-			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+		final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
 
 		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
-			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
+			TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), "");
 
-		final int pageSize = configuration.getInteger(
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+		final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
 
 		// check page size of for minimum size
 		checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+			TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
 			"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
 
 		// check page size for power of two
 		checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+			TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
 			"Memory segment size must be a power of 2.");
 
 		// check whether we use heap or off-heap memory
 		final MemoryType memType;
-		if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
+		if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
 			memType = MemoryType.OFF_HEAP;
 		} else {
 			memType = MemoryType.HEAP;

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 21e0d28..3d43da5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -350,23 +350,19 @@ class LocalFlinkMiniCluster(
 
   def setMemory(config: Configuration): Unit = {
     // set this only if no memory was pre-configured
-    if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
+    if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) ==
+        TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
 
-      val bufferSize: Int = config.getInteger(
-        ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-        ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
+      val bufferSize: Int = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE)
 
-      val bufferMem: Long = config.getLong(
-        ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-        ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
+      val bufferMem: Long = config.getInteger(
+        TaskManagerOptions.NETWORK_NUM_BUFFERS).toLong * bufferSize.toLong
 
       val numTaskManager = config.getInteger(
         ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
         ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
 
-      val memoryFraction = config.getFloat(
-        ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+      val memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION)
 
       // full memory size
       var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
@@ -379,7 +375,7 @@ class LocalFlinkMiniCluster(
       memorySize -= bufferMem
       memorySize = (memorySize * memoryFraction).toLong
       memorySize >>>= 20 // bytes to megabytes
-      config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
+      config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memorySize)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 4a826b7..f19ca4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -51,7 +52,7 @@ public class PartialConsumePipelinedResultTest {
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-		config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, NUMBER_OF_NETWORK_BUFFERS);
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
 
 		flink = new TestingCluster(config, true);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 3944752c..4dec84b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -26,6 +26,7 @@ import com.typesafe.config.Config;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
@@ -105,7 +106,6 @@ import scala.reflect.ClassTag$;
 import java.io.File;
 import java.net.InetAddress;
 import java.util.Collections;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
@@ -588,7 +588,7 @@ public class JobManagerTest extends TestLogger {
 				AkkaUtils.getAkkaURL(system, jobManager.actor()));
 
 		Configuration tmConfig = new Configuration();
-		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+		tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 
 		ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 5a14b40..4ea6511 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -78,8 +79,8 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 8);
+			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
 
 			flink = new TestingCluster(config, true);
 			flink.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 2528e24..2fafe5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
@@ -228,8 +229,8 @@ public abstract class TaskManagerProcessReapingTestBase {
 				Configuration cfg = new Configuration();
 				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
+				cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256);
 
 				TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index b2a905d..4df8db3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.StartupUtils;
@@ -113,7 +114,7 @@ public class TaskManagerStartupTest {
 		try {
 			Configuration cfg = new Configuration();
 			cfg.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, nonWritable.getAbsolutePath());
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 			cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
 
@@ -154,7 +155,7 @@ public class TaskManagerStartupTest {
 			cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true");
 
 			// something invalid
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42);
+			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -42L);
 			try {
 				TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg);
 				fail("Should fail synchronously with an exception");
@@ -165,8 +166,8 @@ public class TaskManagerStartupTest {
 
 			// something ridiculously high
 			final long memSize = (((long) Integer.MAX_VALUE - 1) *
-					ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) >> 20;
-			cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize);
+					TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()) >> 20;
+			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memSize);
 			try {
 				TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg);
 				fail("Should fail synchronously with an exception");
@@ -197,7 +198,7 @@ public class TaskManagerStartupTest {
 			final Configuration cfg = new Configuration();
 			cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
 			cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, blocker.getLocalPort());
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 1L);
 
 			TaskManager.startTaskManagerComponentsAndActor(
 				cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index a754cff..4530ade 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -1498,7 +1498,7 @@ public class TaskManagerTest extends TestLogger {
 			// set the memory segment to the smallest size possible, because we have to fill one
 			// memory buffer to trigger the schedule or update consumers message to the downstream
 			// operators
-			configuration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
+			configuration.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
 
 			final JobID jid = new JobID();
 			final JobVertexID vid = new JobVertexID();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
index 58bc50e..36eb47f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
@@ -102,12 +102,12 @@ public class TaskManagerProcess extends TestJvmProcess {
 			try {
 				Configuration config = ParameterTool.fromArgs(args).getConfiguration();
 
-				if (!config.containsKey(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)) {
-					config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+				if (!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
+					config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 				}
 
-				if (!config.containsKey(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)) {
-					config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+				if (!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+					config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
 				}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index d6221f5..49baff1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobExecutionResult
-import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -304,7 +304,7 @@ object TestingUtils {
 
     val resultingConfiguration = new Configuration()
 
-    resultingConfiguration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
+    resultingConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10L)
 
     resultingConfiguration.addAll(configuration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 4a5f20d..244660a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -98,7 +98,7 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());
-		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+		configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
 
 		// add (and override) the settings with what the user defined
 		configuration.addAll(this.conf);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index cb60552..117f6d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
@@ -113,7 +114,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());
 
-		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+		configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 		
 		// add (and override) the settings with what the user defined

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index cc7c0e2..f96ab3d 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -141,7 +142,7 @@ public class TestBaseUtils extends TestLogger {
 		Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
 		Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
-		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, TASK_MANAGER_MEMORY_SIZE);
 		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
 
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index cc70fee..0303202 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
@@ -52,7 +53,7 @@ public class AccumulatorErrorITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 8d8ee64..06233d6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -22,6 +22,7 @@ package org.apache.flink.test.cancelling;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -88,8 +89,8 @@ public abstract class CancelingTestBase extends TestLogger {
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048);
+		config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
 
 		this.executor = new LocalFlinkMiniCluster(config, false);
 		this.executor.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 5fc2083..462d3a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -99,7 +100,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
 
 		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 09c1437..3345b9c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -70,7 +71,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
 		config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s");
 		config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s");
 		cluster = new LocalFlinkMiniCluster(config, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index a5c994a..3718a94 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -783,7 +784,7 @@ public class SavepointITCase extends TestLogger {
 
 		Configuration config = new Configuration();
 		config.addAll(jobGraph.getJobConfiguration());
-		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2 * jobGraph.getMaximumParallelism());
 		final File checkpointDir = new File(tmpDir, "checkpoints");
 		final File savepointDir = new File(tmpDir, "savepoints");

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 10f78d4..2839bc1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.TestUtils;
@@ -52,7 +53,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 			
 			cluster = new LocalFlinkMiniCluster(config, false);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index a45349d..56d8c66 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -81,7 +82,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
 
 		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 256b1ae..93ab29d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.failingPrograms;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -54,7 +55,7 @@ public class JobSubmissionFailsITCase {
 	public static void setup() {
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
index bd9955c..ee3b4b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -50,10 +51,10 @@ public class NotSoMiniClusterIterations {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, PARALLELISM);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 8);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 8L);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 1000);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 8 * 1024);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1000);
+			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 8 * 1024);
 			
 			config.setInteger("taskmanager.net.server.numThreads", 1);
 			config.setInteger("taskmanager.net.client.numThreads", 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index ec617b1..90dbe80 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -46,9 +47,9 @@ public class StreamingScalabilityAndLatency {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, TASK_MANAGERS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 80);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 20000);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
 
 			config.setInteger("taskmanager.net.server.numThreads", 1);
 			config.setInteger("taskmanager.net.client.numThreads", 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 51f3534..fda731e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -50,7 +51,7 @@ public class CustomSerializationITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 30);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 30L);
 			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 06b93ea..d9cf574 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
@@ -60,7 +61,7 @@ public class MiscellaneousIssuesITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index a43bab6..5761bf2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.examples.java.clustering.KMeans;
 import org.apache.flink.examples.java.clustering.util.KMeansData;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
@@ -48,9 +49,9 @@ public class SuccessAfterNetworkBuffersFailureITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 80);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 840);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 840);
 			
 			cluster = new LocalFlinkMiniCluster(config, false);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index 1912c0f..3c8eb48 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -109,7 +110,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	public static void setup() {
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
 			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 515570d..27d1aa1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -26,6 +26,7 @@ import akka.util.Timeout;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -389,8 +390,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				Configuration cfg = new Configuration();
 				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+				cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 				cfg.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index a51f88b..b6a1bd4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -258,8 +259,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			jmProcess[0].startProcess();
 
 			// Task manager configuration
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 
 			// Start the task manager process

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 5d29905..d256ccf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.Test;
@@ -72,7 +73,7 @@ public class TaskManagerFailureRecoveryITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 			
 			config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
 			config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s");

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 0b008eb..4c77ef0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.testdata.WordCountData;
@@ -73,7 +74,7 @@ public class IPv6HostnamesITCase extends TestLogger {
 			conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
 			conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-			conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+			conf.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 			
 			flink = new LocalFlinkMiniCluster(conf, false);
 			flink.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 13add4b..229d3fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -89,7 +90,7 @@ public class TimestampITCase extends TestLogger {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 
 			cluster = new LocalFlinkMiniCluster(config, false);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 3b0c364..003eb0c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -71,7 +72,7 @@ public class WebFrontendITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
 		File logDir = File.createTempFile("TestBaseUtils-logdir", null);


Mime
View raw message