flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [03/10] flink git commit: [FLINK-2291] [runtime] Add ZooKeeper support to elect a leader from a set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.
Date Mon, 31 Aug 2015 10:31:39 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 057ffeb..d762ab4 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -18,16 +18,23 @@
 
 package org.apache.flink.runtime.testingUtils
 
+import java.util.concurrent.TimeoutException
+
+import akka.pattern.ask
 import akka.actor.{ActorRef, Props, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 import org.apache.flink.runtime.webmonitor.WebMonitor
 
+import scala.concurrent.{Await, Future}
+
 /**
  * Testing cluster which starts the [[JobManager]] and [[TaskManager]] actors with testing
support
  * in the same [[ActorSystem]].
@@ -36,13 +43,15 @@ import org.apache.flink.runtime.webmonitor.WebMonitor
  * @param singleActorSystem true if all actors shall be running in the same [[ActorSystem]],
  *                          otherwise false
  */
-class TestingCluster(userConfiguration: Configuration,
-                     singleActorSystem: Boolean,
-                     synchronousDispatcher: Boolean,
-                     streamingMode: StreamingMode)
-  extends FlinkMiniCluster(userConfiguration,
-                           singleActorSystem,
-                           streamingMode) {
+class TestingCluster(
+    userConfiguration: Configuration,
+    singleActorSystem: Boolean,
+    synchronousDispatcher: Boolean,
+    streamingMode: StreamingMode)
+  extends FlinkMiniCluster(
+    userConfiguration,
+    singleActorSystem,
+    streamingMode) {
   
 
   def this(userConfiguration: Configuration,
@@ -68,7 +77,28 @@ class TestingCluster(userConfiguration: Configuration,
     cfg
   }
 
-  override def startJobManager(actorSystem: ActorSystem): (ActorRef, Option[WebMonitor])
= {
+  override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val jobManagerName = if(singleActorSystem) {
+      JobManager.JOB_MANAGER_NAME + "_" + (index + 1)
+    } else {
+      JobManager.JOB_MANAGER_NAME
+    }
+
+    val archiveName = if(singleActorSystem) {
+      JobManager.ARCHIVE_NAME + "_" + (index + 1)
+    } else {
+      JobManager.ARCHIVE_NAME
+    }
+
+    val jobManagerPort = config.getInteger(
+      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+
+    if(jobManagerPort > 0) {
+      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
+    }
 
     val (executionContext,
       instanceManager,
@@ -78,11 +108,14 @@ class TestingCluster(userConfiguration: Configuration,
       executionRetries,
       delayBetweenRetries,
       timeout,
-      archiveCount) = JobManager.createJobManagerComponents(configuration)
-    
+      archiveCount,
+      leaderElectionService) = JobManager.createJobManagerComponents(config)
+
     val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
-    val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
-    
+    val archive = actorSystem.actorOf(testArchiveProps, archiveName)
+
+    val resolvedLeaderElectionService = createLeaderElectionService(leaderElectionService)
+
     val jobManagerProps = Props(
       new TestingJobManager(
         configuration,
@@ -94,7 +127,8 @@ class TestingCluster(userConfiguration: Configuration,
         executionRetries,
         delayBetweenRetries,
         timeout,
-        streamingMode))
+        streamingMode,
+        resolvedLeaderElectionService))
 
     val dispatcherJobManagerProps = if (synchronousDispatcher) {
       // disable asynchronous futures (e.g. accumulator update in Heartbeat)
@@ -103,27 +137,60 @@ class TestingCluster(userConfiguration: Configuration,
       jobManagerProps
     }
 
-    (actorSystem.actorOf(dispatcherJobManagerProps, JobManager.JOB_MANAGER_NAME), None)
+    actorSystem.actorOf(dispatcherJobManagerProps, jobManagerName)
   }
 
   override def startTaskManager(index: Int, system: ActorSystem) = {
 
     val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1)
 
-    val jobManagerPath: Option[String] = if (singleActorSystem) {
-      Some(jobManagerActor.path.toString)
-    } else {
-      None
-    }
-    
     TaskManager.startTaskManagerComponentsAndActor(
       configuration,
       system,
       hostname,
       Some(tmActorName),
-      jobManagerPath,
+      Some(createLeaderRetrievalService),
       numTaskManagers == 1,
       streamingMode,
       classOf[TestingTaskManager])
   }
+
+
+  def createLeaderElectionService(electionService: LeaderElectionService): LeaderElectionService
= {
+    electionService
+  }
+
+  @throws(classOf[TimeoutException])
+  @throws(classOf[InterruptedException])
+  def waitForTaskManagersToBeAlive(): Unit = {
+    val aliveFutures = taskManagerActors map {
+      _ map {
+        tm => (tm ? Alive)(timeout)
+      }
+    } getOrElse(Seq())
+
+    val combinedFuture = Future.sequence(aliveFutures)
+
+    Await.ready(combinedFuture, timeout)
+  }
+
+  @throws(classOf[TimeoutException])
+  @throws(classOf[InterruptedException])
+  def waitForActorsToBeAlive(): Unit = {
+    val tmsAliveFutures = taskManagerActors map {
+      _ map {
+        tm => (tm ? Alive)(timeout)
+      }
+    } getOrElse(Seq())
+
+    val jmsAliveFutures = jobManagerActors map {
+      _ map {
+        tm => (tm ? Alive)(timeout)
+      }
+    } getOrElse(Seq())
+
+    val combinedFuture = Future.sequence(tmsAliveFutures ++ jmsAliveFutures)
+
+    Await.ready(combinedFuture, timeout)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 987af40..c91a421 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{Cancellable, Terminated, ActorRef}
-import akka.pattern.{ask, pipe}
+import akka.pattern.pipe
+import akka.pattern.ask
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.{StreamingMode, FlinkActor}
+import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
@@ -30,10 +33,12 @@ import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.Messages.Disconnect
+import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
+import org.apache.flink.runtime.testingUtils.TestingMessages.{CheckIfJobRemoved, Alive,
+DisableDisconnect}
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
 
 import scala.concurrent.{ExecutionContext, Future}
@@ -64,7 +69,8 @@ class TestingJobManager(
     defaultExecutionRetries: Int,
     delayBetweenRetries: Long,
     timeout: FiniteDuration,
-    mode: StreamingMode)
+    mode: StreamingMode,
+    leaderElectionService: LeaderElectionService)
   extends JobManager(
     flinkConfiguration,
     executionContext,
@@ -75,7 +81,8 @@ class TestingJobManager(
     defaultExecutionRetries,
     delayBetweenRetries,
     timeout,
-    mode) {
+    mode,
+    leaderElectionService) {
 
   import scala.collection.JavaConverters._
   import context._
@@ -93,6 +100,8 @@ class TestingJobManager(
 
   val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
 
+  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
+
   var disconnectDisabled = false
 
   override def handleMessage: Receive = {
@@ -100,6 +109,8 @@ class TestingJobManager(
   }
 
   def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+
     case RequestExecutionGraph(jobID) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => sender ! decorateMessage(
@@ -168,9 +179,23 @@ class TestingJobManager(
         gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
       }
 
-      import context.dispatcher
+      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
 
-      Future.fold(responses)(true)(_ & _).map(decorateMessage(_)) pipeTo sender
+      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
+
+      import context.dispatcher
+      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender
+
+    case CheckIfJobRemoved(jobID) =>
+      if(currentJobs.contains(jobID)) {
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID))
+        )(context.dispatcher, sender())
+      } else {
+        sender() ! decorateMessage(true)
+      }
 
     case NotifyWhenTaskManagerTerminated(taskManager) =>
       val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
@@ -293,6 +318,20 @@ class TestingJobManager(
           }
         }
       }
+
+    case NotifyWhenLeader =>
+      if (leaderElectionService.hasLeadership) {
+        sender() ! true
+      } else {
+        waitForLeader += sender()
+      }
+
+    case msg: GrantLeadership =>
+      super.handleMessage(msg)
+
+      waitForLeader.foreach(_ ! true)
+
+      waitForLeader.clear()
   }
 
   def checkIfAllVerticesRunning(jobID: JobID): Boolean = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 17beff0..acade53 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -57,12 +57,13 @@ object TestingJobManagerMessages {
   case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
   case class TaskManagerTerminated(taskManager: ActorRef)
 
-  /* Registers a listener to receive a message when accumulators changed.
+  /**
+   * Registers a listener to receive a message when accumulators changed.
    * The change must be explicitly triggered by the TestingTaskManager which can receive
an
    * [[AccumulatorChanged]] message by a task that changed the accumulators. This message
is then
    * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
    * message when the next Heartbeat occurs.
-   * */
+   */
   case class NotifyWhenAccumulatorChange(jobID: JobID)
 
   /**
@@ -71,4 +72,11 @@ object TestingJobManagerMessages {
   case class UpdatedAccumulators(jobID: JobID,
     flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
     userAccumulators: Map[String, Accumulator[_,_]])
+
+  /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader
+    *
+    */
+  case object NotifyWhenLeader
+
+  def getNotifyWhenLeader: AnyRef = NotifyWhenLeader
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
index 52cc1f3..c840ff0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
@@ -18,7 +18,17 @@
 
 package org.apache.flink.runtime.testingUtils
 
+import org.apache.flink.api.common.JobID
+
 object TestingMessages {
 
+  case class CheckIfJobRemoved(jobID: JobID)
+
   case object DisableDisconnect
+
+  case object Alive
+
+  def getAlive: AnyRef = Alive
+
+  def getDisableDisconnect: AnyRef = DisableDisconnect
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 8cc1e92..475115e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,20 +19,23 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{Terminated, ActorRef}
-import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
 RequestLeaderSessionID}
-import org.apache.flink.runtime.messages.Messages.Disconnect
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
+AcknowledgeRegistration}
 import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, TaskInFinalState}
 import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
+import org.apache.flink.runtime.testingUtils.TestingMessages.{CheckIfJobRemoved, Alive,
+DisableDisconnect}
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
 
 import scala.concurrent.duration._
@@ -42,7 +45,6 @@ import scala.language.postfixOps
   *
   * @param config
   * @param connectionInfo
-  * @param jobManagerAkkaURL
   * @param memoryManager
   * @param ioManager
   * @param network
@@ -51,25 +53,25 @@ import scala.language.postfixOps
 class TestingTaskManager(
     config: TaskManagerConfiguration,
     connectionInfo: InstanceConnectionInfo,
-    jobManagerAkkaURL: String,
     memoryManager: DefaultMemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
-    numberOfSlots: Int)
+    numberOfSlots: Int,
+    leaderRetrievalService: LeaderRetrievalService)
   extends TaskManager(
     config,
     connectionInfo,
-    jobManagerAkkaURL,
     memoryManager,
     ioManager,
     network,
-    numberOfSlots) {
+    numberOfSlots,
+    leaderRetrievalService) {
 
   import scala.collection.JavaConverters._
 
   val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val waitForJobRemoval = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
   val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+  val waitForRegisteredAtJobManager = scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
   val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
   val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
 
@@ -83,6 +85,8 @@ class TestingTaskManager(
   }
 
   def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+
     case NotifyWhenTaskIsRunning(executionID) => {
       Option(runningTasks.get(executionID)) match {
         case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
@@ -136,32 +140,28 @@ class TestingTaskManager(
 
     case NotifyWhenJobRemoved(jobID) =>
       if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
-        val set = waitForJobRemoval.getOrElse(jobID, Set())
-        waitForJobRemoval += (jobID -> (set + sender))
-        import context.dispatcher
         context.system.scheduler.scheduleOnce(
           200 milliseconds,
-          this.self,
-          decorateMessage(CheckIfJobRemoved(jobID)))
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+          context.dispatcher,
+          sender
+          )
       }else{
-        waitForJobRemoval.get(jobID) match {
-          case Some(listeners) => (listeners + sender) foreach (_ ! decorateMessage(true))
-          case None => sender ! decorateMessage(true)
-        }
+        sender ! decorateMessage(true)
       }
 
     case CheckIfJobRemoved(jobID) =>
       if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
-        waitForJobRemoval.remove(jobID) match {
-          case Some(listeners) => listeners foreach (_ ! decorateMessage(true))
-          case None =>
-        }
+        sender ! decorateMessage(true)
       } else {
-        import context.dispatcher
         context.system.scheduler.scheduleOnce(
           200 milliseconds,
-          this.self,
-          decorateMessage(CheckIfJobRemoved(jobID)))
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+          context.dispatcher,
+          sender
+          )
       }
 
     case NotifyWhenJobManagerTerminated(jobManager) =>
@@ -218,6 +218,29 @@ class TestingTaskManager(
       }
 
     case RequestLeaderSessionID =>
-      sender() ! ResponseLeaderSessionID(leaderSessionID)
+      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
+
+    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
+      if(isConnected && jobManager == currentJobManager.get) {
+        sender() ! true
+      } else {
+        val list = waitForRegisteredAtJobManager.getOrElse(
+          jobManager,
+          Set[ActorRef]())
+
+        waitForRegisteredAtJobManager += jobManager -> (list + sender())
+      }
+
+    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
+      super.handleMessage(msg)
+
+      val jm = sender()
+
+      waitForRegisteredAtJobManager.remove(jm).foreach {
+        listeners => listeners.foreach{
+          listener =>
+            listener ! true
+        }
+      }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index 1c428cc..ca57245 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -41,8 +41,6 @@ object TestingTaskManagerMessages {
 
   case object RequestNumActiveConnections
   case class ResponseNumActiveConnections(number: Int)
-
-  case class CheckIfJobRemoved(jobID: JobID)
   
   case object RequestRunningTasks
   
@@ -52,6 +50,8 @@ object TestingTaskManagerMessages {
 
   case class JobManagerTerminated(jobManager: ActorRef)
 
+  case class NotifyWhenRegisteredAtJobManager(jobManager: ActorRef)
+
   /**
    * Message to give a hint to the task manager that accumulator values were updated in the
task.
    * This message is forwarded to the job manager which knows that it needs to notify listeners

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 914f37c..21939d6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,15 +18,27 @@
 
 package org.apache.flink.runtime.testingUtils
 
+import java.util.UUID
+
+import akka.actor.{Props, Kill, ActorSystem, ActorRef}
+import akka.pattern.ask
 import com.google.common.util.concurrent.MoreExecutors
 
 import com.typesafe.config.ConfigFactory
+import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor, StreamingMode}
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
+import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
+import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtAnyJobManager
+import org.apache.flink.runtime.taskmanager.TaskManager
 
 import scala.concurrent.duration._
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{Await, ExecutionContext}
 import scala.language.postfixOps
 
 /**
@@ -60,9 +72,14 @@ object TestingUtils {
                           timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
-    config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs)
+    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs)
     config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
-    new TestingCluster(config)
+
+    val cluster = new TestingCluster(config)
+
+    cluster.start()
+
+    cluster
   }
 
   /** Returns the global [[ExecutionContext]] which is a [[scala.concurrent.forkjoin.ForkJoinPool]]
@@ -132,4 +149,182 @@ object TestingUtils {
       runnables.isEmpty
     }
   }
+
+  def createTaskManager(
+    actorSystem: ActorSystem,
+    jobManager: ActorRef,
+    configuration: Configuration,
+    useLocalCommunication: Boolean,
+    waitForRegistration: Boolean)
+  : ActorGateway = {
+    val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager)
+
+    createTaskManager(
+      actorSystem,
+      jobManagerURL,
+      configuration,
+      useLocalCommunication,
+      waitForRegistration
+    )
+  }
+
+  def createTaskManager(
+      actorSystem: ActorSystem,
+      jobManager: ActorGateway,
+      configuration: Configuration,
+      useLocalCommunication: Boolean,
+      waitForRegistration: Boolean)
+    : ActorGateway = {
+    val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor)
+
+    createTaskManager(
+      actorSystem,
+      jobManagerURL,
+      configuration,
+      useLocalCommunication,
+      waitForRegistration
+    )
+  }
+
+  /** Creates a local TaskManager in the given ActorSystem. It is given a
+    * [[StandaloneLeaderRetrievalService]] which returns the given jobManagerURL. After creating
+    * the TaskManager, waitForRegistration specifies whether one waits until the TaskManager
has
+    * registered at the JobManager. An ActorGateway to the TaskManager is returned.
+    *
+    * @param actorSystem ActorSystem in which the TaskManager shall be started
+    * @param jobManagerURL URL of the JobManager to connect to
+    * @param configuration Configuration
+    * @param useLocalCommunication true if the network stack shall use exclusively local
+    *                              communication
+    * @param waitForRegistration true if the method will wait until the TaskManager has connected
to
+    *                            the JobManager
+    * @return ActorGateway of the created TaskManager
+    */
+  def createTaskManager(
+      actorSystem: ActorSystem,
+      jobManagerURL: String,
+      configuration: Configuration,
+      useLocalCommunication: Boolean,
+      waitForRegistration: Boolean)
+    : ActorGateway = {
+
+    val resultingConfiguration = new Configuration()
+
+    resultingConfiguration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
+
+    resultingConfiguration.addAll(configuration)
+
+    val leaderRetrievalService = Option(new StandaloneLeaderRetrievalService(jobManagerURL))
+
+    val taskManager = TaskManager.startTaskManagerComponentsAndActor(
+      resultingConfiguration,
+      actorSystem,
+      "localhost",
+      None,
+      leaderRetrievalService,
+      useLocalCommunication,
+      StreamingMode.BATCH_ONLY,
+      classOf[TestingTaskManager]
+    )
+
+    if (waitForRegistration) {
+      val notificationResult = (taskManager ? NotifyWhenRegisteredAtAnyJobManager)(TESTING_DURATION)
+
+      Await.ready(notificationResult, TESTING_DURATION)
+    }
+
+    new AkkaActorGateway(taskManager, null)
+  }
+
+  /** Stops the given actor by sending it a Kill message
+    *
+    * @param actor
+    */
+  def stopActor(actor: ActorRef): Unit = {
+    if (actor != null) {
+      actor ! Kill
+    }
+  }
+
+  /** Stops the given actro by sending it a Kill message
+    *
+    * @param actorGateway
+    */
+  def stopActor(actorGateway: ActorGateway): Unit = {
+    if (actorGateway != null) {
+      stopActor(actorGateway.actor())
+    }
+  }
+
+  /** Creates a testing JobManager using the default recovery mode (standalone)
+    *
+    * @param actorSystem
+    * @param configuration
+    * @return
+    */
+  def createJobManager(
+      actorSystem: ActorSystem,
+      configuration: Configuration)
+    : ActorGateway = {
+
+    configuration.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE)
+
+      val (actor, _) = JobManager.startJobManagerActors(
+        configuration,
+        actorSystem,
+        Some(JobManager.JOB_MANAGER_NAME),
+        Some(JobManager.ARCHIVE_NAME),
+        StreamingMode.BATCH_ONLY)
+
+    new AkkaActorGateway(actor, null)
+  }
+
+  /** Creates a forwarding JobManager which sends all received message to the forwarding
target.
+    *
+    * @param actorSystem
+    * @param forwardingTarget
+    * @param jobManagerName
+    * @return
+    */
+  def createForwardingJobManager(
+      actorSystem: ActorSystem,
+      forwardingTarget: ActorRef,
+      jobManagerName: Option[String] = None)
+    : ActorGateway = {
+
+    val actor = jobManagerName match {
+      case Some(name) =>
+        actorSystem.actorOf(
+          Props(
+            classOf[ForwardingActor],
+            forwardingTarget,
+            None),
+          name
+        )
+      case None =>
+        actorSystem.actorOf(
+          Props(
+            classOf[ForwardingActor],
+            forwardingTarget,
+            None)
+        )
+    }
+
+    new AkkaActorGateway(actor, null)
+  }
+
+  class ForwardingActor(val target: ActorRef, val leaderSessionID: Option[UUID])
+    extends FlinkActor with LeaderSessionMessageFilter with LogMessages {
+
+    /** Handle incoming messages
+      *
+      * @return
+      */
+    override def handleMessage: Receive = {
+      case msg => target.forward(msg)
+    }
+
+    override val log: Logger = Logger(getClass)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-shaded-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml
index d7e3158..71cc895 100644
--- a/flink-shaded-hadoop/pom.xml
+++ b/flink-shaded-hadoop/pom.xml
@@ -101,6 +101,10 @@ under the License.
 									<pattern>org.jboss.netty</pattern>
 									<shadedPattern>org.apache.flink.hadoop.shaded.org.jboss.netty</shadedPattern>
 								</relocation>
+								<relocation>
+									<pattern>org.apache.curator</pattern>
+									<shadedPattern>org.apache.flink.hadoop.shaded.org.apache.curator</shadedPattern>
+								</relocation>
 							</relocations>
 						</configuration>
 					</execution>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 528e871..f691806 100644
--- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.avro;
 
 import java.io.File;
-import java.net.InetSocketAddress;
 
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
@@ -45,14 +44,21 @@ public class AvroExternalJarProgramITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
 			testMiniCluster = new ForkableFlinkMiniCluster(config, false);
+			testMiniCluster.start();
 			
 			String jarFile = JAR_FILE;
 			String testData = getClass().getResource(TEST_DATA_FILE).toString();
 			
 			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData
});
+
+			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
 						
-			Client c = new Client(new InetSocketAddress("localhost", testMiniCluster.getJobManagerRPCPort()),
-					new Configuration(), program.getUserCodeClassLoader(), -1);
+			Client c = new Client(
+					config,
+					program.getUserCodeClassLoader(),
+					-1);
+
 			c.setPrintStatusDuringExecution(false);
 			c.run(program, 4, true);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
index 7cebccb..7f46c7e 100644
--- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.io.AvroInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index b04b24e..955122f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -47,6 +47,7 @@ public class DegreesWithExceptionITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
 			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster.start();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -72,7 +73,7 @@ public class DegreesWithExceptionITCase {
 	public void testOutDegreesInvalidEdgeSrcId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 		
@@ -96,7 +97,7 @@ public class DegreesWithExceptionITCase {
 	public void testInDegreesInvalidEdgeTrgId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -120,7 +121,7 @@ public class DegreesWithExceptionITCase {
 	public void testGetDegreesInvalidEdgeTrgId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -144,7 +145,7 @@ public class DegreesWithExceptionITCase {
 	public void testGetDegreesInvalidEdgeSrcId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -168,7 +169,7 @@ public class DegreesWithExceptionITCase {
 	public void testGetDegreesInvalidEdgeSrcTrgId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
index c53227b..ab10947 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
@@ -51,6 +51,7 @@ public class ReduceOnEdgesWithExceptionITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
 			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster.start();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -76,7 +77,7 @@ public class ReduceOnEdgesWithExceptionITCase {
 	public void testGroupReduceOnEdgesInvalidEdgeSrcId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -101,7 +102,7 @@ public class ReduceOnEdgesWithExceptionITCase {
 	public void testGroupReduceOnEdgesInvalidEdgeTrgId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
index 21799c7..b32abeb 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -52,6 +52,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
 			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster.start();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -78,7 +79,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
 	public void testGroupReduceOnNeighborsWithVVInvalidEdgeSrcId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -104,7 +105,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
 	public void testGroupReduceOnNeighborsWithVVInvalidEdgeTrgId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -130,7 +131,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
 	public void testGroupReduceOnNeighborsInvalidEdgeSrcId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
@@ -156,7 +157,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
 	public void testGroupReduceOnNeighborsInvalidEdgeTrgId() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
+				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
index d1a61d3..a6be1a6 100644
--- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
index 7ad1d2f..ad0f655 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
@@ -67,7 +67,7 @@ object FlinkShell {
     val (host,port) = if (userHost == "none" || userPort == -1 ) {
       println("Creating new local server")
       cluster = new LocalFlinkMiniCluster(new Configuration, false)
-      ("localhost",cluster.getJobManagerRPCPort)
+      ("localhost",cluster.getLeaderRPCPort)
     } else {
       println(s"Connecting to remote server (host: $userHost, port: $userPort).")
       (userHost, userPort)

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 01fdaf0..1b04f7f 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -141,7 +141,7 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll
{
     // new local cluster
     val host = "localhost"
     val port = cluster match {
-      case Some(c) => c.getJobManagerRPCPort
+      case Some(c) => c.getLeaderRPCPort
 
       case _ => throw new RuntimeException("Test cluster not initialized.")
     }
@@ -187,7 +187,14 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll
{
   val parallelism = 4
 
   override def beforeAll(): Unit = {
-    val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false,
false)
+    val cl = TestBaseUtils.startCluster(
+      1,
+      parallelism,
+      StreamingMode.BATCH_ONLY,
+      false,
+      false,
+      false)
+
     val clusterEnvironment = new TestEnvironment(cl, parallelism)
     clusterEnvironment.setAsContext()
 


Mime
View raw message