flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/2] flink git commit: [FLINK-3073] Replace Streaming Mode by Memory Allocation Mode
Date Wed, 09 Dec 2015 14:33:38 GMT
[FLINK-3073] Replace Streaming Mode by Memory Allocation Mode

Before, streaming mode (either batch or streaming) would specify how
memory is allocated on task managers.

This introduces a new configuration value taskmanager.memory.allocation
that can take values "lazy" or "eager". This controls how memory is
allocated.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f12356e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f12356e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f12356e

Branch: refs/heads/master
Commit: 4f12356eb64af37909d18a5c2abb4a31a4c19472
Parents: 718a17b
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Dec 1 16:13:55 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Dec 9 15:33:10 2015 +0100

----------------------------------------------------------------------
 docs/quickstart/setup_quickstart.md             | 23 +++---
 docs/setup/cluster_setup.md                     | 28 ++------
 docs/setup/config.md                            | 39 +++++-----
 docs/setup/jobmanager_high_availability.md      | 24 +++----
 docs/setup/local_setup.md                       |  3 -
 docs/setup/yarn_setup.md                        | 19 +++--
 .../flink/client/FlinkYarnSessionCli.java       |  3 -
 .../OperatorStatsAccumulatorTest.java           |  3 +-
 .../flink/storm/api/FlinkLocalCluster.java      |  3 +-
 .../flink/configuration/ConfigConstants.java    | 11 +++
 flink-dist/src/main/flink-bin/bin/config.sh     | 20 ++++--
 flink-dist/src/main/flink-bin/bin/jobmanager.sh | 10 +--
 .../flink-bin/bin/start-cluster-streaming.sh    | 24 -------
 .../src/main/flink-bin/bin/start-cluster.sh     | 14 ++--
 .../main/flink-bin/bin/start-local-streaming.sh | 24 -------
 .../src/main/flink-bin/bin/start-local.bat      |  2 +-
 .../src/main/flink-bin/bin/start-local.sh       |  6 +-
 .../src/main/flink-bin/bin/taskmanager.sh       | 22 +++---
 flink-dist/src/main/resources/flink-conf.yaml   | 10 ++-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  2 -
 .../org/apache/flink/runtime/StreamingMode.java | 34 ---------
 .../jobmanager/JobManagerCliOptions.java        | 21 ------
 .../taskmanager/TaskManagerCliOptions.java      | 22 +-----
 .../runtime/yarn/AbstractFlinkYarnClient.java   |  6 --
 .../flink/runtime/jobmanager/JobManager.scala   | 35 ++-------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 13 +---
 .../minicluster/LocalFlinkMiniCluster.scala     | 14 +---
 .../flink/runtime/taskmanager/TaskManager.scala | 75 ++++++++++----------
 .../JobManagerProcessReapingTest.java           |  3 +-
 .../jobmanager/JobManagerStartupTest.java       |  7 +-
 ...ManagerSubmittedJobGraphsRecoveryITCase.java |  7 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |  2 -
 .../JobManagerLeaderElectionTest.java           |  3 -
 .../LeaderChangeStateCleanupTest.java           |  3 +-
 .../LeaderElectionRetrievalTestingCluster.java  | 13 +---
 ...askManagerComponentsStartupShutdownTest.java |  2 -
 .../TaskManagerProcessReapingTest.java          |  4 +-
 .../TaskManagerRegistrationTest.java            |  3 -
 .../taskmanager/TaskManagerStartupTest.java     | 15 ++--
 .../runtime/testutils/JobManagerProcess.java    |  7 +-
 .../runtime/testutils/TaskManagerProcess.java   |  9 +--
 .../jobmanager/JobManagerRegistrationTest.scala |  2 -
 .../runtime/testingUtils/TestingCluster.scala   | 19 ++---
 .../testingUtils/TestingJobManager.scala        |  4 --
 .../runtime/testingUtils/TestingUtils.scala     |  5 +-
 .../flink/api/scala/ScalaShellITCase.scala      |  2 -
 .../flink/tez/test/TezProgramTestBase.java      |  3 +-
 .../connectors/kafka/KafkaTestBase.java         |  3 +-
 .../api/environment/LocalStreamEnvironment.java |  3 +-
 .../util/StreamingMultipleProgramsTestBase.java |  3 +-
 .../util/StreamingProgramTestBase.java          |  3 +-
 ...ScalaStreamingMultipleProgramsTestBase.scala |  2 -
 .../flink/test/util/AbstractTestBase.java       |  8 +--
 .../flink/test/util/JavaProgramTestBase.java    |  3 +-
 .../test/util/MultipleProgramsTestBase.java     |  2 -
 .../apache/flink/test/util/TestBaseUtils.java   |  7 +-
 .../apache/flink/test/util/FlinkTestBase.scala  |  2 -
 .../test/util/ForkableFlinkMiniCluster.scala    | 13 +---
 .../EventTimeAllWindowCheckpointingITCase.java  |  3 +-
 .../EventTimeWindowCheckpointingITCase.java     | 21 +++++-
 .../WindowCheckpointingITCase.java              |  6 +-
 .../manual/StreamingScalabilityAndLatency.java  |  3 +-
 ...tJobManagerProcessFailureRecoveryITCase.java |  3 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |  4 +-
 .../JobManagerCheckpointRecoveryITCase.java     |  3 +-
 .../recovery/ProcessFailureCancelingITCase.java |  2 -
 .../flink/test/runtime/IPv6HostnamesITCase.java |  3 +-
 .../flink/yarn/TestingYarnJobManager.scala      |  3 -
 .../apache/flink/yarn/FlinkYarnClientBase.java  |  7 --
 .../flink/yarn/YarnTaskManagerRunner.java       | 10 +--
 .../flink/yarn/ApplicationMasterBase.scala      | 10 ---
 .../org/apache/flink/yarn/YarnJobManager.scala  | 36 ++++------
 72 files changed, 234 insertions(+), 552 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/docs/quickstart/setup_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/quickstart/setup_quickstart.md b/docs/quickstart/setup_quickstart.md
index 43faa91..6fcb729 100644
--- a/docs/quickstart/setup_quickstart.md
+++ b/docs/quickstart/setup_quickstart.md
@@ -53,7 +53,7 @@ Download the ready to run binary package. Choose the Flink distribution that __m
 
 
 ## Start
-  
+
 1. Go to the download directory.
 2. Unpack the downloaded archive.
 3. Start Flink.
@@ -69,9 +69,6 @@ $ bin/start-local.sh    # Start Flink
 Check the __JobManager's web frontend__ at [http://localhost:8081](http://localhost:8081) and make
 sure everything is up and running.
 
-Instead of starting Flink with `bin/start-local.sh` you can also start Flink in an streaming optimized
-mode, using `bin/start-local-streaming.sh`.
-
 ## Run Example
 
 Run the __Word Count example__ to see Flink at work.
@@ -80,20 +77,20 @@ Run the __Word Count example__ to see Flink at work.
 
   ~~~bash
   $ wget -O hamlet.txt http://www.gutenberg.org/cache/epub/1787/pg1787.txt
-  ~~~ 
+  ~~~
 
 * You now have a text file called _hamlet.txt_ in your working directory.
 * __Start the example program__:
-  
+
   ~~~bash
   $ bin/flink run ./examples/WordCount.jar file://`pwd`/hamlet.txt file://`pwd`/wordcount-result.txt
   ~~~
 
 * You will find a file called __wordcount-result.txt__ in your current directory.
-  
+
 
 ## Cluster Setup
-  
+
 __Running Flink on a cluster__ is as easy as running it locally. Having __passwordless SSH__ and
 __the same directory structure__ on all your cluster nodes lets you use our scripts to control
 everything.
@@ -106,9 +103,7 @@ on each node of your setup.
 3. Add the IPs or hostnames (one per line) of all __worker nodes__ (TaskManager) to the slaves files
 in `conf/slaves`.
 
-You can now __start the cluster__ at your master node with `bin/start-cluster.sh`. If you are planning
-to run only streaming jobs with Flink, you can also an optimized streaming mode: `start-cluster-streaming.sh`.
-
+You can now __start the cluster__ at your master node with `bin/start-cluster.sh`.
 
 The following __example__ illustrates the setup with three nodes (with IP addresses from _10.0.0.1_
 to _10.0.0.3_ and hostnames _master_, _worker1_, _worker2_) and shows the contents of the
@@ -139,9 +134,9 @@ configuration files, which need to be accessible at the same path on all machine
 Have a look at the [Configuration]({{ site.baseurl }}/setup/config.html) section of the documentation to see other available configuration options.
 For Flink to run efficiently, a few configuration values need to be set.
 
-In particular, 
+In particular,
 
- * the amount of available memory per TaskManager (`taskmanager.heap.mb`), 
+ * the amount of available memory per TaskManager (`taskmanager.heap.mb`),
  * the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
  * the total number of CPUs in the cluster (`parallelism.default`) and
  * the temporary directories (`taskmanager.tmp.dirs`)
@@ -150,7 +145,7 @@ In particular,
 are very important configuration values.
 
 ## Flink on YARN
-You can easily deploy Flink on your existing __YARN cluster__. 
+You can easily deploy Flink on your existing __YARN cluster__.
 
 1. Download the __Flink Hadoop2 package__: [Flink with Hadoop 2]({{site.FLINK_DOWNLOAD_URL_HADOOP2_STABLE}})
 2. Make sure your __HADOOP_HOME__ (or _YARN_CONF_DIR_ or _HADOOP_CONF_DIR_) __environment variable__ is set to read your YARN and HDFS configuration.

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/docs/setup/cluster_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/cluster_setup.md b/docs/setup/cluster_setup.md
index 3e56dd0..3ee6630 100644
--- a/docs/setup/cluster_setup.md
+++ b/docs/setup/cluster_setup.md
@@ -51,7 +51,7 @@ For example, on Ubuntu Linux, type in the following commands to install Java and
 ssh:
 
 ~~~bash
-sudo apt-get install ssh 
+sudo apt-get install ssh
 sudo apt-get install openjdk-7-jre
 ~~~
 
@@ -210,9 +210,9 @@ entire Flink directory to every worker node.
 Please see the [configuration page](config.html) for details and additional
 configuration options.
 
-In particular, 
+In particular,
 
- * the amount of available memory per TaskManager (`taskmanager.heap.mb`), 
+ * the amount of available memory per TaskManager (`taskmanager.heap.mb`),
  * the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
  * the total number of CPUs in the cluster (`parallelism.default`) and
  * the temporary directories (`taskmanager.tmp.dirs`)
@@ -236,34 +236,18 @@ bin/start-cluster.sh
 
 To stop Flink, there is also a `stop-cluster.sh` script.
 
-
-### Starting Flink in the streaming mode
-
-~~~bash
-bin/start-cluster-streaming.sh
-~~~
-
-The streaming mode changes the startup behavior of Flink: The system is not 
-bringing up the managed memory services with preallocated memory at the beginning.
-Flink streaming is not using the managed memory employed by the batch operators.
-By not starting these services with preallocated memory, streaming jobs can benefit
-from more heap space being available.
-
-Note that you can still start batch jobs in the streaming mode. The memory manager
-will then allocate memory segments from the Java heap as needed.
-
 ### Optional: Adding JobManager/TaskManager instances to a cluster
 
 You can add both TaskManager or JobManager instances to your running cluster with the `bin/taskmanager.sh` and `bin/jobmanager.sh` scripts.
 
 #### Adding a TaskManager
 <pre>
-bin/taskmanager.sh (start [batch|streaming])|stop|stop-all)
+bin/taskmanager.sh start|stop|stop-all
 </pre>
 
 #### Adding a JobManager
 <pre>
-bin/jobmanager.sh (start cluster [batch|streaming])|stop|stop-all)
+bin/jobmanager.sh (start (local|cluster))|stop|stop-all
 </pre>
 
 Make sure to call these scripts on the hosts, on which you want to start/stop the respective instance.
@@ -281,7 +265,7 @@ setup step is not needed.
 The following instructions are a general overview of usual required settings. Please consult one of the
 many installation guides available online for more detailed instructions.
 
-__Note that the following instructions are based on Hadoop 1.2 and might differ 
+__Note that the following instructions are based on Hadoop 1.2 and might differ
 for Hadoop 2.__
 
 ### Downloading, Installing, and Configuring HDFS

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 93e6870..bc70e1d 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -36,7 +36,7 @@ with format `key: value`.
 The system and run scripts parse the config at startup time. Changes to the configuration
 file require restarting the Flink JobManager and TaskManagers.
 
-The configuration files for the TaskManagers can be different, Flink does not assume 
+The configuration files for the TaskManagers can be different, Flink does not assume
 uniform machines in the cluster.
 
 
@@ -66,9 +66,9 @@ contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and
 user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager
 (including sorting/hashing/caching), so this value should be as
 large as possible. If the cluster is exclusively running Flink,
-the total amount of available memory per machine minus some memory for the 
+the total amount of available memory per machine minus some memory for the
 operating system (maybe 1-2 GB) is a good value.
-On YARN setups, this value is automatically configured to the size of 
+On YARN setups, this value is automatically configured to the size of
 the TaskManager's YARN container, minus a certain tolerance value.
 
 - `taskmanager.numberOfTaskSlots`: The number of parallel operator or
@@ -142,9 +142,12 @@ results outside of the JVM heap. For setups with larger quantities of memory,
 this can improve the efficiency of the operations performed on the memory
 (DEFAULT: false).
 
-- `taskmanager.memory.segment-size`: The size of memory buffers used by the 
+- `taskmanager.memory.segment-size`: The size of memory buffers used by the
 memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
 
+- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task
+managers should allocate all managed memory when starting up. (DEFAULT: false)
+
 
 ### Kerberos
 
@@ -203,18 +206,18 @@ available, increase this value (DEFAULT: 2048).
 
 - `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts
 and Flink's YARN client.
-This can be used to set different garbage collectors or to include remote debuggers into 
+This can be used to set different garbage collectors or to include remote debuggers into
 the JVMs running Flink's services.
 
-- `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled. 
-  
-  Supported backends: 
-  
+- `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled.
+
+  Supported backends:
+
    -  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
    -  `filesystem`: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, ...
 
 - `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a flink supported filesystem
-Note: State backend must be accessible from the JobManager, use file:// only for local setups. 
+Note: State backend must be accessible from the JobManager, use file:// only for local setups.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.
 
@@ -235,7 +238,7 @@ specify that value on the execution environment. Default value is zero.
 ### HDFS
 
 These parameters configure the default HDFS used by Flink. Setups that do not
-specify a HDFS configuration have to specify the full path to 
+specify a HDFS configuration have to specify the full path to
 HDFS files (`hdfs://address:port/path/to/files`) Files will also be written
 with default HDFS parameters (block size, replication factor).
 
@@ -409,10 +412,10 @@ to set the JM host:port manually. It is recommended to leave this option at 1.
 
 - `yarn.heartbeat-delay` (Default: 5 seconds). Time between heartbeats with the ResourceManager.
 
-- `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN, 
-the JobManager's host and the number of available processing slots is written into a properties file, 
-so that the Flink client is able to pick those details up. This configuration parameter allows 
-changing the default location of that file (for example for environments sharing a Flink 
+- `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN,
+the JobManager's host and the number of available processing slots is written into a properties file,
+so that the Flink client is able to pick those details up. This configuration parameter allows
+changing the default location of that file (for example for environments sharing a Flink
 installation between users)
 
 - `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.`
@@ -436,7 +439,7 @@ In order to use the 'zookeeper' mode, it is mandatory to also define the `recove
 
 - `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected
 
-- `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes. 
+- `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes.
 
 - `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.
 
@@ -512,12 +515,12 @@ Flink executes a program in parallel by splitting it into subtasks and schedulin
 
 Each Flink TaskManager provides processing slots in the cluster. The number of slots
 is typically proportional to the number of available CPU cores __of each__ TaskManager.
-As a general recommendation, the number of available CPU cores is a good default for 
+As a general recommendation, the number of available CPU cores is a good default for
 `taskmanager.numberOfTaskSlots`.
 
 When starting a Flink application, users can supply the default number of slots to use for that job.
 The command line value therefore is called `-p` (for parallelism). In addition, it is possible
-to [set the number of slots in the programming APIs]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for 
+to [set the number of slots in the programming APIs]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for
 the whole application and individual operators.
 
 <img src="fig/slots_parallelism.svg" class="img-responsive" />

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index 125f163..ca3bdb2 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -59,22 +59,22 @@ jobManagerAddressX:webUIPortX
 
 In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`:
 
-- **Recovery mode** (required): The *recovery mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode. 
-  
+- **Recovery mode** (required): The *recovery mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
+
   <pre>recovery.mode: zookeeper</pre>
 
 - **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
-  
+
   <pre>recovery.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
 
   Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port.
 
 - **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all required coordination data is placed.
-  
+
   <pre>recovery.zookeeper.path.root: /flink # important: customize per cluster</pre>
 
   **Important**: if you are running multiple Flink HA clusters, you have to manually configure seperate root nodes for each cluster.
-  
+
 - **State backend and storage directory** (required): JobManager meta data is persisted in the *state backend* and only a pointer to this state is stored in ZooKeeper. Currently, only the file system state backend is supported in HA mode.
 
     <pre>
@@ -83,13 +83,13 @@ state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
 recovery.zookeeper.storageDir: hdfs:///flink/recovery/</pre>
 
     The `storageDir` stores all meta data needed to recover a JobManager failure.
-    
+
 After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start an HA-cluster. Keep in mind that the **ZooKeeper quorum has to be running** when you call the scripts and make sure to **configure a seperate ZooKeeper root path** for each HA cluster you are starting.
 
 #### Example: Standalone Cluster with 2 JobManagers
 
 1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
-   
+
    <pre>
 recovery.mode: zookeeper
 recovery.zookeeper.quorum: localhost:2181
@@ -115,10 +115,10 @@ $ bin/start-zookeeper-quorum.sh
 Starting zookeeper daemon on host localhost.</pre>
 
 5. **Start an HA-cluster**:
-   
+
    <pre>
-$ bin/start-cluster-streaming.sh
-Starting HA cluster (streaming mode) with 2 masters and 1 peers in ZooKeeper quorum.
+$ bin/start-cluster.sh
+Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
 Starting jobmanager daemon on host localhost.
 Starting jobmanager daemon on host localhost.
 Starting taskmanager daemon on host localhost.</pre>
@@ -172,7 +172,7 @@ This means that the application can be restarted 10 times before YARN fails the
 #### Example: Highly Available YARN Session
 
 1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
-   
+
    <pre>
 recovery.mode: zookeeper
 recovery.zookeeper.quorum: localhost:2181
@@ -193,7 +193,7 @@ $ bin/start-zookeeper-quorum.sh
 Starting zookeeper daemon on host localhost.</pre>
 
 5. **Start an HA-cluster**:
-   
+
    <pre>
 $ bin/yarn-session.sh -n 2</pre>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/docs/setup/local_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/local_setup.md b/docs/setup/local_setup.md
index 036bf39..d2c5345 100644
--- a/docs/setup/local_setup.md
+++ b/docs/setup/local_setup.md
@@ -77,9 +77,6 @@ INFO ... - Starting web info server for JobManager on port 8081
 
 The JobManager will also start a web frontend on port 8081, which you can check with your browser at `http://localhost:8081`.
 
-Instead of starting Flink with `bin/start-local.sh` you can also start Flink in an streaming optimized
-mode, using `bin/start-local-streaming.sh`.
-
 ## Flink on Windows
 
 If you want to run Flink on Windows you need to download, unpack and configure the Flink archive as mentioned above. After that you can either use the **Windows Batch** file (`.bat`) or use **Cygwin**  to run the Flink Jobmanager.

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index 12f1986..b95b8a5 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -79,7 +79,7 @@ tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
 cd flink-{{site.version }}/
 ~~~
 
-If you want to build the YARN .tgz file from sources, follow the [build instructions](building.html). 
+If you want to build the YARN .tgz file from sources, follow the [build instructions](building.html).
 You can find the result of the build in `flink-dist/target/flink-{{ site.version }}-bin/flink-{{ site.version }}/` (*Note: The version might be different for you* ).
 
 
@@ -105,7 +105,6 @@ Usage:
      -q,--query                      Display available YARN resources (memory, cores)
      -qu,--queue <arg>               Specify YARN queue.
      -s,--slots <arg>                Number of slots per TaskManager
-     -st,--streaming                 Start Flink in streaming mode
      -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]
 
 ~~~
@@ -118,7 +117,7 @@ Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` en
 ./bin/yarn-session.sh -n 10 -tm 8192 -s 32
 ~~~
 
-The system will use the configuration in `conf/flink-config.yaml`. Please follow our [configuration guide](config.html) if you want to change something. 
+The system will use the configuration in `conf/flink-config.yaml`. Please follow our [configuration guide](config.html) if you want to change something.
 
 Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelism.default` if the number of slots has been specified.
 
@@ -132,7 +131,7 @@ Stop the YARN session by stopping the unix process (using CTRL+C) or by entering
 
 #### Detached YARN session
 
-If you do not want to keep the Flink YARN client running all the time, its also possible to start a *detached* YARN session. 
+If you do not want to keep the Flink YARN client running all the time, its also possible to start a *detached* YARN session.
 The parameter for that is called `-d` or `--detached`.
 
 In that case, the Flink YARN client will only submit Flink to the cluster and then close itself.
@@ -206,13 +205,13 @@ Please note that the client then expects the `-yn` value to be set (number of Ta
 ***Example:***
 
 ~~~bash
-./bin/flink run -m yarn-cluster -yn 2 ./examples/WordCount.jar 
+./bin/flink run -m yarn-cluster -yn 2 ./examples/WordCount.jar
 ~~~
 
-The command line options of the YARN session are also available with the `./bin/flink` tool. 
+The command line options of the YARN session are also available with the `./bin/flink` tool.
 They are prefixed with a `y` or `yarn` (for the long argument options).
 
-Note: You can use a different configuration directory per job by setting the environment variable `FLINK_CONF_DIR`. 
+Note: You can use a different configuration directory per job by setting the environment variable `FLINK_CONF_DIR`.
 To use this copy the `conf` directory from the Flink distribution and modify, for example, the logging settings on a per-job basis.
 
 Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to "fire and forget" a Flink job
@@ -234,7 +233,7 @@ There are many reasons why a Flink YARN session deployment can fail. A misconfig
 
 ### Log Files
 
-In cases where the Flink YARN session fails during the deployment itself, users have to rely on the logging capabilities of Hadoop YARN. The most useful feature for that is the [YARN log aggregation](http://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/). 
+In cases where the Flink YARN session fails during the deployment itself, users have to rely on the logging capabilities of Hadoop YARN. The most useful feature for that is the [YARN log aggregation](http://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/).
 To enable it, users have to set the `yarn.log-aggregation-enable` property to `true` in the `yarn-site.xml` file.
 Once that is enabled, users can use the following command to retrieve all log files of a (failed) YARN session.
 
@@ -248,7 +247,7 @@ Note that it takes a few seconds after the session has finished until the logs s
 
 The Flink YARN client also prints error messages in the terminal if errors occur during runtime (for example if a TaskManager stops working after some time).
 
-In addition to that, there is the YARN Resource Manager webinterface (by default on port 8088). The port of the Resource Manager web interface is determined by the `yarn.resourcemanager.webapp.address` configuration value. 
+In addition to that, there is the YARN Resource Manager webinterface (by default on port 8088). The port of the Resource Manager web interface is determined by the `yarn.resourcemanager.webapp.address` configuration value.
 
 It allows to access log files for running YARN applications and shows diagnostics for failed apps.
 
@@ -260,7 +259,7 @@ Users using Hadoop distributions from companies like Hortonworks, Cloudera or Ma
 
 ## Background / Internals
 
-This section briefly describes how Flink and YARN interact. 
+This section briefly describes how Flink and YARN interact.
 
 <img src="fig/FlinkOnYarn.svg" class="img-responsive">
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index a9a20ae..8183df4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -220,9 +220,6 @@ public class FlinkYarnSessionCli {
 			flinkYarnClient.setDetachedMode(detachedMode);
 		}
 
-		if (cmd.hasOption(STREAMING.getOpt())) {
-			flinkYarnClient.setStreamingMode(true);
-		}
 		if(cmd.hasOption(NAME.getOpt())) {
 			flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt()));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
index 41a036f..6f488f6 100644
--- a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
+++ b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
@@ -45,7 +44,7 @@ public class OperatorStatsAccumulatorTest extends AbstractTestBase {
 	private static final String ACCUMULATOR_NAME = "op-stats";
 
 	public OperatorStatsAccumulatorTest(){
-		super(new Configuration(), StreamingMode.BATCH_ONLY);
+		super(new Configuration());
 	}
 	
 	public static class StringToInt extends RichFlatMapFunction<String, Tuple1<Integer>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index 00e1d03..ad45f77 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -28,7 +28,6 @@ import backtype.storm.generated.TopologyInfo;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -86,7 +85,7 @@ public class FlinkLocalCluster {
 			configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
 			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 
-			flink = new LocalFlinkMiniCluster(configuration, true, StreamingMode.STREAMING);
+			flink = new LocalFlinkMiniCluster(configuration, true);
 			this.flink.start();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index da1dfdd..11a9478 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -132,6 +132,12 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap";
 
 	/**
+	 * The config parameter for specifying whether TaskManager managed memory should be preallocated
+	 * when the TaskManager is starting. (default is false)
+	 */
+	public static final String TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY = "taskmanager.memory.preallocate";
+
+	/**
 	 * The config parameter defining the number of buffers used in the network stack. This defines the
 	 * number of possible tasks and shuffles.
 	 */
@@ -582,6 +588,11 @@ public final class ConfigConstants {
 	 */
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
 
+	/**
+	 * The default setting for TaskManager memory eager allocation of managed memory
+	 */
+	public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = false;
+
 	// ------------------------ Runtime Algorithms ------------------------
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 3b2b4d7..8faf951 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -30,7 +30,7 @@ constructFlinkClassPath() {
     echo $FLINK_CLASSPATH
 }
 
-# These are used to mangle paths that are passed to java when using 
+# These are used to mangle paths that are passed to java when using
 # cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
 # but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
 # "cygpath" can do the conversion.
@@ -61,11 +61,11 @@ readFromConfig() {
     local key=$1
     local defaultValue=$2
     local configFile=$3
-    
+
     # first extract the value with the given key (1st sed), then trim the result (2nd sed)
     # if a key exists multiple times, take the "last" one (tail)
     local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`
-    
+
     [ -z "$value" ] && echo "$defaultValue" || echo "$value"
 }
 
@@ -92,6 +92,7 @@ KEY_TASKM_MEM_SIZE="taskmanager.heap.mb"
 KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
 KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
 KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
+KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate"
 
 KEY_ENV_PID_DIR="env.pid.dir"
 KEY_ENV_LOG_MAX="env.log.max"
@@ -130,7 +131,7 @@ FLINK_ROOT_DIR=`dirname "$SYMLINK_RESOLVED_BIN"`
 FLINK_LIB_DIR=$FLINK_ROOT_DIR/lib
 
 # These need to be mangled because they are directly passed to java.
-# The above lib path is used by the shell script to retrieve jars in a 
+# The above lib path is used by the shell script to retrieve jars in a
 # directory, so it needs to be unmangled.
 FLINK_ROOT_DIR_MANGLED=`manglePath "$FLINK_ROOT_DIR"`
 if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; fi
@@ -144,11 +145,11 @@ YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}
 ########################################################################################################################
 
 # read JAVA_HOME from config with no default value
-MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")  
+MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")
 # check if config specified JAVA_HOME
 if [ -z "${MY_JAVA_HOME}" ]; then
     # config did not specify JAVA_HOME. Use system JAVA_HOME
-    MY_JAVA_HOME=${JAVA_HOME} 
+    MY_JAVA_HOME=${JAVA_HOME}
 fi
 # check if we have a valid JAVA_HOME and if java is not available
 if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then
@@ -201,6 +202,11 @@ if [ -z "${FLINK_TM_OFFHEAP}" ]; then
     FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}")
 fi
 
+# Define FLINK_TM_MEM_PRE_ALLOCATE if it is not already set
+if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then
+    FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}")
+fi
+
 if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
     MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")
 fi
@@ -339,4 +345,4 @@ readSlaves() {
 
 useOffHeapMemory() {
     [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index b0157c8..525fe48 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -18,7 +18,7 @@
 ################################################################################
 
 # Start/stop a Flink JobManager.
-USAGE="Usage: jobmanager.sh (start (local|cluster) [batch|streaming] [host] [webui-port])|stop|stop-all)"
+USAGE="Usage: jobmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)"
 
 STARTSTOP=$1
 EXECUTIONMODE=$2
@@ -37,12 +37,6 @@ if [[ $STARTSTOP == "start" ]]; then
         exit 1
     fi
 
-    # Use batch mode as default
-    if [ -z $STREAMINGMODE ]; then
-        echo "Missing streaming mode (batch|streaming) argument. Using 'batch'."
-        STREAMINGMODE="batch"
-    fi
-
     if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP}" -lt "0" ]]; then
         echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
         exit 1
@@ -62,7 +56,7 @@ if [[ $STARTSTOP == "start" ]]; then
     fi
 
     # Startup parameters
-    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "${EXECUTIONMODE}" "--streamingMode" "${STREAMINGMODE}")
+    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "${EXECUTIONMODE}")
     if [ ! -z $HOST ]; then
         args+=("--host")
         args+=("${HOST}")

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
deleted file mode 100755
index 5305b8c..0000000
--- a/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env bash
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-# Start a Flink cluster in streaming mode
-"${bin}"/start-cluster.sh streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index e377dcf..9eccdca 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -22,10 +22,6 @@ USAGE="Usage: start-cluster.sh [batch|streaming]"
 
 STREAMING_MODE=$1
 
-if [[ -z $STREAMING_MODE ]]; then
-    STREAMING_MODE="batch"
-fi
-
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
@@ -37,19 +33,19 @@ if [[ $RECOVERY_MODE == "zookeeper" ]]; then
     # HA Mode
     readMasters
 
-    echo "Starting HA cluster (${STREAMING_MODE} mode) with ${#MASTERS[@]} masters and ${#ZK_QUORUM[@]} peers in ZooKeeper quorum."
+    echo "Starting HA cluster with ${#MASTERS[@]} masters and ${#ZK_QUORUM[@]} peers in ZooKeeper quorum."
 
     for ((i=0;i<${#MASTERS[@]};++i)); do
         master=${MASTERS[i]}
         webuiport=${WEBUIPORTS[i]}
-        ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start cluster ${STREAMING_MODE} ${master} ${webuiport} &"
+        ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start cluster ${master} ${webuiport} &"
     done
 
 else
-    echo "Starting cluster (${STREAMING_MODE} mode)."
+    echo "Starting cluster."
 
     # Start single JobManager on this machine
-    "$FLINK_BIN_DIR"/jobmanager.sh start cluster ${STREAMING_MODE}
+    "$FLINK_BIN_DIR"/jobmanager.sh start cluster
 fi
 shopt -u nocasematch
 
@@ -57,5 +53,5 @@ shopt -u nocasematch
 readSlaves
 
 for slave in ${SLAVES[@]}; do
-    ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" start ${STREAMING_MODE} &"
+    ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" start &"
 done

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
deleted file mode 100755
index c6aef03..0000000
--- a/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env bash
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-# Start a local Flink cluster in streaming mode
-"${bin}"/start-local.sh streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-dist/src/main/flink-bin/bin/start-local.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat b/flink-dist/src/main/flink-bin/bin/start-local.bat
index 202c7d9..2c78307 100644
--- a/flink-dist/src/main/flink-bin/bin/start-local.bat
+++ b/flink-dist/src/main/flink-bin/bin/start-local.bat
@@ -57,6 +57,6 @@ if not defined FOUND (
 echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
 echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
 
-java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --configDir "%FLINK_CONF_DIR%" --executionMode local --streamingMode batch > "%out%" 2>&1
+java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --configDir "%FLINK_CONF_DIR%" --executionMode local > "%out%" 2>&1
 
 endlocal

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-dist/src/main/flink-bin/bin/start-local.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.sh b/flink-dist/src/main/flink-bin/bin/start-local.sh
index 626fa69..b3d41bf 100755
--- a/flink-dist/src/main/flink-bin/bin/start-local.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-local.sh
@@ -19,14 +19,10 @@
 
 STREAMING_MODE=$1
 
-if [[ -z $STREAMING_MODE ]]; then
-	STREAMING_MODE="batch"
-fi
-
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
 . "$bin"/config.sh
 
 # local mode, only bring up job manager. The job manager will start an internal task manager
-"$FLINK_BIN_DIR"/jobmanager.sh start local ${STREAMING_MODE}
+"$FLINK_BIN_DIR"/jobmanager.sh start local

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index f69dd1c..300dcb8 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -18,10 +18,9 @@
 ################################################################################
 
 # Start/stop a Flink JobManager.
-USAGE="Usage: taskmanager.sh (start [batch|streaming])|stop|stop-all)"
+USAGE="Usage: taskmanager.sh (start|stop|stop-all)"
 
 STARTSTOP=$1
-STREAMINGMODE=$2
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
@@ -30,17 +29,12 @@ bin=`cd "$bin"; pwd`
 
 if [[ $STARTSTOP == "start" ]]; then
 
-    # Use batch mode as default
-    if [ -z $STREAMINGMODE ]; then
-        echo "Missing streaming mode (batch|streaming). Using 'batch'."
-        STREAMINGMODE="batch"
-    fi
-    
-    # if mode is streaming and no other JVM options are set, set the 'Concurrent Mark Sweep GC'
-    if [[ $STREAMINGMODE == "streaming" ]] && [ -z $FLINK_ENV_JAVA_OPTS ]; then
-    
+    # if memory allocation mode is lazy and no other JVM options are set,
+    # set the 'Concurrent Mark Sweep GC'
+    if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z $FLINK_ENV_JAVA_OPTS ]; then
+
         JAVA_VERSION=$($JAVA_RUN -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
-    
+
         # set the GC to G1 in Java 8 and to CMS in Java 7
         if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
             if [ "$JAVA_VERSION" -lt 18 ]; then
@@ -63,7 +57,7 @@ if [[ $STARTSTOP == "start" ]]; then
         #
         TM_MAX_OFFHEAP_SIZE="8388607T"
 
-        if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then
+        if [[ "${FLINK_TM_MEM_PRE_ALLOCATE}" == "true" ]] && useOffHeapMemory; then
             if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
                 # We split up the total memory in heap and off-heap memory
                 if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
@@ -94,7 +88,7 @@ if [[ $STARTSTOP == "start" ]]; then
     fi
 
     # Startup parameters
-    args=("--configDir" "${FLINK_CONF_DIR}" "--streamingMode" "${STREAMINGMODE}")
+    args=("--configDir" "${FLINK_CONF_DIR}")
 fi
 
 "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 6f56e89..08adf04 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -47,6 +47,10 @@ taskmanager.heap.mb: 512
 
 taskmanager.numberOfTaskSlots: 1
 
+# Specify whether TaskManager memory should be allocated when starting up (true) or when
+# memory is required in the memory manager (false)
+
+taskmanager.memory.preallocate: false
 
 # The parallelism used for programs that did not specify and other parallelism.
 
@@ -73,10 +77,10 @@ webclient.port: 8080
 # Streaming state checkpointing
 #==============================================================================
 
-# The backend that will be used to store operator state checkpoints if 
-# checkpointing is enabled. 
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled.
 #
-# Supported backends: jobmanager, filesystem, <class-name-of-factory> 
+# Supported backends: jobmanager, filesystem, <class-name-of-factory>
 #
 #state.backend: filesystem
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index a3f152d..0575bba 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -24,7 +24,6 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -177,7 +176,6 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				jobManager[i] = JobManager.startJobManagerActors(
 					jmConfig,
 					jobManagerSystem[i],
-					StreamingMode.STREAMING,
 					JobManager.class,
 					MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
deleted file mode 100644
index bdcbcf9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime;
-
-/**
- * The streaming mode defines whether the system starts in streaming mode,
- * or in pure batch mode. Note that streaming mode can execute batch programs
- * as well.
- */
-public enum StreamingMode {
-	
-	/** This mode indicates the system can run streaming tasks, of which batch
-	 * tasks are a special case. */
-	STREAMING,
-	
-	/** This mode indicates that the system can run only batch tasks */
-	BATCH_ONLY;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
index 1202499..437693e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.runtime.StreamingMode;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -31,8 +29,6 @@ public class JobManagerCliOptions {
 
 	private JobManagerMode jobManagerMode;
 	
-	private StreamingMode streamingMode = StreamingMode.BATCH_ONLY;
-
 	private String host;
 
 	private int webUIPort = -1;
@@ -64,23 +60,6 @@ public class JobManagerCliOptions {
 		}
 	}
 
-	public StreamingMode getStreamingMode() {
-		return streamingMode;
-	}
-
-	public void setStreamingMode(String modeName) {
-		if (modeName.equalsIgnoreCase("streaming")) {
-			this.streamingMode = StreamingMode.STREAMING;
-		}
-		else if (modeName.equalsIgnoreCase("batch")) {
-			this.streamingMode = StreamingMode.BATCH_ONLY;
-		}
-		else {
-			throw new IllegalArgumentException(
-					"Unknown streaming mode. Streaming mode must be one of 'BATCH' or 'STREAMING'.");
-		}
-	}
-
 	public String getHost() {
 		return host;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
index a648caf..e7120fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.runtime.StreamingMode;
-
 /**
  * The command line parameters passed to the TaskManager.
  */
@@ -27,8 +25,6 @@ public class TaskManagerCliOptions {
 
 	private String configDir;
 	
-	private StreamingMode mode = StreamingMode.BATCH_ONLY;
-	
 	// ------------------------------------------------------------------------
 
 	public String getConfigDir() {
@@ -38,20 +34,4 @@ public class TaskManagerCliOptions {
 	public void setConfigDir(String configDir) {
 		this.configDir = configDir;
 	}
-
-	public StreamingMode getMode() {
-		return mode;
-	}
-
-	public void setMode(String modeName) {
-		if (modeName.equalsIgnoreCase("streaming")) {
-			this.mode = StreamingMode.STREAMING;
-		}
-		else if (modeName.equalsIgnoreCase("batch")) {
-			this.mode = StreamingMode.BATCH_ONLY;
-		}
-		else {
-			throw new IllegalArgumentException("Mode must be one of 'BATCH' or 'STREAMING'.");
-		}
-	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
index 65ae2be..83a976d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
@@ -133,12 +133,6 @@ public abstract class AbstractFlinkYarnClient {
 	public abstract String getSessionFilesDir();
 
 	/**
-	 * Instruct Flink to start in streaming mode
-	 * @param streamingMode
-	 */
-	public abstract  void setStreamingMode(boolean streamingMode);
-
-	/**
 	 * Set a name for the YARN application
 	 * @param name
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8cbb13a..2645a7c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -60,7 +60,7 @@ import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}
 
 import scala.collection.JavaConverters._
@@ -107,7 +107,6 @@ class JobManager(
     protected val defaultExecutionRetries: Int,
     protected val delayBetweenRetries: Long,
     protected val timeout: FiniteDuration,
-    protected val mode: StreamingMode,
     protected val leaderElectionService: LeaderElectionService,
     protected val submittedJobGraphs : SubmittedJobGraphStore,
     protected val checkpointRecoveryFactory : CheckpointRecoveryFactory)
@@ -1389,7 +1388,6 @@ object JobManager {
     // parsing the command line arguments
     val (configuration: Configuration,
          executionMode: JobManagerMode,
-         streamingMode: StreamingMode,
          listeningHost: String, listeningPort: Int) =
     try {
       parseArgs(args)
@@ -1439,7 +1437,6 @@ object JobManager {
             runJobManager(
               configuration,
               executionMode,
-              streamingMode,
               listeningHost,
               listeningPort)
           }
@@ -1450,7 +1447,6 @@ object JobManager {
         runJobManager(
           configuration,
           executionMode,
-          streamingMode,
           listeningHost,
           listeningPort)
       }
@@ -1474,14 +1470,12 @@ object JobManager {
    * @param configuration The configuration object for the JobManager.
    * @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an
    *                      an additional TaskManager in the same process.
-   * @param streamingMode The streaming mode to run the system in (streaming vs. batch-only)
    * @param listeningAddress The hostname where the JobManager should listen for messages.
    * @param listeningPort The port where the JobManager should listen for messages.
    */
   def runJobManager(
       configuration: Configuration,
       executionMode: JobManagerMode,
-      streamingMode: StreamingMode,
       listeningAddress: String,
       listeningPort: Int)
     : Unit = {
@@ -1489,7 +1483,6 @@ object JobManager {
     val (jobManagerSystem, _, _, _) = startActorSystemAndJobManagerActors(
       configuration,
       executionMode,
-      streamingMode,
       listeningAddress,
       listeningPort,
       classOf[JobManager],
@@ -1505,7 +1498,6 @@ object JobManager {
     * @param configuration The configuration object for the JobManager
     * @param executionMode The execution mode in which to run. Execution mode LOCAL with spawn an
     *                      additional TaskManager in the same process.
-    * @param streamingMode The streaming mode to run the system in (streaming vs. batch-only)
     * @param listeningAddress The hostname where the JobManager should lsiten for messages.
     * @param listeningPort The port where the JobManager should listen for messages
     * @param jobManagerClass The class of the JobManager to be started
@@ -1516,7 +1508,6 @@ object JobManager {
   def startActorSystemAndJobManagerActors(
       configuration: Configuration,
       executionMode: JobManagerMode,
-      streamingMode: StreamingMode,
       listeningAddress: String,
       listeningPort: Int,
       jobManagerClass: Class[_ <: JobManager],
@@ -1587,7 +1578,6 @@ object JobManager {
       val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
-        streamingMode,
         jobManagerClass,
         archiveClass)
 
@@ -1613,7 +1603,6 @@ object JobManager {
           Some(TaskManager.TASK_MANAGER_NAME),
           None,
           true,
-          streamingMode,
           classOf[TaskManager])
 
         LOG.debug("Starting TaskManager process reaper")
@@ -1654,8 +1643,7 @@ object JobManager {
    * @param args command line arguments
    * @return Quadruple of configuration, execution mode and an optional listening address
    */
-  def parseArgs(args: Array[String]):
-                     (Configuration, JobManagerMode, StreamingMode, String, Int) = {
+  def parseArgs(args: Array[String]): (Configuration, JobManagerMode, String, Int) = {
     val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") {
       head("Flink JobManager")
 
@@ -1673,13 +1661,6 @@ object JobManager {
         "The execution mode of the JobManager (CLUSTER / LOCAL)"
       }
 
-      opt[String]("streamingMode").optional().action { (arg, conf) =>
-        conf.setStreamingMode(arg)
-        conf
-      } text {
-        "The streaming mode of the JobManager (STREAMING / BATCH)"
-      }
-
       opt[String]("host").optional().action { (arg, conf) =>
         conf.setHost(arg)
         conf
@@ -1743,13 +1724,11 @@ object JobManager {
       }
 
     val executionMode = config.getJobManagerMode
-    val streamingMode = config.getStreamingMode
     val hostPortUrl = NetUtils.hostAndPortToUrlString(host, port)
     
-    LOG.info(s"Starting JobManager on $hostPortUrl with execution mode $executionMode and " +
-      s"streaming mode $streamingMode")
+    LOG.info(s"Starting JobManager on $hostPortUrl with execution mode $executionMode")
 
-    (configuration, executionMode, streamingMode, host, port)
+    (configuration, executionMode, host, port)
   }
 
   /**
@@ -1884,7 +1863,6 @@ object JobManager {
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
-   * @param streamingMode The execution mode
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
    *
@@ -1893,7 +1871,6 @@ object JobManager {
   def startJobManagerActors(
       configuration: Configuration,
       actorSystem: ActorSystem,
-      streamingMode: StreamingMode,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -1903,7 +1880,6 @@ object JobManager {
       actorSystem,
       Some(JOB_MANAGER_NAME),
       Some(ARCHIVE_NAME),
-      streamingMode,
       jobManagerClass,
       archiveClass)
   }
@@ -1918,7 +1894,6 @@ object JobManager {
    *                          the actor will have the name generated by the actor system.
    * @param archiveActorName Optionally the name of the archive actor. If none is given,
    *                          the actor will have the name generated by the actor system.
-   * @param streamingMode The mode to run the system in (streaming vs. batch-only)
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
    *
@@ -1929,7 +1904,6 @@ object JobManager {
       actorSystem: ActorSystem,
       jobMangerActorName: Option[String],
       archiveActorName: Option[String],
-      streamingMode: StreamingMode,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -1967,7 +1941,6 @@ object JobManager {
       executionRetries,
       delayBetweenRetries,
       timeout,
-      streamingMode,
       leaderElectionService,
       submittedJobGraphs,
       checkpointRecoveryFactory)

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index cefb462..660c813 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -29,16 +29,15 @@ import com.typesafe.config.Config
 
 import org.apache.flink.api.common.{JobID, JobExecutionResult, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobExecutionException, JobClient}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
-import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
+import org.apache.flink.runtime.jobmanager.RecoveryMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, LeaderRetrievalListener,
 StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtAnyJobManager
-import org.apache.flink.runtime.util.{LeaderRetrievalUtils, StandaloneUtils, ZooKeeperUtils}
+import org.apache.flink.runtime.util.ZooKeeperUtils
 import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
 
 import org.slf4j.LoggerFactory
@@ -55,18 +54,12 @@ import scala.concurrent._
  * @param userConfiguration Configuration object with the user provided configuration values
  * @param useSingleActorSystem true if all actors (JobManager and TaskManager) shall be run in the
  *                             same [[ActorSystem]], otherwise false
- * @param streamingMode True, if the system should be started in streaming mode, false if
- *                      in pure batch mode.
  */
 abstract class FlinkMiniCluster(
     val userConfiguration: Configuration,
-    val useSingleActorSystem: Boolean,
-    val streamingMode: StreamingMode)
+    val useSingleActorSystem: Boolean)
   extends LeaderRetrievalListener {
 
-  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
-         = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
-  
   protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 38e3efb..a27c840 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -22,15 +22,12 @@ import akka.actor.{ActorRef, ActorSystem}
 
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
 
-import org.slf4j.LoggerFactory
-
 /**
  * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
  * JVM. It extends the [[FlinkMiniCluster]] by having convenience functions to setup Flink's
@@ -39,18 +36,11 @@ import org.slf4j.LoggerFactory
  * @param userConfiguration Configuration object with the user provided configuration values
  * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
  *                          [[ActorSystem]], otherwise false
- * @param streamingMode Defines the execution mode of Flink's components (JobManager and
- *                      TaskManager)
  */
 class LocalFlinkMiniCluster(
     userConfiguration: Configuration,
-    singleActorSystem: Boolean,
-    streamingMode: StreamingMode)
-  extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+    singleActorSystem: Boolean) extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
 
-  def this(userConfiguration: Configuration, singleActorSystem: Boolean)
-       = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
-  
   def this(userConfiguration: Configuration) = this(userConfiguration, true)
 
   // --------------------------------------------------------------------------
@@ -84,7 +74,6 @@ class LocalFlinkMiniCluster(
       system,
       Some(jobManagerName),
       Some(archiveName),
-      streamingMode,
       classOf[JobManager],
       classOf[MemoryArchivist])
 
@@ -124,7 +113,6 @@ class LocalFlinkMiniCluster(
       Some(taskManagerActorName), // actor name
       Some(createLeaderRetrievalService), // job manager leader retrieval service
       localExecution, // start network stack?
-      streamingMode,
       classOf[TaskManager])
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 58547ad..78051b8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
-import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -1214,8 +1214,7 @@ object TaskManager {
     }
     
     // try to parse the command line arguments
-    val (configuration: Configuration,
-         mode: StreamingMode) = try {
+    val configuration: Configuration = try {
       parseArgsAndLoadConfig(args)
     }
     catch {
@@ -1232,13 +1231,13 @@ object TaskManager {
         LOG.info("Security is enabled. Starting secure TaskManager.")
         SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
           override def run(): Unit = {
-            selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager])
+            selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
           }
         })
       }
       else {
         LOG.info("Security is not enabled. Starting non-authenticated TaskManager.")
-        selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager])
+        selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
       }
     }
     catch {
@@ -1256,7 +1255,7 @@ object TaskManager {
    * @return The parsed configuration.
    */
   @throws(classOf[Exception])
-  def parseArgsAndLoadConfig(args: Array[String]): (Configuration, StreamingMode) = {
+  def parseArgsAndLoadConfig(args: Array[String]): Configuration = {
     
     // set up the command line parser
     val parser = new scopt.OptionParser[TaskManagerCliOptions]("TaskManager") {
@@ -1268,13 +1267,6 @@ object TaskManager {
       } text {
         "Specify configuration directory."
       }
-
-      opt[String]("streamingMode").optional().action { (param, conf) =>
-        conf.setMode(param)
-        conf
-      } text {
-        "The streaming mode of the JobManager (STREAMING / BATCH)"
-      }
     }
 
     // parse the CLI arguments
@@ -1293,7 +1285,7 @@ object TaskManager {
       case e: Exception => throw new Exception("Could not load configuration", e)
     }
     
-    (conf, cliConfig.getMode)
+    conf
   }
 
   // --------------------------------------------------------------------------
@@ -1315,14 +1307,12 @@ object TaskManager {
    * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
 
    * @param configuration The configuration for the TaskManager.
-   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param taskManagerClass The actor class to instantiate.
    *                         Allows to use TaskManager subclasses for example for YARN.
    */
   @throws(classOf[Exception])
   def selectNetworkInterfaceAndRunTaskManager(
       configuration: Configuration,
-      streamingMode: StreamingMode,
       taskManagerClass: Class[_ <: TaskManager])
     : Unit = {
 
@@ -1332,7 +1322,6 @@ object TaskManager {
       taskManagerHostname,
       actorSystemPort,
       configuration,
-      streamingMode,
       taskManagerClass)
   }
 
@@ -1384,22 +1373,19 @@ object TaskManager {
    * @param taskManagerHostname The hostname/address of the interface where the actor system
    *                         will communicate.
    * @param actorSystemPort The port at which the actor system will communicate.
-   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param configuration The configuration for the TaskManager.
    */
   @throws(classOf[Exception])
   def runTaskManager(
       taskManagerHostname: String,
       actorSystemPort: Int,
-      configuration: Configuration,
-      streamingMode: StreamingMode)
+      configuration: Configuration)
     : Unit = {
 
     runTaskManager(
       taskManagerHostname,
       actorSystemPort,
       configuration,
-      streamingMode,
       classOf[TaskManager])
   }
 
@@ -1415,7 +1401,6 @@ object TaskManager {
    *                         will communicate.
    * @param actorSystemPort The port at which the actor system will communicate.
    * @param configuration The configuration for the TaskManager.
-   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager
    *                         subclasses for example for YARN.
    */
@@ -1424,11 +1409,10 @@ object TaskManager {
       taskManagerHostname: String,
       actorSystemPort: Int,
       configuration: Configuration,
-      streamingMode: StreamingMode,
       taskManagerClass: Class[_ <: TaskManager])
     : Unit = {
 
-    LOG.info(s"Starting TaskManager in streaming mode $streamingMode")
+    LOG.info(s"Starting TaskManager")
 
     // Bring up the TaskManager actor system first, bind it to the given address.
     
@@ -1470,7 +1454,6 @@ object TaskManager {
         Some(TASK_MANAGER_NAME),
         None,
         false,
-        streamingMode,
         taskManagerClass)
 
       // start a process reaper that watches the JobManager. If the TaskManager actor dies,
@@ -1524,7 +1507,6 @@ object TaskManager {
    *                                     constructed from the configuration.
    * @param localTaskManagerCommunication If true, the TaskManager will not initiate the
    *                                      TCP network stack.
-   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param taskManagerClass The class of the TaskManager actor. May be used to give
    *                         subclasses that understand additional actor messages.
    *
@@ -1548,7 +1530,6 @@ object TaskManager {
       taskManagerActorName: Option[String],
       leaderRetrievalServiceOption: Option[LeaderRetrievalService],
       localTaskManagerCommunication: Boolean,
-      streamingMode: StreamingMode,
       taskManagerClass: Class[_ <: TaskManager])
     : ActorRef = {
 
@@ -1580,8 +1561,18 @@ object TaskManager {
         "If you leave this config parameter empty, the system automatically " +
         "pick a fraction of the available memory.")
 
+
+    val preAllocateMemory = configuration.getBoolean(
+      ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE)
+
     val memorySize = if (configuredMemory > 0) {
-      LOG.info(s"Using $configuredMemory MB for Flink managed memory.")
+      if (preAllocateMemory) {
+        LOG.info(s"Using $configuredMemory MB for managed memory.")
+      } else {
+        LOG.info(s"Limiting managed memory to $configuredMemory MB, " +
+          s"memory will be allocated lazily.")
+      }
       configuredMemory << 20 // megabytes to bytes
     }
     else {
@@ -1596,8 +1587,13 @@ object TaskManager {
         val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
           fraction).toLong
 
-        LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
-          s"heap memory (${relativeMemSize >> 20} MB).")
+        if (preAllocateMemory) {
+          LOG.info(s"Using $fraction of the currently free heap space for managed " +
+            s"heap memory (${relativeMemSize >> 20} MB).")
+        } else {
+          LOG.info(s"Limiting managed memory to $fraction of the currently free heap space " +
+            s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
+        }
 
         relativeMemSize
       }
@@ -1607,8 +1603,13 @@ object TaskManager {
         val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory()
         val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong
 
-        LOG.info(s"Using $fraction of the maximum memory size for " +
-          s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+        if (preAllocateMemory) {
+          LOG.info(s"Using $fraction of the maximum memory size for " +
+            s"managed off-heap memory (${directMemorySize >> 20} MB).")
+        } else {
+          LOG.info(s"Limiting managed memory to $fraction of the maximum memory size " +
+            s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
+        }
 
         directMemorySize
       }
@@ -1617,8 +1618,6 @@ object TaskManager {
       }
     }
 
-    val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY
-
     // now start the memory manager
     val memoryManager = try {
       new MemoryManager(
@@ -1629,17 +1628,17 @@ object TaskManager {
         preAllocateMemory)
     }
     catch {
-      case e: OutOfMemoryError => 
+      case e: OutOfMemoryError =>
         memType match {
           case MemoryType.HEAP =>
-            throw new Exception(s"OutOfMemory error (${e.getMessage()})" + 
+            throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
               s" while allocating the TaskManager heap memory (${memorySize} bytes).", e)
-            
+
           case MemoryType.OFF_HEAP =>
             throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
               s" while allocating the TaskManager off-heap memory (${memorySize} bytes). " +
               s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
-            
+
           case _ => throw e
         }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 93f7b9a..063e4c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -27,7 +27,6 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Test;
@@ -198,7 +197,7 @@ public class JobManagerProcessReapingTest {
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
 
-				JobManager.runJobManager(config, JobManagerMode.CLUSTER, StreamingMode.BATCH_ONLY, "localhost", 0);
+				JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", 0);
 				System.exit(0);
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index c52ec2e..7f965fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -29,7 +29,6 @@ import com.google.common.io.Files;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.util.OperatingSystem;
@@ -85,8 +84,7 @@ public class JobManagerStartupTest {
 		}
 		
 		try {
-			JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER,
-									StreamingMode.BATCH_ONLY, "localhost", portNum);
+			JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER, "localhost", portNum);
 			fail("this should throw an exception");
 		}
 		catch (Exception e) {
@@ -125,8 +123,7 @@ public class JobManagerStartupTest {
 		failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, nonExistDirectory);
 
 		try {
-			JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER,
-										StreamingMode.BATCH_ONLY, "localhost", portNum);
+			JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER, "localhost", portNum);
 			fail("this should fail with an exception");
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
index e6156e5..99f7bd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
@@ -27,7 +27,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -125,7 +124,7 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 
-		TestingCluster flink = new TestingCluster(config, false, false, StreamingMode.STREAMING);
+		TestingCluster flink = new TestingCluster(config, false, false);
 
 		try {
 			final Deadline deadline = TestTimeOut.fromNow();
@@ -164,7 +163,7 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 
-		TestingCluster flink = new TestingCluster(config, false, false, StreamingMode.STREAMING);
+		TestingCluster flink = new TestingCluster(config, false, false);
 
 		try {
 			final Deadline deadline = TestTimeOut.fromNow();
@@ -267,7 +266,7 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
 			TaskManager.startTaskManagerComponentsAndActor(
 					config, taskManagerSystem, "localhost",
 					Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
-					false, StreamingMode.STREAMING, TaskManager.class);
+					false, TaskManager.class);
 
 			// Client test actor
 			TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(

http://git-wip-us.apache.org/repos/asf/flink/blob/4f12356e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 61f536c..adaff29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -22,7 +22,6 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -77,7 +76,6 @@ public class JobSubmitTest {
 		ActorRef jobManagerActorRef = JobManager.startJobManagerActors(
 				config,
 				jobManagerSystem,
-				StreamingMode.BATCH_ONLY,
 				JobManager.class,
 				MemoryArchivist.class)._1();
 


Mime
View raw message