flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-1580] [FLINK-1590] [runtime] Various cleanups and improvements in the TaskManager initialization
Date Tue, 24 Feb 2015 21:09:59 GMT
[FLINK-1580] [FLINK-1590] [runtime] Various cleanups and improvements in the TaskManager initialization

 - Better checks during TaskManager startup
 - More robust initialization of TaskManager actor system and actor
 - Fix memory accounting during TaskManager startup
 - Better logging for TaskManagers started through YARN
 - Remove command line parameter hacking fro YARN TaskManagers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed8b26bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed8b26bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed8b26bf

Branch: refs/heads/master
Commit: ed8b26bf2e8dd7c187c24ad0d8ff3e67f6a7478c
Parents: 4883af6
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Feb 24 20:23:47 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Feb 24 20:30:29 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/net/NetUtils.java  |   2 +-
 .../runtime/util/EnvironmentInformation.java    |  98 ++-
 .../apache/flink/runtime/util/MathUtils.java    |  10 +
 .../flink/runtime/jobmanager/JobManager.scala   |  50 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  57 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  56 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 644 +++++++++++++------
 .../TaskManagerCLIConfiguration.scala           |   3 +-
 .../taskmanager/TaskManagerConfiguration.scala  |   7 +-
 .../taskmanager/TaskManagerProfiler.scala       |   4 +-
 .../TaskManagerProcessReapingTest.java          |   6 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   5 +-
 .../apache/flink/runtime/util/MathUtilTest.java |  34 +-
 .../TaskManagerRegistrationITCase.scala         |  10 +-
 .../runtime/testingUtils/TestingCluster.scala   |  18 +-
 .../testingUtils/TestingTaskManager.scala       |  25 +-
 .../runtime/testingUtils/TestingUtils.scala     |  38 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  32 +-
 .../yarn/appMaster/YarnTaskManagerRunner.java   |  63 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |  18 +-
 .../scala/org/apache/flink/yarn/YarnUtils.scala |  45 --
 21 files changed, 723 insertions(+), 502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 73504e9..8e0a41a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -107,7 +107,7 @@ public class NetUtils {
 							break;
 							
 						default:
-							throw new RuntimeException("Unkown address detection strategy: " + strategy);
+							throw new RuntimeException("Unknown address detection strategy: " + strategy);
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 535c756..d2147e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -36,12 +36,6 @@ public class EnvironmentInformation {
 	private static final Logger LOG = LoggerFactory.getLogger(EnvironmentInformation.class);
 
 	private static final String UNKNOWN = "<unknown>";
-	
-	private static final String[] IGNORED_STARTUP_OPTIONS = {
-													"-Dlog.file",
-													"-Dlogback.configurationFile",
-													"-Dlog4j.configuration"
-												};
 
 	/**
 	 * Returns the version of the code as String. If version == null, then the JobManager does not run from a
@@ -55,7 +49,7 @@ public class EnvironmentInformation {
 	}
 
 	/**
-	 * Returns the code revision (commit and commit date) of Flink.
+	 * Returns the code revision (commit and commit date) of Flink, as generated by the Maven builds.
 	 * 
 	 * @return The code revision.
 	 */
@@ -83,11 +77,6 @@ public class EnvironmentInformation {
 		return info;
 	}
 
-	public static class RevisionInformation {
-		public String commitId;
-		public String commitDate;
-	}
-
 	/**
 	 * Gets the name of the user that is running the JVM.
 	 * 
@@ -149,8 +138,12 @@ public class EnvironmentInformation {
 		Runtime r = Runtime.getRuntime();
 		return r.maxMemory() - r.totalMemory() + r.freeMemory();
 	}
-	
-	
+
+	/**
+	 * Gets the version of the JVM in the form "VM_Name - Vendor  - Spec/Version".
+	 *
+	 * @return The JVM version.
+	 */
 	public static String getJvmVersion() {
 		try {
 			final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
@@ -161,24 +154,18 @@ public class EnvironmentInformation {
 		}
 	}
 
+	/**
+	 * Gets the system parameters and environment parameters that were passed to the JVM on startup.
+	 *
+	 * @return The options passed to the JVM on startup.
+	 */
 	public static String getJvmStartupOptions() {
 		try {
 			final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
 			final StringBuilder bld = new StringBuilder();
 			
 			for (String s : bean.getInputArguments()) {
-				
-				boolean append = true;
-				for (String ignored : IGNORED_STARTUP_OPTIONS) {
-					if (s.startsWith(ignored)) {
-						append = false;
-						break;
-					}
-				}
-				
-				if (append) {
-					bld.append(s).append(' ');
-				}
+				bld.append(s).append(' ');
 			}
 
 			return bld.toString();
@@ -187,11 +174,23 @@ public class EnvironmentInformation {
 			return UNKNOWN;
 		}
 	}
-	
+
+	/**
+	 * Gets the directory for temporary files, as returned by the JVM system property "java.io.tmpdir".
+	 *
+	 * @return The directory for temporary files.
+	 */
 	public static String getTemporaryFileDirectory() {
 		return System.getProperty("java.io.tmpdir");
 	}
 
+	/**
+	 * Logs a information about the environment, like code revision, current user, java version,
+	 * and JVM parameters.
+	 *
+	 * @param log The logger to log the information to.
+	 * @param componentName The component name to mention in the log.
+	 */
 	public static void logEnvironmentInfo(Logger log, String componentName) {
 		if (log.isInfoEnabled()) {
 			RevisionInformation rev = getRevisionInformation();
@@ -206,7 +205,7 @@ public class EnvironmentInformation {
 			
 			long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20;
 			
-			log.info("-------------------------------------------------------");
+			log.info("--------------------------------------------------------------------------------");
 			log.info(" Starting " + componentName + " (Version: " + version + ", "
 					+ "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
 			log.info(" Current user: " + user);
@@ -214,15 +213,46 @@ public class EnvironmentInformation {
 			log.info(" Startup Options: " + options);
 			log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
 			log.info(" JAVA_HOME: " + (javaHome == null ? "not set" : javaHome));
-			log.info("-------------------------------------------------------");
+			log.info("--------------------------------------------------------------------------------");
+		}
+	}
+
+	/**
+	 * Checks whether the Java version is lower than Java 7 (Java 1.7) and
+	 * prints a warning message in that case.
+	 */
+	public static void checkJavaVersion() {
+		try {
+			String versionString = System.getProperty("java.version").substring(0, 3);
+			double versionDouble = Double.parseDouble(versionString);
+			if (versionDouble < 1.7) {
+				LOG.warn("Flink has been started with Java 6. " +
+						"Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
+						"Flink may drop support for Java 6 in future releases, due to the " +
+						"unavailability of bug fixes security patches.");
+			}
+		}
+		catch (Exception e) {
+			LOG.warn("Could not parse java version for startup checks");
+			LOG.debug("Exception when parsing java version", e);
 		}
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
+	/** Don't instantiate this class */
 	private EnvironmentInformation() {}
-	
-	public static void main(String[] args) {
-		logEnvironmentInfo(LOG, "Test");
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Revision information encapsulates information about the source code revision of the Flink
+	 * code.
+	 */
+	public static class RevisionInformation {
+		/** The git commit id (hash) */
+		public String commitId;
+		/** The git commit date */
+		public String commitDate;
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
index c8ab69c..ed3cdc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
@@ -99,6 +99,16 @@ public final class MathUtils {
 		}
 		return (int) value;
 	}
+
+	/**
+	 * Checks whether the given value is a power of two.
+	 *
+	 * @param value The value to check.
+	 * @return True, if the value is a power of two, false otherwise.
+	 */
+	public static boolean isPowerOf2(long value) {
+		return (value & (value - 1)) == 0;
+	}
 	
 	// ============================================================================================
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2671f2d..0630115 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -609,7 +609,7 @@ object JobManager {
 
     // startup checks and logging
     EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
-    checkJavaVersion()
+    EnvironmentInformation.checkJavaVersion()
 
     // parsing the command line arguments
     val (configuration: Configuration,
@@ -651,7 +651,7 @@ object JobManager {
     }
     catch {
       case t: Throwable => {
-        LOG.error("Failed to start JobManager.", t)
+        LOG.error("Failed to run JobManager.", t)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
       }
     }
@@ -711,10 +711,10 @@ object JobManager {
 
       // bring up a local task manager, if needed
       if (executionMode.equals(LOCAL)) {
-        LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution")
+        LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
 
-        TaskManager.startActorWithConfiguration("", TaskManager.TASK_MANAGER_NAME, configuration,
-          localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem)
+        TaskManager.startTaskManagerActor(configuration, jobManagerSystem, listeningAddress,
+          TaskManager.TASK_MANAGER_NAME, true, true, classOf[TaskManager])
       }
 
       // start the job manager web frontend
@@ -746,10 +746,10 @@ object JobManager {
    * @return Quadruple of configuration, execution mode and an optional listening address
    */
   def parseArgs(args: Array[String]): (Configuration, ExecutionMode, String, Int) = {
-    val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") {
-      head("flink jobmanager")
+    val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") {
+      head("Flink JobManager")
       opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text ("Specify " +
-        "configuration directory.")
+        "the configuration directory.")
       opt[String]("executionMode") optional() action { (arg, c) =>
         if(arg.equals("local")){
           c.copy(executionMode = LOCAL)
@@ -757,7 +757,7 @@ object JobManager {
           c.copy(executionMode = CLUSTER)
         }
       } text {
-        "Specify execution mode of job manager"
+        "Specify the execution mode of the JobManager (CLUSTER / LOCAL)"
       }
     }
 
@@ -777,7 +777,7 @@ object JobManager {
 
         (configuration, config.executionMode, hostname, port)
     } getOrElse {
-      throw new Exception("Wrong arguments. Usage: " + parser.usage)
+      throw new Exception("Invalid command line arguments. Usage: " + parser.usage)
     }
   }
 
@@ -986,34 +986,4 @@ object JobManager {
     val timeout = AkkaUtils.getLookupTimeout(config)
     getJobManagerRemoteReference(address, system, timeout)
   }
-
-
-
-  // --------------------------------------------------------------------------
-  //  Miscellaneous Utils
-  // --------------------------------------------------------------------------
-
-  /**
-   * Checks whether the Java version is lower than Java 7 (Java 1.7) and
-   * prints a warning message in that case.
-   */
-  private def checkJavaVersion(): Unit = {
-    try {
-      if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
-        LOG.warn("Flink has been started with Java 6. " +
-          "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
-          "Flink may drop support for Java 6 in future releases, due to the " +
-          "unavailability of bug fixes security patches.")
-      }
-    }
-    catch {
-      case e: Exception => 
-        LOG.warn("Could not parse java version for startup checks")
-        LOG.debug("Exception when parsing java version", e)
-    }
-  }
-
-  // --------------------------------------------------------------------------
-
-  class ParseException(message: String) extends Exception(message) {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 3679f02..0e29345 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -41,58 +41,67 @@ import scala.concurrent.{Future, Await}
  * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
  *                          [[ActorSystem]], otherwise false
  */
-abstract class FlinkMiniCluster(userConfiguration: Configuration,
+abstract class FlinkMiniCluster(val userConfiguration: Configuration,
                                 val singleActorSystem: Boolean) {
-  import FlinkMiniCluster._
+
+  protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
+
+  // --------------------------------------------------------------------------
+  //                           Construction
+  // --------------------------------------------------------------------------
 
   // NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
   // not getLocalHost(), which may be 127.0.1.1
   val HOSTNAME = InetAddress.getByName("localhost").getHostAddress()
 
-  implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
+  val timeout = AkkaUtils.getTimeout(userConfiguration)
 
   val configuration = generateConfiguration(userConfiguration)
 
   var jobManagerActorSystem = startJobManagerActorSystem()
   var jobManagerActor = startJobManager(jobManagerActorSystem)
 
-  val numTaskManagers = configuration.getInteger(ConfigConstants
-    .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
-
-  val actorSystemsTaskManagers = for(i <- 0 until numTaskManagers) yield {
-    val actorSystem = if(singleActorSystem) {
-      jobManagerActorSystem
-    } else {
-      startTaskManagerActorSystem(i)
-    }
+  val numTaskManagers = configuration.getInteger(
+     ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
 
-    (actorSystem, startTaskManager(i)(actorSystem))
-  }
+  var (taskManagerActorSystems, taskManagerActors) =
+    (for(i <- 0 until numTaskManagers) yield {
+      val actorSystem = if(singleActorSystem) {
+        jobManagerActorSystem
+      } else {
+        startTaskManagerActorSystem(i)
+      }
 
-  var (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip
+      (actorSystem, startTaskManager(i, actorSystem))
+    }).unzip
 
   waitForTaskManagersToBeRegistered()
 
+
+  // --------------------------------------------------------------------------
+  //                           Construction
+  // --------------------------------------------------------------------------
+
   def generateConfiguration(userConfiguration: Configuration): Configuration
 
   def startJobManager(system: ActorSystem): ActorRef
 
-  def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef
+  def startTaskManager(index: Int, system: ActorSystem): ActorRef
 
   def getJobManagerAkkaConfig: Config = {
-    val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
-    if(singleActorSystem){
+    if (singleActorSystem) {
       AkkaUtils.getAkkaConfig(configuration, None)
-    }else{
+    }
+    else {
+      val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+        ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+
       AkkaUtils.getAkkaConfig(configuration, Some((HOSTNAME, port)))
     }
   }
 
   def startJobManagerActorSystem(): ActorSystem = {
     val config = getJobManagerAkkaConfig
-
     AkkaUtils.createActorSystem(config)
   }
 
@@ -170,7 +179,3 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration,
     Await.ready(Future.sequence(futures), timeout)
   }
 }
-
-object FlinkMiniCluster{
-  val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 66ff4f3..37e41e2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -40,10 +40,9 @@ import org.slf4j.LoggerFactory
  *                          [[ActorSystem]], otherwise false
  */
 class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
-  extends FlinkMiniCluster(userConfiguration, singleActorSystem){
-  import LocalFlinkMiniCluster._
+  extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
 
-  val jobClientActorSystem = if(singleActorSystem){
+  val jobClientActorSystem = if (singleActorSystem) {
     jobManagerActorSystem
   } else {
     // create an actor system listening on a random port
@@ -52,25 +51,24 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
 
   var jobClient: Option[ActorRef] = None
 
+
+
   override def generateConfiguration(userConfiguration: Configuration): Configuration = {
     val config = getDefaultConfig
 
     config.addAll(userConfiguration)
-
     setMemory(config)
-
     initializeIOFormatClasses(config)
 
     config
   }
 
   override def startJobManager(system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-    val (jobManager, _) = JobManager.startJobManagerActors(config, system)
+    val (jobManager, _) = JobManager.startJobManagerActors(configuration, system)
     jobManager
   }
 
-  override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
+  override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
     val config = configuration.clone()
 
     val rpcPort = config.getInteger(
@@ -81,26 +79,23 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
       ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
 
-    if(rpcPort > 0){
+    if (rpcPort > 0) {
       config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
     }
-    if(dataPort > 0){
+    if (dataPort > 0) {
       config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
     }
 
     val localExecution = numTaskManagers == 1
 
-    val taskManagerName = if(singleActorSystem) {
+    val taskManagerActorName = if (singleActorSystem) {
       TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
     } else {
       TaskManager.TASK_MANAGER_NAME
     }
 
-    TaskManager.startActorWithConfiguration(HOSTNAME,
-      taskManagerName,
-      config,
-      singleActorSystem,
-      localExecution)(system)
+    TaskManager.startTaskManagerActor(config, system, HOSTNAME, taskManagerActorName,
+                                      singleActorSystem, localExecution, classOf[TaskManager])
   }
 
   def getJobClient(): ActorRef ={
@@ -177,7 +172,7 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
       memorySize /= numTaskManager + 1 // the +1 is the job manager
 
       // for each TaskManager, subtract the memory needed for memory buffers
-      memorySize -= bufferMem;
+      memorySize -= bufferMem
       memorySize = (memorySize * memoryFraction).toLong
       memorySize >>>= 20  // bytes to megabytes
       config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
@@ -189,31 +184,8 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
 
     config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
 
-    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
-    config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
-
-    config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
-
-    config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION)
-
-    config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
-      ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE)
-
-    config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
-      ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY)
-
-    config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1)
-
     config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
 
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-      ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS)
-
     // Reduce number of threads for local execution
     config.setInteger(NettyConfig.NUM_THREADS_CLIENT, 1)
     config.setInteger(NettyConfig.NUM_THREADS_SERVER, 2)
@@ -221,7 +193,3 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
     config
   }
 }
-
-object LocalFlinkMiniCluster{
-  val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 52a16bd..74c607e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -47,23 +47,23 @@ import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
-import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
-RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
 import org.apache.flink.runtime.messages.TaskManagerMessages._
-import org.apache.flink.runtime.messages.TaskManagerProfilerMessages
-.{UnregisterProfilingListener, UnmonitorTask, MonitorTask, RegisterProfilingListener}
+import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{UnregisterProfilingListener, UnmonitorTask, MonitorTask, RegisterProfilingListener}
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.profiling.ProfilingUtils
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
-import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.util.{MathUtils, EnvironmentInformation}
 import org.apache.flink.util.ExceptionUtils
 import org.slf4j.LoggerFactory
 
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
+import scala.collection.JavaConverters._
+
 import scala.language.postfixOps
 
 /**
@@ -87,7 +87,8 @@ import scala.language.postfixOps
  *
  * - ...
  */
-class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkkaURL: String,
+class TaskManager(val connectionInfo: InstanceConnectionInfo,
+                  val jobManagerAkkaURL: String,
                   val taskManagerConfig: TaskManagerConfiguration,
                   val networkConfig: NetworkEnvironmentConfiguration)
   extends Actor with ActorLogMessages with ActorLogging {
@@ -95,7 +96,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
   import context._
   import taskManagerConfig.{timeout => tmTimeout, _}
 
-import scala.collection.JavaConverters._
+
 
   implicit val timeout = tmTimeout
 
@@ -109,7 +110,6 @@ import scala.collection.JavaConverters._
   var registrationDuration = 0 seconds
   var registrationAttempts: Int = 0
 
-  TaskManager.checkTempDirs(tmpDirPaths)
   val ioManager = new IOManagerAsync(tmpDirPaths)
   val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize)
   val bcVarManager = new BroadcastVariableManager()
@@ -123,7 +123,7 @@ import scala.collection.JavaConverters._
   val profiler = profilingInterval match {
     case Some(interval) =>
       log.info("Profiling of jobs is enabled.")
-      Some(TaskManager.startProfiler(self.path.toSerializationFormat, interval))
+      Some(TaskManager.startProfiler(self.path.toSerializationFormat, interval, context.system))
     case None =>
       log.info("Profiling of jobs is disabled.")
       None
@@ -140,11 +140,6 @@ import scala.collection.JavaConverters._
   var instanceID: InstanceID = null
   var heartbeatScheduler: Option[Cancellable] = None
 
-  memoryLogggingIntervalMs.foreach {
-    interval =>
-      val d = FiniteDuration(interval, TimeUnit.MILLISECONDS)
-      context.system.scheduler.schedule(d, d, self, LogMemoryUsage)
-  }
 
   override def preStart(): Unit = {
     tryJobManagerRegistration()
@@ -717,7 +712,7 @@ import scala.collection.JavaConverters._
 
 /**
  * TaskManager companion object. Contains TaskManager executable entry point, command
- * line parsing, and constants.
+ * line parsing, constants, and setup methods for the TaskManager.
  */
 object TaskManager {
 
@@ -734,296 +729,522 @@ object TaskManager {
   val MAX_REGISTRATION_ATTEMPTS = 10
   val HEARTBEAT_INTERVAL = 5000 millisecond
 
+
+  // --------------------------------------------------------------------------
+  //  TaskManager standalone entry point
+  // --------------------------------------------------------------------------
+
+  /**
+   * Entry point (main method) to run the TaskManager in a standalone fashion.
+   *
+   * @param args The command line arguments.
+   */
   def main(args: Array[String]): Unit = {
+    // startup checks and logging
     EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager")
-    val (hostname, port, configuration) = parseArgs(args)
+    EnvironmentInformation.checkJavaVersion()
 
-    if(SecurityUtils.isSecurityEnabled) {
-      LOG.info("Security is enabled. Starting secure TaskManager.")
-      SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
-        override def run(): Unit = {
-          startActor(hostname, port, configuration, TaskManager.TASK_MANAGER_NAME)
-        }
-      })
-    } else {
-      startActor(hostname, port, configuration, TaskManager.TASK_MANAGER_NAME)
+    // try to parse the command line arguments
+    val configuration = try {
+      parseArgsAndLoadConfig(args)
+    }
+    catch {
+      case t: Throwable => {
+        LOG.error(t.getMessage(), t)
+        System.exit(STARTUP_FAILURE_RETURN_CODE)
+        null
+      }
     }
-  }
-
-  def startActor(hostname: String, port: Int, configuration: Configuration,
-                 taskManagerName: String) : Unit = {
-
-    val (taskManagerSystem, taskManager) = startActorSystemAndActor(hostname, port, configuration,
-      taskManagerName, localAkkaCommunication = false, localTaskManagerCommunication = false)
-
-    // start a process reaper that watches the JobManager. If the JobManager actor dies,
-    // the process reaper will kill the JVM process (to ensure easy failure detection)
-    taskManagerSystem.actorOf(
-      Props(classOf[ProcessReaper], taskManager, LOG, RUNTIME_FAILURE_RETURN_CODE),
-      "TaskManager_Process_Reaper")
 
-    // block until everything is done
-    taskManagerSystem.awaitTermination()
+    // run the TaskManager (is requested in an authentication enabled context)
+    try {
+      if (SecurityUtils.isSecurityEnabled) {
+        LOG.info("Security is enabled. Starting secure TaskManager.")
+        SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
+          override def run(): Unit = {
+            runTaskManager(configuration, classOf[TaskManager])
+          }
+        })
+      }
+      else {
+        LOG.info("Security is not enabled. Starting non-authenticated TaskManager.")
+        runTaskManager(configuration, classOf[TaskManager])
+      }
+    }
+    catch {
+      case t: Throwable => {
+        LOG.error("Failed to run TaskManager.", t)
+        System.exit(STARTUP_FAILURE_RETURN_CODE)
+      }
+    }
   }
 
   /**
-   * Parse the command line arguments of the [[TaskManager]]. The method loads the configuration,
-   * extracts the hostname and port on which the actor system shall listen.
+   * Parse the command line arguments of the [[TaskManager]] and loads the configuration.
    *
    * @param args Command line arguments
-   * @return Tuple of (hostname, port, configuration)
+   * @return The parsed configuration.
    */
-  def parseArgs(args: Array[String]): (String, Int, Configuration) = {
+  @throws(classOf[Exception])
+  def parseArgsAndLoadConfig(args: Array[String]): Configuration = {
+
+    // set up the command line parser
     val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") {
       head("flink task manager")
       opt[String]("configDir") action { (x, c) =>
         c.copy(configDir = x)
       } text "Specify configuration directory."
+    }
 
-      opt[String]("tempDir") optional() action { (x, c) =>
-        c.copy(tmpDir = x)
-      } text "Specify temporary directory."
+    // parse the CLI arguments
+    val cliConfig = parser.parse(args, TaskManagerCLIConfiguration()).getOrElse {
+      throw new Exception(
+        s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
     }
 
+    // load the configuration
+    try {
+      GlobalConfiguration.loadConfiguration(cliConfig.configDir)
+      GlobalConfiguration.getConfiguration()
+    }
+    catch {
+      case e: Exception => throw new Exception("Could not load configuration", e)
+    }
+  }
 
-    parser.parse(args, TaskManagerCLIConfiguration()) map {
-      config =>
-        GlobalConfiguration.loadConfiguration(config.configDir)
+  // --------------------------------------------------------------------------
+  //  Starting and running the TaskManager
+  // --------------------------------------------------------------------------
 
-        val configuration = GlobalConfiguration.getConfiguration
+  /**
+   * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its
+   * actors, starts the TaskManager's services (library cache, shuffle network stack, ...),
+   * and starts the TaskManager itself.
 
-        if (config.tmpDir != null && GlobalConfiguration.getString(ConfigConstants
-          .TASK_MANAGER_TMP_DIR_KEY,
-          null) == null) {
-          configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, config.tmpDir)
-        }
+   * @param configuration The configuration for the TaskManager.
+   * @param taskManagerClass The actor class to instantiate. Allows to use TaskManager subclasses
+   *                         for example for YARN.
+   */
+  @throws(classOf[Exception])
+  def runTaskManager(configuration: Configuration,
+                     taskManagerClass: Class[_ <: TaskManager]) : Unit = {
+
+    val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
+
+    // try to find out the hostname of the interface from which the TaskManager
+    // can connect to the JobManager. This involves a reverse name lookup
+    LOG.info("Trying to determine network interface and address/hostname to use")
+    val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
+    val taskManagerHostname = try {
+      NetUtils.resolveAddress(jobManagerAddress).getHostName()
+    }
+    catch {
+      case t: Throwable => throw new Exception("TaskManager cannot find a network interface " +
+        "that can communicate with the JobManager (" + jobManagerAddress + ")", t)
+    }
 
-        val jobManagerHostname = configuration.getString(
-          ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+    LOG.info("TaskManager will use hostname/address '{}' for communication.", taskManagerHostname)
 
-        val jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+    // if no task manager port has been configured, use 0 (system will pick any free port)
+    val actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0)
+    if (actorSystemPort < 0) {
+      throw new Exception("Invalid value for '" + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY  +
+        "' (port for the TaskManager actor system) : " + actorSystemPort +
+        " - Leave config parameter empty or use 0 to let the system choose a port automatically.")
+    }
 
-        val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
+    runTaskManager(taskManagerHostname, actorSystemPort, configuration, classOf[TaskManager])
+  }
 
-        val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0)
-        // try to find out the TaskManager's own hostname by connecting to jobManagerAddress
-        val hostname = NetUtils.resolveAddress(jobManagerAddress).getHostName
+  /**
+   * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its
+   * actors, starts the TaskManager's services (library cache, shuffle network stack, ...),
+   * and starts the TaskManager itself.
+   *
+   * This method will also spawn a process reaper for the TaskManager (kill the process if
+   * the actor fails) and optionally start the JVM memory logging thread.
+   *
+   * @param taskManagerHostname The hostname/address of the interface where the actor system
+   *                         will communicate.
+   * @param actorSystemPort The port at which the actor system will communicate.
+   * @param configuration The configuration for the TaskManager.
+   */
+  @throws(classOf[Exception])
+  def runTaskManager(taskManagerHostname: String,
+                     actorSystemPort: Int,
+                     configuration: Configuration) : Unit = {
 
-        (hostname, port, configuration)
-    } getOrElse {
-      LOG.error(s"TaskManager parseArgs called with ${args.mkString(" ")}.")
-      LOG.error("CLI parsing failed. Usage: " + parser.usage)
-      sys.exit(STARTUP_FAILURE_RETURN_CODE)
+    runTaskManager(taskManagerHostname, actorSystemPort, configuration, classOf[TaskManager])
+  }
+
+  /**
+   * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its
+   * actors, starts the TaskManager's services (library cache, shuffle network stack, ...),
+   * and starts the TaskManager itself.
+   *
+   * This method will also spawn a process reaper for the TaskManager (kill the process if
+   * the actor fails) and optionally start the JVM memory logging thread.
+   *
+   * @param taskManagerHostname The hostname/address of the interface where the actor system
+   *                         will communicate.
+   * @param actorSystemPort The port at which the actor system will communicate.
+   * @param configuration The configuration for the TaskManager.
+   * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager
+   *                         subclasses for example for YARN.
+   */
+  @throws(classOf[Exception])
+  def runTaskManager(taskManagerHostname: String,
+                     actorSystemPort: Int,
+                     configuration: Configuration,
+                     taskManagerClass: Class[_ <: TaskManager]) : Unit = {
+
+    LOG.info("Starting TaskManager")
+
+    // Bring up the TaskManager actor system first, bind it to the given address.
+    LOG.info("Starting TaskManager actor system")
+
+    val taskManagerSystem = try {
+      AkkaUtils.createActorSystem(configuration, Some((taskManagerHostname, actorSystemPort)))
+    }
+    catch {
+      case t: Throwable => {
+        if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
+          val cause = t.getCause()
+          if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) {
+            val address = taskManagerHostname + ":" + actorSystemPort
+            throw new Exception("Unable to bind TaskManager actor system to address " +
+              address + " - " + cause.getMessage(), t)
+          }
+        }
+        throw new Exception("Could not create TaskManager actor system", t)
+      }
+    }
+
+    // start all the TaskManager services (network stack,  library cache, ...)
+    // and the TaskManager actor
+    try {
+      LOG.info("Starting TaskManager actor")
+      val taskManager = startTaskManagerActor(configuration, taskManagerSystem, taskManagerHostname,
+        TASK_MANAGER_NAME, false, false, taskManagerClass)
+
+      // start a process reaper that watches the JobManager. If the JobManager actor dies,
+      // the process reaper will kill the JVM process (to ensure easy failure detection)
+      LOG.debug("Starting TaskManager process reaper")
+      taskManagerSystem.actorOf(
+        Props(classOf[ProcessReaper], taskManager, LOG, RUNTIME_FAILURE_RETURN_CODE),
+        "TaskManager_Process_Reaper")
+
+      // if desired, start the logging daemon that periodically logs the
+      // memory usage information
+      if (LOG.isInfoEnabled && configuration.getBoolean(
+        ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
+        ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
+        LOG.info("Starting periodic memory usage logger")
+
+        val interval = configuration.getLong(
+          ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
+          ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS)
+
+        val logger = new Thread("Memory Usage Logger") {
+          override def run(): Unit = {
+            try {
+              val memoryMXBean = ManagementFactory.getMemoryMXBean
+              val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans.asScala
+
+              while (!taskManagerSystem.isTerminated) {
+                Thread.sleep(interval)
+                LOG.info(getMemoryUsageStatsAsString(memoryMXBean))
+                LOG.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans))
+              }
+            }
+            catch {
+              case t: Throwable => LOG.error("Memory usage logging thread died", t)
+            }
+          }
+        }
+        logger.setDaemon(true)
+        logger.start()
+      }
+
+      // block until everything is done
+      taskManagerSystem.awaitTermination()
+    }
+    catch {
+      case t: Throwable => {
+        LOG.error("Error while starting up taskManager", t)
+        try {
+          taskManagerSystem.shutdown()
+        } catch {
+          case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt)
+        }
+        throw t
+      }
     }
   }
 
-  def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration,
-                               taskManagerName: String,
-                               localAkkaCommunication: Boolean,
-                               localTaskManagerCommunication: Boolean): (ActorSystem, ActorRef) = {
-    implicit val actorSystem = AkkaUtils.createActorSystem(configuration, Some((hostname, port)))
+  @throws(classOf[Exception])
+  def startTaskManagerActor(configuration: Configuration,
+                            actorSystem: ActorSystem,
+                            taskManagerHostname: String,
+                            taskManagerActorName: String,
+                            localAkkaCommunication: Boolean,
+                            localTaskManagerCommunication: Boolean,
+                            taskManagerClass: Class[_ <: TaskManager]): ActorRef = {
 
-    val (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig) =
-      parseConfiguration(hostname, configuration, localAkkaCommunication,
-        localTaskManagerCommunication)
+    val (tmConfig, netConfig, connectionInfo, jmAkkaURL) =  parseTaskManagerConfiguration(
+      configuration, taskManagerHostname, localAkkaCommunication, localTaskManagerCommunication)
+
+    val tmProps = Props(taskManagerClass, connectionInfo, jmAkkaURL, tmConfig, netConfig)
+    actorSystem.actorOf(tmProps, taskManagerActorName)
+  }
+
+  /**
+   * Starts the profiler actor.
+   *
+   * @param instanceActorPath The actor path of the taskManager that is profiled.
+   * @param reportInterval The interval in which the profiler runs.
+   * @param actorSystem The actor system for the profiler actor
+   * @return The profiler actor ref.
+   */
+  private def startProfiler(instanceActorPath: String,
+                            reportInterval: Long,
+                            actorSystem: ActorSystem): ActorRef = {
+
+    val profilerProps = Props(classOf[TaskManagerProfiler], instanceActorPath, reportInterval)
+    actorSystem.actorOf(profilerProps, PROFILER_NAME)
+  }
+
+  // --------------------------------------------------------------------------
+  //  Resolving the TaskManager actor
+  // --------------------------------------------------------------------------
+
+  /**
+   * Resolves the TaskManager actor reference in a blocking fashion.
+   *
+   * @param taskManagerUrl The akka URL of the JobManager.
+   * @param system The local actor system that should perform the lookup.
+   * @param timeout The maximum time to wait until the lookup fails.
+   * @throws java.io.IOException Thrown, if the lookup fails.
+   * @return The ActorRef to the TaskManager
+   */
+  @throws(classOf[IOException])
+  def getTaskManagerRemoteReference(taskManagerUrl: String,
+                                   system: ActorSystem,
+                                   timeout: FiniteDuration): ActorRef = {
+    try {
+      val future = AkkaUtils.getReference(taskManagerUrl, system, timeout)
+      Await.result(future, timeout)
+    }
+    catch {
+      case e @ (_ : ActorNotFound | _ : TimeoutException) =>
+        throw new IOException(
+          s"TaskManager at $taskManagerUrl not reachable. " +
+            s"Please make sure that the TaskManager is running and its port is reachable.", e)
 
-    (actorSystem, startActor(taskManagerName, connectionInfo, jobManagerURL, taskManagerConfig,
-      networkConfig))
+      case e: IOException =>
+        throw new IOException("Could not connect to TaskManager at " + taskManagerUrl, e)
+    }
   }
 
+  // --------------------------------------------------------------------------
+  //  Miscellaneous Utilities
+  // --------------------------------------------------------------------------
+
   /**
-   * Extracts from the configuration the TaskManager's settings. Returns the TaskManager's
-   * connection information, the JobManager's Akka URL, the task manager configuration and the
-   * network connection configuration.
+   * Utility method to extract TaskManager config parameters from the configuration and to
+   * sanity check them.
    *
-   * @param hostname Hostname of the instance on which the TaskManager runs
-   * @param configuration Configuration instance containing the user provided configuration values
-   * @param localAkkaCommunication true if the TaskManager runs in the same [[ActorSystem]] as the
-   *                               JobManager, otherwise false
-   * @param localTaskManagerCommunication true if all TaskManager run in the same JVM, otherwise
-   *                                      false
-   * @return Tuple of (TaskManager's connection information, JobManager's Akka URL, TaskManager's
-   *         configuration, network connection configuration)
+   * @param configuration The configuration.
+   * @param taskManagerHostname The host name under which the TaskManager communicates.
+   * @param localAkkaCommunication True, if the TaskManager runs in the same actor
+   *                               system as its JobManager.
+   * @param localTaskManagerCommunication True, to skip initializing the network stack.
+   *                                      Use only when only one task manager is used.
+   * @return A tuple (TaskManagerConfiguration, network configuration,
+   *                  InstanceConnectionInfo, JobManager actor Akka URL).
    */
-  def parseConfiguration(hostname: String, configuration: Configuration,
-                         localAkkaCommunication: Boolean,
-                         localTaskManagerCommunication: Boolean):
-  (InstanceConnectionInfo, String, TaskManagerConfiguration, NetworkEnvironmentConfiguration) = {
+  @throws(classOf[Exception])
+  def parseTaskManagerConfiguration(configuration: Configuration,
+                                    taskManagerHostname: String,
+                                    localAkkaCommunication: Boolean,
+                                    localTaskManagerCommunication: Boolean):
+  (TaskManagerConfiguration, NetworkEnvironmentConfiguration, InstanceConnectionInfo, String) = {
+
+    // ------- read values from the config and check them ---------
+    //                      (a lot of them)
+
+    // ----> hosts / ports for communication and data exchange
+
     val dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) match {
-      case 0 => NetUtils.getAvailablePort
+      case 0 => NetUtils.getAvailablePort()
       case x => x
     }
 
-    val connectionInfo = new InstanceConnectionInfo(InetAddress.getByName(hostname), dataport)
+    checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+      "Leave config parameter empty or use 0 to let the system choose a port automatically.")
+
+    val taskManagerAddress = InetAddress.getByName(taskManagerHostname)
+    val connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport)
 
-    val jobManagerURL = if (localAkkaCommunication) {
+    val jobManagerActorURL = if (localAkkaCommunication) {
       // JobManager and TaskManager are in the same ActorSystem -> Use local Akka URL
       JobManager.getLocalJobManagerAkkaURL
     }
     else {
-      val jobManagerAddress = configuration.getString(
-        ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-
-      val jobManagerRPCPort = configuration.getInteger(
-        ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-        ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
-      if (jobManagerAddress == null) {
-        throw new RuntimeException(
-          "JobManager address has not been specified in the configuration.")
-      }
-
-      val hostPort = new InetSocketAddress(InetAddress.getByName(jobManagerAddress),
-                                           jobManagerRPCPort)
+      // both run in different actor system
+      val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
+      val hostPort = new InetSocketAddress(jobManagerHostname, jobManagerPort)
       JobManager.getRemoteJobManagerAkkaURL(hostPort)
     }
 
-    val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+    // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
 
-    val numberOfSlots = if (slots > 0) slots else 1
+    // we need this because many configs have been written with a "-1" entry
+    val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) match {
+      case -1 => 1
+      case x => x
+    }
 
     val pageSize = configuration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE)
-
-    val tmpDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator)
-
     val numNetworkBuffers = configuration.getInteger(
       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
 
-    val nettyConfig = localTaskManagerCommunication match {
-      case true => None
-      case false => Some(new NettyConfig(
+    val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
+
+    checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+      "Number of task slots must be at least one.")
+
+    checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+      ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
+
+    checkConfigParameter(pageSize >= DefaultMemoryManager.MIN_PAGE_SIZE, pageSize,
+      ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+      "Minimum buffer size is " + DefaultMemoryManager.MIN_PAGE_SIZE)
+
+    checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+      ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+      "Buffer size must be a power of 2.")
+
+    checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
+      ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+      "MemoryManager needs at least one MB of memory. " +
+        "Leave this config parameter empty to let the system automatically " +
+        "pick a fraction of the available memory.")
+
+    val tmpDirs = configuration.getString(
+      ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)
+      .split(",|" + File.pathSeparator)
+
+    checkTempDirs(tmpDirs)
+
+    val nettyConfig = if (localTaskManagerCommunication) {
+      None
+    } else {
+      Some(new NettyConfig(
         connectionInfo.address(), connectionInfo.dataPort(), pageSize, configuration))
     }
 
     val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize, nettyConfig)
 
-    val networkBufferMem = if (localTaskManagerCommunication) 0 else numNetworkBuffers * pageSize
-
-    val configuredMemory: Long = configuration.getInteger(
-      ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1
-    )
+    val networkBufferMem = numNetworkBuffers * pageSize
 
     val memorySize = if (configuredMemory > 0) {
-      configuredMemory << 20
-    } else {
+      LOG.info("Using {} MB for Flink managed memory.", configuredMemory)
+      configuredMemory << 20 // megabytes to bytes
+    }
+    else {
       val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
         ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+      checkConfigParameter(fraction > 0.0f, fraction,
+        ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+        "MemoryManager fraction of the free memory must be positive.")
 
-      LOG.info("Using {} of the free heap space for managed memory.", fraction)
+      val relativeMemSize = ((EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() -
+        networkBufferMem) * fraction).toLong
 
-      ((EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag - networkBufferMem) * fraction)
-        .toLong
-    }
+      LOG.info("Using {} of the currently free heap space for Flink managed memory ({} MB).",
+        fraction, relativeMemSize >> 20)
 
-    val memoryLoggingIntervalMs = configuration.getBoolean(
-      ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD
-    ) match {
-      case true => Some(
-        configuration.getLong(ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
-          ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS)
-      )
-      case false => None
+      relativeMemSize
     }
 
-    val profilingInterval = configuration.getBoolean(
-      ProfilingUtils.ENABLE_PROFILING_KEY, false
-    ) match {
-      case true => Some(configuration.getInteger(ProfilingUtils.TASKMANAGER_REPORTINTERVAL_KEY,
-        ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL).toLong)
-      case false => None
+    // ----> timeouts, library caching, profiling
+
+    val timeout = try {
+      AkkaUtils.getTimeout(configuration)
+    }
+    catch {
+      case e: Exception => throw new Exception(
+        s"Invalid format for '${ConfigConstants.AKKA_ASK_TIMEOUT}'. " +
+          s"Use formats like '50 s' or '1 min' to specify the timeout.")
     }
+    LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout)
+
+    val profilingInterval =
+      if (configuration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
+        Some(configuration.getLong(ProfilingUtils.TASKMANAGER_REPORTINTERVAL_KEY,
+          ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL))
+      } else {
+        None
+      }
 
     val cleanupInterval = configuration.getLong(
       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
-    val timeout = AkkaUtils.getTimeout(configuration)
+
 
     val maxRegistrationDuration = Duration(configuration.getString(
       ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
       ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION))
 
-    val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize,
-      tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout,
-      maxRegistrationDuration, configuration)
-
-    (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)
-  }
-
-  def startActor(taskManagerName: String,
-                 connectionInfo: InstanceConnectionInfo,
-                 jobManagerURL: String,
-                 taskManagerConfig: TaskManagerConfiguration,
-                 networkConfig: NetworkEnvironmentConfiguration)
-                (implicit actorSystem: ActorSystem): ActorRef = {
-    startActor(taskManagerName,
-      Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)))
-  }
+    val taskManagerConfig = TaskManagerConfiguration(slots, memorySize, pageSize,
+      tmpDirs, cleanupInterval, profilingInterval, timeout, maxRegistrationDuration,
+      configuration)
 
-  def startActor(taskManagerName: String, props: Props)
-                (implicit actorSystem: ActorSystem): ActorRef = {
-    actorSystem.actorOf(props, taskManagerName)
+    (taskManagerConfig, networkConfig, connectionInfo, jobManagerActorURL)
   }
 
-  def startActorWithConfiguration(hostname: String, taskManagerName: String,
-                                  configuration: Configuration,
-                                  localAkkaCommunication: Boolean,
-                                  localTaskManagerCommunication: Boolean)
-                                 (implicit system: ActorSystem) = {
-    val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) =
-      parseConfiguration(hostname, configuration, localAkkaCommunication,
-        localTaskManagerCommunication)
-
-    startActor(taskManagerName, connectionInfo, jobManagerURL, taskManagerConfig,
-      networkConnectionConfiguration)
-  }
-
-  def startProfiler(instancePath: String, reportInterval: Long)(implicit system: ActorSystem):
-  ActorRef = {
-    system.actorOf(Props(classOf[TaskManagerProfiler], instancePath, reportInterval), PROFILER_NAME)
-  }
-
-  // --------------------------------------------------------------------------
-  //  Resolving the TaskManager actor
-  // --------------------------------------------------------------------------
-
   /**
-   * Resolves the TaskManager actor reference in a blocking fashion.
+   * Gets the hostname and port of the JobManager from the configuration. Also checks that
+   * the hostname is not null and the port non-negative.
    *
-   * @param taskManagerUrl The akka URL of the JobManager.
-   * @param system The local actor system that should perform the lookup.
-   * @param timeout The maximum time to wait until the lookup fails.
-   * @throws java.io.IOException Thrown, if the lookup fails.
-   * @return The ActorRef to the TaskManager
+   * @param configuration The configuration to read the config values from.
+   * @return A 2-tuple (hostname, port).
    */
-  @throws(classOf[IOException])
-  def getTaskManagerRemoteReference(taskManagerUrl: String,
-                                   system: ActorSystem,
-                                   timeout: FiniteDuration): ActorRef = {
-    try {
-      val future = AkkaUtils.getReference(taskManagerUrl, system, timeout)
-      Await.result(future, timeout)
+  private def getAndCheckJobManagerAddress(configuration: Configuration) : (String, Int) = {
+
+    val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+
+    val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+
+    if (hostname == null) {
+      throw new Exception("Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
+        "' is missing (hostname/address of JobManager to connect to).")
     }
-    catch {
-      case e @ (_ : ActorNotFound | _ : TimeoutException) =>
-        throw new IOException(
-          s"TaskManager at $taskManagerUrl not reachable. " +
-            s"Please make sure that the TaskManager is running and its port is reachable.", e)
 
-      case e: IOException =>
-        throw new IOException("Could not connect to TaskManager at " + taskManagerUrl, e)
+    if (port <= 0) {
+      throw new Exception("Invalid value for '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
+        "' (port of the JobManager actor system) : " + port)
     }
+
+    (hostname, port)
   }
 
-  // --------------------------------------------------------------------------
-  //  Miscellaneous Utilities
-  // --------------------------------------------------------------------------
+  private def checkConfigParameter(condition: Boolean,
+                                   parameter: Any,
+                                   name: String,
+                                   errorMessage: String = ""): Unit = {
+    if (!condition) {
+      throw new Exception(
+        s"Invalid configuration value for '${name}' : ${parameter} - ${errorMessage}")
+    }
+  }
 
   private def checkTempDirs(tmpDirs: Array[String]): Unit = {
     tmpDirs.zipWithIndex.foreach {
@@ -1052,7 +1273,6 @@ object TaskManager {
             f"usable $usableSpaceGb GB ($usablePercentage%.2f%% usable)")
         }
       case (_, id) => throw new Exception(s"Temporary file directory #$id is null.")
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
index d2d9cf4..5c71f5e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
@@ -22,6 +22,5 @@ package org.apache.flink.runtime.taskmanager
  * Command line configuration object for the [[TaskManager]]
  *
  * @param configDir Path to configuration directory
- * @param tmpDir Path to temporary directory
  */
-case class TaskManagerCLIConfiguration(configDir: String = null, tmpDir: String = null)
+case class TaskManagerCLIConfiguration(configDir: String = null)

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index 8c1217e..19d55f7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -22,9 +22,10 @@ import org.apache.flink.configuration.Configuration
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
 
-case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int,
-                                    tmpDirPaths: Array[String], cleanupInterval: Long,
-                                    memoryLogggingIntervalMs: Option[Long],
+case class TaskManagerConfiguration(numberOfSlots: Int,
+                                    memorySize: Long, pageSize: Int,
+                                    tmpDirPaths: Array[String],
+                                    cleanupInterval: Long,
                                     profilingInterval: Option[Long],
                                     timeout: FiniteDuration,
                                     maxRegistrationDuration: Duration,

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
index abd44b1..51e99f9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
@@ -43,8 +43,8 @@ import scala.concurrent.duration.FiniteDuration
  * @param instancePath Akka URL to [[TaskManager]] instance
  * @param reportInterval Interval of profiling action
  */
-class TaskManagerProfiler(val instancePath: String, val reportInterval: Int) extends Actor with
-ActorLogMessages with ActorLogging {
+class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
+   extends Actor with ActorLogMessages with ActorLogging {
 
   import context.dispatcher
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index b92cfd4..2d752eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -55,8 +55,6 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
  */
 public class TaskManagerProcessReapingTest {
 
-	private static final String TASK_MANAGER_ACTOR_NAME = "TEST_TM";
-
 	@Test
 	public void testReapProcessOnFailure() {
 		Process taskManagerProcess = null;
@@ -108,7 +106,7 @@ public class TaskManagerProcessReapingTest {
 			// grab the reference to the TaskManager. try multiple times, until the process
 			// is started and the TaskManager is up
 			String taskManagerActorName = String.format("akka.tcp://flink@%s:%d/user/%s",
-					"127.0.0.1", taskManagerPort, TASK_MANAGER_ACTOR_NAME);
+					"127.0.0.1", taskManagerPort, TaskManager.TASK_MANAGER_NAME());
 
 			ActorRef taskManagerRef = null;
 			for (int i = 0; i < 20; i++) {
@@ -191,7 +189,7 @@ public class TaskManagerProcessReapingTest {
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
 
-				TaskManager.startActor("localhost", taskManagerPort, cfg, TASK_MANAGER_ACTOR_NAME);
+				TaskManager.runTaskManager("localhost", taskManagerPort, cfg);
 
 				// wait forever
 				Object lock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index e6a487d..d6724ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -539,7 +539,7 @@ public class TaskManagerTest {
 	public static ActorRef createTaskManager(ActorRef jm) {
 		Configuration cfg = new Configuration();
 		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
-		GlobalConfiguration.includeConfiguration(cfg);
+
 		String jobManagerURL = jm.path().toString();
 
 		ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost",
@@ -551,7 +551,8 @@ public class TaskManagerTest {
 		try {
 			FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS);
 			Await.ready(response, d);
-		}catch(Exception e){
+		}
+		catch(Exception e) {
 			throw new RuntimeException("Exception while waiting for the task manager registration.", e);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/java/org/apache/flink/runtime/util/MathUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/MathUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/MathUtilTest.java
index c3d2c0f..6ffb64c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/MathUtilTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/MathUtilTest.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.runtime.util.MathUtils;
 import org.junit.Test;
 
-public class MathUtilTest
-{
+public class MathUtilTest {
+
 	@Test
-	public void testLog2Computation()
-	{
+	public void testLog2Computation() {
 		assertEquals(0, MathUtils.log2floor(1));
 		assertEquals(1, MathUtils.log2floor(2));
 		assertEquals(1, MathUtils.log2floor(3));
@@ -52,8 +51,7 @@ public class MathUtilTest
 	}
 	
 	@Test
-	public void testRoundDownToPowerOf2()
-	{
+	public void testRoundDownToPowerOf2() {
 		assertEquals(0, MathUtils.roundDownToPowerOf2(0));
 		assertEquals(1, MathUtils.roundDownToPowerOf2(1));
 		assertEquals(2, MathUtils.roundDownToPowerOf2(2));
@@ -80,4 +78,24 @@ public class MathUtilTest
 		assertEquals(1073741824, MathUtils.roundDownToPowerOf2(1852987883));
 		assertEquals(1073741824, MathUtils.roundDownToPowerOf2(Integer.MAX_VALUE));
 	}
+
+	@Test
+	public void testPowerOfTwo() {
+		assertTrue(MathUtils.isPowerOf2(1));
+		assertTrue(MathUtils.isPowerOf2(2));
+		assertTrue(MathUtils.isPowerOf2(4));
+		assertTrue(MathUtils.isPowerOf2(8));
+		assertTrue(MathUtils.isPowerOf2(32768));
+		assertTrue(MathUtils.isPowerOf2(65536));
+		assertTrue(MathUtils.isPowerOf2(1 << 30));
+		assertTrue(MathUtils.isPowerOf2(1L + Integer.MAX_VALUE));
+		assertTrue(MathUtils.isPowerOf2(1L << 41));
+		assertTrue(MathUtils.isPowerOf2(1L << 62));
+
+		assertFalse(MathUtils.isPowerOf2(3));
+		assertFalse(MathUtils.isPowerOf2(5));
+		assertFalse(MathUtils.isPowerOf2(567923));
+		assertFalse(MathUtils.isPowerOf2(Integer.MAX_VALUE));
+		assertFalse(MathUtils.isPowerOf2(Long.MAX_VALUE));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
index ccd326f..58905ef 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
@@ -45,7 +45,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
   "The JobManager" should {
     "notify already registered TaskManagers" in {
 
-      val jm = TestingUtils.startTestingJobManager
+      val jm = TestingUtils.startTestingJobManager(_system)
 
       val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
       val hardwareDescription = HardwareDescription.extractFromSystem(10)
@@ -67,7 +67,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
   "The TaskManager" should {
     "shutdown if its registration is refused by the JobManager" in {
 
-      val tm = TestingUtils.startTestingTaskManager(self)
+      val tm = TestingUtils.startTestingTaskManager(self, _system)
 
       watch(tm)
 
@@ -83,7 +83,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
 
     "ignore RefuseRegistration messages after it has been successfully registered" in {
 
-      val tm = TestingUtils.startTestingTaskManager(self)
+      val tm = TestingUtils.startTestingTaskManager(self, _system)
 
       try {
         ignoreMsg{
@@ -112,8 +112,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       val config = new Configuration()
       config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "1 second")
 
-      val tm = TestingUtils.startTestingTaskManagerWithConfiguration("LOCALHOST",
-        self.path.toString, config)
+      val tm = TestingUtils.startTestingTaskManagerWithConfiguration("localhost",
+        self.path.toString, config, _system)
 
       watch(tm)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 87fca99..9e53bcb 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -33,13 +33,13 @@ import org.apache.flink.runtime.taskmanager.TaskManager
  * @param singleActorSystem true if all actors shall be running in the same [[ActorSystem]],
  *                          otherwise false
  */
-class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true) extends
-FlinkMiniCluster(userConfiguration, singleActorSystem) {
+class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
+  extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
 
   override def generateConfiguration(userConfig: Configuration): Configuration = {
     val cfg = new Configuration()
     cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
-    cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, NetUtils.getAvailablePort)
+    cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, NetUtils.getAvailablePort())
     cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
 
     cfg.addAll(userConfig)
@@ -62,13 +62,11 @@ FlinkMiniCluster(userConfiguration, singleActorSystem) {
     actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
   }
 
-  override def startTaskManager(index: Int)(implicit system: ActorSystem) = {
-    val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) =
-      TaskManager.parseConfiguration(HOSTNAME, configuration,
-        localAkkaCommunication = singleActorSystem, localTaskManagerCommunication = true)
+  override def startTaskManager(index: Int, system: ActorSystem) = {
 
-    system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
-      networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + "_" +
-      (index + 1))
+    val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
+
+    TaskManager.startTaskManagerActor(configuration, system, HOSTNAME, tmActorName,
+      singleActorSystem, true, classOf[TestingTaskManager])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 59106d3..06062f4 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,14 +19,13 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{Terminated, ActorRef}
-import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.messages.Messages.Disconnect
 import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
-import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
 
@@ -34,10 +33,14 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 
 /**
- * Mixin for the [[TaskManager]] to support testing messages
+ * Subclass of the [[TaskManager]] to support testing messages
  */
-trait TestingTaskManager extends ActorLogMessages {
-  that: TaskManager =>
+class TestingTaskManager(connectionInfo: InstanceConnectionInfo,
+                         jobManagerAkkaURL: String,
+                         taskManagerConfig: TaskManagerConfiguration,
+                         networkConfig: NetworkEnvironmentConfiguration)
+  extends TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConfig) {
+
 
   val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
   val waitForJobRemoval = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
@@ -45,12 +48,16 @@ trait TestingTaskManager extends ActorLogMessages {
 
   var disconnectDisabled = false
 
-  abstract override def receiveWithLogMessages = {
+
+  override def receiveWithLogMessages = {
     receiveTestMessages orElse super.receiveWithLogMessages
   }
 
+  /**
+   * Handler for testing related messages
+   */
   def receiveTestMessages: Receive = {
-    
+
     case RequestRunningTasks =>
       sender ! ResponseRunningTasks(runningTasks.toMap)
       
@@ -134,6 +141,4 @@ trait TestingTaskManager extends ActorLogMessages {
     case DisableDisconnect =>
       disconnectDisabled = true
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 147cc8a..7d682f9 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -57,17 +57,7 @@ object TestingUtils {
 
   def getDefaultTestingActorSystemConfig = testConfig
 
-  def startTestingTaskManagerWithConfiguration(hostname: String, jobManagerURL: String,
-                                               config: Configuration)
-                                              (implicit system: ActorSystem) = {
-    val (connectionInfo, _, taskManagerConfig, networkConnectionConfig) =
-      TaskManager.parseConfiguration(hostname, config,
-        localAkkaCommunication = true, localTaskManagerCommunication = false)
-    system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
-      networkConnectionConfig) with TestingTaskManager))
-  }
-
-  def startTestingJobManager(implicit system: ActorSystem): ActorRef = {
+  def startTestingJobManager(system: ActorSystem): ActorRef = {
     val config = new Configuration()
 
     val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _ ,
@@ -84,15 +74,29 @@ object TestingUtils {
     system.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
   }
 
-  def startTestingTaskManager(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = {
+  def startTestingTaskManagerWithConfiguration(hostname: String,
+                                               jobManagerURL: String,
+                                               config: Configuration,
+                                               system: ActorSystem) = {
+
+    val (tmConfig, netConfig, connectionInfo, _) =
+      TaskManager.parseTaskManagerConfiguration(config, hostname, true, false)
+
+    val tmProps = Props(classOf[TestingTaskManager], connectionInfo,
+                        jobManagerURL, tmConfig, netConfig)
+    system.actorOf(tmProps)
+  }
+
+  def startTestingTaskManager(jobManager: ActorRef, system: ActorSystem): ActorRef = {
+
     val jmURL = jobManager.path.toString
     val config = new Configuration()
-    val (connectionInfo, _, taskManagerConfig, networkConnectionConfig) =
-      TaskManager.parseConfiguration("localhost", config,
-        localAkkaCommunication = true, localTaskManagerCommunication = true)
 
-    system.actorOf(Props(new TaskManager(connectionInfo, jmURL, taskManagerConfig,
-      networkConnectionConfig) with TestingTaskManager))
+    val (tmConfig, netConfig, connectionInfo, _) =
+      TaskManager.parseTaskManagerConfiguration(config,  "localhost", true, true)
+
+    val tmProps = Props(classOf[TestingTaskManager], connectionInfo, jmURL, tmConfig, netConfig)
+    system.actorOf(tmProps)
   }
 
   def startTestingCluster(numSlots: Int, numTMs: Int = 1,

http://git-wip-us.apache.org/repos/asf/flink/blob/ed8b26bf/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index eeb622a..1b088fc 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -48,13 +48,14 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
 
     val forkNumber = try {
       Integer.parseInt(forNumberString)
-    }catch{
+    }
+    catch {
       case e: NumberFormatException => -1
     }
 
     val config = userConfiguration.clone()
 
-    if(forkNumber != -1){
+    if (forkNumber != -1) {
       val jobManagerRPC = 1024 + forkNumber*300
       val taskManagerRPC = 1024 + forkNumber*300 + 100
       val taskManagerData = 1024 + forkNumber*300 + 200
@@ -83,29 +84,27 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
     actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
   }
 
-  override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
+  override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
     val config = configuration.clone()
 
-    val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
-      .DEFAULT_TASK_MANAGER_IPC_PORT)
-    val dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
-      .DEFAULT_TASK_MANAGER_DATA_PORT)
+    val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+                                    ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+
+    val dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+                                     ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
 
-    if(rpcPort > 0){
+    if (rpcPort > 0) {
       config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
     }
-
-    if(dataPort > 0){
+    if (dataPort > 0) {
       config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
     }
 
     val localExecution = numTaskManagers == 1
 
-    val (connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConnectionConfig) =
-      TaskManager.parseConfiguration(HOSTNAME, config, singleActorSystem, localExecution)
-
-    system.actorOf(Props(new TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig,
-      networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index)
+    TaskManager.startTaskManagerActor(config, system, HOSTNAME,
+        TaskManager.TASK_MANAGER_NAME + index, singleActorSystem, localExecution,
+         classOf[TestingTaskManager])
   }
 
   def restartJobManager(): Unit = {
@@ -127,7 +126,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
     taskManagerActorSystems(index).awaitTermination()
 
     val taskManagerActorSystem  = startTaskManagerActorSystem(index)
-    val taskManagerActor = startTaskManager(index)(jobManagerActorSystem)
+    val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
 
     taskManagerActors = taskManagerActors.patch(index, Seq(taskManagerActor), 1)
     taskManagerActorSystems = taskManagerActorSystems.patch(index, Seq(taskManagerActorSystem), 1)
@@ -141,6 +140,7 @@ object ForkableFlinkMiniCluster {
   def startCluster(numSlots: Int,
                    numTaskManagers: Int,
                    timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): ForkableFlinkMiniCluster = {
+
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)


Mime
View raw message