flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [09/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close calls from ZooKeeper based HA services
Date Fri, 05 May 2017 11:48:14 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 68f43da..5092643 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
@@ -57,8 +57,8 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGr
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
 import org.apache.flink.runtime.jobmaster.JobMaster
+import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService}
 import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME}
-import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService}
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -2037,14 +2037,27 @@ object JobManager {
 
     val timeout = AkkaUtils.getTimeout(configuration)
 
-    val (jobManagerSystem, _, _, webMonitorOption, _) = try {
-      startActorSystemAndJobManagerActors(
+    // we have to first start the JobManager ActorSystem because this determines the port if 0
+    // was chosen before. The method startActorSystem will update the configuration correspondingly.
+    val jobManagerSystem = startActorSystem(
+      configuration,
+      listeningAddress,
+      listeningPort)
+
+    val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+      configuration,
+      ioExecutor,
+      AddressResolution.NO_ADDRESS_RESOLUTION);
+
+    val (_, _, webMonitorOption, _) = try {
+      startJobManagerActors(
+        jobManagerSystem,
         configuration,
         executionMode,
         listeningAddress,
-        listeningPort,
         futureExecutor,
         ioExecutor,
+        highAvailabilityServices,
         classOf[JobManager],
         classOf[MemoryArchivist],
         Option(classOf[StandaloneResourceManager])
@@ -2070,6 +2083,13 @@ object JobManager {
         }
     }
 
+    try {
+      highAvailabilityServices.close()
+    } catch {
+      case t: Throwable =>
+        LOG.warn("Could not properly stop the high availability services.", t)
+    }
+
     FlinkExecutors.gracefulShutdown(
       timeout.toMillis,
       TimeUnit.MILLISECONDS,
@@ -2175,32 +2195,18 @@ object JobManager {
     }
   }
 
-  /** Starts an ActorSystem, the JobManager and all its components including the WebMonitor.
+  /**
+    * Starts the JobManager actor system.
     *
-    * @param configuration The configuration object for the JobManager
-    * @param executionMode The execution mode in which to run. Execution mode LOCAL with spawn an
-    *                      additional TaskManager in the same process.
-    * @param externalHostname The hostname where the JobManager is reachable for rpc communication
-    * @param port The port where the JobManager is reachable for rpc communication
-    * @param futureExecutor to run the JobManager's futures
-    * @param ioExecutor to run blocking io operations
-    * @param jobManagerClass The class of the JobManager to be started
-    * @param archiveClass The class of the Archivist to be started
-    * @param resourceManagerClass Optional class of resource manager if one should be started
-    * @return A tuple containing the started ActorSystem, ActorRefs to the JobManager and the
-    *         Archivist and an Option containing a possibly started WebMonitor
+    * @param configuration Configuration to use for the job manager actor system
+    * @param externalHostname External hostname to bind to
+    * @param port Port to bind to
+    * @return Actor system for the JobManager and its components
     */
-  def startActorSystemAndJobManagerActors(
+  def startActorSystem(
       configuration: Configuration,
-      executionMode: JobManagerMode,
       externalHostname: String,
-      port: Int,
-      futureExecutor: ScheduledExecutorService,
-      ioExecutor: Executor,
-      jobManagerClass: Class[_ <: JobManager],
-      archiveClass: Class[_ <: MemoryArchivist],
-      resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
-    : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = {
+      port: Int): ActorSystem = {
 
     val hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(externalHostname, port)
 
@@ -2223,7 +2229,7 @@ object JobManager {
           val cause = t.getCause()
           if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) {
             throw new Exception("Unable to create JobManager at address " + hostPort +
-              " - " + cause.getMessage(), t)
+                                  " - " + cause.getMessage(), t)
           }
         }
         throw new Exception("Could not create JobManager actor system", t)
@@ -2234,11 +2240,42 @@ object JobManager {
     configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get)
     configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get)
 
+    jobManagerSystem
+  }
+
+  /** Starts the JobManager and all its components including the WebMonitor.
+    *
+    * @param configuration The configuration object for the JobManager
+    * @param executionMode The execution mode in which to run. Execution mode LOCAL with spawn an
+    *                      additional TaskManager in the same process.
+    * @param externalHostname The hostname where the JobManager is reachable for rpc communication
+    * @param futureExecutor to run the JobManager's futures
+    * @param ioExecutor to run blocking io operations
+    * @param highAvailabilityServices to instantiate high availability services
+    * @param jobManagerClass The class of the JobManager to be started
+    * @param archiveClass The class of the Archivist to be started
+    * @param resourceManagerClass Optional class of resource manager if one should be started
+    * @return A tuple containing the started ActorSystem, ActorRefs to the JobManager and the
+    *         Archivist and an Option containing a possibly started WebMonitor
+    */
+  def startJobManagerActors(
+      jobManagerSystem: ActorSystem,
+      configuration: Configuration,
+      executionMode: JobManagerMode,
+      externalHostname: String,
+      futureExecutor: ScheduledExecutorService,
+      ioExecutor: Executor,
+      highAvailabilityServices: HighAvailabilityServices,
+      jobManagerClass: Class[_ <: JobManager],
+      archiveClass: Class[_ <: MemoryArchivist],
+      resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
+    : (ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = {
+
     val webMonitor: Option[WebMonitor] =
       if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
         LOG.info("Starting JobManager web frontend")
-        val leaderRetrievalService = LeaderRetrievalUtils
-          .createLeaderRetrievalService(configuration, false)
+        val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
+          HighAvailabilityServices.DEFAULT_JOB_ID)
 
         // start the web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
@@ -2265,6 +2302,7 @@ object JobManager {
         jobManagerSystem,
         futureExecutor,
         ioExecutor,
+        highAvailabilityServices,
         jobManagerClass,
         archiveClass)
 
@@ -2287,9 +2325,9 @@ object JobManager {
           configuration,
           ResourceID.generate(),
           jobManagerSystem,
+          highAvailabilityServices,
           externalHostname,
           Some(TaskExecutor.TASK_MANAGER_NAME),
-          None,
           localTaskManagerCommunication = true,
           classOf[TaskManager])
 
@@ -2324,14 +2362,15 @@ object JobManager {
               FlinkResourceManager.startResourceManagerActors(
                 configuration,
                 jobManagerSystem,
-                LeaderRetrievalUtils.createLeaderRetrievalService(configuration, false),
+                highAvailabilityServices.getJobManagerLeaderRetriever(
+                  HighAvailabilityServices.DEFAULT_JOB_ID),
                 rmClass))
           case None =>
             LOG.info("Resource Manager class not provided. No resource manager will be started.")
             None
         }
 
-      (jobManagerSystem, jobManager, archive, webMonitor, resourceManager)
+      (jobManager, archive, webMonitor, resourceManager)
     }
     catch {
       case t: Throwable =>
@@ -2468,15 +2507,12 @@ object JobManager {
    * @param configuration The configuration from which to parse the config values.
    * @param futureExecutor to run JobManager's futures
    * @param ioExecutor to run blocking io operations
-   * @param leaderElectionServiceOption LeaderElectionService which shall be returned if the option
-   *                                    is defined
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
       futureExecutor: ScheduledExecutorService,
-      ioExecutor: Executor,
-      leaderElectionServiceOption: Option[LeaderElectionService]) :
+      ioExecutor: Executor) :
     (InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
@@ -2484,9 +2520,6 @@ object JobManager {
     FiniteDuration, // timeout
     Int, // number of archived jobs
     Option[Path], // archive path
-    LeaderElectionService,
-    SubmittedJobGraphStore,
-    CheckpointRecoveryFactory,
     FiniteDuration, // timeout for job recovery
     Option[FlinkMetricRegistry]
    ) = {
@@ -2550,32 +2583,6 @@ object JobManager {
         throw t
     }
 
-    // Create recovery related components
-    val (leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) =
-      HighAvailabilityMode.fromConfig(configuration) match {
-        case HighAvailabilityMode.NONE =>
-          val leaderElectionService = leaderElectionServiceOption match {
-            case Some(les) => les
-            case None => new StandaloneLeaderElectionService()
-          }
-
-          (leaderElectionService,
-            new StandaloneSubmittedJobGraphStore(),
-            new StandaloneCheckpointRecoveryFactory())
-
-        case HighAvailabilityMode.ZOOKEEPER =>
-          val client = ZooKeeperUtils.startCuratorFramework(configuration)
-
-          val leaderElectionService = leaderElectionServiceOption match {
-            case Some(les) => les
-            case None => ZooKeeperUtils.createLeaderElectionService(client, configuration)
-          }
-
-          (leaderElectionService,
-            ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, ioExecutor),
-            new ZooKeeperCheckpointRecoveryFactory(client, configuration, ioExecutor))
-      }
-
     val jobRecoveryTimeoutStr = configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY)
 
     val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) {
@@ -2605,9 +2612,6 @@ object JobManager {
       timeout,
       archiveCount,
       archivePath,
-      leaderElectionService,
-      submittedJobGraphs,
-      checkpointRecoveryFactory,
       jobRecoveryTimeout,
       metricRegistry)
   }
@@ -2629,6 +2633,7 @@ object JobManager {
       actorSystem: ActorSystem,
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
+      highAvailabilityServices: HighAvailabilityServices,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
@@ -2638,6 +2643,7 @@ object JobManager {
       actorSystem,
       futureExecutor,
       ioExecutor,
+      highAvailabilityServices,
       Some(JobMaster.JOB_MANAGER_NAME),
       Some(JobMaster.ARCHIVE_NAME),
       jobManagerClass,
@@ -2665,6 +2671,7 @@ object JobManager {
       actorSystem: ActorSystem,
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
+      highAvailabilityServices: HighAvailabilityServices,
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
       jobManagerClass: Class[_ <: JobManager],
@@ -2678,15 +2685,11 @@ object JobManager {
     timeout,
     archiveCount,
     archivePath,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
     jobRecoveryTimeout,
     metricsRegistry) = createJobManagerComponents(
       configuration,
       futureExecutor,
-      ioExecutor,
-      None)
+      ioExecutor)
 
     val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath)
 
@@ -2707,9 +2710,10 @@ object JobManager {
       archive,
       restartStrategy,
       timeout,
-      leaderElectionService,
-      submittedJobGraphs,
-      checkpointRecoveryFactory,
+      highAvailabilityServices.getJobManagerLeaderElectionService(
+        HighAvailabilityServices.DEFAULT_JOB_ID),
+      highAvailabilityServices.getSubmittedJobGraphStore(),
+      highAvailabilityServices.getCheckpointRecoveryFactory(),
       jobRecoveryTimeout,
       metricsRegistry)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index a493b3d..79141f1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -101,9 +101,10 @@ object TaskManagerMessages {
 
   /**
    * Acknowledges that the task manager has been successfully registered at any job manager. This
-   * message is a response to [[NotifyWhenRegisteredAtJobManager]].
+   * message is a response to [[NotifyWhenRegisteredAtJobManager]] and contains the current leader
+   * session id.
    */
-  case object RegisteredAtJobManager
+  case class RegisteredAtJobManager(leaderId: UUID)
 
   /** Tells the address of the new leading [[org.apache.flink.runtime.jobmanager.JobManager]]
     * and the new leader session ID.
@@ -151,14 +152,6 @@ object TaskManagerMessages {
   NotifyWhenRegisteredAtJobManager.type = NotifyWhenRegisteredAtJobManager
 
   /**
-   * Accessor for the case object instance, to simplify Java interoperability.
-   *
-   * @return The RegisteredAtJobManager case object instance.
-   */
-  def getRegisteredAtJobManagerMessage:
-            RegisteredAtJobManager.type = RegisteredAtJobManager
-
-  /**
     * Accessor for the case object instance, to simplify Java interoperability.
     * @return The RequestTaskManagerLog case object instance.
     */

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 07fb996..46c4404 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.minicluster
 
+import java.net.URL
 import java.util.UUID
 import java.util.concurrent.{Executors, TimeUnit}
 
@@ -25,20 +26,21 @@ import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
-
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
 import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware, ZooKeeperUtils}
+import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -56,6 +58,7 @@ import scala.concurrent._
  */
 abstract class FlinkMiniCluster(
     val userConfiguration: Configuration,
+    val highAvailabilityServices: HighAvailabilityServices,
     val useSingleActorSystem: Boolean)
   extends LeaderRetrievalListener {
 
@@ -115,6 +118,15 @@ abstract class FlinkMiniCluster(
     Hardware.getNumberCPUCores(),
     new ExecutorThreadFactory("mini-cluster-io"))
 
+  def this(configuration: Configuration, useSingleActorSystem: Boolean) {
+    this(
+      configuration,
+      HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+        configuration,
+        ExecutionContext.global),
+      useSingleActorSystem)
+  }
+
   def configuration: Configuration = {
     if (originalConfiguration.getInteger(
       ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
@@ -148,25 +160,17 @@ abstract class FlinkMiniCluster(
   // --------------------------------------------------------------------------
 
   def getNumberOfJobManagers: Int = {
-    if(haMode == HighAvailabilityMode.NONE) {
-      1
-    } else {
-      originalConfiguration.getInteger(
-        ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
-        ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER
-      )
-    }
+    originalConfiguration.getInteger(
+      ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
+      ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER
+    )
   }
 
   def getNumberOfResourceManagers: Int = {
-    if(haMode == HighAvailabilityMode.NONE) {
-      1
-    } else {
-      originalConfiguration.getInteger(
-        ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
-        ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
-      )
-    }
+    originalConfiguration.getInteger(
+      ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
+      ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
+    )
   }
 
   def getJobManagersAsJava = {
@@ -327,10 +331,11 @@ abstract class FlinkMiniCluster(
     jobManagerActorSystems = Some(jmActorSystems)
     jobManagerActors = Some(jmActors)
 
-    // start leader retrieval service
-    val lrs = createLeaderRetrievalService()
-    jobManagerLeaderRetrievalService = Some(lrs)
-    lrs.start(this)
+    // find out which job manager the leader is
+    jobManagerLeaderRetrievalService = Option(highAvailabilityServices.getJobManagerLeaderRetriever(
+      HighAvailabilityServices.DEFAULT_JOB_ID))
+
+    jobManagerLeaderRetrievalService.foreach(_.start(this))
 
     // start as many resource managers as job managers
     val (rmActorSystems, rmActors) =
@@ -383,7 +388,8 @@ abstract class FlinkMiniCluster(
         config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
 
       // TODO: Add support for HA: Make web server work independently from the JM
-      val leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManagerAkkaURL)
+      val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
+        HighAvailabilityServices.DEFAULT_JOB_ID)
 
       LOG.info("Starting JobManger web frontend")
       // start the new web frontend. we need to load this dynamically
@@ -409,6 +415,9 @@ abstract class FlinkMiniCluster(
     awaitTermination()
 
     jobManagerLeaderRetrievalService.foreach(_.stop())
+
+    highAvailabilityServices.closeAndCleanupAllData()
+
     isRunning = false
 
     FlinkExecutors.gracefulShutdown(
@@ -509,33 +518,37 @@ abstract class FlinkMiniCluster(
     submitJobAndWait(jobGraph, printUpdates, timeout)
   }
 
-  def submitJobAndWait(
-    jobGraph: JobGraph,
-    printUpdates: Boolean,
-    timeout: FiniteDuration)
-  : JobExecutionResult = {
-    submitJobAndWait(jobGraph, printUpdates, timeout, createLeaderRetrievalService())
-  }
-
   @throws(classOf[JobExecutionException])
   def submitJobAndWait(
       jobGraph: JobGraph,
       printUpdates: Boolean,
-      timeout: FiniteDuration,
-      leaderRetrievalService: LeaderRetrievalService)
+      timeout: FiniteDuration)
     : JobExecutionResult = {
 
     val clientActorSystem = startJobClientActorSystem(jobGraph.getJobID)
 
-     try {
-     JobClient.submitJobAndWait(
+    val userCodeClassLoader =
+      try {
+        createUserCodeClassLoader(
+          jobGraph.getUserJars,
+          jobGraph.getClasspaths,
+          Thread.currentThread().getContextClassLoader)
+      } catch {
+        case e: Exception => throw new JobExecutionException(
+          jobGraph.getJobID,
+          "Could not create the user code class loader.",
+          e)
+      }
+
+    try {
+      JobClient.submitJobAndWait(
        clientActorSystem,
        configuration,
-       leaderRetrievalService,
+       highAvailabilityServices,
        jobGraph,
        timeout,
        printUpdates,
-       this.getClass.getClassLoader())
+       userCodeClassLoader)
     } finally {
        if(!useSingleActorSystem) {
          // we have to shutdown the just created actor system
@@ -558,11 +571,24 @@ abstract class FlinkMiniCluster(
         )
     }
 
+    val userCodeClassLoader =
+      try {
+        createUserCodeClassLoader(
+          jobGraph.getUserJars,
+          jobGraph.getClasspaths,
+          Thread.currentThread().getContextClassLoader)
+      } catch {
+        case e: Exception => throw new JobExecutionException(
+          jobGraph.getJobID,
+          "Could not create the user code class loader.",
+          e)
+      }
+
     JobClient.submitJobDetached(jobManagerGateway,
       configuration,
       jobGraph,
       timeout,
-      this.getClass.getClassLoader())
+      userCodeClassLoader)
 
     new JobSubmissionResult(jobGraph.getJobID)
   }
@@ -573,20 +599,6 @@ abstract class FlinkMiniCluster(
     }
   }
 
-  protected def createLeaderRetrievalService(): LeaderRetrievalService = {
-    (jobManagerActorSystems, jobManagerActors) match {
-      case (Some(jmActorSystems), Some(jmActors)) =>
-        if (haMode == HighAvailabilityMode.NONE) {
-          new StandaloneLeaderRetrievalService(
-            AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
-        } else {
-          ZooKeeperUtils.createLeaderRetrievalService(originalConfiguration)
-        }
-
-      case _ => throw new Exception("The FlinkMiniCluster has not been started properly.")
-    }
-  }
-
   protected def clearLeader(): Unit = {
     futureLock.synchronized{
       leaderGateway = Promise()
@@ -643,4 +655,28 @@ abstract class FlinkMiniCluster(
       }
     }
   }
+
+  private def createUserCodeClassLoader(
+      jars: java.util.List[Path],
+      classPaths: java.util.List[URL],
+      parentClassLoader: ClassLoader): FlinkUserCodeClassLoader = {
+
+    val urls = new Array[URL](jars.size() + classPaths.size())
+
+    import scala.collection.JavaConverters._
+
+    var counter = 0
+
+    for (path <- jars.asScala) {
+      urls(counter) = path.makeQualified(path.getFileSystem).toUri.toURL
+      counter += 1
+    }
+
+    for (classPath <- classPaths.asScala) {
+      urls(counter) = classPath
+      counter += 1
+    }
+
+    new FlinkUserCodeClassLoader(urls, parentClassLoader)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 2f83548..3db91b7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
@@ -49,7 +50,7 @@ import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfigura
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
 
-import scala.concurrent.Await
+import scala.concurrent.{Await, ExecutionContext}
 import scala.concurrent.duration.FiniteDuration
 
 /**
@@ -63,7 +64,20 @@ import scala.concurrent.duration.FiniteDuration
  */
 class LocalFlinkMiniCluster(
     userConfiguration: Configuration,
-    singleActorSystem: Boolean) extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
+    highAvailabilityServices: HighAvailabilityServices,
+    singleActorSystem: Boolean) extends FlinkMiniCluster(
+  userConfiguration,
+  highAvailabilityServices,
+  singleActorSystem) {
+
+  def this(userConfiguration: Configuration, useSingleActorSystem: Boolean) = {
+    this(
+      userConfiguration,
+       HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+         userConfiguration,
+         ExecutionContext.global),
+      useSingleActorSystem)
+  }
 
   def this(userConfiguration: Configuration) = this(userConfiguration, true)
 
@@ -125,15 +139,11 @@ class LocalFlinkMiniCluster(
     timeout,
     archiveCount,
     archivePath,
-    leaderElectionService,
-    submittedJobGraphStore,
-    checkpointRecoveryFactory,
     jobRecoveryTimeout,
     metricsRegistry) = JobManager.createJobManagerComponents(
       config,
       futureExecutor,
-      ioExecutor,
-      createLeaderElectionService())
+      ioExecutor)
 
     if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
       metricsRegistry.get.startQueryService(system, null)
@@ -158,9 +168,10 @@ class LocalFlinkMiniCluster(
         archive,
         restartStrategyFactory,
         timeout,
-        leaderElectionService,
-        submittedJobGraphStore,
-        checkpointRecoveryFactory,
+        highAvailabilityServices.getJobManagerLeaderElectionService(
+          HighAvailabilityServices.DEFAULT_JOB_ID),
+        highAvailabilityServices.getSubmittedJobGraphStore(),
+        highAvailabilityServices.getCheckpointRecoveryFactory(),
         jobRecoveryTimeout,
         metricsRegistry),
       jobManagerName)
@@ -182,7 +193,8 @@ class LocalFlinkMiniCluster(
     val resourceManagerProps = getResourceManagerProps(
       resourceManagerClass,
       config,
-      createLeaderRetrievalService())
+      highAvailabilityServices.getJobManagerLeaderRetriever(
+        HighAvailabilityServices.DEFAULT_JOB_ID))
 
     system.actorOf(resourceManagerProps, resourceManagerName)
   }
@@ -237,7 +249,8 @@ class LocalFlinkMiniCluster(
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment,
-      createLeaderRetrievalService(),
+      highAvailabilityServices.getJobManagerLeaderRetriever(
+        HighAvailabilityServices.DEFAULT_JOB_ID),
       metricRegistry)
 
     if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
@@ -332,10 +345,6 @@ class LocalFlinkMiniCluster(
   // Helper methods
   //------------------------------------------------------------------------------------------------
 
-  def createLeaderElectionService(): Option[LeaderElectionService] = {
-    None
-  }
-
   def initializeIOFormatClasses(configuration: Configuration): Unit = {
     try {
       val om = classOf[FileOutputFormat[_]].getDeclaredMethod("initDefaultsFromConfiguration",

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 4065660..a3110a4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -39,11 +39,14 @@ import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.concurrent.Executors
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, PartitionInfo}
 import org.apache.flink.runtime.filecache.FileCache
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
@@ -293,7 +296,7 @@ class TaskManager(
     // its registration at the JobManager
     case NotifyWhenRegisteredAtJobManager =>
       if (isConnected) {
-        sender ! decorateMessage(RegisteredAtJobManager)
+        sender ! decorateMessage(RegisteredAtJobManager(leaderSessionID.orNull))
       } else {
         waitForRegistration += sender
       }
@@ -993,7 +996,7 @@ class TaskManager(
 
     // notify all the actors that listen for a successful registration
     for (listener <- waitForRegistration) {
-      listener ! RegisteredAtJobManager
+      listener ! RegisteredAtJobManager(leaderSessionID.orNull)
     }
     waitForRegistration.clear()
   }
@@ -1638,20 +1641,37 @@ object TaskManager {
       taskManagerClass: Class[_ <: TaskManager])
     : Unit = {
 
-    val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(configuration)
+    val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+      configuration,
+      Executors.directExecutor(),
+      AddressResolution.TRY_ADDRESS_RESOLUTION)
 
-    runTaskManager(
-      taskManagerHostname,
-      resourceID,
-      actorSystemPort,
+    val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(
       configuration,
-      taskManagerClass)
+      highAvailabilityServices)
+
+    try {
+      runTaskManager(
+        taskManagerHostname,
+        resourceID,
+        actorSystemPort,
+        configuration,
+        highAvailabilityServices,
+        taskManagerClass)
+    } finally {
+      try {
+        highAvailabilityServices.close()
+      } catch {
+        case t: Throwable => LOG.warn("Could not properly stop the high availability services.", t)
+      }
+    }
   }
 
   @throws(classOf[IOException])
   @throws(classOf[IllegalConfigurationException])
   def selectNetworkInterfaceAndPort(
-      configuration: Configuration)
+      configuration: Configuration,
+      highAvailabilityServices: HighAvailabilityServices)
     : (String, Int) = {
 
     var taskManagerHostname = configuration.getString(
@@ -1661,13 +1681,11 @@ object TaskManager {
       LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname)
     }
     else {
-      val leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(
-        configuration,
-        true)
       val lookupTimeout = AkkaUtils.getLookupTimeout(configuration)
 
       val taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
-        leaderRetrievalService,
+        highAvailabilityServices.getJobManagerLeaderRetriever(
+          HighAvailabilityServices.DEFAULT_JOB_ID),
         lookupTimeout)
 
       taskManagerHostname = taskManagerAddress.getHostName()
@@ -1700,13 +1718,15 @@ object TaskManager {
    * @param resourceID The id of the resource which the task manager will run on.
    * @param actorSystemPort The port at which the actor system will communicate.
    * @param configuration The configuration for the TaskManager.
+   * @param highAvailabilityServices Service factory for high availability services
    */
   @throws(classOf[Exception])
   def runTaskManager(
       taskManagerHostname: String,
       resourceID: ResourceID,
       actorSystemPort: Int,
-      configuration: Configuration)
+      configuration: Configuration,
+      highAvailabilityServices: HighAvailabilityServices)
     : Unit = {
 
     runTaskManager(
@@ -1714,6 +1734,7 @@ object TaskManager {
       resourceID,
       actorSystemPort,
       configuration,
+      highAvailabilityServices,
       classOf[TaskManager])
   }
 
@@ -1730,6 +1751,7 @@ object TaskManager {
    * @param resourceID The id of the resource which the task manager will run on.
    * @param actorSystemPort The port at which the actor system will communicate.
    * @param configuration The configuration for the TaskManager.
+   * @param highAvailabilityServices Service factory for high availability services
    * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager
    *                         subclasses for example for YARN.
    */
@@ -1739,6 +1761,7 @@ object TaskManager {
       resourceID: ResourceID,
       actorSystemPort: Int,
       configuration: Configuration,
+      highAvailabilityServices: HighAvailabilityServices,
       taskManagerClass: Class[_ <: TaskManager])
     : Unit = {
 
@@ -1778,9 +1801,9 @@ object TaskManager {
         configuration,
         resourceID,
         taskManagerSystem,
+        highAvailabilityServices,
         taskManagerHostname,
         Some(TaskExecutor.TASK_MANAGER_NAME),
-        None,
         localTaskManagerCommunication = false,
         taskManagerClass)
 
@@ -1821,8 +1844,7 @@ object TaskManager {
 
       // block until everything is done
       taskManagerSystem.awaitTermination()
-    }
-    catch {
+    } catch {
       case t: Throwable =>
         LOG.error("Error while starting up taskManager", t)
         try {
@@ -1840,17 +1862,15 @@ object TaskManager {
    * @param configuration The configuration for the TaskManager.
    * @param resourceID The id of the resource which the task manager will run on.
    * @param actorSystem The actor system that should run the TaskManager actor.
+   * @param highAvailabilityServices Factory to create high availability services
    * @param taskManagerHostname The hostname/address that describes the TaskManager's data location.
    * @param taskManagerActorName Optionally the name of the TaskManager actor. If none is given,
    *                             the actor will use a random name.
-   * @param leaderRetrievalServiceOption Optionally, a leader retrieval service can be provided. If
-   *                                     none is given, then a LeaderRetrievalService is
-   *                                     constructed from the configuration.
    * @param localTaskManagerCommunication If true, the TaskManager will not initiate the
    *                                      TCP network stack.
    * @param taskManagerClass The class of the TaskManager actor. May be used to give
    *                         subclasses that understand additional actor messages.
-    * @throws org.apache.flink.configuration.IllegalConfigurationException
+   * @throws org.apache.flink.configuration.IllegalConfigurationException
    *                              Thrown, if the given config contains illegal values.
    * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools,
    *                             I/O manager, ...) cannot be properly started.
@@ -1865,9 +1885,9 @@ object TaskManager {
       configuration: Configuration,
       resourceID: ResourceID,
       actorSystem: ActorSystem,
+      highAvailabilityServices: HighAvailabilityServices,
       taskManagerHostname: String,
       taskManagerActorName: Option[String],
-      leaderRetrievalServiceOption: Option[LeaderRetrievalService],
       localTaskManagerCommunication: Boolean,
       taskManagerClass: Class[_ <: TaskManager])
     : ActorRef = {
@@ -1885,11 +1905,8 @@ object TaskManager {
 
     val metricRegistry = taskManagerServices.getMetricRegistry()
 
-    val leaderRetrievalService = leaderRetrievalServiceOption match {
-      case Some(lrs) => lrs
-      // validate the address if possible (e.g. we're in Standalone mode)
-      case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration, true)
-    }
+    val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
+      HighAvailabilityServices.DEFAULT_JOB_ID)
 
     // create the actor properties (which define the actor constructor parameters)
     val tmProps = getTaskManagerProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 99a3815..ec1bbd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import scala.concurrent.Await;
@@ -47,7 +48,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class CoordinatorShutdownTest {
+public class CoordinatorShutdownTest extends TestLogger {
 	
 	@Test
 	public void testCoordinatorShutsDownOnFailure() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index 0ec00df..8530b0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -306,11 +307,14 @@ public class JobClientActorTest extends TestLogger {
 			leaderSessionID
 		);
 
+		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderRetrievalService);
+
 		JobListeningContext jobListeningContext =
 			JobClient.submitJob(
 				system,
 				clientConfig,
-				testingLeaderRetrievalService,
+				highAvailabilityServices,
 				testJobGraph,
 				timeout,
 				false,
@@ -328,6 +332,8 @@ public class JobClientActorTest extends TestLogger {
 			Assert.fail();
 		} catch (JobExecutionException e) {
 			// this is what we want
+		} finally {
+			highAvailabilityServices.closeAndCleanupAllData();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
index 41018dd..388572c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testingUtils.TestingMessages;
@@ -32,12 +34,16 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import scala.Option;
 
+import java.util.Arrays;
+
 /**
  * Runs tests to ensure that a cluster is shutdown properly.
  */
@@ -47,6 +53,19 @@ public class ClusterShutdownITCase extends TestLogger {
 
 	private static Configuration config = new Configuration();
 
+	private HighAvailabilityServices highAvailabilityServices;
+
+	@Before
+	public void setupTest() {
+		highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+	}
+
+	@After
+	public void tearDownTest() throws Exception {
+		highAvailabilityServices.closeAndCleanupAllData();
+		highAvailabilityServices = null;
+	}
+
 	@BeforeClass
 	public static void setup() {
 		system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
@@ -68,37 +87,51 @@ public class ClusterShutdownITCase extends TestLogger {
 		@Override
 		protected void run() {
 
-			ActorGateway me =
-				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
-			// start job manager which doesn't shutdown the actor system
-			ActorGateway jobManager =
-				TestingUtils.createJobManager(
-					system,
-					TestingUtils.defaultExecutor(),
-					TestingUtils.defaultExecutor(),
-					config,
-					"jobmanager1");
-
-			// Tell the JobManager to inform us of shutdown actions
-			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-			// Register a TaskManager
-			ActorGateway taskManager =
-				TestingUtils.createTaskManager(system, jobManager, config, true, true);
-
-			// Tell the TaskManager to inform us of TaskManager shutdowns
-			taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-
-			// No resource manager connected
-			jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
-
-			expectMsgAllOf(
-				new TestingMessages.ComponentShutdown(taskManager.actor()),
-				new TestingMessages.ComponentShutdown(jobManager.actor()),
-				StopClusterSuccessful.getInstance()
-			);
+			ActorGateway jobManager = null;
+			ActorGateway taskManager = null;
+			ActorGateway forwardingActor = null;
+
+			try {
+				// start job manager which doesn't shutdown the actor system
+				jobManager =
+					TestingUtils.createJobManager(
+						system,
+						TestingUtils.defaultExecutor(),
+						TestingUtils.defaultExecutor(),
+						config,
+						highAvailabilityServices,
+						"jobmanager1");
+
+				forwardingActor =
+					TestingUtils.createForwardingActor(
+						system,
+						getTestActor(),
+						jobManager.leaderSessionID(),
+						Option.<String>empty());
+
+				// Tell the JobManager to inform us of shutdown actions
+				jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), forwardingActor);
+
+				// Register a TaskManager
+				taskManager =
+					TestingUtils.createTaskManager(system, highAvailabilityServices, config, true, true);
+
+				// Tell the TaskManager to inform us of TaskManager shutdowns
+				taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), forwardingActor);
+
+
+				// No resource manager connected
+				jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), forwardingActor);
+
+				expectMsgAllOf(
+					new TestingMessages.ComponentShutdown(taskManager.actor()),
+					new TestingMessages.ComponentShutdown(jobManager.actor()),
+					StopClusterSuccessful.getInstance()
+				);
+			} finally {
+				TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
+					jobManager, taskManager, forwardingActor));
+			}
 
 		}};
 		}};
@@ -115,50 +148,68 @@ public class ClusterShutdownITCase extends TestLogger {
 		@Override
 		protected void run() {
 
-			ActorGateway me =
-				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
-			// start job manager which doesn't shutdown the actor system
-			ActorGateway jobManager =
-				TestingUtils.createJobManager(
-					system,
-					TestingUtils.defaultExecutor(),
-					TestingUtils.defaultExecutor(),
-					config,
-					"jobmanager2");
-
-			// Tell the JobManager to inform us of shutdown actions
-			jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-			// Register a TaskManager
-			ActorGateway taskManager =
-				TestingUtils.createTaskManager(system, jobManager, config, true, true);
-
-			// Tell the TaskManager to inform us of TaskManager shutdowns
-			taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-			// Start resource manager and let it register
-			ActorGateway resourceManager =
-				TestingUtils.createResourceManager(system, jobManager.actor(), config);
-
-			// Tell the ResourceManager to inform us of ResourceManager shutdowns
-			resourceManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-			// notify about a resource manager registration at the job manager
-			resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
-			// Wait for resource manager
-			expectMsgEquals(Acknowledge.get());
-
-			// Shutdown cluster with resource manager connected
-			jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
-
-			expectMsgAllOf(
-				new TestingMessages.ComponentShutdown(taskManager.actor()),
-				new TestingMessages.ComponentShutdown(jobManager.actor()),
-				new TestingMessages.ComponentShutdown(resourceManager.actor()),
-				StopClusterSuccessful.getInstance()
-			);
+			ActorGateway jobManager = null;
+			ActorGateway taskManager = null;
+			ActorGateway resourceManager = null;
+			ActorGateway forwardingActor = null;
+
+			try {
+				// start job manager which doesn't shutdown the actor system
+				jobManager =
+					TestingUtils.createJobManager(
+						system,
+						TestingUtils.defaultExecutor(),
+						TestingUtils.defaultExecutor(),
+						config,
+						highAvailabilityServices,
+						"jobmanager2");
+
+				forwardingActor =
+					TestingUtils.createForwardingActor(
+						system,
+						getTestActor(),
+						jobManager.leaderSessionID(),
+						Option.<String>empty());
+
+				// Tell the JobManager to inform us of shutdown actions
+				jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), forwardingActor);
+
+				// Register a TaskManager
+				taskManager =
+					TestingUtils.createTaskManager(system, highAvailabilityServices, config, true, true);
+
+				// Tell the TaskManager to inform us of TaskManager shutdowns
+				taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), forwardingActor);
+
+				// Start resource manager and let it register
+				resourceManager =
+					TestingUtils.createResourceManager(
+						system,
+						config,
+						highAvailabilityServices);
+
+				// Tell the ResourceManager to inform us of ResourceManager shutdowns
+				resourceManager.tell(TestingMessages.getNotifyOfComponentShutdown(), forwardingActor);
+
+				// notify about a resource manager registration at the job manager
+				resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), forwardingActor);
+
+				// Wait for resource manager
+				expectMsgEquals(Acknowledge.get());
+
+				// Shutdown cluster with resource manager connected
+				jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), forwardingActor);
+
+				expectMsgAllOf(
+					new TestingMessages.ComponentShutdown(taskManager.actor()),
+					new TestingMessages.ComponentShutdown(jobManager.actor()),
+					new TestingMessages.ComponentShutdown(resourceManager.actor()),
+					StopClusterSuccessful.getInstance()
+				);
+			} finally {
+				TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
+					jobManager, taskManager, resourceManager, forwardingActor));
+			}
 
 		}};
 		}};

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
index 6191195..d52afe4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
@@ -23,6 +23,8 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -32,12 +34,17 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import scala.Option;
 
+
+import java.util.Arrays;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -53,6 +60,8 @@ public class ResourceManagerITCase extends TestLogger {
 
 	private static Configuration config = new Configuration();
 
+	private HighAvailabilityServices highAvailabilityServices;
+
 	@BeforeClass
 	public static void setup() {
 		system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
@@ -63,6 +72,17 @@ public class ResourceManagerITCase extends TestLogger {
 		JavaTestKit.shutdownActorSystem(system);
 	}
 
+	@Before
+	public void setupTest() {
+		highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+	}
+
+	@After
+	public void tearDownTest() throws Exception {
+		highAvailabilityServices.closeAndCleanupAllData();
+		highAvailabilityServices = null;
+	}
+
 	/**
 	 * Tests whether the resource manager connects and reconciles existing task managers.
 	 */
@@ -74,49 +94,67 @@ public class ResourceManagerITCase extends TestLogger {
 		@Override
 		protected void run() {
 
-			ActorGateway jobManager =
-				TestingUtils.createJobManager(
-					system,
-					TestingUtils.defaultExecutor(),
-					TestingUtils.defaultExecutor(),
-					config,
-					"ReconciliationTest");
-			ActorGateway me =
-				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+			ActorGateway jobManager = null;
+			ActorGateway resourceManager = null;
+			ActorGateway forwardingActor = null;
+
+			try {
+				jobManager =
+					TestingUtils.createJobManager(
+						system,
+						TestingUtils.defaultExecutor(),
+						TestingUtils.defaultExecutor(),
+						config,
+						highAvailabilityServices,
+						"ReconciliationTest");
 
-			// !! no resource manager started !!
+				forwardingActor =
+					TestingUtils.createForwardingActor(
+						system,
+						getTestActor(),
+						jobManager.leaderSessionID(),
+						Option.<String>empty());
 
-			ResourceID resourceID = ResourceID.generate();
+				// !! no resource manager started !!
 
-			TaskManagerLocation location = mock(TaskManagerLocation.class);
-			when(location.getResourceID()).thenReturn(resourceID);
+				ResourceID resourceID = ResourceID.generate();
 
-			HardwareDescription resourceProfile = HardwareDescription.extractFromSystem(1_000_000);
+				TaskManagerLocation location = mock(TaskManagerLocation.class);
+				when(location.getResourceID()).thenReturn(resourceID);
 
-			jobManager.tell(
-				new RegistrationMessages.RegisterTaskManager(resourceID, location, resourceProfile, 1),
-				me);
+				HardwareDescription resourceProfile = HardwareDescription.extractFromSystem(1_000_000);
 
-			expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
+				jobManager.tell(
+					new RegistrationMessages.RegisterTaskManager(resourceID, location, resourceProfile, 1),
+					forwardingActor);
 
-			// now start the resource manager
-			ActorGateway resourceManager =
-				TestingUtils.createResourceManager(system, jobManager.actor(), config);
+				expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
 
-			// register at testing job manager to receive a message once a resource manager registers
-			resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+				// now start the resource manager
+				resourceManager =
+					TestingUtils.createResourceManager(
+						system,
+						config,
+						highAvailabilityServices);
 
-			// Wait for resource manager
-			expectMsgEquals(Acknowledge.get());
+				// register at testing job manager to receive a message once a resource manager registers
+				resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), forwardingActor);
 
-			// check if we registered the task manager resource
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me);
+				// Wait for resource manager
+				expectMsgEquals(Acknowledge.get());
 
-			TestingResourceManager.GetRegisteredResourcesReply reply =
-				expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+				// check if we registered the task manager resource
+				resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), forwardingActor);
 
-			assertEquals(1, reply.resources.size());
-			assertTrue(reply.resources.contains(resourceID));
+				TestingResourceManager.GetRegisteredResourcesReply reply =
+					expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+				assertEquals(1, reply.resources.size());
+				assertTrue(reply.resources.contains(resourceID));
+			} finally {
+				TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
+					jobManager, resourceManager, forwardingActor));
+			}
 
 		}};
 		}};
@@ -133,37 +171,53 @@ public class ResourceManagerITCase extends TestLogger {
 		@Override
 		protected void run() {
 
-			ActorGateway jobManager =
-				TestingUtils.createJobManager(
-					system,
-					TestingUtils.defaultExecutor(),
-					TestingUtils.defaultExecutor(),
-					config,
-					"RegTest");
-			ActorGateway me =
-				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-
-			// start the resource manager
-			ActorGateway resourceManager =
-				TestingUtils.createResourceManager(system, jobManager.actor(), config);
-
-			// notify about a resource manager registration at the job manager
-			resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
-			// Wait for resource manager
-			expectMsgEquals(Acknowledge.get());
-
-			// start task manager and wait for registration
-			ActorGateway taskManager =
-				TestingUtils.createTaskManager(system, jobManager.actor(), config, true, true);
-
-			// check if we registered the task manager resource
-			resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me);
-
-			TestingResourceManager.GetRegisteredResourcesReply reply =
-				expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-			assertEquals(1, reply.resources.size());
+			ActorGateway jobManager = null;
+			ActorGateway taskManager = null;
+			ActorGateway resourceManager = null;
+			ActorGateway forwardingActor = null;
+
+			try {
+				jobManager =
+					TestingUtils.createJobManager(
+						system,
+						TestingUtils.defaultExecutor(),
+						TestingUtils.defaultExecutor(),
+						config,
+						highAvailabilityServices,
+						"RegTest");
+
+				forwardingActor =
+					TestingUtils.createForwardingActor(
+						system,
+						getTestActor(),
+						jobManager.leaderSessionID(),
+						Option.<String>empty());
+
+				// start the resource manager
+				resourceManager =
+					TestingUtils.createResourceManager(system, config, highAvailabilityServices);
+
+				// notify about a resource manager registration at the job manager
+				resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), forwardingActor);
+
+				// Wait for resource manager
+				expectMsgEquals(Acknowledge.get());
+
+				// start task manager and wait for registration
+				taskManager =
+					TestingUtils.createTaskManager(system, highAvailabilityServices, config, true, true);
+
+				// check if we registered the task manager resource
+				resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), forwardingActor);
+
+				TestingResourceManager.GetRegisteredResourcesReply reply =
+					expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+				assertEquals(1, reply.resources.size());
+			} finally {
+				TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
+					jobManager, resourceManager, taskManager, forwardingActor));
+			}
 
 		}};
 		}};

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index c740518..5aa31ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -58,7 +59,9 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -90,6 +93,9 @@ public class ResourceManagerTest extends TestLogger {
 
 	private static Configuration config = new Configuration();
 
+	private TestingHighAvailabilityServices highAvailabilityServices;
+	private TestingLeaderRetrievalService jobManagerLeaderRetrievalService;
+
 	@BeforeClass
 	public static void setup() {
 		system = AkkaUtils.createLocalActorSystem(config);
@@ -100,6 +106,32 @@ public class ResourceManagerTest extends TestLogger {
 		JavaTestKit.shutdownActorSystem(system);
 	}
 
+	@Before
+	public void setupTest() {
+		jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
+
+		highAvailabilityServices = new TestingHighAvailabilityServices();
+
+		highAvailabilityServices.setJobMasterLeaderRetriever(
+			HighAvailabilityServices.DEFAULT_JOB_ID,
+			jobManagerLeaderRetrievalService);
+	}
+
+	@After
+	public void teardownTest() throws Exception {
+		if (jobManagerLeaderRetrievalService != null) {
+			jobManagerLeaderRetrievalService.stop();
+
+			jobManagerLeaderRetrievalService = null;
+		}
+
+		if (highAvailabilityServices != null) {
+			highAvailabilityServices.closeAndCleanupAllData();
+
+			highAvailabilityServices = null;
+		}
+	}
+
 	/**
 	 * Tests the registration and reconciliation of the ResourceManager with the JobManager
 	 */
@@ -109,8 +141,20 @@ public class ResourceManagerTest extends TestLogger {
 		new Within(duration("10 seconds")) {
 		@Override
 		protected void run() {
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+			fakeJobManager = TestingUtils.createForwardingActor(
+				system,
+				getTestActor(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID,
+				Option.<String>empty());
+
+			jobManagerLeaderRetrievalService.notifyListener(
+				fakeJobManager.path(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+			resourceManager = TestingUtils.createResourceManager(
+				system,
+				config,
+				highAvailabilityServices);
 
 			expectMsgClass(RegisterResourceManager.class);
 
@@ -150,8 +194,20 @@ public class ResourceManagerTest extends TestLogger {
 			Configuration shortTimeoutConfig = config.clone();
 			shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s");
 
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig);
+			fakeJobManager = TestingUtils.createForwardingActor(
+				system,
+				getTestActor(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID,
+				Option.<String>empty());
+
+			jobManagerLeaderRetrievalService.notifyListener(
+				fakeJobManager.path(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+			resourceManager = TestingUtils.createResourceManager(
+				system,
+				shortTimeoutConfig,
+				highAvailabilityServices);
 
 			// wait for registration message
 			RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class);
@@ -180,8 +236,20 @@ public class ResourceManagerTest extends TestLogger {
 			Configuration shortTimeoutConfig = config.clone();
 			shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s");
 
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig);
+			fakeJobManager = TestingUtils.createForwardingActor(
+				system,
+				getTestActor(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID,
+				Option.<String>empty());
+
+			jobManagerLeaderRetrievalService.notifyListener(
+				fakeJobManager.path(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+			resourceManager = TestingUtils.createResourceManager(
+				system,
+				shortTimeoutConfig,
+				highAvailabilityServices);
 
 			// wait for registration message
 			RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class);
@@ -212,8 +280,20 @@ public class ResourceManagerTest extends TestLogger {
 		@Override
 		protected void run() {
 
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+			fakeJobManager = TestingUtils.createForwardingActor(
+				system,
+				getTestActor(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID,
+				Option.<String>empty());
+
+			jobManagerLeaderRetrievalService.notifyListener(
+				fakeJobManager.path(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+			resourceManager = TestingUtils.createResourceManager(
+				system,
+				config,
+				highAvailabilityServices);
 
 			// register with JM
 			expectMsgClass(RegisterResourceManager.class);
@@ -270,8 +350,20 @@ public class ResourceManagerTest extends TestLogger {
 		@Override
 		protected void run() {
 
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+			fakeJobManager = TestingUtils.createForwardingActor(
+				system,
+				getTestActor(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID,
+				Option.<String>empty());
+
+			jobManagerLeaderRetrievalService.notifyListener(
+				fakeJobManager.path(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+			resourceManager = TestingUtils.createResourceManager(
+				system,
+				config,
+				highAvailabilityServices);
 
 			// register with JM
 			expectMsgClass(RegisterResourceManager.class);
@@ -321,8 +413,20 @@ public class ResourceManagerTest extends TestLogger {
 		@Override
 		protected void run() {
 
-			fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
-			resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+			fakeJobManager = TestingUtils.createForwardingActor(
+				system,
+				getTestActor(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID,
+				Option.<String>empty());
+
+			jobManagerLeaderRetrievalService.notifyListener(
+				fakeJobManager.path(),
+				HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+			resourceManager = TestingUtils.createResourceManager(
+				system,
+				config,
+				highAvailabilityServices);
 
 			// register with JM
 			expectMsgClass(RegisterResourceManager.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java
new file mode 100644
index 0000000..423c0ce
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ManualLeaderService.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Leader service for {@link TestingManualHighAvailabilityServices} implementation. The leader
+ * service allows to create multiple {@link TestingLeaderElectionService} and
+ * {@link TestingLeaderRetrievalService} and allows to manually trigger the services identified
+ * by a continuous index.
+ */
+public class ManualLeaderService {
+
+	private final List<TestingLeaderElectionService> leaderElectionServices;
+	private final List<TestingLeaderRetrievalService> leaderRetrievalServices;
+
+	private int currentLeaderIndex;
+
+	@Nullable
+	private UUID currentLeaderId;
+
+	public ManualLeaderService() {
+		leaderElectionServices = new ArrayList<>(4);
+		leaderRetrievalServices = new ArrayList<>(4);
+
+		currentLeaderIndex = -1;
+		currentLeaderId = null;
+	}
+
+	public LeaderRetrievalService createLeaderRetrievalService() {
+		final TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+			getLeaderAddress(currentLeaderIndex),
+			currentLeaderId);
+
+		leaderRetrievalServices.add(testingLeaderRetrievalService);
+
+		return testingLeaderRetrievalService;
+	}
+
+	public LeaderElectionService createLeaderElectionService() {
+		TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
+
+		leaderElectionServices.add(testingLeaderElectionService);
+
+		return testingLeaderElectionService;
+	}
+
+	public void grantLeadership(int index, UUID leaderId) {
+		if (currentLeaderId != null) {
+			revokeLeadership();
+		}
+
+		Preconditions.checkNotNull(leaderId);
+		Preconditions.checkArgument(0 <= index && index < leaderElectionServices.size());
+
+		TestingLeaderElectionService testingLeaderElectionService = leaderElectionServices.get(index);
+
+		testingLeaderElectionService.isLeader(leaderId);
+
+		currentLeaderIndex = index;
+		currentLeaderId = leaderId;
+	}
+
+	public void revokeLeadership() {
+		assert(currentLeaderId != null);
+		assert(0 <= currentLeaderIndex &&  currentLeaderIndex < leaderElectionServices.size());
+
+		TestingLeaderElectionService testingLeaderElectionService = leaderElectionServices.get(currentLeaderIndex);
+
+		testingLeaderElectionService.notLeader();
+
+		currentLeaderIndex = -1;
+		currentLeaderId = null;
+	}
+
+	public void notifyRetrievers(int index, UUID leaderId) {
+		for (TestingLeaderRetrievalService retrievalService: leaderRetrievalServices) {
+			retrievalService.notifyListener(getLeaderAddress(index), leaderId);
+		}
+	}
+
+	private String getLeaderAddress(int index) {
+		if (0 <= index && index < leaderElectionServices.size()) {
+			TestingLeaderElectionService testingLeaderElectionService = leaderElectionServices.get(index);
+			return testingLeaderElectionService.getAddress();
+		} else {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
new file mode 100644
index 0000000..676b0d8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Testing high availability service which can be manually controlled. The leader election and
+ * notification of new leaders is triggered manually via {@link #grantLeadership(JobID, int, UUID)}
+ * and {@link #notifyRetrievers(JobID, int, UUID)}.
+ */
+public class TestingManualHighAvailabilityServices implements HighAvailabilityServices {
+
+	private final Map<JobID, ManualLeaderService> jobManagerLeaderServices;
+
+	private final ManualLeaderService resourceManagerLeaderService;
+
+	public TestingManualHighAvailabilityServices() {
+		jobManagerLeaderServices = new HashMap<>(4);
+		resourceManagerLeaderService = new ManualLeaderService();
+	}
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+		return resourceManagerLeaderService.createLeaderRetrievalService();
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+		ManualLeaderService leaderService = getOrCreateJobManagerLeaderService(jobID);
+
+		return leaderService.createLeaderRetrievalService();
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
+		return resourceManagerLeaderService.createLeaderElectionService();
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+		ManualLeaderService leaderService = getOrCreateJobManagerLeaderService(jobID);
+
+		return leaderService.createLeaderElectionService();
+	}
+
+	@Override
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+		return new StandaloneCheckpointRecoveryFactory();
+	}
+
+	@Override
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+		return new StandaloneSubmittedJobGraphStore();
+	}
+
+	@Override
+	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+		return new StandaloneRunningJobsRegistry();
+	}
+
+	@Override
+	public BlobStore createBlobStore() throws IOException {
+		return new VoidBlobStore();
+	}
+
+	@Override
+	public void close() throws Exception {
+		// nothing to do
+	}
+
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		// nothing to do
+	}
+
+	public void grantLeadership(JobID jobId, int index, UUID leaderId) {
+		ManualLeaderService manualLeaderService = jobManagerLeaderServices.get(jobId);
+
+		if (manualLeaderService != null) {
+			manualLeaderService.grantLeadership(index, leaderId);
+		} else {
+			throw new IllegalStateException("No manual leader service for job id " + jobId +
+				" has been initialized.");
+		}
+	}
+
+	public void revokeLeadership(JobID jobId) {
+		ManualLeaderService manualLeaderService = jobManagerLeaderServices.get(jobId);
+
+		if (manualLeaderService != null) {
+			manualLeaderService.revokeLeadership();
+		} else {
+			throw new IllegalStateException("No manual leader service for job id " + jobId +
+				" has been initialized.");
+		}
+	}
+
+	public void notifyRetrievers(JobID jobId, int index, UUID leaderId) {
+		ManualLeaderService manualLeaderService = jobManagerLeaderServices.get(jobId);
+
+		if (manualLeaderService != null) {
+			manualLeaderService.notifyRetrievers(index, leaderId);
+		} else {
+			throw new IllegalStateException("No manual leader service for job id " + jobId +
+				" has been initialized.");
+		}
+	}
+
+	private ManualLeaderService getOrCreateJobManagerLeaderService(JobID jobId) {
+		ManualLeaderService manualLeaderService = jobManagerLeaderServices.get(jobId);
+
+		if (manualLeaderService == null) {
+			manualLeaderService = new ManualLeaderService();
+			jobManagerLeaderServices.put(jobId, manualLeaderService);
+		}
+
+		return manualLeaderService;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 77eb566..b8b5984 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceManager;
@@ -67,7 +68,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -123,7 +124,7 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class JobManagerHARecoveryTest {
+public class JobManagerHARecoveryTest extends TestLogger {
 
 	private static ActorSystem system;
 
@@ -171,8 +172,11 @@ public class JobManagerHARecoveryTest {
 			CheckpointRecoveryFactory checkpointStateFactory = new MyCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
 			TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
 			TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService(
-				"localhost",
-				HighAvailabilityServices.DEFAULT_LEADER_ID);
+				null,
+				null);
+			TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
+
+			testingHighAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, myLeaderRetrievalService);
 
 			InstanceManager instanceManager = new InstanceManager();
 			instanceManager.addInstanceListener(scheduler);
@@ -200,14 +204,14 @@ public class JobManagerHARecoveryTest {
 			ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID);
 
 			taskManager = TaskManager.startTaskManagerComponentsAndActor(
-					flinkConfiguration,
-					ResourceID.generate(),
-					system,
-					"localhost",
-					Option.apply("taskmanager"),
-					Option.apply((LeaderRetrievalService) myLeaderRetrievalService),
-					true,
-					TestingTaskManager.class);
+				flinkConfiguration,
+				ResourceID.generate(),
+				system,
+				testingHighAvailabilityServices,
+				"localhost",
+				Option.apply("taskmanager"),
+				true,
+				TestingTaskManager.class);
 
 			ActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID);
 
@@ -226,9 +230,9 @@ public class JobManagerHARecoveryTest {
 					vertexId,
 					vertexId,
 					vertexId,
-					100,
-					10 * 60 * 1000,
-					0,
+					100L,
+					10L * 60L * 1000L,
+					0L,
 					1,
 					ExternalizedCheckpointSettings.none(),
 					null,
@@ -604,5 +608,4 @@ public class JobManagerHARecoveryTest {
 			return recoveredStates;
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 2ac3ea7..35ac1e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import org.apache.flink.configuration.Configuration;
@@ -50,7 +51,7 @@ import java.util.regex.Pattern;
 /**
  * Tests that the JobManager process properly exits when the JobManager actor dies.
  */
-public class JobManagerProcessReapingTest {
+public class JobManagerProcessReapingTest extends TestLogger {
 
 	@Test
 	public void testReapProcessOnFailure() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 0ab1b67..9ac6873 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.util.StartupUtils;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,7 +45,7 @@ import org.junit.Test;
  * Tests that verify the startup behavior of the JobManager in failure
  * situations, when the JobManager cannot be started.
  */
-public class JobManagerStartupTest {
+public class JobManagerStartupTest extends TestLogger {
 
 	private final static String DOES_NOT_EXISTS_NO_SIR = "does-not-exist-no-sir";
 
@@ -101,7 +102,7 @@ public class JobManagerStartupTest {
 					throw (BindException) cause;
 				}	
 			}
-			fail("this should throw a BindException");
+			throw e;
 		}
 		finally {
 			try {


Mime
View raw message