flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/4] flink git commit: [FLINK-2084] [core] Add an option to start Flink in streaming mode
Date Tue, 26 May 2015 14:23:18 GMT
Repository: flink
Updated Branches:
  refs/heads/master 924830ffa -> 043772244


[FLINK-2084] [core] Add an option to start Flink in streaming mode

 - Streaming mode sets the memory manager to lazy memory allocation to ensure
   heap is not blocked by batch memory manager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efec2297
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efec2297
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efec2297

Branch: refs/heads/master
Commit: efec2297b46b83e0dc6111a91a3f80f9d5375e0d
Parents: ea60678
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri May 22 17:12:45 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue May 26 16:22:22 2015 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/jobmanager.sh |   5 +-
 .../flink-bin/bin/start-cluster-streaming.sh    |  48 +++++++
 .../src/main/flink-bin/bin/start-cluster.sh     |   2 +-
 .../main/flink-bin/bin/start-local-streaming.sh |  27 ++++
 .../src/main/flink-bin/bin/start-local.bat      |   2 +-
 .../src/main/flink-bin/bin/start-local.sh       |   2 +-
 .../src/main/flink-bin/bin/taskmanager.sh       |   5 +-
 .../org/apache/flink/runtime/StreamingMode.java |  34 +++++
 .../jobmanager/JobManagerCliOptions.java        |  77 ++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../taskmanager/TaskManagerCliOptions.java      |  57 +++++++++
 .../flink/runtime/util/SignalHandler.java       |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 125 ++++++++++---------
 .../jobmanager/JobManagerCLIConfiguration.scala |  28 -----
 .../runtime/minicluster/FlinkMiniCluster.scala  |   9 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  31 +++--
 .../flink/runtime/taskmanager/TaskManager.scala |  69 ++++++----
 .../TaskManagerCLIConfiguration.scala           |  26 ----
 18 files changed, 402 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index 273cdd3..0f1e4fa 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -20,6 +20,7 @@
 
 STARTSTOP=$1
 EXECUTIONMODE=$2
+STREAMINGMODE=$3
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
@@ -80,7 +81,7 @@ case $STARTSTOP in
         rotateLogFile $out
 
         echo "Starting Job Manager"
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --executionMode $EXECUTIONMODE --configDir "$FLINK_CONF_DIR"  > "$out" 2>&1 < /dev/null &
+        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --configDir "$FLINK_CONF_DIR" --executionMode $EXECUTIONMODE --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
         echo $! > $pid
 
     ;;
@@ -99,7 +100,7 @@ case $STARTSTOP in
     ;;
 
     (*)
-        echo "Please specify 'start (cluster|local)' or stop"
+        echo "Please specify 'start (cluster|local) [batch|streaming]' or 'stop'"
     ;;
 
 esac

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
new file mode 100755
index 0000000..86a87cd
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
@@ -0,0 +1,48 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+HOSTLIST=$FLINK_SLAVES
+
+if [ "$HOSTLIST" = "" ]; then
+    HOSTLIST="${FLINK_CONF_DIR}/slaves"
+fi
+
+if [ ! -f "$HOSTLIST" ]; then
+    echo $HOSTLIST is not a valid slave list
+    exit 1
+fi
+
+# cluster mode, bring up job manager locally and a task manager on every slave host
+"$FLINK_BIN_DIR"/jobmanager.sh start cluster streaming
+
+GOON=true
+while $GOON
+do
+    read line || GOON=false
+    if [ -n "$line" ]; then
+        HOST=$( extractHostName $line)
+        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start streaming &"
+    fi
+done < "$HOSTLIST"

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index db65032..666edeb 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -43,6 +43,6 @@ do
     read line || GOON=false
     if [ -n "$line" ]; then
         HOST=$( extractHostName $line)
-        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start &"
+        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start batch &"
     fi
 done < "$HOSTLIST"

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
new file mode 100755
index 0000000..2cb4d4a
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# local mode, only bring up job manager. The job manager will start an internal task manager
+"$FLINK_BIN_DIR"/jobmanager.sh start local streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-local.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat b/flink-dist/src/main/flink-bin/bin/start-local.bat
index 386a631..202c7d9 100644
--- a/flink-dist/src/main/flink-bin/bin/start-local.bat
+++ b/flink-dist/src/main/flink-bin/bin/start-local.bat
@@ -57,6 +57,6 @@ if not defined FOUND (
 echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
 echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
 
-java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --executionMode local --configDir "%FLINK_CONF_DIR%" > "%out%" 2>&1
+java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --configDir "%FLINK_CONF_DIR%" --executionMode local --streamingMode batch > "%out%" 2>&1
 
 endlocal

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/start-local.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.sh b/flink-dist/src/main/flink-bin/bin/start-local.sh
index f382763..7ea3ff4 100755
--- a/flink-dist/src/main/flink-bin/bin/start-local.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-local.sh
@@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd`
 . "$bin"/config.sh
 
 # local mode, only bring up job manager. The job manager will start an internal task manager
-"$FLINK_BIN_DIR"/jobmanager.sh start local
+"$FLINK_BIN_DIR"/jobmanager.sh start local batch

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 557ea2b..a99d39d 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -19,6 +19,7 @@
 
 
 STARTSTOP=$1
+STREAMINGMODE=$2
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
@@ -68,7 +69,7 @@ case $STARTSTOP in
         rotateLogFile $out
 
         echo Starting task manager on host $HOSTNAME
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null &
+        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
         echo $! > $pid
 
     ;;
@@ -87,7 +88,7 @@ case $STARTSTOP in
     ;;
 
     (*)
-        echo Please specify start or stop
+        echo "Please specify 'start [batch|streaming]' or 'stop'"
     ;;
 
 esac

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
new file mode 100644
index 0000000..bdcbcf9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+/**
+ * The streaming mode defines whether the system starts in streaming mode,
+ * or in pure batch mode. Note that streaming mode can execute batch programs
+ * as well.
+ */
+public enum StreamingMode {
+	
+	/** This mode indicates the system can run streaming tasks, of which batch
+	 * tasks are a special case. */
+	STREAMING,
+	
+	/** This mode indicates that the system can run only batch tasks */
+	BATCH_ONLY;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
new file mode 100644
index 0000000..988e3a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.StreamingMode;
+
+/**
+ * The command line parameters passed to the TaskManager.
+ */
+public class JobManagerCliOptions {
+
+	private String configDir;
+
+	private JobManagerMode jobManagerMode;
+	
+	private StreamingMode streamingMode = StreamingMode.BATCH_ONLY;
+
+	// ------------------------------------------------------------------------
+
+	public String getConfigDir() {
+		return configDir;
+	}
+
+	public void setConfigDir(String configDir) {
+		this.configDir = configDir;
+	}
+
+	public JobManagerMode getJobManagerMode() {
+		return jobManagerMode;
+	}
+
+	public void setJobManagerMode(String modeName) {
+		if (modeName.equalsIgnoreCase("cluster")) {
+			this.jobManagerMode = JobManagerMode.CLUSTER;
+		}
+		else if (modeName.equalsIgnoreCase("local")) {
+			this.jobManagerMode = JobManagerMode.LOCAL;
+		}
+		else {
+			throw new IllegalArgumentException(
+					"Unknown execution mode. Execution mode must be one of 'cluster' or 'local'.");
+		}
+	}
+
+	public StreamingMode getStreamingMode() {
+		return streamingMode;
+	}
+
+	public void setStreamingMode(String modeName) {
+		if (modeName.equalsIgnoreCase("streaming")) {
+			this.streamingMode = StreamingMode.STREAMING;
+		}
+		else if (modeName.equalsIgnoreCase("batch")) {
+			this.streamingMode = StreamingMode.BATCH_ONLY;
+		}
+		else {
+			throw new IllegalArgumentException(
+					"Unknown streaming mode. Streaming mode must be one of 'BATCH' or 'STREAMING'.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 51ce91f..40198dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -74,7 +74,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 /**
  * The Task represents one execution of a parallel subtask on a TaskManager.
  * A Task wraps a Flink operator (which may be a user function) and
- * runs it, providing all service necessary for example to consume input data,
+ * runs it, providing all services necessary for example to consume input data,
  * produce its results (intermediate result partitions) and communicate
  * with the JobManager.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
new file mode 100644
index 0000000..a648caf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.StreamingMode;
+
+/**
+ * The command line parameters passed to the TaskManager.
+ */
+public class TaskManagerCliOptions {
+
+	private String configDir;
+	
+	private StreamingMode mode = StreamingMode.BATCH_ONLY;
+	
+	// ------------------------------------------------------------------------
+
+	public String getConfigDir() {
+		return configDir;
+	}
+
+	public void setConfigDir(String configDir) {
+		this.configDir = configDir;
+	}
+
+	public StreamingMode getMode() {
+		return mode;
+	}
+
+	public void setMode(String modeName) {
+		if (modeName.equalsIgnoreCase("streaming")) {
+			this.mode = StreamingMode.STREAMING;
+		}
+		else if (modeName.equalsIgnoreCase("batch")) {
+			this.mode = StreamingMode.BATCH_ONLY;
+		}
+		else {
+			throw new IllegalArgumentException("Mode must be one of 'BATCH' or 'STREAMING'.");
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
index 546e142..bcd3dc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
@@ -21,7 +21,7 @@ import org.slf4j.Logger;
 import sun.misc.Signal;
 
 /**
- * This signal handler / signal logger is based on Apache Hadoops org.apache.hadoop.util.SignalLogger.
+ * This signal handler / signal logger is based on Apache Hadoop's org.apache.hadoop.util.SignalLogger.
  */
 public class SignalHandler {
 	private static boolean registered = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0c71938..ba819ca 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
@@ -99,7 +99,8 @@ class JobManager(protected val flinkConfiguration: Configuration,
                  protected val accumulatorManager: AccumulatorManager,
                  protected val defaultExecutionRetries: Int,
                  protected val delayBetweenRetries: Long,
-                 protected val timeout: FiniteDuration)
+                 protected val timeout: FiniteDuration,
+                 protected val mode: StreamingMode)
   extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   /** List of current jobs running jobs */
@@ -759,10 +760,11 @@ object JobManager {
   val STARTUP_FAILURE_RETURN_CODE = 1
   val RUNTIME_FAILURE_RETURN_CODE = 2
 
+  /** Name of the JobManager actor */
   val JOB_MANAGER_NAME = "jobmanager"
-  val EVENT_COLLECTOR_NAME = "eventcollector"
+
+  /** Name of the archive actor */
   val ARCHIVE_NAME = "archive"
-  val PROFILER_NAME = "profiler"
 
 
   /**
@@ -778,6 +780,7 @@ object JobManager {
     // parsing the command line arguments
     val (configuration: Configuration,
          executionMode: JobManagerMode,
+         streamingMode: StreamingMode,
          listeningHost: String, listeningPort: Int) =
     try {
       parseArgs(args)
@@ -814,13 +817,15 @@ object JobManager {
         LOG.info("Security is enabled. Starting secure JobManager.")
         SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
           override def run(): Unit = {
-            runJobManager(configuration, executionMode, listeningHost, listeningPort)
+            runJobManager(configuration, executionMode, streamingMode,
+                          listeningHost, listeningPort)
           }
         })
       }
       else {
         LOG.info("Security is not enabled. Starting non-authenticated JobManager.")
-        runJobManager(configuration, executionMode, listeningHost, listeningPort)
+        runJobManager(configuration, executionMode, streamingMode,
+                      listeningHost, listeningPort)
       }
     }
     catch {
@@ -842,11 +847,13 @@ object JobManager {
    * @param configuration The configuration object for the JobManager.
    * @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an
    *                      an additional TaskManager in the same process.
+   * @param streamingMode The streaming mode to run the system in (streaming vs. batch-only)
    * @param listeningAddress The hostname where the JobManager should listen for messages.
    * @param listeningPort The port where the JobManager should listen for messages.
    */
   def runJobManager(configuration: Configuration,
                     executionMode: JobManagerMode,
+                    streamingMode: StreamingMode,
                     listeningAddress: String,
                     listeningPort: Int) : Unit = {
 
@@ -880,7 +887,8 @@ object JobManager {
     try {
       // bring up the job manager actor
       LOG.info("Starting JobManager actor")
-      val (jobManager, archiver) = startJobManagerActors(configuration, jobManagerSystem)
+      val (jobManager, archiver) = startJobManagerActors(configuration, 
+                                                         jobManagerSystem, streamingMode)
 
       // start a process reaper that watches the JobManager. If the JobManager actor dies,
       // the process reaper will kill the JVM process (to ensure easy failure detection)
@@ -898,7 +906,8 @@ object JobManager {
                         listeningAddress,
                         Some(TaskManager.TASK_MANAGER_NAME),
                         Some(jobManager.path.toString),
-                        true, classOf[TaskManager])
+                        true, streamingMode,
+                        classOf[TaskManager])
 
         LOG.debug("Starting TaskManager process reaper")
         jobManagerSystem.actorOf(
@@ -936,61 +945,59 @@ object JobManager {
    * @param args command line arguments
    * @return Quadruple of configuration, execution mode and an optional listening address
    */
-  def parseArgs(args: Array[String]): (Configuration, JobManagerMode, String, Int) = {
-    val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") {
+  def parseArgs(args: Array[String]): (Configuration, JobManagerMode, StreamingMode, String, Int) = {
+    val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") {
       head("Flink JobManager")
 
-      opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text {
-        "The configuration directory." }
-
-      opt[String]("executionMode") action { (arg, c) =>
-        val argLower = arg.toLowerCase()
-        var result: JobManagerCLIConfiguration = null
-
-        for (mode <- JobManagerMode.values() if result == null) {
-          val modeName = mode.name().toLowerCase()
-
-          if (modeName.equals(argLower)) {
-            result = c.copy(executionMode = mode)
-          }
-        }
+      opt[String]("configDir") action { (arg, conf) => 
+        conf.setConfigDir(arg)
+        conf
+      } text {
+        "The configuration directory."
+      }
 
-        if (result == null) {
-          throw new Exception("Unknown execution mode: " + arg)
-        } else {
-          result
-        }
+      opt[String]("executionMode") action { (arg, conf) =>
+        conf.setJobManagerMode(arg)
+        conf
       } text {
         "The execution mode of the JobManager (CLUSTER / LOCAL)"
       }
-    }
 
-    parser.parse(args, JobManagerCLIConfiguration()) map {
-      config =>
+      opt[String]("streamingMode").optional().action { (arg, conf) =>
+        conf.setStreamingMode(arg)
+        conf
+      } text {
+        "The streaming mode of the JobManager (STREAMING / BATCH)"
+      }
+    }
 
-        if (config.configDir == null) {
-          throw new Exception("Missing parameter '--configDir'")
-        }
-        if (config.executionMode == null) {
-          throw new Exception("Missing parameter '--executionMode'")
-        }
+    val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
+      throw new Exception(
+        s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
+    }
+    
+    val configDir = config.getConfigDir()
+    
+    if (configDir == null) {
+      throw new Exception("Missing parameter '--configDir'")
+    }
+    if (config.getJobManagerMode() == null) {
+      throw new Exception("Missing parameter '--executionMode'")
+    }
 
-        LOG.info("Loading configuration from " + config.configDir)
-        GlobalConfiguration.loadConfiguration(config.configDir)
-        val configuration = GlobalConfiguration.getConfiguration
+    LOG.info("Loading configuration from " + configDir)
+    GlobalConfiguration.loadConfiguration(configDir)
+    val configuration = GlobalConfiguration.getConfiguration()
 
-        if (config.configDir != null && new File(config.configDir).isDirectory) {
-          configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
-        }
+    if (new File(configDir).isDirectory) {
+      configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
+    }
 
-        val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-        val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+    val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+    val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-        (configuration, config.executionMode, hostname, port)
-    } getOrElse {
-      throw new Exception("Invalid command line arguments: " + parser.usage)
-    }
+    (configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port)
   }
 
   /**
@@ -1082,9 +1089,12 @@ object JobManager {
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(configuration: Configuration,
-                            actorSystem: ActorSystem): (ActorRef, ActorRef) = {
+                            actorSystem: ActorSystem,
+                            streamingMode: StreamingMode): (ActorRef, ActorRef) = {
 
-    startJobManagerActors(configuration,actorSystem, Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME))
+    startJobManagerActors(configuration, actorSystem,
+                          Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME),
+                          streamingMode)
   }
   /**
    * Starts the JobManager and job archiver based on the given configuration, in the
@@ -1096,18 +1106,21 @@ object JobManager {
    *                          the actor will have the name generated by the actor system.
    * @param archiverActorName Optionally the name of the archive actor. If none is given,
    *                          the actor will have the name generated by the actor system.
+   * @param streamingMode The mode to run the system in (streaming vs. batch-only)
+   * 
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(configuration: Configuration,
                             actorSystem: ActorSystem,
                             jobMangerActorName: Option[String],
-                            archiverActorName: Option[String]): (ActorRef, ActorRef) = {
+                            archiverActorName: Option[String],
+                            streamingMode: StreamingMode): (ActorRef, ActorRef) = {
 
     val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
       executionRetries, delayBetweenRetries,
       timeout, _) = createJobManagerComponents(configuration)
 
-    // start the archiver wither with the given name, or without (avoid name conflicts)
+    // start the archiver with the given name, or without (avoid name conflicts)
     val archiver: ActorRef = archiverActorName match {
       case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
       case None => actorSystem.actorOf(archiveProps)
@@ -1115,7 +1128,7 @@ object JobManager {
 
     val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler,
         libraryCacheManager, archiver, accumulatorManager, executionRetries,
-        delayBetweenRetries, timeout)
+        delayBetweenRetries, timeout, streamingMode)
 
     val jobManager: ActorRef = jobMangerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
deleted file mode 100644
index 4cd02c5..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-/**
- * Holder for command line parameters of the JobManager.
- *
- * @param configDir The directory to load the configuration from.
- * @param executionMode Mode for the JobManager.
- */
-case class JobManagerCLIConfiguration(configDir: String = null,
-                                      executionMode: JobManagerMode = null) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 8a6c394..edc12a1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -26,6 +26,7 @@ import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
 import org.apache.flink.api.common.JobSubmissionResult
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobExecutionException, JobClient, SerializedJobExecutionResult}
 import org.apache.flink.runtime.jobgraph.JobGraph
@@ -44,10 +45,16 @@ import scala.concurrent.{Future, Await}
  * @param userConfiguration Configuration object with the user provided configuration values
  * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
  *                          [[ActorSystem]], otherwise false
+ * @param streamingMode True, if the system should be started in streaming mode, false if
+ *                      in pure batch mode.
  */
 abstract class FlinkMiniCluster(val userConfiguration: Configuration,
-                                val singleActorSystem: Boolean) {
+                                val singleActorSystem: Boolean,
+                                val streamingMode: StreamingMode) {
 
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
+         = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+  
   protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index e2d7cc1..663307d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,18 +18,19 @@
 
 package org.apache.flink.runtime.minicluster
 
-import akka.actor.{ActorRef, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
+
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
+
 import org.slf4j.LoggerFactory
-import akka.actor.ExtendedActorSystem
 
 /**
  * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
@@ -41,9 +42,20 @@ import akka.actor.ExtendedActorSystem
  * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
  *                          [[ActorSystem]], otherwise false
  */
-class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
-  extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
-
+class LocalFlinkMiniCluster(userConfiguration: Configuration,
+                            singleActorSystem: Boolean,
+                            streamingMode: StreamingMode)
+  extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+
+  
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+       = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+  
+  def this(userConfiguration: Configuration) = this(userConfiguration, true)
+
+  // --------------------------------------------------------------------------
+  
+  
   val jobClientActorSystem = if (singleActorSystem) {
     jobManagerActorSystem
   } else {
@@ -64,7 +76,9 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
 
   override def startJobManager(system: ActorSystem): ActorRef = {
     val config = configuration.clone()
-    val (jobManager, archiver) = JobManager.startJobManagerActors(config, system)
+       
+    val (jobManager, archiver) = JobManager.startJobManagerActors(config, system, streamingMode)
+    
     if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) {
       val webServer = new WebInfoServer(configuration, jobManager, archiver)
       webServer.start()
@@ -103,12 +117,13 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
     } else {
       None
     }
-
+    
     TaskManager.startTaskManagerComponentsAndActor(config, system,
                                                    HOSTNAME, // network interface to bind to
                                                    Some(taskManagerActorName), // actor name
                                                    jobManagerPath, // job manager akka URL
                                                    localExecution, // start network stack?
+                                                   streamingMode,
                                                    classOf[TaskManager])
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a95b5cb..8a45fa4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,7 +37,7 @@ import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage}
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -999,8 +999,8 @@ object TaskManager {
   /** Return code for critical errors during the runtime */
   val RUNTIME_FAILURE_RETURN_CODE = 2
 
+  /** The name of the TaskManager actor */
   val TASK_MANAGER_NAME = "taskmanager"
-  val PROFILER_NAME = "profiler"
 
   /** Maximum time (msecs) that the TaskManager will spend searching for a
     * suitable network interface to use for communication */
@@ -1033,7 +1033,8 @@ object TaskManager {
     EnvironmentInformation.checkJavaVersion()
 
     // try to parse the command line arguments
-    val configuration = try {
+    val (configuration: Configuration,
+         mode: StreamingMode) = try {
       parseArgsAndLoadConfig(args)
     }
     catch {
@@ -1050,13 +1051,13 @@ object TaskManager {
         LOG.info("Security is enabled. Starting secure TaskManager.")
         SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
           override def run(): Unit = {
-            selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
+            selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager])
           }
         })
       }
       else {
         LOG.info("Security is not enabled. Starting non-authenticated TaskManager.")
-        selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
+        selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager])
       }
     }
     catch {
@@ -1074,31 +1075,44 @@ object TaskManager {
    * @return The parsed configuration.
    */
   @throws(classOf[Exception])
-  def parseArgsAndLoadConfig(args: Array[String]): Configuration = {
-
+  def parseArgsAndLoadConfig(args: Array[String]): (Configuration, StreamingMode) = {
+    
     // set up the command line parser
-    val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") {
-      head("flink task manager")
-      opt[String]("configDir") action { (x, c) =>
-        c.copy(configDir = x)
-      } text "Specify configuration directory."
+    val parser = new scopt.OptionParser[TaskManagerCliOptions]("TaskManager") {
+      head("Flink TaskManager")
+      
+      opt[String]("configDir") action { (param, conf) =>
+        conf.setConfigDir(param)
+        conf
+      } text {
+        "Specify configuration directory."
+      }
+
+      opt[String]("streamingMode").optional().action { (param, conf) =>
+        conf.setMode(param)
+        conf
+      } text {
+        "The streaming mode of the JobManager (STREAMING / BATCH)"
+      }
     }
 
     // parse the CLI arguments
-    val cliConfig = parser.parse(args, TaskManagerCLIConfiguration()).getOrElse {
+    val cliConfig = parser.parse(args, new TaskManagerCliOptions()).getOrElse {
       throw new Exception(
         s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
     }
 
     // load the configuration
-    try {
-      LOG.info("Loading configuration from " + cliConfig.configDir)
-      GlobalConfiguration.loadConfiguration(cliConfig.configDir)
+    val conf: Configuration = try {
+      LOG.info("Loading configuration from " + cliConfig.getConfigDir())
+      GlobalConfiguration.loadConfiguration(cliConfig.getConfigDir())
       GlobalConfiguration.getConfiguration()
     }
     catch {
       case e: Exception => throw new Exception("Could not load configuration", e)
     }
+    
+    (conf, cliConfig.getMode)
   }
 
   // --------------------------------------------------------------------------
@@ -1120,11 +1134,13 @@ object TaskManager {
    * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
 
    * @param configuration The configuration for the TaskManager.
+   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param taskManagerClass The actor class to instantiate.
    *                         Allows to use TaskManager subclasses for example for YARN.
    */
   @throws(classOf[Exception])
   def selectNetworkInterfaceAndRunTaskManager(configuration: Configuration,
+                                              streamingMode: StreamingMode,
                                               taskManagerClass: Class[_ <: TaskManager]) : Unit = {
 
     val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
@@ -1132,7 +1148,8 @@ object TaskManager {
     val (taskManagerHostname, actorSystemPort) =
        selectNetworkInterfaceAndPort(configuration, jobManagerHostname, jobManagerPort)
 
-    runTaskManager(taskManagerHostname, actorSystemPort, configuration, taskManagerClass)
+    runTaskManager(taskManagerHostname, actorSystemPort, configuration,
+                   streamingMode, taskManagerClass)
   }
 
   @throws(classOf[IOException])
@@ -1196,14 +1213,17 @@ object TaskManager {
    * @param taskManagerHostname The hostname/address of the interface where the actor system
    *                         will communicate.
    * @param actorSystemPort The port at which the actor system will communicate.
+   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param configuration The configuration for the TaskManager.
    */
   @throws(classOf[Exception])
   def runTaskManager(taskManagerHostname: String,
-                     actorSystemPort: Int,
-                     configuration: Configuration) : Unit = {
+                     actorSystemPort: Int, 
+                     configuration: Configuration,
+                     streamingMode: StreamingMode) : Unit = {
 
-    runTaskManager(taskManagerHostname, actorSystemPort, configuration, classOf[TaskManager])
+    runTaskManager(taskManagerHostname, actorSystemPort, configuration,
+                   streamingMode, classOf[TaskManager])
   }
 
   /**
@@ -1218,6 +1238,7 @@ object TaskManager {
    *                         will communicate.
    * @param actorSystemPort The port at which the actor system will communicate.
    * @param configuration The configuration for the TaskManager.
+   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager
    *                         subclasses for example for YARN.
    */
@@ -1225,6 +1246,7 @@ object TaskManager {
   def runTaskManager(taskManagerHostname: String,
                      actorSystemPort: Int,
                      configuration: Configuration,
+                     streamingMode: StreamingMode,
                      taskManagerClass: Class[_ <: TaskManager]) : Unit = {
 
     LOG.info("Starting TaskManager")
@@ -1264,6 +1286,7 @@ object TaskManager {
                                                            taskManagerHostname,
                                                            Some(TASK_MANAGER_NAME),
                                                            None, false,
+                                                           streamingMode,
                                                            taskManagerClass)
 
       // start a process reaper that watches the JobManager. If the JobManager actor dies,
@@ -1317,6 +1340,7 @@ object TaskManager {
    *                       JobManager hostname an port specified in the configuration.
    * @param localTaskManagerCommunication If true, the TaskManager will not initiate the
    *                                      TCP network stack.
+   * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
    * @param taskManagerClass The class of the TaskManager actor. May be used to give
    *                         subclasses that understand additional actor messages.
    *
@@ -1339,6 +1363,7 @@ object TaskManager {
                                          taskManagerActorName: Option[String],
                                          jobManagerPath: Option[String],
                                          localTaskManagerCommunication: Boolean,
+                                         streamingMode: StreamingMode,
                                          taskManagerClass: Class[_ <: TaskManager]): ActorRef = {
 
     // get and check the JobManager config
@@ -1391,13 +1416,15 @@ object TaskManager {
 
       relativeMemSize
     }
+    
+    val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY
 
     // now start the memory manager
     val memoryManager = try {
       new DefaultMemoryManager(memorySize,
                                taskManagerConfig.numberOfSlots,
                                netConfig.networkBufferSize,
-                               true)
+                               preAllocateMemory)
     }
     catch {
       case e: OutOfMemoryError => throw new Exception(

http://git-wip-us.apache.org/repos/asf/flink/blob/efec2297/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
deleted file mode 100644
index 5c71f5e..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-/**
- * Command line configuration object for the [[TaskManager]]
- *
- * @param configDir Path to configuration directory
- */
-case class TaskManagerCLIConfiguration(configDir: String = null)


Mime
View raw message