flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/7] flink git commit: [FLINK-3544][runtime] introduce ResourceManager component
Date Tue, 29 Mar 2016 10:52:14 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 518336c..b487e30 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -39,6 +39,10 @@ import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.execution.SuppressRestartsException
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager
+import org.apache.flink.runtime.clusterframework.messages._
+import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, RestartStrategyFactory}
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
@@ -48,15 +52,18 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService}
+
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified}
 import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, AbstractCheckpointMessage, AcknowledgeCheckpoint}
+
+import org.apache.flink.runtime.messages.webmonitor.InfoMessage
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
@@ -65,7 +72,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}
+import org.apache.flink.util.{InstantiationUtil, NetUtils}
 
 import org.jboss.netty.channel.ChannelException
 
@@ -154,6 +161,9 @@ class JobManager(
   val webMonitorPort : Int = flinkConfiguration.getInteger(
     ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
 
+  /** The resource manager actor responsible for allocating and managing task manager resources. */
+  var currentResourceManager: Option[ActorRef] = None
+
   /**
    * Run when the job manager is started. Simply logs an informational message.
    * The method also starts the leader election service.
@@ -312,59 +322,138 @@ class JobManager(
 
       leaderSessionID = None
 
-    case RegisterTaskManager(
-      connectionInfo,
-      hardwareInformation,
-      numberOfSlots) =>
+    case msg: RegisterResourceManager =>
+      log.debug(s"Resource manager registration: $msg")
+
+      // ditch current resource manager (if any)
+      currentResourceManager = Option(msg.resourceManager())
+
+      val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
+        instance => instance.getResourceId).toList.asJava
+
+      // confirm registration and send known task managers with their resource ids
+      sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources))
+
+    case msg: ReconnectResourceManager =>
+      log.debug(s"Resource manager reconnect: $msg")
+
+      /**
+        * In most cases, the ResourceManager handles the reconnect itself (due to leader change)
+        * but in case it doesn't we're sending a TriggerRegistrationAtJobManager message until we
+        * receive a registration of this or another ResourceManager.
+        */
+      def reconnectRepeatedly(): Unit = {
+        msg.resourceManager() ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+        // try again after some delay
+        context.system.scheduler.scheduleOnce(2 seconds) {
+          self ! decorateMessage(msg)
+        }(context.dispatcher)
+      }
+
+      currentResourceManager match {
+        case Some(rm) if rm.equals(msg.resourceManager()) =>
+          // we should ditch the current resource manager
+          log.debug(s"Disconnecting resource manager $rm and forcing a reconnect.")
+          currentResourceManager = None
+          reconnectRepeatedly()
+        case Some(rm) =>
+          // we have registered with another ResourceManager in the meantime, stop sending
+          // TriggerRegistrationAtJobManager messages to the old ResourceManager
+        case None =>
+          log.warn(s"No resource manager ${msg.resourceManager()} connected. " +
+            s"Telling old ResourceManager to register again.")
+          reconnectRepeatedly()
+      }
+
+    case msg @ RegisterTaskManager(
+          resourceId,
+          connectionInfo,
+          hardwareInformation,
+          numberOfSlots) =>
+      // we are being informed by the ResourceManager that a new task manager is available
+      log.debug(s"RegisterTaskManager: $msg")
 
       val taskManager = sender()
 
+      currentResourceManager match {
+        case Some(rm) =>
+          val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout)
+          future.onComplete {
+            case scala.util.Success(response) =>
+              // the resource manager is available and answered
+              self ! response
+            case scala.util.Failure(t) =>
+              log.error("Failure while asking ResourceManager for RegisterResource", t)
+              // slow or unreachable resource manager, register anyway and let the rm reconnect
+              self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg))
+              self ! decorateMessage(new ReconnectResourceManager(rm))
+          }(context.dispatcher)
+
+        case None =>
+          log.info("Task Manager Registration but not connected to ResourceManager")
+          // ResourceManager not yet available
+          // sending task manager information later upon ResourceManager registration
+          self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg))
+      }
+
+    case msg: RegisterResourceSuccessful =>
+
+      val originalMsg = msg.getRegistrationMessage
+      val taskManager = msg.getTaskManager
+
+      // ResourceManager knows about the resource, now let's try to register TaskManager
       if (instanceManager.isRegistered(taskManager)) {
         val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
 
-        // IMPORTANT: Send the response to the "sender", which is not the
-        //            TaskManager actor, but the ask future!
-        sender() ! decorateMessage(
+        taskManager ! decorateMessage(
           AlreadyRegistered(
             instanceID,
-            libraryCacheManager.getBlobServerPort)
-        )
-      }
-      else {
+            libraryCacheManager.getBlobServerPort))
+      } else {
         try {
           val instanceID = instanceManager.registerTaskManager(
             taskManager,
-            connectionInfo,
-            hardwareInformation,
-            numberOfSlots,
+            originalMsg.resourceId,
+            originalMsg.connectionInfo,
+            originalMsg.resources,
+            originalMsg.numberOfSlots,
             leaderSessionID.orNull)
 
-          // IMPORTANT: Send the response to the "sender", which is not the
-          //            TaskManager actor, but the ask future!
-          sender() ! decorateMessage(
-            AcknowledgeRegistration(
-              instanceID,
-              libraryCacheManager.getBlobServerPort)
-          )
+          taskManager ! decorateMessage(
+            AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort))
 
           // to be notified when the taskManager is no longer reachable
           context.watch(taskManager)
-        }
-        catch {
+        } catch {
           // registerTaskManager throws an IllegalStateException if it is already shut down
           // let the actor crash and restart itself in this case
           case e: Exception =>
             log.error("Failed to register TaskManager at instance manager", e)
 
-            // IMPORTANT: Send the response to the "sender", which is not the
-            //            TaskManager actor, but the ask future!
-            sender() ! decorateMessage(
-              RefuseRegistration(
-                ExceptionUtils.stringifyException(e))
-            )
+            taskManager ! decorateMessage(
+              RefuseRegistration(e))
         }
       }
 
+    case msg: RegisterResourceFailed =>
+
+      val taskManager = msg.getTaskManager
+      val resourceId = msg.getResourceID
+      log.warn(s"TaskManager's resource id $resourceId is not registered with ResourceManager. " +
+        s"Refusing registration.")
+
+      taskManager ! decorateMessage(
+        RefuseRegistration(new IllegalStateException(
+            s"Resource $resourceId not registered with resource manager.")))
+
+    case msg: ResourceRemoved =>
+      // we're being informed by the resource manager that a resource has become unavailable
+      val resourceID = msg.resourceId()
+      log.debug(s"Resource has been removed: $resourceID")
+      val instance = instanceManager.getRegisteredInstance(resourceID)
+      // trigger removal of task manager
+      handleTaskManagerTerminated(instance.getActorGateway.actor())
+
     case RequestNumberRegisteredTaskManager =>
       sender ! decorateMessage(instanceManager.getNumberOfRegisteredTaskManagers)
 
@@ -815,7 +904,6 @@ class JobManager(
       sender ! decorateMessage(ResponseArchive(archive))
 
     case RequestRegisteredTaskManagers =>
-      import scala.collection.JavaConverters._
       sender ! decorateMessage(
         RegisteredTaskManagers(
           instanceManager.getAllRegisteredInstances.asScala
@@ -842,13 +930,8 @@ class JobManager(
       val gateway = instanceManager.getRegisteredInstanceById(instanceID).getActorGateway
       gateway.forward(SendStackTrace, new AkkaActorGateway(sender, leaderSessionID.orNull))
 
-    case Terminated(taskManager) =>
-      if (instanceManager.isRegistered(taskManager)) {
-        log.info(s"Task manager ${taskManager.path} terminated.")
-
-        instanceManager.unregisterTaskManager(taskManager, true)
-        context.unwatch(taskManager)
-      }
+    case Terminated(taskManagerActorRef) =>
+      handleTaskManagerTerminated(taskManagerActorRef)
 
     case RequestJobManagerStatus =>
       sender() ! decorateMessage(JobManagerStatusAlive)
@@ -890,6 +973,34 @@ class JobManager(
         context.unwatch(taskManager)
       }
 
+    case msg: StopCluster =>
+
+      log.info(s"Stopping JobManager with final application status ${msg.finalStatus()} " +
+        s"and diagnostics: ${msg.message()}")
+
+      // stop all task managers
+      instanceManager.getAllRegisteredInstances.asScala foreach {
+        instance =>
+          instance.getActorGateway.tell(msg)
+      }
+
+      // send resource manager the ok
+      currentResourceManager match {
+        case Some(rm) =>
+
+          // inform rm
+          rm ! decorateMessage(msg)
+
+          sender() ! decorateMessage(StopClusterSuccessful.getInstance())
+
+          // trigger shutdown
+          shutdown()
+
+        case None =>
+          // ResourceManager not available
+          // we choose not to wait here beacuse it might block the shutdown forever
+      }
+
     case RequestLeaderSessionID =>
       sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
 
@@ -898,6 +1009,21 @@ class JobManager(
   }
 
   /**
+    * Handler to be executed when a task manager terminates.
+    * (Akka Deathwatch or notifiction from ResourceManager)
+    *
+    * @param taskManager The ActorRef of the taskManager
+    */
+  private def handleTaskManagerTerminated(taskManager: ActorRef): Unit = {
+    if (instanceManager.isRegistered(taskManager)) {
+      log.info(s"Task manager ${taskManager.path} terminated.")
+
+      instanceManager.unregisterTaskManager(taskManager, true)
+      context.unwatch(taskManager)
+    }
+  }
+
+  /**
    * Submits a job to the job manager. The job is registered at the libraryCacheManager which
    * creates the job's class loader. The job graph is appended to the corresponding execution
    * graph and the execution vertices are queued for scheduling.
@@ -1554,8 +1680,8 @@ class JobManager(
   
   /**
    * Updates the accumulators reported from a task manager via the Heartbeat message.
-    *
-    * @param accumulators list of accumulator snapshots
+   *
+   * @param accumulators list of accumulator snapshots
    */
   private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
     accumulators foreach {
@@ -1570,6 +1696,20 @@ class JobManager(
         }
     }
   }
+
+  /**
+    * Shutdown method which may be overridden for testing.
+    */
+  protected def shutdown() : Unit = {
+    // Await actor system termination and shut down JVM
+    new ProcessShutDownThread(
+      log.logger,
+      context.system,
+      FiniteDuration(10, SECONDS)).start()
+
+    // Shutdown and discard all queued messages
+    context.system.shutdown()
+  }
 }
 
 /**
@@ -1689,14 +1829,15 @@ object JobManager {
       listeningAddress: String,
       listeningPort: Int)
     : Unit = {
-    
-    val (jobManagerSystem, _, _, _) = startActorSystemAndJobManagerActors(
+
+    val (jobManagerSystem, _, _, _, _) = startActorSystemAndJobManagerActors(
       configuration,
       executionMode,
       listeningAddress,
       listeningPort,
       classOf[JobManager],
-      classOf[MemoryArchivist]
+      classOf[MemoryArchivist],
+      Option(classOf[StandaloneResourceManager])
     )
 
     // block until everything is shut down
@@ -1810,6 +1951,7 @@ object JobManager {
     * @param listeningPort The port where the JobManager should listen for messages
     * @param jobManagerClass The class of the JobManager to be started
     * @param archiveClass The class of the Archivist to be started
+    * @param resourceManagerClass Optional class of resource manager if one should be started
     * @return A tuple containing the started ActorSystem, ActorRefs to the JobManager and the
     *         Archivist and an Option containing a possibly started WebMonitor
     */
@@ -1819,8 +1961,9 @@ object JobManager {
       listeningAddress: String,
       listeningPort: Int,
       jobManagerClass: Class[_ <: JobManager],
-      archiveClass: Class[_ <: MemoryArchivist])
-    : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor]) = {
+      archiveClass: Class[_ <: MemoryArchivist],
+      resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
+    : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = {
 
     LOG.info("Starting JobManager")
 
@@ -1905,6 +2048,7 @@ object JobManager {
 
         val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor(
           configuration,
+          ResourceID.generate(),
           jobManagerSystem,
           listeningAddress,
           Some(TaskManager.TASK_MANAGER_NAME),
@@ -1922,13 +2066,29 @@ object JobManager {
           "TaskManager_Process_Reaper")
       }
 
+      // start web monitor
       webMonitor.foreach {
         monitor =>
           val jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(configuration)
           monitor.start(jobManagerAkkaUrl)
       }
 
-      (jobManagerSystem, jobManager, archive, webMonitor)
+      val resourceManager =
+        resourceManagerClass match {
+          case Some(rmClass) =>
+            LOG.debug("Starting Resource manager actor")
+            Option(
+              FlinkResourceManager.startResourceManagerActors(
+                configuration,
+                jobManagerSystem,
+                LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
+                rmClass))
+          case None =>
+            LOG.info("Resource Manager class not provided. No resource manager will be started.")
+            None
+        }
+
+      (jobManagerSystem, jobManager, archive, webMonitor, resourceManager)
     }
     catch {
       case t: Throwable =>
@@ -2197,8 +2357,8 @@ object JobManager {
   }
 
   /**
-   * Starts the JobManager and job archiver based on the given configuration, in the
-   * given actor system.
+   * Starts the JobManager and job archiver based on the given configuration, in
+   * the given actor system.
    *
    * @param configuration The configuration for the JobManager
    * @param actorSystem The actor system running the JobManager
@@ -2292,10 +2452,6 @@ object JobManager {
     (jobManager, archive)
   }
 
-  def startActor(props: Props, actorSystem: ActorSystem): ActorRef = {
-    actorSystem.actorOf(props, JOB_MANAGER_NAME)
-  }
-
   // --------------------------------------------------------------------------
   //  Resolving the JobManager endpoint
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index 941d63f..b48bcf9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.messages
 
+import akka.actor.ActorRef
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription}
 
 import scala.concurrent.duration.{Deadline, FiniteDuration}
 
 /**
- * A set of messages from the between TaskManager and JobManager handle the
+ * A set of messages between TaskManager and JobManager to handle the
  * registration of the TaskManager at the JobManager.
  */
 object RegistrationMessages {
@@ -49,7 +51,7 @@ object RegistrationMessages {
     extends RegistrationMessage
 
   /**
-   * Registers a task manager at the job manager. A successful registration is acknowledged by
+   * Registers a task manager at the JobManager. A successful registration is acknowledged by
    * [[AcknowledgeRegistration]].
    *
    * @param connectionInfo The TaskManagers connection information.
@@ -57,14 +59,16 @@ object RegistrationMessages {
    * @param numberOfSlots The number of processing slots offered by the TaskManager.
    */
   case class RegisterTaskManager(
+      resourceId: ResourceID,
       connectionInfo: InstanceConnectionInfo,
       resources: HardwareDescription,
       numberOfSlots: Int)
     extends RegistrationMessage
 
   /**
-   * Denotes the successful registration of a task manager at the job manager. This is the
-   * response triggered by the [[RegisterTaskManager]] message.
+   * Denotes the successful registration of a task manager at the JobManager. This is the
+   * response triggered by the [[RegisterTaskManager]] message when the JobManager has registered
+   * the task manager with the resource manager.
    *
    * @param instanceID The instance ID under which the TaskManager is registered at the
    *                   JobManager.
@@ -87,11 +91,11 @@ object RegistrationMessages {
     extends RegistrationMessage
 
   /**
-   * Denotes the unsuccessful registration of a task manager at the job manager. This is the
+   * Denotes the unsuccessful registration of a task manager at the JobManager. This is the
    * response triggered by the [[RegisterTaskManager]] message.
    *
    * @param reason Reason why the task manager registration was refused
    */
-  case class RefuseRegistration(reason: String)
-    extends RegistrationMessage
+  case class RefuseRegistration(reason: Throwable) extends RegistrationMessage
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 30c82fe..9ffaca0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -44,6 +44,7 @@ object TaskManagerMessages {
 
     /**
      * Accessor for the case object instance, to simplify Java interoperability.
+     *
      * @return The SendHeartbeat case object instance.
      */
     def get() : SendHeartbeat.type = SendHeartbeat
@@ -74,6 +75,7 @@ object TaskManagerMessages {
 
     /**
      * Accessor for the case object instance, to simplify Java interoperability.
+     *
      * @return The SendStackTrace case object instance.
      */
     def get() : SendStackTrace.type = SendStackTrace
@@ -95,14 +97,14 @@ object TaskManagerMessages {
 
   /**
    * Requests a notification from the task manager as soon as the task manager has been
-   * registered at any job manager. Once the task manager is registered at any job manager a
+   * registered at a job manager. Once the task manager is registered at a job manager a
    * [[RegisteredAtJobManager]] message will be sent to the sender.
    */
-  case object NotifyWhenRegisteredAtAnyJobManager
+  case object NotifyWhenRegisteredAtJobManager
 
   /**
    * Acknowledges that the task manager has been successfully registered at any job manager. This
-   * message is a response to [[NotifyWhenRegisteredAtAnyJobManager]].
+   * message is a response to [[NotifyWhenRegisteredAtJobManager]].
    */
   case object RegisteredAtJobManager
 
@@ -121,13 +123,15 @@ object TaskManagerMessages {
 
   /**
    * Accessor for the case object instance, to simplify Java interoperability.
+   *
    * @return The NotifyWhenRegisteredAtJobManager case object instance.
    */
   def getNotifyWhenRegisteredAtJobManagerMessage:
-            NotifyWhenRegisteredAtAnyJobManager.type = NotifyWhenRegisteredAtAnyJobManager
+  NotifyWhenRegisteredAtJobManager.type = NotifyWhenRegisteredAtJobManager
 
   /**
    * Accessor for the case object instance, to simplify Java interoperability.
+   *
    * @return The RegisteredAtJobManager case object instance.
    */
   def getRegisteredAtJobManagerMessage:

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 0346d6d..5074b8c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.RecoveryMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, LeaderRetrievalListener,
 StandaloneLeaderRetrievalService}
-import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtAnyJobManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.ZooKeeperUtils
 import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
 
@@ -102,8 +102,11 @@ abstract class FlinkMiniCluster(
   var taskManagerActorSystems: Option[Seq[ActorSystem]] = None
   var taskManagerActors: Option[Seq[ActorRef]] = None
 
-  protected var leaderRetrievalService: Option[LeaderRetrievalService] = None
-  
+  var resourceManagerActorSystems: Option[Seq[ActorSystem]] = None
+  var resourceManagerActors: Option[Seq[ActorRef]] = None
+
+  protected var jobManagerLeaderRetrievalService: Option[LeaderRetrievalService] = None
+
   private var isRunning = false
 
   // --------------------------------------------------------------------------
@@ -112,6 +115,8 @@ abstract class FlinkMiniCluster(
 
   def generateConfiguration(userConfiguration: Configuration): Configuration
 
+  def startResourceManager(index: Int, system: ActorSystem): ActorRef
+
   def startJobManager(index: Int, system: ActorSystem): ActorRef
 
   def startTaskManager(index: Int, system: ActorSystem): ActorRef
@@ -131,6 +136,17 @@ abstract class FlinkMiniCluster(
     }
   }
 
+  def getNumberOfResourceManagers: Int = {
+    if(recoveryMode == RecoveryMode.STANDALONE) {
+      1
+    } else {
+      configuration.getInteger(
+        ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
+        ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
+      )
+    }
+  }
+
   def getJobManagersAsJava = {
     import collection.JavaConverters._
     jobManagerActors.getOrElse(Seq()).asJava
@@ -165,6 +181,19 @@ abstract class FlinkMiniCluster(
     Await.result(indexFuture, timeout)
   }
 
+  def getResourceManagerAkkaConfig(index: Int): Config = {
+    if (useSingleActorSystem) {
+      AkkaUtils.getAkkaConfig(configuration, None)
+    } else {
+      val port = configuration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
+        ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+
+      val resolvedPort = if(port != 0) port + index else port
+
+      AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+    }
+  }
+
   def getJobManagerAkkaConfig(index: Int): Config = {
     if (useSingleActorSystem) {
       AkkaUtils.getAkkaConfig(configuration, None)
@@ -208,6 +237,11 @@ abstract class FlinkMiniCluster(
   //                          Start/Stop Methods
   // --------------------------------------------------------------------------
 
+  def startResourceManagerActorSystem(index: Int): ActorSystem = {
+    val config = getResourceManagerAkkaConfig(index)
+    AkkaUtils.createActorSystem(config)
+  }
+
   def startJobManagerActorSystem(index: Int): ActorSystem = {
     val config = getJobManagerAkkaConfig(index)
     AkkaUtils.createActorSystem(config)
@@ -215,7 +249,6 @@ abstract class FlinkMiniCluster(
 
   def startTaskManagerActorSystem(index: Int): ActorSystem = {
     val config = getTaskManagerAkkaConfig(index)
-
     AkkaUtils.createActorSystem(config)
   }
 
@@ -254,11 +287,27 @@ abstract class FlinkMiniCluster(
     jobManagerActorSystems = Some(jmActorSystems)
     jobManagerActors = Some(jmActors)
 
+    // start leader retrieval service
     val lrs = createLeaderRetrievalService()
-
-    leaderRetrievalService = Some(lrs)
+    jobManagerLeaderRetrievalService = Some(lrs)
     lrs.start(this)
 
+    // start as many resource managers as job managers
+    val (rmActorSystems, rmActors) =
+      (for(i <- 0 until getNumberOfResourceManagers) yield {
+        val actorSystem = if(useSingleActorSystem) {
+          jmActorSystems(0)
+        } else {
+          startResourceManagerActorSystem(i)
+        }
+
+        (actorSystem, startResourceManager(i, actorSystem))
+      }).unzip
+
+    resourceManagerActorSystems = Some(rmActorSystems)
+    resourceManagerActors = Some(rmActors)
+
+    // start task managers
     val (tmActorSystems, tmActors) =
       (for(i <- 0 until numTaskManagers) yield {
         val actorSystem = if(useSingleActorSystem) {
@@ -292,7 +341,7 @@ abstract class FlinkMiniCluster(
     if(
       config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
         config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-      
+
       // TODO: Add support for HA: Make web server work independently from the JM
       val leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManagerAkkaURL)
 
@@ -319,7 +368,7 @@ abstract class FlinkMiniCluster(
     shutdown()
     awaitTermination()
 
-    leaderRetrievalService.foreach(_.stop())
+    jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
   }
 
@@ -337,17 +386,26 @@ abstract class FlinkMiniCluster(
       _.map(gracefulStop(_, timeout))
     } getOrElse(Seq())
 
-    Await.ready(Future.sequence(jmFutures ++ tmFutures), timeout)
+    val rmFutures = resourceManagerActors map {
+      _.map(gracefulStop(_, timeout))
+    } getOrElse(Seq())
+
+    Await.ready(Future.sequence(jmFutures ++ tmFutures ++ rmFutures), timeout)
 
     if (!useSingleActorSystem) {
       taskManagerActorSystems foreach {
         _ foreach(_.shutdown())
       }
+
+      resourceManagerActorSystems foreach {
+        _ foreach(_.shutdown())
+      }
     }
 
     jobManagerActorSystems foreach {
       _ foreach(_.shutdown())
     }
+
   }
 
   def awaitTermination(): Unit = {
@@ -355,6 +413,10 @@ abstract class FlinkMiniCluster(
       _ foreach(_.awaitTermination())
     }
 
+    resourceManagerActorSystems foreach {
+      _ foreach(_.awaitTermination())
+    }
+
     taskManagerActorSystems foreach {
       _ foreach(_.awaitTermination())
     }
@@ -387,7 +449,7 @@ abstract class FlinkMiniCluster(
   @throws(classOf[InterruptedException])
   def waitForTaskManagersToBeRegistered(timeout: FiniteDuration): Unit = {
     val futures = taskManagerActors map {
-      _ map(taskManager => (taskManager ? NotifyWhenRegisteredAtAnyJobManager)(timeout))
+      _ map(taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout))
     } getOrElse(Seq())
 
     Await.ready(Future.sequence(futures), timeout)

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index a4c10e7..1b398de 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -26,6 +26,9 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager
+import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.messages.JobManagerMessages
@@ -89,6 +92,29 @@ class LocalFlinkMiniCluster(
     jobManager
   }
 
+  override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val resourceManagerName = getResourceManagerName(index)
+
+    val resourceManagerPort = config.getInteger(
+      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+
+    if(resourceManagerPort > 0) {
+      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
+    }
+
+    val resourceManager = FlinkResourceManager.startResourceManagerActors(
+      config,
+      system,
+      createLeaderRetrievalService(),
+      classOf[StandaloneResourceManager],
+      resourceManagerName)
+
+    resourceManager
+  }
+
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
     val config = configuration.clone()
 
@@ -117,10 +143,11 @@ class LocalFlinkMiniCluster(
     
     TaskManager.startTaskManagerComponentsAndActor(
       config,
+      ResourceID.generate(), // generate random resource id
       system,
       hostname, // network interface to bind to
       Some(taskManagerActorName), // actor name
-      Some(createLeaderRetrievalService), // job manager leader retrieval service
+      Some(createLeaderRetrievalService()), // job manager leader retrieval service
       localExecution, // start network stack?
       classOf[TaskManager])
   }
@@ -211,6 +238,15 @@ class LocalFlinkMiniCluster(
       JobManager.JOB_MANAGER_NAME
     }
   }
+
+  protected def getResourceManagerName(index: Int): String = {
+    if(singleActorSystem) {
+      FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1)
+    } else {
+      FlinkResourceManager.RESOURCE_MANAGER_NAME
+    }
+  }
+
   protected def getArchiveName(index: Int): String = {
     if(singleActorSystem) {
       JobManager.ARCHIVE_NAME + "_" + (index + 1)

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8745f26..20a83b0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -38,6 +38,8 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.memory.{HybridMemorySegment, HeapMemorySegment, MemorySegmentFactory, MemoryType}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
+import org.apache.flink.runtime.clusterframework.messages.StopCluster
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobCache, BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -63,7 +65,7 @@ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
-import org.apache.flink.runtime.util.{EnvironmentInformation, LeaderRetrievalUtils, MathUtils, SignalHandler}
+import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.NetUtils
 
@@ -118,6 +120,7 @@ import scala.util.{Failure, Success}
  */
 class TaskManager(
     protected val config: TaskManagerConfiguration,
+    protected val resourceID: ResourceID,
     protected val connectionInfo: InstanceConnectionInfo,
     protected val memoryManager: MemoryManager,
     protected val ioManager: IOManager,
@@ -159,21 +162,23 @@ class TaskManager(
         MetricFilter.ALL))
 
   /** Actors which want to be notified once this task manager has been
-      registered at the job manager */
+    * registered at the job manager */
   private val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
 
   private var blobService: Option[BlobService] = None
   private var libraryCacheManager: Option[LibraryCacheManager] = None
+
+  /* The current leading JobManager Actor associated with */
   protected var currentJobManager: Option[ActorRef] = None
+  /* The current leading JobManager URL */
   private var jobManagerAkkaURL: Option[String] = None
- 
+
   private var instanceID: InstanceID = null
 
   private var heartbeatScheduler: Option[Cancellable] = None
 
   var leaderSessionID: Option[UUID] = None
 
-
   private val runtimeInfo = new TaskManagerRuntimeInfo(
        connectionInfo.getHostname(),
        new UnmodifiableConfiguration(config.configuration))
@@ -212,7 +217,7 @@ class TaskManager(
    */
   override def postStop(): Unit = {
     log.info(s"Stopping TaskManager ${self.path.toSerializationFormat}.")
-    
+
     cancelAndClearEverything(new Exception("TaskManager is shutting down."))
 
     if (isConnected) {
@@ -286,7 +291,7 @@ class TaskManager(
 
     // registers the message sender to be notified once this TaskManager has completed
     // its registration at the JobManager
-    case NotifyWhenRegisteredAtAnyJobManager =>
+    case NotifyWhenRegisteredAtJobManager =>
       if (isConnected) {
         sender ! decorateMessage(RegisteredAtJobManager)
       } else {
@@ -296,18 +301,28 @@ class TaskManager(
     // this message indicates that some actor watched by this TaskManager has died
     case Terminated(actor: ActorRef) =>
       if (isConnected && actor == currentJobManager.orNull) {
-        handleJobManagerDisconnect(sender(), "JobManager is no longer reachable")
-        triggerTaskManagerRegistration()
-      }
-      else {
+          handleJobManagerDisconnect(sender(), "JobManager is no longer reachable")
+          triggerTaskManagerRegistration()
+      } else {
         log.warn(s"Received unrecognized disconnect message " +
-          s"from ${if (actor == null) null else actor.path}.")
+            s"from ${if (actor == null) null else actor.path}.")
       }
 
     case Disconnect(msg) =>
-      handleJobManagerDisconnect(sender(), "JobManager requested disconnect: " + msg)
+      handleJobManagerDisconnect(sender(), s"ResourceManager requested disconnect: $msg")
       triggerTaskManagerRegistration()
 
+    case msg: StopCluster =>
+      log.info(s"Stopping TaskManager with final application status ${msg.finalStatus()} " +
+        s"and diagnostics: ${msg.message()}")
+      context.system.shutdown()
+
+      // Await actor system termination and shut down JVM
+      new ProcessShutDownThread(
+        log.logger,
+        context.system,
+        FiniteDuration(10, SECONDS)).start()
+
     case FatalError(message, cause) =>
       killTaskManagerFatal(message, cause)
   }
@@ -485,7 +500,7 @@ class TaskManager(
         val taskExecutionId = message.getTaskExecutionId
         val checkpointId = message.getCheckpointId
         val timestamp = message.getTimestamp
-        
+
         log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
 
         val task = runningTasks.get(taskExecutionId)
@@ -516,8 +531,7 @@ class TaskManager(
   }
 
   /**
-   * Handler for messages concerning the registration of the TaskManager at
-   * the JobManager.
+   * Handler for messages concerning the registration of the TaskManager at the JobManager.
    *
    * Errors must not propagate out of the handler, but need to be handled internally.
    *
@@ -536,16 +550,14 @@ class TaskManager(
           // in the meantime, the registration is acknowledged
           log.debug(
             "TaskManager was triggered to register at JobManager, but is already registered")
-        }
-        else if (deadline.exists(_.isOverdue())) {
+        } else if (deadline.exists(_.isOverdue())) {
           // we failed to register in time. that means we should quit
           log.error("Failed to register at the JobManager withing the defined maximum " +
             "connect time. Shutting down ...")
 
           // terminate ourselves (hasta la vista)
           self ! decorateMessage(PoisonPill)
-        }
-        else {
+        } else {
           if (!jobManagerAkkaURL.equals(Option(jobManagerURL))) {
             throw new Exception("Invalid internal state: Trying to register at JobManager " +
               s"$jobManagerURL even though the current JobManagerAkkaURL is set to " +
@@ -559,6 +571,7 @@ class TaskManager(
 
           jobManager ! decorateMessage(
             RegisterTaskManager(
+              resourceID,
               connectionInfo,
               resources,
               numberOfSlots)
@@ -592,15 +605,14 @@ class TaskManager(
             log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
               s"because the TaskManager is already registered at ${currentJobManager.orNull}")
           }
-        }
-        else {
+        } else {
           // not yet connected, so let's associate with that JobManager
           try {
             associateWithJobManager(jobManager, id, blobPort)
           } catch {
             case t: Throwable =>
               killTaskManagerFatal(
-                "Unable to start TaskManager components after registering at JobManager", t)
+                "Unable to start TaskManager components and associate with the JobManager", t)
           }
         }
 
@@ -616,9 +628,8 @@ class TaskManager(
               s"JobManager ${jobManager.path}, even through TaskManager is currently " +
               s"registered at ${currentJobManager.orNull}")
           }
-        }
-        else {
-          // not connected, yet, to let's associate
+        } else {
+          // not connected, yet, so let's associate
           log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
 
           try {
@@ -635,31 +646,29 @@ class TaskManager(
           log.error(s"The registration at JobManager $jobManagerAkkaURL was refused, " +
             s"because: $reason. Retrying later...")
 
-        if(jobManagerAkkaURL.isDefined) {
-          // try the registration again after some time
-          val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
-          val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
-            timeout => timeout + delay fromNow
-          }
+          if(jobManagerAkkaURL.isDefined) {
+            // try the registration again after some time
+            val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
+            val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
+              timeout => timeout + delay fromNow
+            }
 
-          context.system.scheduler.scheduleOnce(delay) {
-            self ! decorateMessage(
-              TriggerTaskManagerRegistration(
-                jobManagerAkkaURL.get,
-                TaskManager.INITIAL_REGISTRATION_TIMEOUT,
-                deadline,
-                1)
-            )
-          }(context.dispatcher)
-        }
-      }
-        else {
+            context.system.scheduler.scheduleOnce(delay) {
+              self ! decorateMessage(
+                TriggerTaskManagerRegistration(
+                  jobManagerAkkaURL.get,
+                  TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+                  deadline,
+                  1)
+              )
+            }(context.dispatcher)
+          }
+        } else {
           // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
           if (sender() == currentJobManager.orNull) {
             log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
               s" even though this TaskManager is already registered there.")
-          }
-          else {
+          } else {
             log.warn(s"Ignoring 'RefuseRegistration' from unrelated " +
               s"JobManager (${sender().path})")
           }
@@ -782,14 +791,14 @@ class TaskManager(
   }
 
   // --------------------------------------------------------------------------
-  //  Task Manager / JobManager association and initialization
+  //  Task Manager / ResourceManager / JobManager association and initialization
   // --------------------------------------------------------------------------
 
   /**
-   * Checks whether the TaskManager is currently connected to its JobManager.
-   *
-   * @return True, if the TaskManager is currently connected to a JobManager, false otherwise.
-   */
+    * Checks whether the TaskManager is currently connected to the JobManager.
+    *
+    * @return True, if the TaskManager is currently connected to the JobManager, false otherwise.
+    */
   protected def isConnected : Boolean = currentJobManager.isDefined
 
   /**
@@ -850,8 +859,6 @@ class TaskManager(
         new AkkaActorGateway(jobManager, leaderSessionID.orNull),
         new AkkaActorGateway(self, leaderSessionID.orNull)
       )
-
-
     }
     catch {
       case e: Exception =>
@@ -882,7 +889,6 @@ class TaskManager(
     else {
       libraryCacheManager = Some(new FallbackLibraryCacheManager)
     }
-
     // watch job manager to detect when it dies
     context.watch(jobManager)
 
@@ -979,7 +985,7 @@ class TaskManager(
       }
     }
   }
-  
+
   // --------------------------------------------------------------------------
   //  Task Operations
   // --------------------------------------------------------------------------
@@ -1038,14 +1044,14 @@ class TaskManager(
         runningTasks.put(execId, prevTask)
         throw new IllegalStateException("TaskManager already contains a task for id " + execId)
       }
-      
+
       // all good, we kick off the task, which performs its own initialization
       task.startTaskThread()
-      
+
       sender ! decorateMessage(Acknowledge)
     }
     catch {
-      case t: Throwable => 
+      case t: Throwable =>
         log.error("SubmitTask failed", t)
         sender ! decorateMessage(Failure(t))
     }
@@ -1119,7 +1125,7 @@ class TaskManager(
   private def cancelAndClearEverything(cause: Throwable) {
     if (runningTasks.size > 0) {
       log.info("Cancelling all computations and discarding all cached data.")
-      
+
       for (t <- runningTasks.values().asScala) {
         t.failExternally(cause)
       }
@@ -1243,8 +1249,8 @@ class TaskManager(
   }
 
   /** Handles the notification about a new leader and its address. If the TaskManager is still
-    * connected to another JobManager, it first disconnects from it. If the new JobManager
-    * address is not null, then it starts the registration process.
+    * connected to a JobManager, it first disconnects from it. It then retrieves the new
+    * JobManager address from the new leading JobManager and starts the registration process.
     *
     * @param newJobManagerAkkaURL Akka URL of the new job manager
     * @param leaderSessionID New leader session ID associated with the leader
@@ -1272,7 +1278,6 @@ class TaskManager(
   }
 
   /** Starts the TaskManager's registration process to connect to the JobManager.
-    *
     */
   def triggerTaskManagerRegistration(): Unit = {
     if(jobManagerAkkaURL.isDefined) {
@@ -1322,7 +1327,9 @@ object TaskManager {
     * connection attempts */
   val STARTUP_CONNECT_LOG_SUPPRESS = 10000L
 
+  /** The initial time for registration of the TaskManager with the JobManager */
   val INITIAL_REGISTRATION_TIMEOUT: FiniteDuration = 500 milliseconds
+  /** The maximum time for registration of the TaskManager with the JobManager */
   val MAX_REGISTRATION_TIMEOUT: FiniteDuration = 30 seconds
 
   val DELAY_AFTER_REFUSED_REGISTRATION: FiniteDuration = 10 seconds
@@ -1362,19 +1369,22 @@ object TaskManager {
         null
     }
 
+    // In Standalone mode, we generate a resource identifier.
+    val resourceId = ResourceID.generate()
+
     // run the TaskManager (if requested in an authentication enabled context)
     try {
       if (SecurityUtils.isSecurityEnabled) {
         LOG.info("Security is enabled. Starting secure TaskManager.")
         SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
           override def run(): Unit = {
-            selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
+            selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager])
           }
         })
       }
       else {
         LOG.info("Security is not enabled. Starting non-authenticated TaskManager.")
-        selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
+        selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager])
       }
     }
     catch {
@@ -1451,7 +1461,7 @@ object TaskManager {
    * After selecting the network interface, this method brings up an actor system
    * for the TaskManager and its actors, starts the TaskManager's services
    * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
-
+   *
    * @param configuration The configuration for the TaskManager.
    * @param taskManagerClass The actor class to instantiate.
    *                         Allows to use TaskManager subclasses for example for YARN.
@@ -1459,6 +1469,7 @@ object TaskManager {
   @throws(classOf[Exception])
   def selectNetworkInterfaceAndRunTaskManager(
       configuration: Configuration,
+      resourceID: ResourceID,
       taskManagerClass: Class[_ <: TaskManager])
     : Unit = {
 
@@ -1466,6 +1477,7 @@ object TaskManager {
 
     runTaskManager(
       taskManagerHostname,
+      resourceID,
       actorSystemPort,
       configuration,
       taskManagerClass)
@@ -1518,18 +1530,21 @@ object TaskManager {
    *
    * @param taskManagerHostname The hostname/address of the interface where the actor system
    *                         will communicate.
+   * @param resourceID The id of the resource which the task manager will run on.
    * @param actorSystemPort The port at which the actor system will communicate.
    * @param configuration The configuration for the TaskManager.
    */
   @throws(classOf[Exception])
   def runTaskManager(
       taskManagerHostname: String,
+      resourceID: ResourceID,
       actorSystemPort: Int,
       configuration: Configuration)
     : Unit = {
 
     runTaskManager(
       taskManagerHostname,
+      resourceID,
       actorSystemPort,
       configuration,
       classOf[TaskManager])
@@ -1545,6 +1560,7 @@ object TaskManager {
    *
    * @param taskManagerHostname The hostname/address of the interface where the actor system
    *                         will communicate.
+   * @param resourceID The id of the resource which the task manager will run on.
    * @param actorSystemPort The port at which the actor system will communicate.
    * @param configuration The configuration for the TaskManager.
    * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager
@@ -1553,6 +1569,7 @@ object TaskManager {
   @throws(classOf[Exception])
   def runTaskManager(
       taskManagerHostname: String,
+      resourceID: ResourceID,
       actorSystemPort: Int,
       configuration: Configuration,
       taskManagerClass: Class[_ <: TaskManager])
@@ -1594,6 +1611,7 @@ object TaskManager {
       LOG.info("Starting TaskManager actor")
       val taskManager = startTaskManagerComponentsAndActor(
         configuration,
+        resourceID,
         taskManagerSystem,
         taskManagerHostname,
         Some(TASK_MANAGER_NAME),
@@ -1642,6 +1660,7 @@ object TaskManager {
   /**
    *
    * @param configuration The configuration for the TaskManager.
+   * @param resourceID The id of the resource which the task manager will run on.
    * @param actorSystem The actor system that should run the TaskManager actor.
    * @param taskManagerHostname The hostname/address that describes the TaskManager's data location.
    * @param taskManagerActorName Optionally the name of the TaskManager actor. If none is given,
@@ -1653,15 +1672,12 @@ object TaskManager {
    *                                      TCP network stack.
    * @param taskManagerClass The class of the TaskManager actor. May be used to give
    *                         subclasses that understand additional actor messages.
-   *
-   * @throws org.apache.flink.configuration.IllegalConfigurationException
+    * @throws org.apache.flink.configuration.IllegalConfigurationException
    *                              Thrown, if the given config contains illegal values.
-   *
    * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools,
    *                             I/O manager, ...) cannot be properly started.
    * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration
    *                             or starting the TaskManager components.
-   *
    * @return An ActorRef to the TaskManager actor.
    */
   @throws(classOf[IllegalConfigurationException])
@@ -1669,6 +1685,7 @@ object TaskManager {
   @throws(classOf[Exception])
   def startTaskManagerComponentsAndActor(
       configuration: Configuration,
+      resourceID: ResourceID,
       actorSystem: ActorSystem,
       taskManagerHostname: String,
       taskManagerActorName: Option[String],
@@ -1799,6 +1816,7 @@ object TaskManager {
     val tmProps = Props(
       taskManagerClass,
       taskManagerConfig,
+      resourceID,
       connectionInfo,
       memoryManager,
       ioManager,
@@ -2066,7 +2084,6 @@ object TaskManager {
    * @param parameter The parameter value. Will be shown in the exception message.
    * @param name The name of the config parameter. Will be shown in the exception message.
    * @param errorMessage The optional custom error message to append to the exception message.
-   *
    * @throws IllegalConfigurationException Thrown if the condition is violated.
    */
   @throws(classOf[IllegalConfigurationException])

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
new file mode 100644
index 0000000..9ab40fd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+public class BootstrapToolsTest {
+
+	@Test
+	public void testSubstituteConfigKey() {
+		String deprecatedKey1 ="deprecated-key";
+		String deprecatedKey2 ="another-out_of-date_key";
+		String deprecatedKey3 ="yet-one-more";
+
+		String designatedKey1 ="newkey1";
+		String designatedKey2 ="newKey2";
+		String designatedKey3 ="newKey3";
+
+		String value1 = "value1";
+		String value2_designated = "designated-value2";
+		String value2_deprecated = "deprecated-value2";
+
+		// config contains only deprecated key 1, and for key 2 both deprecated and designated
+		Configuration cfg = new Configuration();
+		cfg.setString(deprecatedKey1, value1);
+		cfg.setString(deprecatedKey2, value2_deprecated);
+		cfg.setString(designatedKey2, value2_designated);
+
+		BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey1, designatedKey1);
+		BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey2, designatedKey2);
+		BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey3, designatedKey3);
+
+		// value 1 should be set to designated
+		assertEquals(value1, cfg.getString(designatedKey1, null));
+
+		// value 2 should not have been set, since it had a value already
+		assertEquals(value2_designated, cfg.getString(designatedKey2, null));
+
+		// nothing should be in there for key 3
+		assertNull(cfg.getString(designatedKey3, null));
+		assertNull(cfg.getString(deprecatedKey3, null));
+	}
+
+	@Test
+	public void testSubstituteConfigKeyPrefix() {
+		String deprecatedPrefix1 ="deprecated-prefix";
+		String deprecatedPrefix2 ="-prefix-2";
+		String deprecatedPrefix3 ="prefix-3";
+
+		String designatedPrefix1 ="p1";
+		String designatedPrefix2 ="ppp";
+		String designatedPrefix3 ="zzz";
+
+		String depr1 = deprecatedPrefix1 + "var";
+		String depr2 = deprecatedPrefix2 + "env";
+		String depr3 = deprecatedPrefix2 + "x";
+
+		String desig1 = designatedPrefix1 + "var";
+		String desig2 = designatedPrefix2 + "env";
+		String desig3 = designatedPrefix2 + "x";
+
+		String val1 = "1";
+		String val2 = "2";
+		String val3_depr = "3-";
+		String val3_desig = "3+";
+
+		// config contains only deprecated key 1, and for key 2 both deprecated and designated
+		Configuration cfg = new Configuration();
+		cfg.setString(depr1, val1);
+		cfg.setString(depr2, val2);
+		cfg.setString(depr3, val3_depr);
+		cfg.setString(desig3, val3_desig);
+
+		BootstrapTools.substituteDeprecatedConfigPrefix(cfg, deprecatedPrefix1, designatedPrefix1);
+		BootstrapTools.substituteDeprecatedConfigPrefix(cfg, deprecatedPrefix2, designatedPrefix2);
+		BootstrapTools.substituteDeprecatedConfigPrefix(cfg, deprecatedPrefix3, designatedPrefix3);
+
+		assertEquals(val1, cfg.getString(desig1, null));
+		assertEquals(val2, cfg.getString(desig2, null));
+		assertEquals(val3_desig, cfg.getString(desig3, null));
+
+		// check that nothing with prefix 3 is contained
+		for (String key : cfg.keySet()) {
+			assertFalse(key.startsWith(designatedPrefix3));
+			assertFalse(key.startsWith(deprecatedPrefix3));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index c85ca13..6659b5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -111,7 +112,7 @@ public class ExecutionGraphTestUtils {
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 
-		return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots);
+		return new Instance(gateway, connection, ResourceID.generate(), new InstanceID(), hardwareDescription, numberOfSlots);
 	}
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index 2d7d6f5..a4c86e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
@@ -387,6 +388,7 @@ public class LocalInputSplitsTest {
 				new ExecutionGraphTestUtils.SimpleActorGateway(
 						TestingUtils.defaultExecutionContext()),
 				connection,
+				ResourceID.generate(),
 				new InstanceID(),
 				hardwareDescription,
 				slots);

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index f747ff3..a28fb49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.DummyActorGateway;
@@ -80,7 +81,8 @@ public class TerminalStateDeadlockTest {
 			InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345);
 				
 			HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
-			Instance instance = new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, 4);
+			Instance instance = new Instance(DummyActorGateway.INSTANCE, ci,
+				ResourceID.generate(), new InstanceID(), resources, 4);
 
 			this.resource = instance.allocateSimpleSlot(new JobID());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 25c9d70..d866b2f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
@@ -452,6 +453,7 @@ public class VertexLocationConstraintTest {
 				new ExecutionGraphTestUtils.SimpleActorGateway(
 						TestingUtils.defaultExecutionContext()),
 				connection,
+				ResourceID.generate(),
 				new InstanceID(),
 				hardwareDescription,
 				1);

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index a0166eb..ff5e2ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -35,6 +35,7 @@ import java.util.UUID;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 
@@ -67,26 +68,33 @@ public class InstanceManagerTest{
 	public void testInstanceRegistering() {
 		try {
 			InstanceManager cm = new InstanceManager();
-			
+
 			final int dataPort = 20000;
 
 			HardwareDescription hardwareDescription = HardwareDescription.extractFromSystem(4096);
 
 			InetAddress address = InetAddress.getByName("127.0.0.1");
-			
+
 			// register three instances
 			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort);
 			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, dataPort + 15);
 			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, dataPort + 30);
 
+			ResourceID resID1 = ResourceID.generate();
+			ResourceID resID2 = ResourceID.generate();
+			ResourceID resID3 = ResourceID.generate();
+
 			final JavaTestKit probe1 = new JavaTestKit(system);
 			final JavaTestKit probe2 = new JavaTestKit(system);
 			final JavaTestKit probe3 = new JavaTestKit(system);
 
-			cm.registerTaskManager(probe1.getRef(), ici1, hardwareDescription, 1, leaderSessionID);
-			cm.registerTaskManager(probe2.getRef(), ici2, hardwareDescription, 2, leaderSessionID);
-			cm.registerTaskManager(probe3.getRef(), ici3, hardwareDescription, 5, leaderSessionID);
-			
+			cm.registerTaskManager(probe1.getRef(), resID1,
+				ici1, hardwareDescription, 1, leaderSessionID);
+			cm.registerTaskManager(probe2.getRef(), resID2,
+				ici2, hardwareDescription, 2, leaderSessionID);
+			cm.registerTaskManager(probe3.getRef(), resID3,
+				ici3, hardwareDescription, 5, leaderSessionID);
+
 			assertEquals(3, cm.getNumberOfRegisteredTaskManagers());
 			assertEquals(8, cm.getTotalNumberOfSlots());
 
@@ -101,7 +109,7 @@ public class InstanceManagerTest{
 			assertTrue(instanceConnectionInfos.contains(ici1));
 			assertTrue(instanceConnectionInfos.contains(ici2));
 			assertTrue(instanceConnectionInfos.contains(ici3));
-			
+
 			cm.shutdown();
 		}
 		catch (Exception e) {
@@ -110,28 +118,36 @@ public class InstanceManagerTest{
 			Assert.fail("Test erroneous: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testRegisteringAlreadyRegistered() {
 		try {
 			InstanceManager cm = new InstanceManager();
-			
+
 			final int dataPort = 20000;
 
+			ResourceID resID1 = ResourceID.generate();
+			ResourceID resID2 = ResourceID.generate();
+
 			HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			InstanceConnectionInfo ici = new InstanceConnectionInfo(address, dataPort);
 
 			JavaTestKit probe = new JavaTestKit(system);
-			InstanceID i = cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID);
+			cm.registerTaskManager(probe.getRef(), resID1,
+				ici, resources, 1, leaderSessionID);
 
-			assertNotNull(i);
 			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
 			assertEquals(1, cm.getTotalNumberOfSlots());
-			
-			InstanceID next = cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID);
-			assertNull(next);
-			
+
+			try {
+				cm.registerTaskManager(probe.getRef(), resID2,
+					ici, resources, 1, leaderSessionID);
+			} catch (Exception e) {
+				// good
+			}
+
+			// check for correct number of registerede instances
 			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
 			assertEquals(1, cm.getTotalNumberOfSlots());
 
@@ -143,18 +159,22 @@ public class InstanceManagerTest{
 			Assert.fail("Test erroneous: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testReportHeartbeat() {
 		try {
 			InstanceManager cm = new InstanceManager();
-			
+
 			final int dataPort = 20000;
 
+			ResourceID resID1 = ResourceID.generate();
+			ResourceID resID2 = ResourceID.generate();
+			ResourceID resID3 = ResourceID.generate();
+
 			HardwareDescription hardwareDescription = HardwareDescription.extractFromSystem(4096);
 
 			InetAddress address = InetAddress.getByName("127.0.0.1");
-			
+
 			// register three instances
 			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort);
 			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, dataPort + 1);
@@ -163,19 +183,22 @@ public class InstanceManagerTest{
 			JavaTestKit probe1 = new JavaTestKit(system);
 			JavaTestKit probe2 = new JavaTestKit(system);
 			JavaTestKit probe3 = new JavaTestKit(system);
-			
-			InstanceID i1 = cm.registerTaskManager(probe1.getRef(), ici1, hardwareDescription, 1, leaderSessionID);
-			InstanceID i2 = cm.registerTaskManager(probe2.getRef(), ici2, hardwareDescription, 1, leaderSessionID);
-			InstanceID i3 = cm.registerTaskManager(probe3.getRef(), ici3, hardwareDescription, 1, leaderSessionID);
+
+			InstanceID instanceID1 = cm.registerTaskManager(probe1.getRef(), resID1,
+				ici1, hardwareDescription, 1, leaderSessionID);
+			InstanceID instanceID2 = cm.registerTaskManager(probe2.getRef(), resID2,
+				ici2, hardwareDescription, 1, leaderSessionID);
+			InstanceID instanceID3 = cm.registerTaskManager(probe3.getRef(), resID3,
+				ici3, hardwareDescription, 1, leaderSessionID);
 
 			// report some immediate heart beats
-			assertTrue(cm.reportHeartBeat(i1, new byte[] {}));
-			assertTrue(cm.reportHeartBeat(i2, new byte[] {}));
-			assertTrue(cm.reportHeartBeat(i3, new byte[] {}));
-			
+			assertTrue(cm.reportHeartBeat(instanceID1, new byte[] {}));
+			assertTrue(cm.reportHeartBeat(instanceID2, new byte[] {}));
+			assertTrue(cm.reportHeartBeat(instanceID3, new byte[] {}));
+
 			// report heart beat for non-existing instance
 			assertFalse(cm.reportHeartBeat(new InstanceID(), new byte[] {}));
-			
+
 			final long WAIT = 200;
 			CommonTestUtils.sleepUninterruptibly(WAIT);
 
@@ -190,14 +213,14 @@ public class InstanceManagerTest{
 			// send one heart beat again and verify that the
 			assertTrue(cm.reportHeartBeat(instance1.getId(), new byte[] {}));
 			long newH1 = instance1.getLastHeartBeat();
-			
+
 			long now = System.currentTimeMillis();
-			
+
 			assertTrue(now - h1 >= WAIT);
 			assertTrue(now - h2 >= WAIT);
 			assertTrue(now - h3 >= WAIT);
 			assertTrue(now - newH1 <= WAIT);
-			
+
 			cm.shutdown();
 		}
 		catch (Exception e) {
@@ -206,20 +229,22 @@ public class InstanceManagerTest{
 			Assert.fail("Test erroneous: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testShutdown() {
 		try {
 			InstanceManager cm = new InstanceManager();
 			cm.shutdown();
-			
+
 			try {
+				ResourceID resID = ResourceID.generate();
 				HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
 				InetAddress address = InetAddress.getByName("127.0.0.1");
 				InstanceConnectionInfo ici = new InstanceConnectionInfo(address, 20000);
 
 				JavaTestKit probe = new JavaTestKit(system);
-				cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID);
+				cm.registerTaskManager(probe.getRef(), resID,
+					ici, resources, 1, leaderSessionID);
 				fail("Should raise exception in shutdown state");
 			}
 			catch (IllegalStateException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 2c0f4d9..faa679b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Method;
 import java.net.InetAddress;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.junit.Test;
 
 /**
@@ -38,7 +39,8 @@ public class InstanceTest {
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 
-			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 4);
+			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
+				ResourceID.generate(), new InstanceID(), hardwareDescription, 4);
 
 			assertEquals(4, instance.getTotalNumberOfSlots());
 			assertEquals(4, instance.getNumberOfAvailableSlots());
@@ -99,7 +101,8 @@ public class InstanceTest {
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 
-			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 3);
+			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
+				ResourceID.generate(), new InstanceID(), hardwareDescription, 3);
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 
@@ -129,7 +132,8 @@ public class InstanceTest {
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 
-			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 3);
+			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
+				ResourceID.generate(), new InstanceID(), hardwareDescription, 3);
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 
@@ -166,4 +170,4 @@ public class InstanceTest {
 			fail(e.getMessage());
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index 997bb46..459a3ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 
 import java.net.InetAddress;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.api.common.JobID;
 
@@ -146,7 +147,8 @@ public class SimpleSlotTest {
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 
-		Instance instance = new Instance(DummyActorGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 1);
+		Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
+			ResourceID.generate(), new InstanceID(), hardwareDescription, 1);
 		return instance.allocateSimpleSlot(new JobID());
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 1cd01ff..e820ed6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -74,7 +75,9 @@ public class JobSubmitTest {
 
 		scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.apply(new Tuple2<String, Object>("localhost", port));
 		jobManagerSystem = AkkaUtils.createActorSystem(config, listeningAddress);
-		ActorRef jobManagerActorRef = JobManager.startJobManagerActors(
+
+		// only start JobManager (no ResourceManager)
+		JobManager.startJobManagerActors(
 				config,
 				jobManagerSystem,
 				JobManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index b7bc860..ed6d1ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.DummyActorGateway;
@@ -66,7 +67,8 @@ public class SchedulerTestUtils {
 		final long GB = 1024L*1024*1024;
 		HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB);
 		
-		return new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, numSlots);
+		return new Instance(DummyActorGateway.INSTANCE, ci, ResourceID.generate(),
+			new InstanceID(), resources, numSlots);
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/92ff2b15/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
new file mode 100644
index 0000000..5ea7d76
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Starts a dedicated Actor system and runs a shutdown test to shut it down.
+ */
+public class ClusterShutdownITCase extends TestLogger {
+
+	private static ActorSystem system;
+
+	private static Configuration config = new Configuration();
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+	}
+
+	/**
+	 * Tests cluster shutdown procedure of RM
+	 */
+	@Test
+	public void testClusterShutdown() {
+
+		new JavaTestKit(system){{
+		new Within(duration("30 seconds")) {
+		@Override
+		protected void run() {
+
+			ActorGateway me =
+				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+
+			ActorGateway jobManager = TestingUtils.createJobManager(system, config);
+
+			ActorGateway resourceManager =
+				TestingUtils.createResourceManager(system, jobManager.actor(), config);
+
+			// notify about a resource manager registration at the job manager
+			resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+			// Wait for resource manager
+			expectMsgEquals(Messages.getAcknowledge());
+
+			jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
+
+			expectMsgClass(StopClusterSuccessful.class);
+
+			boolean isTerminated = false;
+			for (int i=0; i < 10 && !isTerminated; i++) {
+				isTerminated = system.isTerminated();
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException e) {
+					// try again
+				}
+			}
+
+			assertTrue(isTerminated);
+
+		}};
+		}};
+	}
+
+}


Mime
View raw message