flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [3/4] flink git commit: [FLINK-1415] [runtime] Akka cleanups
Date Thu, 05 Feb 2015 13:59:42 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 79b9a74..c9a60b4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -20,11 +20,9 @@ package org.apache.flink.runtime.jobmanager
 
 import java.io.{IOException, File}
 import java.net.InetSocketAddress
-import java.util.concurrent.TimeUnit
 
 import akka.actor._
-import akka.pattern.Patterns
-import akka.pattern.{ask, pipe}
+import akka.pattern.ask
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
@@ -37,7 +35,7 @@ import org.apache.flink.runtime.{JobException, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobID}
 import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -48,14 +46,43 @@ import org.slf4j.LoggerFactory
 
 import scala.concurrent.Future
 import scala.concurrent.duration._
+import scala.language.postfixOps
 
-class JobManager(val configuration: Configuration)
-  extends Actor with ActorLogMessages with ActorLogging {
+/**
+ * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the
+ * job status and managing the task managers. It is realized as an actor and receives amongst others
+ * the following messages:
+ *
+ *  - [[RegisterTaskManager]] is sent by a TaskManager which wants to registe at the job manager.
+ *  A successful registration at the instance manager is acknowledged by [[AcknowledgeRegistration]]
+ *
+ *  - [[SubmitJob]] is sent by a client which wants to submit a job to the system. The submit
+ *  message contains the job description in the form of the JobGraph. The JobGraph is appended to
+ *  the ExecutionGraph and the corresponding JobExecutionVertices are scheduled for execution on
+ *  the TaskManagers.
+ *
+ *  - [[CancelJob]] requests to cancel the job with the specified jobID. A successful cancellation
+ *  is indicated by [[CancellationSuccess]] and a failure by [[CancellationFailure]]
+ *
+ * - [[UpdateTaskExecutionState]] is sent by a TaskManager to update the state of an
+ * [[org.apache.flink.runtime.executiongraph.ExecutionVertex]] contained in the [[ExecutionGraph]].
+ * A successful update is acknowledged by true and otherwise false.
+ *
+ * - [[RequestNextInputSplit]] requests the next input split for a running task on a
+ * [[TaskManager]]. The assigned input split or null is sent to the sender in the form of the
+ * message [[NextInputSplit]].
+ *
+ * - [[JobStatusChanged]] indicates that the status of job (RUNNING, CANCELING, FINISHED, etc.) has
+ * changed. This message is sent by the ExecutionGraph.
+ *
+ * @param configuration object with user provided configuration values
+ */
+class JobManager(val configuration: Configuration) extends 
+Actor with ActorLogMessages with ActorLogging {
   import context._
   import scala.collection.JavaConverters._
 
-  implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
-    ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+  implicit val timeout = AkkaUtils.getTimeout(configuration)
 
   log.info(s"Starting job manager at ${self.path}.")
 
@@ -117,7 +144,7 @@ class JobManager(val configuration: Configuration)
   }
 
   override def receiveWithLogMessages: Receive = {
-    case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => {
+    case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) =>
       val taskManager = sender
       val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
         hardwareInformation, numberOfSlots)
@@ -132,146 +159,35 @@ class JobManager(val configuration: Configuration)
 
         taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
       }
-    }
 
-    case RequestNumberRegisteredTaskManager => {
+
+    case RequestNumberRegisteredTaskManager =>
       sender ! instanceManager.getNumberOfRegisteredTaskManagers
-    }
 
-    case RequestTotalNumberOfSlots => {
+    case RequestTotalNumberOfSlots =>
       sender ! instanceManager.getTotalNumberOfSlots
-    }
-
-    case SubmitJob(jobGraph, listenToEvents, detach) => {
-      try {
-        if (jobGraph == null) {
-          sender ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" +
-            " null."))
-        } else {
-          log.info("Received job {} ({}).", jobGraph.getJobID, jobGraph.getName)
-
-          if (jobGraph.getNumberOfVertices == 0) {
-            sender ! SubmissionFailure(jobGraph.getJobID, new IllegalArgumentException("Job is " +
-              "empty."))
-          } else {
-            // Create the user code class loader
-            libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
-
-            val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
-
-            val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID(),
-              (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
-                jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader),
-                JobInfo(sender, System.currentTimeMillis())))
-
-            val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) {
-              jobGraph.getNumberOfExecutionRetries
-            } else {
-              defaultExecutionRetries
-            }
-
-            executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
-            executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
-
-            if (userCodeLoader == null) {
-              throw new JobException("The user code class loader could not be initialized.")
-            }
-
-            if (log.isDebugEnabled) {
-              log.debug("Running master initialization of job {} ({}).",
-                jobGraph.getJobID, jobGraph.getName)
-            }
-
-            for (vertex <- jobGraph.getVertices.asScala) {
-              val executableClass = vertex.getInvokableClassName
-              if (executableClass == null || executableClass.length == 0) {
-                throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " +
-                  s"invokable class.")
-              }
-
-              vertex.initializeOnMaster(userCodeLoader)
-            }
-
-            // topological sorting of the job vertices
-            val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources
-
-            if (log.isDebugEnabled) {
-              log.debug("Adding {} vertices from job graph {} ({}).",
-                sortedTopology.size(), jobGraph.getJobID, jobGraph.getName)
-            }
 
-            executionGraph.attachJobGraph(sortedTopology)
+    case SubmitJob(jobGraph, listen, d) =>
+      submitJob(jobGraph, listenToEvents = listen, detached = d)
 
-            if (log.isDebugEnabled) {
-              log.debug("Successfully created execution graph from job graph {} ({}).",
-                jobGraph.getJobID, jobGraph.getName)
-            }
-
-            executionGraph.setScheduleMode(jobGraph.getScheduleMode)
-            executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
-
-            // get notified about job status changes
-            executionGraph.registerJobStatusListener(self)
-
-            if (listenToEvents) {
-              // the sender will be notified about state changes
-              executionGraph.registerExecutionListener(sender)
-              executionGraph.registerJobStatusListener(sender)
-            }
-
-            jobInfo.detach = detach
-
-            log.info("Scheduling job {}.", jobGraph.getName)
-
-            executionGraph.scheduleForExecution(scheduler)
-
-            sender ! SubmissionSuccess(jobGraph.getJobID)
-          }
-        }
-      } catch {
-        case t: Throwable =>
-          log.error(t, "Job submission failed.")
-
-          currentJobs.get(jobGraph.getJobID) match {
-            case Some((executionGraph, jobInfo)) =>
-              executionGraph.fail(t)
-
-              // don't send the client the final job status because we will send him
-              // SubmissionFailure
-              jobInfo.detach = true
-
-              val status = Patterns.ask(self, RequestFinalJobStatus(jobGraph.getJobID), 10 second)
-              status.onFailure{
-                case _: Throwable => self ! JobStatusChanged(executionGraph.getJobID,
-                  JobStatus.FAILED, System.currentTimeMillis(),
-                  s"Cleanup job ${jobGraph.getJobID}.")
-              }
-            case None =>
-              libraryCacheManager.unregisterJob(jobGraph.getJobID)
-              currentJobs.remove(jobGraph.getJobID)
-          }
-
-          sender ! SubmissionFailure(jobGraph.getJobID, t)
-      }
-    }
-
-    case CancelJob(jobID) => {
+    case CancelJob(jobID) =>
       log.info("Trying to cancel job with ID {}.", jobID)
 
       currentJobs.get(jobID) match {
         case Some((executionGraph, _)) =>
+          // execute the cancellation asynchronously
           Future {
             executionGraph.cancel()
           }
+
           sender ! CancellationSuccess(jobID)
         case None =>
           log.info("No job found with ID {}.", jobID)
           sender ! CancellationFailure(jobID, new IllegalArgumentException("No job found with " +
-            s"ID ${jobID}."))
+            s"ID $jobID."))
       }
-    }
 
-    case UpdateTaskExecutionState(taskExecutionState) => {
+    case UpdateTaskExecutionState(taskExecutionState) =>
       if(taskExecutionState == null){
         sender ! false
       }else {
@@ -286,12 +202,11 @@ class JobManager(val configuration: Configuration)
             sender ! false
         }
       }
-    }
 
-    case RequestNextInputSplit(jobID, vertexID, executionAttempt) => {
+    case RequestNextInputSplit(jobID, vertexID, executionAttempt) =>
       val nextInputSplit = currentJobs.get(jobID) match {
         case Some((executionGraph,_)) =>
-          val execution = executionGraph.getRegisteredExecutions().get(executionAttempt)
+          val execution = executionGraph.getRegisteredExecutions.get(executionAttempt)
 
           if(execution == null){
             log.error("Can not find Execution for attempt {}.", executionAttempt)
@@ -328,9 +243,8 @@ class JobManager(val configuration: Configuration)
         log.debug("Send next input split {}.", nextInputSplit)
       }
       sender ! NextInputSplit(nextInputSplit)
-    }
 
-    case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) => {
+    case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
           log.info("Status of job {} ({}) changed to {}{}.",
@@ -340,7 +254,8 @@ class JobManager(val configuration: Configuration)
           if(newJobStatus.isTerminalState) {
             jobInfo.end = timeStamp
 
-            if(!jobInfo.detach) {
+            // is the client waiting for the job result?
+            if(!jobInfo.detached) {
               newJobStatus match {
                 case JobStatus.FINISHED =>
                   val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID)
@@ -364,86 +279,220 @@ class JobManager(val configuration: Configuration)
         case None =>
           removeJob(jobID)
       }
-    }
 
-    case RequestFinalJobStatus(jobID) => {
+    case RequestFinalJobStatus(jobID) =>
       currentJobs.get(jobID) match {
         case Some(_) =>
           val listeners = finalJobStatusListener.getOrElse(jobID, Set())
           finalJobStatusListener += jobID -> (listeners + sender)
         case None =>
-          archive ! RequestJobStatus(jobID)
+          // There is no job running with this job ID. Check the archive.
+          archive forward RequestJobStatus(jobID)
       }
-    }
 
-    case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) => {
+    case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
       currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
-          sender ! ConsumerNotificationResult(executionGraph
-            .scheduleOrUpdateConsumers(executionId, partitionIndex))
+          sender ! ConsumerNotificationResult(
+            executionGraph.scheduleOrUpdateConsumers(executionId, partitionIndex)
+          )
         case None =>
           log.error("Cannot find execution graph for job ID {}.", jobId)
-          sender ! ConsumerNotificationResult(false, Some(
+          sender ! ConsumerNotificationResult(success = false, Some(
             new IllegalStateException("Cannot find execution graph for job ID " + jobId)))
       }
-    }
 
-    case ReportAccumulatorResult(accumulatorEvent) => {
+    case ReportAccumulatorResult(accumulatorEvent) =>
       accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID,
-        accumulatorEvent.getAccumulators
-        (libraryCacheManager.getClassLoader(accumulatorEvent.getJobID)))
-    }
+        accumulatorEvent.getAccumulators(
+          libraryCacheManager.getClassLoader(accumulatorEvent.getJobID)
+        )
+      )
 
-    case RequestAccumulatorResults(jobID) => {
+    case RequestAccumulatorResults(jobID) =>
       import scala.collection.JavaConverters._
       sender ! AccumulatorResultsFound(jobID, accumulatorManager.getJobAccumulatorResults
         (jobID).asScala.toMap)
-    }
 
-    case RequestJobStatus(jobID) => {
+    case RequestJobStatus(jobID) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph,_)) => sender ! CurrentJobStatus(jobID, executionGraph.getState)
-        case None => (archive ? RequestJobStatus(jobID))(timeout) pipeTo sender
+        case None =>
+          // check the archive
+          archive forward RequestJobStatus(jobID)
       }
-    }
 
-    case RequestRunningJobs => {
+    case RequestRunningJobs =>
       val executionGraphs = currentJobs map {
         case (_, (eg, jobInfo)) => eg
       }
 
       sender ! RunningJobs(executionGraphs)
-    }
 
-    case RequestJob(jobID) => {
+    case RequestJob(jobID) =>
       currentJobs.get(jobID) match {
         case Some((eg, _)) => sender ! JobFound(jobID, eg)
-        case None => (archive ? RequestJob(jobID))(timeout) pipeTo sender
+        case None =>
+          // check the archive
+          archive forward RequestJob(jobID)
       }
-    }
 
-    case RequestBlobManagerPort => {
+    case RequestBlobManagerPort =>
       sender ! libraryCacheManager.getBlobServerPort
-    }
 
-    case RequestRegisteredTaskManagers => {
+    case RequestRegisteredTaskManagers =>
       import scala.collection.JavaConverters._
       sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
-    }
 
-    case Heartbeat(instanceID) => {
+    case Heartbeat(instanceID) =>
       instanceManager.reportHeartBeat(instanceID)
-    }
 
-    case Terminated(taskManager) => {
+    case Terminated(taskManager) =>
       log.info("Task manager {} terminated.", taskManager.path)
       JobManager.LOG.warn(s"Task manager ${taskManager.path} terminated.")
       instanceManager.unregisterTaskManager(taskManager)
       context.unwatch(taskManager)
-    }
 
-    case RequestJobManagerStatus => {
+    case RequestJobManagerStatus =>
       sender ! JobManagerStatusAlive
+  }
+
+  /**
+   * Submits a job to the job manager. The job is registered at the libraryCacheManager which
+   * creates the job's class loader. The job graph is appended to the corresponding execution
+   * graph and the execution vertices are queued for scheduling.
+   *
+   * @param jobGraph representing the Flink job
+   * @param listenToEvents true if the sender wants to listen to job status and execution state
+   *                       change notificatinos. false if not.
+   * @param detached true if the job runs in detached mode, meaning that the sender does not wait
+   *                 for the result of the job. false otherwise.
+   */
+  private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean, detached: Boolean): Unit = {
+    try {
+      if (jobGraph == null) {
+        sender ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" +
+          " null."))
+      } else {
+        log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).")
+
+        if (jobGraph.getNumberOfVertices == 0) {
+          sender ! SubmissionFailure(jobGraph.getJobID, new IllegalArgumentException("Job is " +
+            "empty."))
+        } else {
+          // Create the user code class loader
+          libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
+
+          val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
+
+          // see if there already exists an ExecutionGraph for the corresponding job ID
+          val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID,
+            (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
+              jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader),
+              JobInfo(sender, System.currentTimeMillis())))
+
+          val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) {
+            jobGraph.getNumberOfExecutionRetries
+          } else {
+            defaultExecutionRetries
+          }
+
+          executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
+          executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
+
+          if (userCodeLoader == null) {
+            throw new JobException("The user code class loader could not be initialized.")
+          }
+
+          if (log.isDebugEnabled) {
+            log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${
+              jobGraph
+                .getName
+            }}).")
+          }
+
+          for (vertex <- jobGraph.getVertices.asScala) {
+            val executableClass = vertex.getInvokableClassName
+            if (executableClass == null || executableClass.length == 0) {
+              throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " +
+                s"invokable class.")
+            }
+
+            vertex.initializeOnMaster(userCodeLoader)
+          }
+
+          // topological sorting of the job vertices
+          val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources
+
+          if (log.isDebugEnabled) {
+            log.debug(s"Adding ${sortedTopology.size()} vertices from job graph " +
+              s"${jobGraph.getJobID} (${jobGraph.getName}).")
+          }
+
+          executionGraph.attachJobGraph(sortedTopology)
+
+          if (log.isDebugEnabled) {
+            log.debug(s"Successfully created execution graph from job graph " +
+              s"${jobGraph.getJobID} (${jobGraph.getName}).")
+          }
+
+          executionGraph.setScheduleMode(jobGraph.getScheduleMode)
+          executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
+
+          // get notified about job status changes
+          executionGraph.registerJobStatusListener(self)
+
+          if (listenToEvents) {
+            // the sender wants to be notified about state changes
+            executionGraph.registerExecutionListener(sender)
+            executionGraph.registerJobStatusListener(sender)
+          }
+
+          jobInfo.detached = detached
+
+          log.info(s"Scheduling job ${jobGraph.getName}.")
+
+          executionGraph.scheduleForExecution(scheduler)
+
+          sender ! SubmissionSuccess(jobGraph.getJobID)
+        }
+      }
+    } catch {
+      case t: Throwable =>
+        log.error(t, "Job submission failed.")
+
+        currentJobs.get(jobGraph.getJobID) match {
+          case Some((executionGraph, jobInfo)) =>
+            /*
+             * Register self to be notified about job status changes in case that it did not happen
+             * before. That way the proper cleanup of the job is triggered in the JobStatusChanged
+             * handler.
+             */
+            val status = (self ? RequestFinalJobStatus(jobGraph.getJobID))(10 second)
+
+            /*
+             * if we cannot register as final job status listener, then send manually a
+             * JobStatusChanged message with JobStatus.FAILED.
+             */
+            val selfActorRef = self
+            status.onFailure{
+              case _: Throwable => selfActorRef ! JobStatusChanged(executionGraph.getJobID,
+                JobStatus.FAILED, System.currentTimeMillis(), s"Cleanup job ${jobGraph.getJobID}.")
+            }
+
+            /*
+             * Don't send the client the final job status because we will send him a
+             * SubmissionFailure.
+             */
+            jobInfo.detached = true
+
+            executionGraph.fail(t)
+          case None =>
+            libraryCacheManager.unregisterJob(jobGraph.getJobID)
+            currentJobs.remove(jobGraph.getJobID)
+        }
+
+        sender ! SubmissionFailure(jobGraph.getJobID, t)
     }
   }
 
@@ -460,10 +509,10 @@ class JobManager(val configuration: Configuration)
    */
   private def removeJob(jobID: JobID): Unit = {
     currentJobs.remove(jobID) match {
-      case Some((eg, _)) => {
+      case Some((eg, _)) =>
         eg.prepareForArchiving()
         archive ! ArchiveExecutionGraph(jobID, eg)
-      }
+
       case None =>
     }
 
@@ -475,7 +524,7 @@ class JobManager(val configuration: Configuration)
     }
   }
 
-  private def checkJavaVersion {
+  private def checkJavaVersion(): Unit = {
     if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
       log.warning("Warning: Flink is running with Java 6. " +
         "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
@@ -496,21 +545,29 @@ object JobManager {
 
   def main(args: Array[String]): Unit = {
     EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
+    val (configuration, executionMode, listeningAddress) = parseArgs(args)
 
-    val (hostname, port, configuration, executionMode) = parseArgs(args)
 
-    val jobManagerSystem = AkkaUtils.createActorSystem(hostname, port, configuration)
+    val jobManagerSystem = AkkaUtils.createActorSystem(configuration, listeningAddress)
 
     startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem)
 
     if(executionMode.equals(LOCAL)){
-      TaskManager.startActorWithConfiguration(hostname, configuration, true)(jobManagerSystem)
+      TaskManager.startActorWithConfiguration("", configuration,
+        localAkkaCommunication = true, localTaskManagerCommunication = true)(jobManagerSystem)
     }
 
     jobManagerSystem.awaitTermination()
   }
 
-  def parseArgs(args: Array[String]): (String, Int, Configuration, ExecutionMode) = {
+  /**
+   * Loads the configuration, execution mode and the listening address from the provided command
+   * line arguments.
+   *
+   * @param args command line arguments
+   * @return triple of configuration, execution mode and an optional listening address
+   */
+  def parseArgs(args: Array[String]): (Configuration, ExecutionMode, Option[(String, Int)]) = {
     val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") {
       head("flink jobmanager")
       opt[String]("configDir") action { (x, c) => c.copy(configDir = x) } text ("Specify " +
@@ -530,28 +587,38 @@ object JobManager {
       config =>
         GlobalConfiguration.loadConfiguration(config.configDir)
 
-        val configuration = GlobalConfiguration.getConfiguration()
+        val configuration = GlobalConfiguration.getConfiguration
+
         if (config.configDir != null && new File(config.configDir).isDirectory) {
           configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
         }
 
-        val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-        val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+        val listeningAddress = if(config.executionMode.equals(LOCAL)){
+          // All communication happens within the same actor system
+          None
+        }else{
+          val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+          val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+            ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-        (hostname, port, configuration, config.executionMode)
+          // Listening address on which the actor system listens for remote messages
+          Some((hostname, port))
+        }
+
+        (configuration, config.executionMode, listeningAddress)
     } getOrElse {
       LOG.error("CLI Parsing failed. Usage: " + parser.usage)
       sys.exit(FAILURE_RETURN_CODE)
     }
   }
 
-  def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration):
-  (ActorSystem, ActorRef) = {
-    implicit val actorSystem = AkkaUtils.createActorSystem(hostname, port, configuration)
-    (actorSystem, startActor(configuration))
-  }
-
+  /**
+   * Extracts the job manager configuration values from a configuration instance.
+   *
+   * @param configuration Object with the user provided configuration values
+   * @return Tuple of (number of archived jobs, profiling enabled, cleanup interval of the library
+   *         cache manager, default number of execution retries, delay between retries)
+   */
   def parseConfiguration(configuration: Configuration): (Int, Boolean, Long, Int, Long) = {
     val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
@@ -580,7 +647,11 @@ object JobManager {
   }
 
   def getRemoteAkkaURL(address: String): String = {
-    s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}"
+    s"akka.tcp://flink@$address/user/$JOB_MANAGER_NAME"
+  }
+
+  def getLocalAkkaURL: String = {
+    s"akka://flink/user/$JOB_MANAGER_NAME"
   }
 
   def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration):

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
index 89e8b54..e44f7e9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
@@ -26,19 +26,19 @@ import org.apache.flink.runtime.profiling.types.ThreadProfilingEvent
 
 import scala.collection.convert.WrapAsScala
 
+/**
+ * Basic skeleton for the JobManager profiler. Currently, it simply logs the received messages.
+ */
 class JobManagerProfiler extends Actor with ActorLogMessages with ActorLogging with WrapAsScala {
   override def receiveWithLogMessages: Receive = {
-    case ReportProfilingData(profilingContainer) => {
-
+    case ReportProfilingData(profilingContainer) =>
       profilingContainer.getIterator foreach {
         case x: InternalExecutionVertexThreadProfilingData =>
-          log.info(s"Received InternalExecutionVertexThreadProfilingData ${x}.")
+          log.info(s"Received InternalExecutionVertexThreadProfilingData $x.")
         case x: InternalInstanceProfilingData =>
-          log.info(s"Received InternalInstanceProfilingData ${x}.")
-
+          log.info(s"Received InternalInstanceProfilingData $x.")
         case x =>
           log.error(s"Received unknown profiling data: ${x.getClass.getName}" )
       }
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 88dc927..2d055ed 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -25,52 +25,67 @@ import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
-import scala.collection.mutable.LinkedHashMap
+import scala.collection.mutable
 import scala.ref.SoftReference
 
-class MemoryArchivist(private val max_entries: Int) extends Actor 
-      with ActorLogMessages with ActorLogging {
-
-  /**
+/**
+ * Actor which stores terminated Flink jobs. The number of stored Flink jobs is set by max_entries.
+ * If this number is exceeded, the oldest job will be discarded. One can interact with the actor by
+ * the following messages:
+ *
+ *  - [[ArchiveExecutionGraph]] archives the attached [[ExecutionGraph]]
+ *
+ *  - [[RequestArchivedJobs]] returns all currently stored [[ExecutionGraph]]s to the sender
+ *  encapsulated in a [[ArchivedJobs]] message.
+ *
+ *  - [[RequestJob]] returns the corresponding [[org.apache.flink.runtime.jobgraph.JobGraph]]
+ *  encapsulated in [[JobFound]] if the job is stored by the MemoryArchivist. If not, then
+ *  [[JobNotFound]] is returned.
+ *
+ *  - [[RequestJobStatus]] returns the last state of the corresponding job. If the job can be found,
+ *  then a [[CurrentJobStatus]] message with the last state is returned to the sender, otherwise
+ *  a [[JobNotFound]] message is returned
+ *
+ * @param max_entries Maximum number of stored Flink jobs
+ */
+class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with
+ActorLogging {
+  /*
    * Map of execution graphs belonging to recently started jobs with the time stamp of the last
    * received job event. The insert order is preserved through a LinkedHashMap.
    */
-  val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
+  val graphs = mutable.LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
 
   override def receiveWithLogMessages: Receive = {
     
     /* Receive Execution Graph to archive */
-    case ArchiveExecutionGraph(jobID, graph) => {
+    case ArchiveExecutionGraph(jobID, graph) => 
       // wrap graph inside a soft reference
       graphs.update(jobID, new SoftReference(graph))
 
       trimHistory()
-    }
 
-    case RequestArchivedJobs => {
-      sender ! ArchivedJobs(getAllGraphs())
-    }
+    case RequestArchivedJobs =>
+      sender ! ArchivedJobs(getAllGraphs)
 
-    case RequestJob(jobID) => {
+    case RequestJob(jobID) =>
       getGraph(jobID) match {
         case Some(graph) => sender ! JobFound(jobID, graph)
         case None => sender ! JobNotFound(jobID)
       }
-    }
 
-    case RequestJobStatus(jobID) => {
+    case RequestJobStatus(jobID) =>
       getGraph(jobID) match {
         case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState)
         case None => sender ! JobNotFound(jobID)
       }
-    }
   }
 
   /**
    * Gets all graphs that have not been garbage collected.
    * @return An iterable with all valid ExecutionGraphs
    */
-  protected def getAllGraphs(): Iterable[ExecutionGraph] = graphs.values.flatMap(_.get)
+  protected def getAllGraphs: Iterable[ExecutionGraph] = graphs.values.flatMap(_.get)
 
   /**
    * Gets a graph with a jobID if it has not been garbage collected.

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
index 715fc0c..62e56fe 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.Actor
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 
+/**
+ * Mixin for the [[JobManager]] which starts a [[WebInfoServer]] for the JobManager.
+ */
 trait WithWebServer extends Actor {
   that: JobManager =>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index 884bc2a..704bf86 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -37,7 +37,7 @@ object ArchiveMessages {
    * Response to [[RequestArchivedJobs]] message. The response contains the archived jobs.
    * @param jobs
    */
-  case class ArchivedJobs(val jobs: Iterable[ExecutionGraph]){
+  case class ArchivedJobs(jobs: Iterable[ExecutionGraph]){
     def asJavaIterable: java.lang.Iterable[ExecutionGraph] = {
       import scala.collection.JavaConverters._
       jobs.asJava
@@ -53,7 +53,7 @@ object ArchiveMessages {
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
   
-  def getRequestArchivedJobs() : AnyRef = {
+  def getRequestArchivedJobs : AnyRef = {
     RequestArchivedJobs
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index 51767e4..ef5b99c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.messages
 import java.text.SimpleDateFormat
 import java.util.Date
 
-import org.apache.flink.runtime.execution.{ExecutionState}
+import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID, JobID}
 
@@ -56,7 +56,7 @@ object ExecutionGraphMessages {
         ""
       }
       s"${timestampToString(timestamp)}\t$taskName(${subtaskIndex +
-        1}/${totalNumberOfSubTasks}) switched to $newExecutionState $oMsg"
+        1}/$totalNumberOfSubTasks) switched to $newExecutionState $oMsg"
     }
   }
 
@@ -71,7 +71,7 @@ object ExecutionGraphMessages {
   case class JobStatusChanged(jobID: JobID, newJobStatus: JobStatus, timestamp: Long,
                               optionalMessage: String){
     override def toString: String = {
-      s"${timestampToString(timestamp)}\tJob execution switched to status ${newJobStatus}."
+      s"${timestampToString(timestamp)}\tJob execution switched to status $newJobStatus."
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
index 7ce013b..5189a02 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
@@ -31,7 +31,7 @@ object JobManagerMessages {
 
   /**
    * Submits a job to the job manager. If [[registerForEvents]] is true,
-   * then the sender will be registered as listener for the state change messages. If [[detach]]
+   * then the sender will be registered as listener for the state change messages. If [[detached]]
    * is set to true, then the sender will detach from the job execution. Consequently,
    * he will not receive the job execution result [[JobResult]]. The submission result will be sent
    * back to the
@@ -39,10 +39,10 @@ object JobManagerMessages {
    *
    * @param jobGraph
    * @param registerForEvents if true, then register for state change events
-   * @param detach if true, then detach from the job execution
+   * @param detached if true, then detach from the job execution
    */
   case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean = false,
-                       detach: Boolean = false)
+                       detached: Boolean = false)
 
   /**
    * Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is
@@ -330,31 +330,31 @@ object JobManagerMessages {
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
   
-  def getRequestNumberRegisteredTaskManager() : AnyRef = {
+  def getRequestNumberRegisteredTaskManager : AnyRef = {
     RequestNumberRegisteredTaskManager
   }
   
-  def getRequestTotalNumberOfSlots() : AnyRef = {
+  def getRequestTotalNumberOfSlots : AnyRef = {
     RequestTotalNumberOfSlots
   }
   
-  def getRequestBlobManagerPort() : AnyRef = {
+  def getRequestBlobManagerPort : AnyRef = {
     RequestBlobManagerPort
   }
   
-  def getRequestRunningJobs() : AnyRef = {
+  def getRequestRunningJobs : AnyRef = {
     RequestRunningJobs
   }
   
-  def getRequestRegisteredTaskManagers() : AnyRef = {
+  def getRequestRegisteredTaskManagers : AnyRef = {
     RequestRegisteredTaskManagers
   }
   
-  def getRequestJobManagerStatus() : AnyRef = {
+  def getRequestJobManagerStatus : AnyRef = {
     RequestJobManagerStatus
   }
   
-  def getJobManagerStatusAlive() : AnyRef = {
+  def getJobManagerStatusAlive : AnyRef = {
     JobManagerStatusAlive
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index d5e7e9b..968dc46 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -134,23 +134,23 @@ object TaskManagerMessages {
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
   
-  def getNotifyWhenRegisteredAtJobManagerMessage() : AnyRef = {
+  def getNotifyWhenRegisteredAtJobManagerMessage : AnyRef = {
     NotifyWhenRegisteredAtJobManager
   }
   
-  def getRegisteredAtJobManagerMessage() : AnyRef = {
+  def getRegisteredAtJobManagerMessage : AnyRef = {
     RegisteredAtJobManager
   }
   
-  def getRegisterAtJobManagerMessage() : AnyRef = {
+  def getRegisterAtJobManagerMessage : AnyRef = {
     RegisterAtJobManager
   }
 
-  def getSendHeartbeatMessage() : AnyRef = {
+  def getSendHeartbeatMessage : AnyRef = {
     SendHeartbeat
   }
 
-  def getLogMemoryUsageMessage() : AnyRef = {
+  def getLogMemoryUsageMessage : AnyRef = {
     RegisteredAtJobManager
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 16884ca..dd158cb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,36 +18,38 @@
 
 package org.apache.flink.runtime.minicluster
 
-import java.util.concurrent.TimeUnit
-
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
-import com.typesafe.config.{ConfigFactory}
+import com.typesafe.config.Config
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.slf4j.LoggerFactory
 
-import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Future, Await}
 
+/**
+ * Abstract base class for Flink's mini cluster. The mini cluster starts a
+ * [[org.apache.flink.runtime.jobmanager.JobManager]] and one or multiple
+ * [[org.apache.flink.runtime.taskmanager.TaskManager]]. Depending on the settings, the different
+ * actors can all be run in the same [[ActorSystem]] or each one in its own.
+ *
+ * @param userConfiguration Configuration object with the user provided configuration values
+ * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
+ *                          [[ActorSystem]], otherwise false
+ */
 abstract class FlinkMiniCluster(userConfiguration: Configuration,
                                 val singleActorSystem: Boolean) {
   import FlinkMiniCluster._
 
   val HOSTNAME = "localhost"
 
-  implicit val timeout = FiniteDuration(userConfiguration.getInteger(ConfigConstants
-    .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+  implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
 
   val configuration = generateConfiguration(userConfiguration)
 
-  if(singleActorSystem){
-    configuration.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager")
-  }
-
-  val jobManagerActorSystem = startJobManagerActorSystem()
-  val jobManagerActor = startJobManager(jobManagerActorSystem)
+  var jobManagerActorSystem = startJobManagerActorSystem()
+  var jobManagerActor = startJobManager(jobManagerActorSystem)
 
   val numTaskManagers = configuration.getInteger(ConfigConstants
     .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
@@ -63,46 +65,43 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration,
     (actorSystem, startTaskManager(i)(actorSystem))
   }
 
-  val (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip
+  var (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip
 
   waitForTaskManagersToBeRegistered()
 
   def generateConfiguration(userConfiguration: Configuration): Configuration
 
   def startJobManager(implicit system: ActorSystem): ActorRef
-  def startTaskManager(index: Int)(implicit system: ActorSystem):
-  ActorRef
+  def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef
 
-  def getJobManagerAkkaConfigString(): String = {
-    val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
-      .DEFAULT_JOB_MANAGER_IPC_PORT)
+  def getJobManagerAkkaConfig: Config = {
+    val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
     if(singleActorSystem){
-      AkkaUtils.getLocalConfigString(configuration)
+      AkkaUtils.getAkkaConfig(configuration, None)
     }else{
-      AkkaUtils.getConfigString(HOSTNAME, port, configuration)
+      AkkaUtils.getAkkaConfig(configuration, Some((HOSTNAME, port)))
     }
-
   }
 
   def startJobManagerActorSystem(): ActorSystem = {
-    val configString = getJobManagerAkkaConfigString()
-
-    val config = ConfigFactory.parseString(getJobManagerAkkaConfigString())
+    val config = getJobManagerAkkaConfig
 
     AkkaUtils.createActorSystem(config)
   }
 
-  def getTaskManagerAkkaConfigString(index: Int): String = {
+  def getTaskManagerAkkaConfig(index: Int): Config = {
     val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
 
-    AkkaUtils.getConfigString(HOSTNAME, if(port != 0) port + index else port,
-      configuration)
+    val resolvedPort = if(port != 0) port + index else port
+
+    AkkaUtils.getAkkaConfig(configuration, Some((HOSTNAME, resolvedPort)))
   }
 
   def startTaskManagerActorSystem(index: Int): ActorSystem = {
-    val config = ConfigFactory.parseString(getTaskManagerAkkaConfigString(index))
+    val config = getTaskManagerAkkaConfig(index)
 
     AkkaUtils.createActorSystem(config)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 59174f6..06d611a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -29,6 +29,16 @@ import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.slf4j.LoggerFactory
 
+/**
+ * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
+ * JVM. It extends the [[FlinkMiniCluster]] by providing a [[JobClient]], having convenience
+ * functions to setup Flink's configuration and implementations to create [[JobManager]] and
+ * [[TaskManager]].
+ *
+ * @param userConfiguration Configuration object with the user provided configuration values
+ * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
+ *                          [[ActorSystem]], otherwise false
+ */
 class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
   extends FlinkMiniCluster(userConfiguration, singleActorSystem){
   import LocalFlinkMiniCluster._
@@ -36,7 +46,8 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
   val jobClientActorSystem = if(singleActorSystem){
     jobManagerActorSystem
   }else{
-    AkkaUtils.createActorSystem()
+    // create an actor system listening on a random port
+    AkkaUtils.createDefaultActorSystem()
   }
 
   var jobClient: Option[ActorRef] = None
@@ -81,7 +92,10 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
       false
     }
 
-    TaskManager.startActorWithConfiguration(HOSTNAME, config, localExecution)(system)
+    TaskManager.startActorWithConfiguration(HOSTNAME,
+      config,
+      singleActorSystem,
+      localExecution)(system)
   }
 
   def getJobClient(): ActorRef ={
@@ -93,11 +107,8 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
         config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
         config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
 
-        if(singleActorSystem){
-          config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager")
-        }
-
-        val jc = JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
+        val jc = JobClient.startActorWithConfiguration(config,
+          singleActorSystem)(jobClientActorSystem)
         jobClient = Some(jc)
         jc
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/MemoryUsageLogging.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/MemoryUsageLogging.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/MemoryUsageLogging.scala
deleted file mode 100644
index 05f0f9a..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/MemoryUsageLogging.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-case class MemoryUsageLogging(logIntervalMs: Option[Int]= None)

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index ed1da80..91eac35 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -42,14 +42,15 @@ import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectio
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
-import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobID}
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
 RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
 import org.apache.flink.runtime.messages.TaskManagerMessages._
-import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{MonitorTask, RegisterProfilingListener, UnmonitorTask}
+import org.apache.flink.runtime.messages.TaskManagerProfilerMessages
+.{UnregisterProfilingListener, UnmonitorTask, MonitorTask, RegisterProfilingListener}
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.profiling.ProfilingUtils
 import org.apache.flink.runtime.util.EnvironmentInformation
@@ -59,11 +60,11 @@ import org.slf4j.LoggerFactory
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
+import scala.language.postfixOps
 
 /**
- *
- *
- * The TaskManager has the following phases:
+ * The TaskManager is responsible for executing the individual tasks of a Flink job. It is
+ * implemented as an actor. The TaskManager has the following phases:
  *
  * - Waiting to be registered with its JobManager. In that phase, it periodically sends
  * [[RegisterAtJobManager]] messages to itself, which trigger the sending of
@@ -73,6 +74,13 @@ import scala.util.{Failure, Success}
  * message. This stops the registration messages and initializes all fields
  * that require the JobManager's actor reference
  *
+ *  - [[SubmitTask]] is sent from the JobManager and contains the next Task to be executed on this
+ *  TaskManager
+ *
+ *  - [[CancelTask]] requests to cancel the corresponding task
+ *
+ *  - [[FailTask]] requests to fail the corresponding task
+ *
  * - ...
  */
 class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkkaURL: String,
@@ -117,7 +125,7 @@ import scala.collection.JavaConverters._
   }
 
   if (log.isInfoEnabled) {
-    log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean()))
+    log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
   }
 
   var libraryCacheManager: LibraryCacheManager = null
@@ -184,7 +192,7 @@ import scala.collection.JavaConverters._
   }
 
   override def receiveWithLogMessages: Receive = {
-    case RegisterAtJobManager => {
+    case RegisterAtJobManager =>
       if(!registered) {
         registrationDuration += registrationDelay
         // double delay for exponential backoff
@@ -196,9 +204,9 @@ import scala.collection.JavaConverters._
             maxRegistrationDuration)
 
           self ! PoisonPill
-        } else if (!registered) {
-          log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " +
-            s"Attempt")
+        } else {
+          log.info("Try to register at master {}. {}. Attempt", jobManagerAkkaURL,
+            registrationAttempts)
           val jobManager = context.actorSelection(jobManagerAkkaURL)
 
           jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
@@ -206,25 +214,21 @@ import scala.collection.JavaConverters._
           context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager)
         }
       }
-    }
 
-    case AcknowledgeRegistration(id, blobPort) => {
+    case AcknowledgeRegistration(id, blobPort) =>
       if(!registered) {
-        finishRegistration(id, blobPort)
-        registered = true
+        finishRegistration(sender, id, blobPort)
       } else {
         log.info("The TaskManager {} is already registered at the JobManager {}, but received " +
           "another AcknowledgeRegistration message.", self.path, currentJobManager.path)
       }
-    }
 
     case AlreadyRegistered(id, blobPort) =>
       if(!registered) {
         log.warning("The TaskManager {} seems to be already registered at the JobManager {} even" +
           "though it has not yet finished the registration process.", self.path, sender.path)
 
-        finishRegistration(id, blobPort)
-        registered = true
+        finishRegistration(sender, id, blobPort)
       } else {
         // ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration
         log.info("The TaskManager {} has already been registered at the JobManager {}.",
@@ -244,17 +248,16 @@ import scala.collection.JavaConverters._
           "registered")
       }
 
-    case SubmitTask(tdd) => {
+    case SubmitTask(tdd) =>
       submitTask(tdd)
-    }
 
-    case UpdateTask(executionId, resultId, partitionInfo) => {
+    case UpdateTask(executionId, resultId, partitionInfo) =>
       updateTask(executionId, resultId, partitionInfo)
-    }
 
-    case CancelTask(executionID) => {
+    case CancelTask(executionID) =>
       runningTasks.get(executionID) match {
         case Some(task) =>
+          // execute cancel operation concurrently
           Future {
             task.cancelExecution()
           }
@@ -263,73 +266,74 @@ import scala.collection.JavaConverters._
           sender ! new TaskOperationResult(executionID, false,
             "No task with that execution ID was found.")
       }
-    }
 
-    case UnregisterTask(executionID) => {
+    case UnregisterTask(executionID) =>
       unregisterTask(executionID)
-    }
 
-    case SendHeartbeat => {
+    case updateMsg:UpdateTaskExecutionState =>
+      val futureResponse = (currentJobManager ? updateMsg)(timeout)
+
+      val jobID = updateMsg.taskExecutionState.getJobID
+      val executionID = updateMsg.taskExecutionState.getID
+      val executionState = updateMsg.taskExecutionState.getExecutionState
+
+      futureResponse.mapTo[Boolean].onComplete{
+        case Success(result) =>
+          if(!result){
+            self ! FailTask(executionID,
+              new IllegalStateException("Task has been disposed on JobManager."))
+          }
+
+          if (!result || executionState == ExecutionState.FINISHED || executionState ==
+            ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
+            self ! UnregisterTask(executionID)
+          }
+        case Failure(t) =>
+          log.warning(s"Execution state change notification failed for task $executionID " +
+            s"of job $jobID. Cause ${t.getMessage}.")
+          self ! UnregisterTask(executionID)
+      }
+
+    case SendHeartbeat =>
       currentJobManager ! Heartbeat(instanceID)
-    }
 
-    case LogMemoryUsage => {
+    case LogMemoryUsage =>
       logMemoryStats()
-    }
 
-    case NotifyWhenRegisteredAtJobManager => {
+    case NotifyWhenRegisteredAtJobManager =>
       if (registered) {
         sender ! RegisteredAtJobManager
       } else {
         waitForRegistration += sender
       }
-    }
 
-    case FailTask(executionID, cause) => {
+    case FailTask(executionID, cause) =>
       runningTasks.get(executionID) match {
         case Some(task) =>
+          // execute failing operation concurrently
           Future {
             task.failExternally(cause)
           }
         case None =>
       }
-    }
 
-    case Terminated(jobManager) => {
+    case Terminated(jobManager) =>
       log.info("Job manager {} is no longer reachable. Cancelling all tasks and trying to " +
         "reregister.", jobManager.path)
 
       cancelAndClearEverything(new Throwable("Lost connection to JobManager"))
-      tryJobManagerRegistration()
-    }
-  }
 
-  def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID,
-                                 executionState: ExecutionState,
-                                 optionalError: Throwable): Unit = {
-    log.info("Update execution state to {}.", executionState)
-    val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState
-    (jobID, executionID, executionState, optionalError)))(timeout)
-
-    futureResponse.mapTo[Boolean].onComplete {
-      case Success(result) =>
-        if (!result) {
-          self ! FailTask(executionID, new IllegalStateException("Task has been disposed on " +
-            "JobManager."))
-        }
+      cleanupTaskManager()
 
-        if (!result || executionState == ExecutionState.FINISHED || executionState ==
-          ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
-          self ! UnregisterTask(executionID)
-        }
-      case Failure(t) => {
-        log.warning("Execution state change notification failed for task {} of job {}. Cause {}.",
-          executionID, jobID, t.getMessage)
-        self ! UnregisterTask(executionID)
-      }
-    }
+      tryJobManagerRegistration()
   }
 
+  /**
+   * Receives a [[TaskDeploymentDescriptor]] describing the task to be executed. Sets up a
+   * [[RuntimeEnvironment]] for the task and starts its execution in a separate thread.
+   *
+   * @param tdd TaskDeploymentDescriptor describing the task to be executed on this [[TaskManager]]
+   */
   private def submitTask(tdd: TaskDeploymentDescriptor): Unit = {
     val jobID = tdd.getJobID
     val vertexID = tdd.getVertexID
@@ -343,7 +347,9 @@ import scala.collection.JavaConverters._
       if (log.isDebugEnabled) {
         startRegisteringTask = System.currentTimeMillis()
       }
-      libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles())
+      libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles)
+      // triggers the download of all missing jar files from the job manager
+      libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles)
 
       if (log.isDebugEnabled) {
         log.debug("Register task {} took {}s", executionID,
@@ -357,11 +363,11 @@ import scala.collection.JavaConverters._
       }
 
       task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
-        tdd.getTaskName, this)
+        tdd.getTaskName, self)
 
       runningTasks.put(executionID, task) match {
         case Some(_) => throw new RuntimeException(
-          s"TaskManager contains already a task with executionID ${executionID}.")
+          s"TaskManager contains already a task with executionID $executionID.")
         case None =>
       }
 
@@ -384,7 +390,7 @@ import scala.collection.JavaConverters._
 
       if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
         profiler match {
-          case Some(profiler) => profiler ! MonitorTask(task)
+          case Some(profilerActorRef) => profilerActorRef ! MonitorTask(task)
           case None => log.info("There is no profiling enabled for the task manager.")
         }
       }
@@ -401,7 +407,7 @@ import scala.collection.JavaConverters._
         throw new RuntimeException("Cannot start task. Task was canceled or failed.")
       }
 
-      sender ! TaskOperationResult(executionID, true)
+      sender ! TaskOperationResult(executionID, success = true)
     } catch {
       case t: Throwable =>
         val message = if (t.isInstanceOf[CancelTaskException]) {
@@ -426,12 +432,42 @@ import scala.collection.JavaConverters._
     }
   }
 
+  private def cleanupTaskManager(): Unit = {
+    context.unwatch(currentJobManager)
+
+    networkEnvironment foreach {
+      _.shutdown()
+    }
+
+    networkEnvironment = None
+
+    if(libraryCacheManager != null){
+      libraryCacheManager.shutdown()
+    }
+
+    libraryCacheManager = null
+
+    heartbeatScheduler foreach {
+      _.cancel()
+    }
+
+    heartbeatScheduler = None
+
+    profiler foreach {
+      _.tell(UnregisterProfilingListener, JobManager.getProfiler(currentJobManager))
+    }
+
+    currentJobManager = ActorRef.noSender
+    instanceID = null
+    registered = false
+  }
+
   private def updateTask(executionId: ExecutionAttemptID, resultId: IntermediateDataSetID,
                          partitionInfo: PartitionInfo): Unit = {
 
     partitionInfo.getProducerLocation match {
       case PartitionInfo.PartitionLocation.UNKNOWN =>
-        sender ! TaskOperationResult(executionId, false,
+        sender ! TaskOperationResult(executionId, success = false,
           "Tried to update task with UNKNOWN channel.")
 
       case _ =>
@@ -455,42 +491,45 @@ import scala.collection.JavaConverters._
                       }
                   }
                 }
-                sender ! TaskOperationResult(executionId, true)
-              case None => sender ! TaskOperationResult(executionId, false, "No reader with ID " +
-                resultId + " was found.")
+                sender ! TaskOperationResult(executionId, success = true)
+              case None => sender ! TaskOperationResult(executionId, success = false,
+                s"No reader with ID $resultId  was found.")
             }
 
-          case None => sender ! TaskOperationResult(executionId, false, "No task with execution" +
-            "ID " + executionId + " was found.")
+          case None => sender ! TaskOperationResult(executionId, success = false,
+            s"No task with execution ID $executionId was found.")
         }
     }
   }
 
-  private def finishRegistration(id: InstanceID, blobPort: Int): Unit = {
-    currentJobManager = sender
+  private def finishRegistration(jobManager: ActorRef, id: InstanceID, blobPort: Int): Unit = {
+    setupTaskManager(jobManager, id, blobPort)
+
+    for (listener <- waitForRegistration) {
+      listener ! RegisteredAtJobManager
+    }
+
+    waitForRegistration.clear()
+  }
+
+  private def setupTaskManager(jobManager: ActorRef, id: InstanceID, blobPort: Int): Unit = {
+    registered = true
+    currentJobManager = jobManager
     instanceID = id
 
+    // watch job manager to detect when it dies
     context.watch(currentJobManager)
 
-    log.info(s"TaskManager successfully registered at JobManager ${
-      currentJobManager.path.toString
-    }.")
-
     setupNetworkEnvironment()
     setupLibraryCacheManager(blobPort)
 
+    // schedule regular heartbeat message for oneself
     heartbeatScheduler = Some(context.system.scheduler.schedule(
       TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat))
 
     profiler foreach {
       _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager))
     }
-
-    for (listener <- waitForRegistration) {
-      listener ! RegisteredAtJobManager
-    }
-
-    waitForRegistration.clear()
   }
 
   private def setupNetworkEnvironment(): Unit = {
@@ -514,7 +553,7 @@ import scala.collection.JavaConverters._
   }
 
   private def setupLibraryCacheManager(blobPort: Int): Unit = {
-    // shutdown existing library cache manager
+    // shutdown existing library cache manager first
     if (libraryCacheManager != null) {
       try {
         libraryCacheManager.shutdown()
@@ -525,6 +564,7 @@ import scala.collection.JavaConverters._
       libraryCacheManager = null
     }
 
+    // Check if a blob server is specified
     if (blobPort > 0) {
       val address = new InetSocketAddress(currentJobManager.path.address.host.getOrElse
         ("localhost"), blobPort)
@@ -543,6 +583,7 @@ import scala.collection.JavaConverters._
   private def cancelAndClearEverything(cause: Throwable) {
     if (runningTasks.size > 0) {
       log.info("Cancelling all computations and discarding all cached data.")
+
       for (t <- runningTasks.values) {
         t.failExternally(cause)
         runningTasks.remove(t.getExecutionId)
@@ -552,6 +593,7 @@ import scala.collection.JavaConverters._
 
   private def unregisterTask(executionID: ExecutionAttemptID): Unit = {
     log.info("Unregister task with execution ID {}.", executionID)
+
     runningTasks.remove(executionID) match {
       case Some(task) =>
         removeAllTaskResources(task)
@@ -566,8 +608,8 @@ import scala.collection.JavaConverters._
   private def removeAllTaskResources(task: Task): Unit = {
     if (task.getEnvironment != null) {
       try {
-        for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
-          .getJobConfiguration).asScala) {
+        for (entry <- DistributedCache.readFileInfoFromConfig(
+          task.getEnvironment.getJobConfiguration).asScala) {
           fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
         }
       } catch {
@@ -589,8 +631,8 @@ import scala.collection.JavaConverters._
 
   private def logMemoryStats(): Unit = {
     if (log.isInfoEnabled) {
-      val memoryMXBean = ManagementFactory.getMemoryMXBean()
-      val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().asScala
+      val memoryMXBean = ManagementFactory.getMemoryMXBean
+      val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans.asScala
 
       log.info(TaskManager.getMemoryUsageStatsAsString(memoryMXBean))
       log.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans))
@@ -620,21 +662,29 @@ object TaskManager {
 
     val (hostname, port, configuration) = parseArgs(args)
 
-    val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, configuration)
+    val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, configuration,
+      localAkkaCommunication = false, localTaskManagerCommunication = false)
 
     taskManagerSystem.awaitTermination()
   }
 
+  /**
+   * Parse the command line arguments of the [[TaskManager]]. The method loads the configuration,
+   * extracts the hostname and port on which the actor system shall listen.
+   *
+   * @param args Command line arguments
+   * @return Tuple of (hostname, port, configuration)
+   */
   def parseArgs(args: Array[String]): (String, Int, Configuration) = {
     val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") {
       head("flink task manager")
       opt[String]("configDir") action { (x, c) =>
         c.copy(configDir = x)
-      } text ("Specify configuration directory.")
+      } text "Specify configuration directory."
 
       opt[String]("tempDir") optional() action { (x, c) =>
         c.copy(tmpDir = x)
-      } text ("Specify temporary directory.")
+      } text "Specify temporary directory."
     }
 
 
@@ -642,7 +692,7 @@ object TaskManager {
       config =>
         GlobalConfiguration.loadConfiguration(config.configDir)
 
-        val configuration = GlobalConfiguration.getConfiguration()
+        val configuration = GlobalConfiguration.getConfiguration
 
         if (config.tmpDir != null && GlobalConfiguration.getString(ConfigConstants
           .TASK_MANAGER_TMP_DIR_KEY,
@@ -650,14 +700,16 @@ object TaskManager {
           configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, config.tmpDir)
         }
 
-        val jobManagerHostname = configuration.getString(ConfigConstants
-          .JOB_MANAGER_IPC_ADDRESS_KEY, null)
+        val jobManagerHostname = configuration.getString(
+          ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+
         val jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
           ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
         val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
 
         val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0)
+        // try to find out the TaskManager's own hostname by connecting to jobManagerAddress
         val hostname = NetUtils.resolveAddress(jobManagerAddress).getHostName
 
         (hostname, port, configuration)
@@ -669,18 +721,35 @@ object TaskManager {
   }
 
   def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration,
-                               localExecution: Boolean = false): (ActorSystem, ActorRef) = {
-    implicit val actorSystem = AkkaUtils.createActorSystem(hostname, port, configuration)
+                               localAkkaCommunication: Boolean,
+                               localTaskManagerCommunication: Boolean): (ActorSystem, ActorRef) = {
+    implicit val actorSystem = AkkaUtils.createActorSystem(configuration, Some((hostname, port)))
 
     val (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig) =
-      parseConfiguration(hostname, configuration, localExecution)
+      parseConfiguration(hostname, configuration, localAkkaCommunication,
+        localTaskManagerCommunication)
 
     (actorSystem, startActor(connectionInfo, jobManagerURL, taskManagerConfig,
       networkConfig))
   }
 
+  /**
+   * Extracts from the configuration the TaskManager's settings. Returns the TaskManager's
+   * connection information, the JobManager's Akka URL, the task manager configuration and the
+   * network connection configuration.
+   *
+   * @param hostname Hostname of the instance on which the TaskManager runs
+   * @param configuration Configuration instance containing the user provided configuration values
+   * @param localAkkaCommunication true if the TaskManager runs in the same [[ActorSystem]] as the
+   *                               JobManager, otherwise false
+   * @param localTaskManagerCommunication true if all TaskManager run in the same JVM, otherwise
+   *                                      false
+   * @return Tuple of (TaskManager's connection information, JobManager's Akka URL, TaskManager's
+   *         configuration, network connection configuration)
+   */
   def parseConfiguration(hostname: String, configuration: Configuration,
-                         localExecution: Boolean = false):
+                         localAkkaCommunication: Boolean,
+                         localTaskManagerCommunication: Boolean):
   (InstanceConnectionInfo, String, TaskManagerConfiguration, NetworkEnvironmentConfiguration) = {
     val dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) match {
@@ -690,20 +759,21 @@ object TaskManager {
 
     val connectionInfo = new InstanceConnectionInfo(InetAddress.getByName(hostname), dataport)
 
-    val jobManagerURL = configuration.getString(ConfigConstants.JOB_MANAGER_AKKA_URL, null) match {
-      case url: String => url
-      case _ =>
-        val jobManagerAddress = configuration.getString(ConfigConstants
+    val jobManagerURL = if (localAkkaCommunication) {
+      // JobManager and TaskManager are in the same ActorSystem -> Use local Akka URL
+      JobManager.getLocalAkkaURL
+    } else {
+      val jobManagerAddress = configuration.getString(ConfigConstants
           .JOB_MANAGER_IPC_ADDRESS_KEY, null)
-        val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
           ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-        if (jobManagerAddress == null) {
-          throw new RuntimeException("JobManager address has not been specified in the " +
-            "configuration.")
-        }
+      if (jobManagerAddress == null) {
+        throw new RuntimeException("JobManager address has not been specified in the " +
+          "configuration.")
+      }
 
-        JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
+      JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
     }
 
     val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
@@ -720,7 +790,7 @@ object TaskManager {
       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
 
-    val nettyConfig = localExecution match {
+    val nettyConfig = localTaskManagerCommunication match {
       case true => None
       case false => Some(new NettyConfig(
         connectionInfo.address(), connectionInfo.dataPort(), pageSize, configuration))
@@ -728,10 +798,11 @@ object TaskManager {
 
     val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize, nettyConfig)
 
-    val networkBufferMem = if (localExecution) 0 else numNetworkBuffers * pageSize
+    val networkBufferMem = if (localTaskManagerCommunication) 0 else numNetworkBuffers * pageSize
 
-    val configuredMemory: Long = configuration.getInteger(ConfigConstants
-      .TASK_MANAGER_MEMORY_SIZE_KEY, -1)
+    val configuredMemory: Long = configuration.getInteger(
+      ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1
+    )
 
     val memorySize = if (configuredMemory > 0) {
       configuredMemory << 20
@@ -745,9 +816,10 @@ object TaskManager {
         .toLong
     }
 
-    val memoryLoggingIntervalMs = configuration.getBoolean(ConfigConstants
-      .TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD) match {
+    val memoryLoggingIntervalMs = configuration.getBoolean(
+      ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
+      ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD
+    ) match {
       case true => Some(
         configuration.getLong(ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
           ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS)
@@ -755,19 +827,19 @@ object TaskManager {
       case false => None
     }
 
-    val profilingInterval = configuration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY,
-      false) match {
+    val profilingInterval = configuration.getBoolean(
+      ProfilingUtils.ENABLE_PROFILING_KEY, false
+    ) match {
       case true => Some(configuration.getInteger(ProfilingUtils.TASKMANAGER_REPORTINTERVAL_KEY,
         ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL).toLong)
       case false => None
     }
 
-    val cleanupInterval = configuration.getLong(ConfigConstants
-      .LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+    val cleanupInterval = configuration.getLong(
+      ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
-    val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
-      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+    val timeout = AkkaUtils.getTimeout(configuration)
 
     val maxRegistrationDuration = Duration(configuration.getString(
       ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
@@ -793,10 +865,12 @@ object TaskManager {
   }
 
   def startActorWithConfiguration(hostname: String, configuration: Configuration,
-                                  localExecution: Boolean = false)
+                                  localAkkaCommunication: Boolean,
+                                  localTaskManagerCommunication: Boolean)
                                  (implicit system: ActorSystem) = {
     val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) =
-      parseConfiguration(hostname, configuration, localExecution)
+      parseConfiguration(hostname, configuration, localAkkaCommunication,
+        localTaskManagerCommunication)
 
     startActor(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration)
   }
@@ -832,7 +906,7 @@ object TaskManager {
           LOG.info(f"Temporary file directory '$path': total $totalSpaceGb GB, " +
             f"usable $usableSpaceGb GB ($usablePercentage%.2f%% usable)")
         }
-      case (_, id) => throw new Exception(s"Temporary file directory #${id} is null.")
+      case (_, id) => throw new Exception(s"Temporary file directory #$id is null.")
 
     }
   }
@@ -859,7 +933,7 @@ object TaskManager {
       bean =>
         s"[${bean.getName}, GC TIME (ms): ${bean.getCollectionTime}, " +
           s"GC COUNT: ${bean.getCollectionCount}]"
-    } mkString (", ")
+    } mkString ", "
 
     "Garbage collector stats: " + beans
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
index 9d061d7..d2d9cf4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala
@@ -18,4 +18,10 @@
 
 package org.apache.flink.runtime.taskmanager
 
+/**
+ * Command line configuration object for the [[TaskManager]]
+ *
+ * @param configDir Path to configuration directory
+ * @param tmpDir Path to temporary directory
+ */
 case class TaskManagerCLIConfiguration(configDir: String = null, tmpDir: String = null)

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
index 616fb61..1a7c31d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
@@ -35,6 +35,14 @@ import org.apache.flink.runtime.profiling.impl.{EnvironmentThreadSet, InstancePr
 
 import scala.concurrent.duration.FiniteDuration
 
+/**
+ * Actor which is responsible for profiling task threads on the [[TaskManager]]. The monitoring
+ * is triggered by the self-addressed message [[ProfileTasks]] which is scheduled to be sent
+ * repeatedly.
+ *
+ * @param instancePath Akka URL to [[TaskManager]] instance
+ * @param reportInterval Interval of profiling action
+ */
 class TaskManagerProfiler(val instancePath: String, val reportInterval: Int) extends Actor with
 ActorLogMessages with ActorLogging {
 
@@ -57,30 +65,26 @@ ActorLogMessages with ActorLogging {
 
 
   override def receiveWithLogMessages: Receive = {
-    case MonitorTask(task) => {
+    case MonitorTask(task) =>
       task.registerExecutionListener(self)
       environments += task.getExecutionId -> task.getEnvironment
-    }
 
-    case UnmonitorTask(executionAttemptID) => {
+    case UnmonitorTask(executionAttemptID) =>
       environments.remove(executionAttemptID)
-    }
 
-    case RegisterProfilingListener => {
+    case RegisterProfilingListener =>
       listeners += sender
       if (monitoringScheduler.isEmpty) {
-        startMonitoring
+        startMonitoring()
       }
-    }
 
-    case UnregisterProfilingListener => {
+    case UnregisterProfilingListener =>
       listeners -= sender
       if (listeners.isEmpty) {
-        stopMonitoring
+        stopMonitoring()
       }
-    }
 
-    case ProfileTasks => {
+    case ProfileTasks =>
       val timestamp = System.currentTimeMillis()
 
       val profilingDataContainer = new ProfilingDataContainer()
@@ -96,10 +100,9 @@ ActorLogMessages with ActorLogging {
           val instanceProfilingData = try {
             Some(instanceProfiler.generateProfilingData(timestamp))
           } catch {
-            case e: ProfilingException => {
+            case e: ProfilingException =>
               log.error(e, "Error while retrieving instance profiling data.")
               None
-            }
           }
 
           instanceProfilingData foreach {
@@ -115,10 +118,9 @@ ActorLogMessages with ActorLogging {
           profilingDataContainer.clear()
         }
       }
-    }
 
     case ExecutionStateChanged(_, vertexID, _, _, subtaskIndex, executionID, newExecutionState,
-    _, _) => {
+    _, _) =>
       import ExecutionState._
 
       environments.get(executionID) match {
@@ -131,14 +133,15 @@ ActorLogMessages with ActorLogging {
             case _ =>
           }
         case None =>
-          log.warning(s"Could not find environment for execution id ${executionID}.")
+          log.warning(s"Could not find environment for execution id $executionID.")
       }
-    }
   }
 
   def startMonitoring(): Unit = {
     val interval = new FiniteDuration(reportInterval, TimeUnit.MILLISECONDS)
     val delay = new FiniteDuration((reportInterval * Math.random()).toLong, TimeUnit.MILLISECONDS)
+
+    // schedule ProfileTasks message to be sent repeatedly to oneself
     monitoringScheduler = Some(context.system.scheduler.schedule(delay, interval, self,
       ProfileTasks))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index dc81cbe..4d10585 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -45,13 +45,13 @@ public class AllVerticesIteratorTest {
 			ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
 					
 			ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1,
-					AkkaUtils.DEFAULT_TIMEOUT());
+					AkkaUtils.getDefaultTimeout());
 			ExecutionJobVertex ejv2 = new ExecutionJobVertex(eg, v2, 1,
-					AkkaUtils.DEFAULT_TIMEOUT());
+					AkkaUtils.getDefaultTimeout());
 			ExecutionJobVertex ejv3 = new ExecutionJobVertex(eg, v3, 1,
-					AkkaUtils.DEFAULT_TIMEOUT());
+					AkkaUtils.getDefaultTimeout());
 			ExecutionJobVertex ejv4 = new ExecutionJobVertex(eg, v4, 1,
-					AkkaUtils.DEFAULT_TIMEOUT());
+					AkkaUtils.getDefaultTimeout());
 			
 			AllVerticesIterator iter = new AllVerticesIterator(Arrays.asList(ejv1, ejv2, ejv3, ejv4).iterator());
 			


Mime
View raw message