openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rab...@apache.org
Subject [incubator-openwhisk] branch master updated: Stabilize loadbalancer under on/off conditions. (#2476)
Date Thu, 20 Jul 2017 18:30:40 GMT
This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new aa13227  Stabilize loadbalancer under on/off conditions. (#2476)
aa13227 is described below

commit aa13227e61350032a27065378d32999ff9d9090c
Author: Markus Thömmes <markusthoemmes@me.com>
AuthorDate: Thu Jul 20 20:30:37 2017 +0200

    Stabilize loadbalancer under on/off conditions. (#2476)
    
    If invokers are coming online/offline frequently today, the list of available invokers changes frequently which causes the hash-based picking algorithm to constantly change home invokers of actions, thus destroying warm-ratios of that action.
    
    This attacks this issue by accepting that invokers are in fact index based and thus they are now managed in an array which's length will not change if an invoker goes offline. For partitioning cases (i.e. blackbox containers) the length of the underlying sequence is kept constant by padding them respectively.
    
    Removed invokerDownCallback-handling since the bookkeeping is self-healing.
---
 .../main/scala/whisk/core/connector/Message.scala  |   4 +-
 .../scala/whisk/core/controller/Controller.scala   |   4 +-
 .../core/loadBalancer/InvokerSupervision.scala     |  63 ++++----
 .../whisk/core/loadBalancer/LoadBalancerData.scala |  19 +--
 .../core/loadBalancer/LoadBalancerService.scala    | 172 ++++++++-------------
 .../main/scala/whisk/core/invoker/Invoker.scala    |   4 +-
 .../scala/whisk/core/invoker/InvokerReactive.scala |   2 +-
 .../connector/tests/CompletionMessageTests.scala   |   9 +-
 .../test/InvokerSupervisionTests.scala             | 111 ++++++-------
 .../loadBalancer/test/LoadBalancerDataTests.scala  |  21 +--
 .../test/LoadBalancerServiceObjectTests.scala      |  87 ++++++++---
 11 files changed, 247 insertions(+), 249 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 82c797d..d977a61 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -92,7 +92,7 @@ object ActivationMessage extends DefaultJsonProtocol {
 case class CompletionMessage(
     override val transid: TransactionId,
     response: Either[ActivationId, WhiskActivation],
-    invoker: String)
+    invoker: InstanceId)
     extends Message {
 
     override def serialize: String = {
@@ -109,7 +109,7 @@ object CompletionMessage extends DefaultJsonProtocol {
     private val serdes = jsonFormat3(CompletionMessage.apply)
 }
 
-case class PingMessage(name: String) extends Message {
+case class PingMessage(instance: InstanceId) extends Message {
     override def serialize = PingMessage.serdes.write(this).compactPrint
 }
 
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 2740190..bb01a0b 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -138,7 +138,9 @@ class Controller(
     private val internalInvokerHealth = {
         (path("invokers") & get) {
             complete {
-                loadBalancer.invokerHealth.map(_.mapValues(_.asString).toJson.asJsObject)
+                loadBalancer.allInvokers.map(_.map {
+                    case (instance, state) => s"invoker${instance.toInt}" -> state.asString
+                }.toMap.toJson.asJsObject)
             }
         }
     }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 7fefb87..9868a5e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -63,8 +63,8 @@ case object Offline extends InvokerState { val asString = "down" }
 case object Healthy extends InvokerState { val asString = "up" }
 case object UnHealthy extends InvokerState { val asString = "unhealthy" }
 
-case class ActivationRequest(msg: ActivationMessage, invoker: String)
-case class InvocationFinishedMessage(name: String, successful: Boolean)
+case class ActivationRequest(msg: ActivationMessage, invoker: InstanceId)
+case class InvocationFinishedMessage(invokerInstance: InstanceId, successful: Boolean)
 
 // Data stored in the Invoker
 final case class InvokerInfo(buffer: RingBuffer[Boolean])
@@ -81,10 +81,9 @@ final case class InvokerInfo(buffer: RingBuffer[Boolean])
  * by the InvokerPool and thus might not be caught by monitoring.
  */
 class InvokerPool(
-    childFactory: (ActorRefFactory, String) => ActorRef,
+    childFactory: (ActorRefFactory, InstanceId) => ActorRef,
     kv: KeyValueStore,
-    invokerDownCallback: String => Unit,
-    sendActivationToInvoker: (ActivationMessage, String) => Future[RecordMetadata],
+    sendActivationToInvoker: (ActivationMessage, InstanceId) => Future[RecordMetadata],
     pingConsumer: MessageConsumer) extends Actor {
 
     implicit val transid = TransactionId.invokerHealth
@@ -94,35 +93,41 @@ class InvokerPool(
 
     // State of the actor. It's important not to close over these
     // references directly, so they don't escape the Actor.
-    val invokers = mutable.HashMap.empty[String, ActorRef]
-    val invokerStatus = mutable.HashMap.empty[String, InvokerState]
+    val instanceToRef = mutable.Map[InstanceId, ActorRef]()
+    val refToInstance = mutable.Map[ActorRef, InstanceId]()
+    var status = IndexedSeq[(InstanceId, InvokerState)]()
 
     def receive = {
         case p: PingMessage =>
-            val invoker = invokers.getOrElseUpdate(p.name, {
-                logging.info(this, s"registered a new invoker: ${p.name}")(TransactionId.invokerHealth)
-                val ref = childFactory(context, p.name)
+            val invoker = instanceToRef.getOrElseUpdate(p.instance, {
+                logging.info(this, s"registered a new invoker: invoker${p.instance.toInt}")(TransactionId.invokerHealth)
+
+                status = padToIndexed(status, p.instance.toInt + 1, i => (InstanceId(i), Offline))
+
+                val ref = childFactory(context, p.instance)
                 ref ! SubscribeTransitionCallBack(self) // register for state change events
+
+                refToInstance.update(ref, p.instance)
                 ref
             })
             invoker.forward(p)
 
-        case GetStatus => sender() ! invokerStatus.toMap
+        case GetStatus => sender() ! status
 
         case msg: InvocationFinishedMessage => {
             // Forward message to invoker, if InvokerActor exists
-            invokers.get(msg.name).map(_.forward(msg))
+            instanceToRef.get(msg.invokerInstance).map(_.forward(msg))
         }
 
         case CurrentState(invoker, currentState: InvokerState) =>
-            invokerStatus.update(invoker.path.name, currentState)
+            refToInstance.get(invoker).foreach { instance =>
+                status = status.updated(instance.toInt, (instance, currentState))
+            }
             publishStatus()
 
         case Transition(invoker, oldState: InvokerState, newState: InvokerState) =>
-            invokerStatus.update(invoker.path.name, newState)
-            newState match {
-                case Offline => Future(invokerDownCallback(invoker.path.name))
-                case _       =>
+            refToInstance.get(invoker).foreach {
+                instance => status = status.updated(instance.toInt, (instance, newState))
             }
             publishStatus()
 
@@ -131,12 +136,10 @@ class InvokerPool(
     }
 
     def publishStatus() = {
-        val json = invokerStatus.toMap.mapValues(_.asString).toJson
+        val json = status.map { case (instance, state) => s"invoker${instance.toInt}" -> state.asString }.toMap.toJson
         kv.put(LoadBalancerKeys.invokerHealth, json.compactPrint)
 
-        val pretty = invokerStatus.toSeq.sortBy {
-            case (name, _) => name.filter(_.isDigit).toInt
-        }.map { case (name, state) => s"$name: $state" }
+        val pretty = status.map { case (instance, state) => s"${instance.toInt} -> $state" }
         logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}")
     }
 
@@ -158,16 +161,18 @@ class InvokerPool(
                 logging.error(this, s"failed processing message: $raw with $t")
         }
     }
+
+    /** Pads a list to a given length using the given function to compute entries */
+    def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A) = list ++ (list.size until n).map(f)
 }
 
 object InvokerPool {
     def props(
-        f: (ActorRefFactory, String) => ActorRef,
+        f: (ActorRefFactory, InstanceId) => ActorRef,
         kv: KeyValueStore,
-        cb: String => Unit,
-        p: (ActivationMessage, String) => Future[RecordMetadata],
+        p: (ActivationMessage, InstanceId) => Future[RecordMetadata],
         pc: MessageConsumer) = {
-        Props(new InvokerPool(f, kv, cb, p, pc))
+        Props(new InvokerPool(f, kv, p, pc))
     }
 
     /** A stub identity for invoking the test action. This does not need to be a valid identity. */
@@ -191,10 +196,10 @@ object InvokerPool {
  * This finite state-machine represents an Invoker in its possible
  * states "Healthy" and "Offline".
  */
-class InvokerActor(controllerInstance: InstanceId) extends FSM[InvokerState, InvokerInfo] {
+class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId) extends FSM[InvokerState, InvokerInfo] {
     implicit val transid = TransactionId.invokerHealth
     implicit val logging = new AkkaLogging(context.system.log)
-    def name = self.path.name
+    val name = s"invoker${invokerInstance.toInt}"
 
     val healthyTimeout = 10.seconds
 
@@ -315,7 +320,7 @@ class InvokerActor(controllerInstance: InstanceId) extends FSM[InvokerState, Inv
                 rootControllerIndex = controllerInstance,
                 content = None)
 
-            context.parent ! ActivationRequest(activationMessage, name)
+            context.parent ! ActivationRequest(activationMessage, invokerInstance)
         }
     }
 
@@ -330,7 +335,7 @@ class InvokerActor(controllerInstance: InstanceId) extends FSM[InvokerState, Inv
 }
 
 object InvokerActor {
-    def props(controllerInstance: InstanceId) = Props(new InvokerActor(controllerInstance))
+    def props(invokerInstance: InstanceId, controllerInstance: InstanceId) = Props(new InvokerActor(invokerInstance, controllerInstance))
 
     val bufferSize = 10
     val bufferErrorTolerance = 3
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index 3fd3d96..a33b813 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -17,12 +17,13 @@
 
 package whisk.core.loadBalancer
 
-import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
+import whisk.core.entity.{ ActivationId, UUID, WhiskActivation }
 import scala.collection.concurrent.TrieMap
 import scala.concurrent.Promise
+import whisk.core.entity.InstanceId
 
 /** Encapsulates data relevant for a single activation */
-case class ActivationEntry(id: ActivationId, namespaceId: UUID, invokerName: String, promise: Promise[Either[ActivationId, WhiskActivation]])
+case class ActivationEntry(id: ActivationId, namespaceId: UUID, invokerName: InstanceId, promise: Promise[Either[ActivationId, WhiskActivation]])
 
 /**
  * Encapsulates data used for loadbalancer and active-ack bookkeeping.
@@ -34,7 +35,7 @@ class LoadBalancerData() {
 
     type TrieSet[T] = TrieMap[T, Unit]
 
-    private val activationByInvoker = new TrieMap[String, TrieSet[ActivationEntry]]
+    private val activationByInvoker = new TrieMap[InstanceId, TrieSet[ActivationEntry]]
     private val activationByNamespaceId = new TrieMap[UUID, TrieSet[ActivationEntry]]
     private val activationsById = new TrieMap[ActivationId, ActivationEntry]
 
@@ -52,21 +53,11 @@ class LoadBalancerData() {
      *
      * @return a map (invoker -> number of activations queued for the invoker)
      */
-    def activationCountByInvoker: Map[String, Int] = {
+    def activationCountByInvoker: Map[InstanceId, Int] = {
         activationByInvoker.toMap.mapValues(_.size)
     }
 
     /**
-     * Get all active activations for a specific invoker.
-     *
-     * @param invokerName the invoker to get activations for
-     * @return all active activations for the given invoker
-     */
-    def activationsByInvoker(invokerName: String): TrieSet[ActivationEntry] = {
-        activationByInvoker.getOrElseUpdate(invokerName, new TrieSet[ActivationEntry])
-    }
-
-    /**
      * Get an activation entry for a given activation id.
      *
      * @param activationId activation id to get data for
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 9f78e45..8313832 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -54,6 +54,8 @@ import whisk.core.entity.UUID
 import whisk.core.entity.WhiskAction
 import whisk.core.entity.types.EntityStore
 import scala.annotation.tailrec
+import whisk.core.entity.EntityName
+import whisk.core.entity.Identity
 
 trait LoadBalancer {
 
@@ -103,7 +105,7 @@ class LoadBalancerService(
 
     override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)(
         implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
-        chooseInvoker(action, msg).flatMap { invokerName =>
+        chooseInvoker(msg.user, action).flatMap { invokerName =>
             val entry = setupActivation(action, msg.activationId, msg.user.uuid, invokerName, transid)
             sendActivationToInvoker(messageProducer, msg, invokerName).map { _ =>
                 entry.promise.future
@@ -111,7 +113,10 @@ class LoadBalancerService(
         }
     }
 
-    def invokerHealth: Future[Map[String, InvokerState]] = invokerPool.ask(GetStatus)(Timeout(5.seconds)).mapTo[Map[String, InvokerState]]
+    /** An indexed sequence of all invokers in the current system */
+    def allInvokers: Future[IndexedSeq[(InstanceId, InvokerState)]] = invokerPool
+        .ask(GetStatus)(Timeout(5.seconds))
+        .mapTo[IndexedSeq[(InstanceId, InvokerState)]]
 
     /**
      * Tries to fill in the result slot (i.e., complete the promise) when a completion message arrives.
@@ -138,8 +143,7 @@ class LoadBalancerService(
     /**
      * Creates an activation entry and insert into various maps.
      */
-    private def setupActivation(action: ExecutableWhiskAction, activationId: ActivationId, namespaceId: UUID, invokerName: String, transid: TransactionId): ActivationEntry = {
-
+    private def setupActivation(action: ExecutableWhiskAction, activationId: ActivationId, namespaceId: UUID, invokerName: InstanceId, transid: TransactionId): ActivationEntry = {
         val timeout = action.limits.timeout.duration + activeAckTimeoutGrace
         // Install a timeout handler for the catastrophic case where an active ack is not received at all
         // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
@@ -155,19 +159,6 @@ class LoadBalancerService(
     }
 
     /**
-     * When invoker health detects a new invoker has come up, this callback is called.
-     */
-    private def clearInvokerState(invokerName: String) = {
-        val actSet = loadBalancerData.activationsByInvoker(invokerName)
-        actSet.keySet map {
-            case actEntry @ ActivationEntry(activationId, namespaceId, invokerIndex, promise) =>
-                promise.tryFailure(new LoadBalancerException(s"Invoker $invokerIndex restarted"))
-                loadBalancerData.removeActivation(actEntry)
-        }
-        actSet.clear()
-    }
-
-    /**
      * Creates or updates a health test action by updating the entity store.
      * This method is intended for use on startup.
      * @return Future that completes successfully iff the action is added to the database
@@ -187,13 +178,15 @@ class LoadBalancerService(
     /** Gets a producer which can publish messages to the kafka bus. */
     private val messageProducer = new KafkaProducerConnector(config.kafkaHost, executionContext)
 
-    private def sendActivationToInvoker(producer: MessageProducer, msg: ActivationMessage, invokerName: String): Future[RecordMetadata] = {
+    private def sendActivationToInvoker(producer: MessageProducer, msg: ActivationMessage, invoker: InstanceId): Future[RecordMetadata] = {
         implicit val transid = msg.transid
-        val start = transid.started(this, LoggingMarkers.CONTROLLER_KAFKA, s"posting topic '$invokerName' with activation id '${msg.activationId}'")
 
-        producer.send(invokerName, msg).andThen {
+        val topic = s"invoker${invoker.toInt}"
+        val start = transid.started(this, LoggingMarkers.CONTROLLER_KAFKA, s"posting topic '$topic' with activation id '${msg.activationId}'")
+
+        producer.send(topic, msg).andThen {
             case Success(status) => transid.finished(this, start, s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]")
-            case Failure(e)      => transid.failed(this, start, s"error on posting to topic $invokerName")
+            case Failure(e)      => transid.failed(this, start, s"error on posting to topic $topic")
         }
     }
 
@@ -211,12 +204,10 @@ class LoadBalancerService(
         val consul = new ConsulClient(config.consulServer)
         // Each controller gets its own Group Id, to receive all messages
         val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health", maxPeek = maxPingsPerPoll)
-        val invokerFactory = (f: ActorRefFactory, name: String) => f.actorOf(InvokerActor.props(instance), name)
+        val invokerFactory = (f: ActorRefFactory, invokerInstance: InstanceId) => f.actorOf(InvokerActor.props(invokerInstance, instance))
 
-        actorSystem.actorOf(InvokerPool.props(invokerFactory, consul.kv, invoker => {
-            clearInvokerState(invoker)
-            logging.info(this, s"cleared load balancer state for $invoker")(TransactionId.invokerHealth)
-        }, (m, i) => sendActivationToInvoker(messageProducer, m, i), pingConsumer))
+        actorSystem.actorOf(InvokerPool.props(invokerFactory, consul.kv,
+            (m, i) => sendActivationToInvoker(messageProducer, m, i), pingConsumer))
     }
 
     /**
@@ -247,59 +238,47 @@ class LoadBalancerService(
         }
     }
 
-    /** Return a sorted list of available invokers. */
-    private def availableInvokers: Future[Seq[String]] = invokerHealth.map {
-        _.collect {
-            case (name, Healthy) => name
-        }.toSeq.sortBy(_.drop(7).toInt) // Sort by the number in "invokerN"
-    }.recover {
-        case _ => Seq.empty[String]
-    }
-
     /** Compute the number of blackbox-dedicated invokers by applying a rounded down fraction of all invokers (but at least 1). */
     private def numBlackbox(totalInvokers: Int) = Math.max(1, (totalInvokers.toDouble * blackboxFraction).toInt)
 
     /** Return invokers (almost) dedicated to running blackbox actions. */
-    private def blackboxInvokers: Future[Seq[String]] = availableInvokers.map { allInvokers =>
-        allInvokers.takeRight(numBlackbox(allInvokers.length))
+    private def blackboxInvokers(invokers: IndexedSeq[(InstanceId, InvokerState)]): IndexedSeq[(InstanceId, InvokerState)] = {
+        val blackboxes = numBlackbox(invokers.size)
+        invokers.takeRight(blackboxes)
     }
 
-    /** Return (at least one) invokers for running non black-box actions.  This set can overlap with the blackbox set if there is only one invoker. */
-    private def managedInvokers: Future[Seq[String]] = availableInvokers.map { allInvokers =>
-        val numManaged = Math.max(1, allInvokers.length - numBlackbox(allInvokers.length))
-        allInvokers.take(numManaged)
+    /**
+     * Return (at least one) invokers for running non black-box actions.
+     * This set can overlap with the blackbox set if there is only one invoker.
+     */
+    private def managedInvokers(invokers: IndexedSeq[(InstanceId, InvokerState)]): IndexedSeq[(InstanceId, InvokerState)] = {
+        val managed = Math.max(1, invokers.length - numBlackbox(invokers.length))
+        invokers.take(managed)
     }
 
     /** Determine which invoker this activation should go to. Due to dynamic conditions, it may return no invoker. */
-    private def chooseInvoker(action: ExecutableWhiskAction, msg: ActivationMessage): Future[String] = {
-        val invokers = if (action.exec.pull) blackboxInvokers else managedInvokers
-        val hash = hashSubjectAction(msg)
-
-        invokers.flatMap { invokers =>
-            LoadBalancerService.schedule(
-                invokers,
-                loadBalancerData.activationCountByInvoker,
-                config.loadbalancerInvokerBusyThreshold,
-                hash) match {
-                    case Some(invoker) => Future.successful(invoker)
-                    case None =>
-                        logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
-                        Future.failed(new LoadBalancerException("no invokers available"))
-                }
+    private def chooseInvoker(user: Identity, action: ExecutableWhiskAction): Future[InstanceId] = {
+        val hash = generateHash(user.namespace, action)
+
+        allInvokers.flatMap { invokers =>
+            val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
+            val invokersWithUsage = invokersToUse.view.map {
+                // Using a view defers the comparably expensive lookup to actual access of the element
+                case (instance, state) => (instance, state, loadBalancerData.activationCountByInvoker.get(instance).getOrElse(0))
+            }
+
+            LoadBalancerService.schedule(invokersWithUsage, config.loadbalancerInvokerBusyThreshold, hash) match {
+                case Some(invoker) => Future.successful(invoker)
+                case None =>
+                    logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
+                    Future.failed(new LoadBalancerException("no invokers available"))
+            }
         }
     }
 
-    /*
-     * The path contains more than the action per se but seems sufficiently
-     * isomorphic as the other parts are constant.  Extracting just the
-     * action out specifically will involve some hairy regex's that the
-     * Invoker is currently using and which is better avoid if/until
-     * these are moved to some common place (like a subclass of Message?)
-     */
-    private def hashSubjectAction(msg: ActivationMessage): Int = {
-        val subject = msg.user.subject.asString
-        val path = msg.action.toString
-        (subject.hashCode() ^ path.hashCode()).abs
+    /** Generates a hash based on the string representation of namespace and action */
+    private def generateHash(namespace: EntityName, action: ExecutableWhiskAction): Int = {
+        (namespace.asString.hashCode() ^ action.fullyQualifiedName(false).asString.hashCode()).abs
     }
 }
 
@@ -329,58 +308,37 @@ object LoadBalancerService {
     /**
      * Scans through all invokers and searches for an invoker, that has a queue length
      * below the defined threshold. The threshold is subject to a 3 times back off. Iff
-     * no "underloaded" invoker was found it will default to the home invoker.
+     * no "underloaded" invoker was found it will default to the first invoker in the
+     * step-defined progression that is healthy.
      *
-     * @param availableInvokers a list of available (healthy) invokers to search in
-     * @param activationsPerInvoker a map of the number of outstanding activations per invoker
+     * @param invokers a list of available invokers to search in, including their state and usage
      * @param invokerBusyThreshold defines when an invoker is considered overloaded
      * @param hash stable identifier of the entity to be scheduled
      * @return an invoker to schedule to or None of no invoker is available
      */
-    def schedule[A](
-        availableInvokers: Seq[A],
-        activationsPerInvoker: collection.Map[A, Int],
+    def schedule(
+        invokers: Seq[(InstanceId, InvokerState, Int)],
         invokerBusyThreshold: Int,
-        hash: Int): Option[A] = {
+        hash: Int): Option[InstanceId] = {
 
-        val numInvokers = availableInvokers.length
+        val numInvokers = invokers.size
         if (numInvokers > 0) {
             val homeInvoker = hash % numInvokers
-
             val stepSizes = LoadBalancerService.pairwiseCoprimeNumbersUntil(numInvokers)
             val step = stepSizes(hash % stepSizes.size)
 
-            @tailrec
-            def search(targetInvoker: Int, iteration: Int = 1): A = {
-                // map the computed index to the actual invoker index
-                val invokerName = availableInvokers(targetInvoker)
-
-                // send the request to the target invoker if it has capacity...
-                if (activationsPerInvoker.get(invokerName).getOrElse(0) < invokerBusyThreshold * iteration) {
-                    invokerName
-                } else {
-                    // ... otherwise look for a less loaded invoker by stepping through a pre-computed
-                    // list of invokers; there are two possible outcomes:
-                    // 1. the search lands on a new invoker that has capacity, choose it
-                    // 2. walked through the entire list and found no better invoker than the
-                    //    "home invoker", force the home invoker
-                    val newTarget = (targetInvoker + step) % numInvokers
-                    if (newTarget == homeInvoker) {
-                        if (iteration < 3) {
-                            search(newTarget, iteration + 1)
-                        } else {
-                            availableInvokers(homeInvoker)
-                        }
-                    } else {
-                        search(newTarget, iteration)
-                    }
-                }
-            }
-
-            Some(search(homeInvoker))
-        } else {
-            None
-        }
+            val invokerProgression = Stream.from(0)
+                .take(numInvokers)
+                .map(i => (homeInvoker + i * step) % numInvokers)
+                .map(invokers)
+                .filter(_._2 == Healthy)
+
+            invokerProgression.find(_._3 < invokerBusyThreshold)
+                .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 2))
+                .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 3))
+                .orElse(invokerProgression.headOption)
+                .map(_._1)
+        } else None
     }
 
 }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index a789e4e..57ba590 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -281,7 +281,7 @@ class Invoker(
         implicit transid: TransactionId): Unit = {
 
         def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
-            val msg = CompletionMessage(transid, res, this.name)
+            val msg = CompletionMessage(transid, res, instance)
             producer.send(s"completed${tran.msg.rootControllerIndex.toInt}", msg).andThen {
                 case Success(_) =>
                     logging.info(this, s"posted ${if (recovery) "recovery" else ""} completion of activation ${activationResult.activationId}")
@@ -497,7 +497,7 @@ object Invoker {
         dispatcher.start()
 
         Scheduler.scheduleWaitAtMost(1.seconds)(() => {
-            producer.send("health", PingMessage(s"invoker${invokerInstance.toInt}")).andThen {
+            producer.send("health", PingMessage(invokerInstance)).andThen {
                 case Failure(t) => logger.error(this, s"failed to ping the controller: $t")
             }
         })
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 456182b..e406af9 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -114,7 +114,7 @@ class InvokerReactive(
         implicit val transid = tid
 
         def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
-            val msg = CompletionMessage(transid, res, this.name)
+            val msg = CompletionMessage(transid, res, instance)
             producer.send(s"completed${controllerInstance.toInt}", msg).andThen {
                 case Success(_) =>
                     logging.info(this, s"posted ${if (recovery) "recovery" else ""} completion of activation ${activationResult.activationId}")
diff --git a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
index 01f0282..ee8a072 100644
--- a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
+++ b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
@@ -27,7 +27,6 @@ import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
 
-import spray.json.DefaultJsonProtocol.StringJsonFormat
 import spray.json._
 import whisk.common.TransactionId
 import whisk.core.connector.CompletionMessage
@@ -57,7 +56,7 @@ class CompletionMessageTests extends FlatSpec with Matchers {
         duration = Some(123))
 
     it should "serialize a left completion message" in {
-        val m = CompletionMessage(TransactionId.testing, Left(ActivationId()), "xyz")
+        val m = CompletionMessage(TransactionId.testing, Left(ActivationId()), InstanceId(0))
         m.serialize shouldBe JsObject(
             "transid" -> m.transid.toJson,
             "response" -> m.response.left.get.toJson,
@@ -65,7 +64,7 @@ class CompletionMessageTests extends FlatSpec with Matchers {
     }
 
     it should "serialize a right completion message" in {
-        val m = CompletionMessage(TransactionId.testing, Right(activation), "xyz")
+        val m = CompletionMessage(TransactionId.testing, Right(activation), InstanceId(0))
         m.serialize shouldBe JsObject(
             "transid" -> m.transid.toJson,
             "response" -> m.response.right.get.toJson,
@@ -73,12 +72,12 @@ class CompletionMessageTests extends FlatSpec with Matchers {
     }
 
     it should "deserialize a left completion message" in {
-        val m = CompletionMessage(TransactionId.testing, Left(ActivationId()), "xyz")
+        val m = CompletionMessage(TransactionId.testing, Left(ActivationId()), InstanceId(0))
         CompletionMessage.parse(m.serialize) shouldBe Success(m)
     }
 
     it should "deserialize a right completion message" in {
-        val m = CompletionMessage(TransactionId.testing, Right(activation), "xyz")
+        val m = CompletionMessage(TransactionId.testing, Right(activation), InstanceId(0))
         CompletionMessage.parse(m.serialize) shouldBe Success(m)
     }
 }
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index eaa0278..371fad2 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -99,70 +99,74 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
     def timeout(actor: ActorRef) = actor ! FSM.StateTimeout
 
     /** Queries all invokers for their state */
-    def allStates(pool: ActorRef) = Await.result(pool.ask(GetStatus).mapTo[Map[String, InvokerState]], timeout.duration)
+    def allStates(pool: ActorRef) = Await.result(pool.ask(GetStatus).mapTo[IndexedSeq[(InstanceId, InvokerState)]], timeout.duration)
+
+    /** Helper to generate a list of (InstanceId, InvokerState) */
+    def zipWithInstance(list: IndexedSeq[InvokerState]) = list.zipWithIndex.map { case (state, index) => (InstanceId(index), state) }
 
     val pC = new TestConnector("pingFeedTtest", 4, false) {}
 
     behavior of "InvokerPool"
 
     it should "successfully create invokers in its pool on ping and keep track of statechanges" in {
-        val invoker0 = TestProbe()
-        val invoker1 = TestProbe()
-        val invoker0Name = invoker0.ref.path.name
-        val invoker1Name = invoker1.ref.path.name
+        val invoker5 = TestProbe()
+        val invoker2 = TestProbe()
+
+        val invoker5Instance = InstanceId(5)
+        val invoker2Instance = InstanceId(2)
 
-        val children = mutable.Queue(invoker0.ref, invoker1.ref)
-        val childFactory = (f: ActorRefFactory, name: String) => children.dequeue()
+        val children = mutable.Queue(invoker5.ref, invoker2.ref)
+        val childFactory = (f: ActorRefFactory, instance: InstanceId) => children.dequeue()
 
         val kv = stub[KeyValueStore]
-        val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
+        val sendActivationToInvoker = stubFunction[ActivationMessage, InstanceId, Future[RecordMetadata]]
 
-        val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC))
+        val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, sendActivationToInvoker, pC))
 
         within(timeout.duration) {
             // create first invoker
-            val ping0 = PingMessage(invoker0Name)
+            val ping0 = PingMessage(invoker5Instance)
             supervisor ! ping0
-            invoker0.expectMsgType[SubscribeTransitionCallBack] // subscribe to the actor
-            invoker0.expectMsg(ping0)
+            invoker5.expectMsgType[SubscribeTransitionCallBack] // subscribe to the actor
+            invoker5.expectMsg(ping0)
 
-            invoker0.send(supervisor, CurrentState(invoker0.ref, Healthy))
-            allStates(supervisor) shouldBe Map(invoker0Name -> Healthy)
+            invoker5.send(supervisor, CurrentState(invoker5.ref, Healthy))
+            allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Offline, Offline, Offline, Offline, Offline, Healthy))
 
             // create second invoker
-            val ping1 = PingMessage(invoker1Name)
+            val ping1 = PingMessage(invoker2Instance)
             supervisor ! ping1
-            invoker1.expectMsgType[SubscribeTransitionCallBack]
-            invoker1.expectMsg(ping1)
+            invoker2.expectMsgType[SubscribeTransitionCallBack]
+            invoker2.expectMsg(ping1)
 
-            invoker1.send(supervisor, CurrentState(invoker1.ref, Healthy))
-            allStates(supervisor) shouldBe Map(invoker0Name -> Healthy, invoker1Name -> Healthy)
+            invoker2.send(supervisor, CurrentState(invoker2.ref, Healthy))
+            allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Offline, Offline, Healthy, Offline, Offline, Healthy))
 
             // ping the first invoker again
             supervisor ! ping0
-            invoker0.expectMsg(ping0)
+            invoker5.expectMsg(ping0)
 
-            allStates(supervisor) shouldBe Map(invoker0Name -> Healthy, invoker1Name -> Healthy)
+            allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Offline, Offline, Healthy, Offline, Offline, Healthy))
 
             // one invoker goes offline
-            invoker1.send(supervisor, Transition(invoker1.ref, Healthy, Offline))
-            allStates(supervisor) shouldBe Map(invoker0Name -> Healthy, invoker1Name -> Offline)
+            invoker2.send(supervisor, Transition(invoker2.ref, Healthy, Offline))
+            allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Offline, Offline, Offline, Offline, Offline, Healthy))
         }
     }
 
     it should "publish state changes via kv and call the provided callback if an invoker goes offline" in {
         val invoker = TestProbe()
-        val invokerName = invoker.ref.path.name
-        val childFactory = (f: ActorRefFactory, name: String) => invoker.ref
+        val invokerInstance = InstanceId(0)
+        val invokerName = s"invoker${invokerInstance.toInt}"
+        val childFactory = (f: ActorRefFactory, instance: InstanceId) => invoker.ref
 
         val kv = stub[KeyValueStore]
-        val callback = stubFunction[String, Unit]
-        val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
-        val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, callback, sendActivationToInvoker, pC))
+        val sendActivationToInvoker = stubFunction[ActivationMessage, InstanceId, Future[RecordMetadata]]
+        val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, sendActivationToInvoker, pC))
 
         within(timeout.duration) {
             // create first invoker
-            val ping0 = PingMessage(invokerName)
+            val ping0 = PingMessage(invokerInstance)
             supervisor ! ping0
             invoker.expectMsgType[SubscribeTransitionCallBack] // subscribe to the actor
             invoker.expectMsg(ping0)
@@ -177,30 +181,30 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
 
         retry({
             (kv.put _).verify(LoadBalancerKeys.invokerHealth, *).repeated(3)
-            callback.verify(invokerName)
         }, N = 3, waitBeforeRetry = Some(500.milliseconds))
     }
 
     it should "forward the ActivationResult to the appropriate invoker" in {
         val invoker = TestProbe()
-        val invokerName = invoker.ref.path.name
-        val childFactory = (f: ActorRefFactory, name: String) => invoker.ref
+        val invokerInstance = InstanceId(0)
+        val invokerName = s"invoker${invokerInstance.toInt}"
+        val childFactory = (f: ActorRefFactory, instance: InstanceId) => invoker.ref
         val kv = stub[KeyValueStore]
-        val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
+        val sendActivationToInvoker = stubFunction[ActivationMessage, InstanceId, Future[RecordMetadata]]
 
-        val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC))
+        val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, sendActivationToInvoker, pC))
 
         within(timeout.duration) {
             // Create one invoker
-            val ping0 = PingMessage(invokerName)
+            val ping0 = PingMessage(invokerInstance)
             supervisor ! ping0
             invoker.expectMsgType[SubscribeTransitionCallBack] // subscribe to the actor
             invoker.expectMsg(ping0)
             invoker.send(supervisor, CurrentState(invoker.ref, Healthy))
-            allStates(supervisor) shouldBe Map(invokerName -> Healthy)
+            allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Healthy))
 
             // Send message and expect receive in invoker
-            val msg = InvocationFinishedMessage(invokerName, true)
+            val msg = InvocationFinishedMessage(invokerInstance, true)
             supervisor ! msg
             invoker.expectMsg(msg)
         }
@@ -208,13 +212,14 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
 
     it should "forward an ActivationMessage to the sendActivation-Method" in {
         val invoker = TestProbe()
-        val invokerName = invoker.ref.path.name
-        val childFactory = (f: ActorRefFactory, name: String) => invoker.ref
+        val invokerInstance = InstanceId(0)
+        val invokerName = s"invoker${invokerInstance.toInt}"
+        val childFactory = (f: ActorRefFactory, instance: InstanceId) => invoker.ref
 
         val kv = stub[KeyValueStore]
-        val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
+        val sendActivationToInvoker = stubFunction[ActivationMessage, InstanceId, Future[RecordMetadata]]
 
-        val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC))
+        val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, sendActivationToInvoker, pC))
 
         // Send ActivationMessage to InvokerPool
         val activationMessage = ActivationMessage(
@@ -226,14 +231,14 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
             activationNamespace = EntityPath("guest"),
             rootControllerIndex = InstanceId(0),
             content = None)
-        val msg = ActivationRequest(activationMessage, invokerName)
+        val msg = ActivationRequest(activationMessage, invokerInstance)
 
-        sendActivationToInvoker.when(activationMessage, invokerName).returns(Future.successful(new RecordMetadata(new TopicPartition(invokerName, 0), 0L, 0L, 0L, 0L, 0, 0)))
+        sendActivationToInvoker.when(activationMessage, invokerInstance).returns(Future.successful(new RecordMetadata(new TopicPartition(invokerName, 0), 0L, 0L, 0L, 0L, 0, 0)))
 
         supervisor ! msg
 
         // Verify, that MessageProducer will receive a call to send the message
-        retry(sendActivationToInvoker.verify(activationMessage, invokerName).once, N = 3, waitBeforeRetry = Some(500.milliseconds))
+        retry(sendActivationToInvoker.verify(activationMessage, invokerInstance).once, N = 3, waitBeforeRetry = Some(500.milliseconds))
     }
 
     behavior of "InvokerActor"
@@ -242,7 +247,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
     // offline -> unhealthy
     it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
         val pool = TestProbe()
-        val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0)))
+        val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0), InstanceId(0)))
 
         within(timeout.duration) {
             pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -250,7 +255,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
             timeout(invoker)
             pool.expectMsg(Transition(invoker, UnHealthy, Offline))
 
-            invoker ! PingMessage("testinvoker")
+            invoker ! PingMessage(InstanceId(0))
             pool.expectMsg(Transition(invoker, Offline, UnHealthy))
         }
     }
@@ -258,7 +263,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
     // unhealthy -> healthy
     it should "goto healthy again, if unhealthy and error buffer has enough successful invocations" in {
         val pool = TestProbe()
-        val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0)))
+        val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0), InstanceId(0)))
 
         within(timeout.duration) {
             pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -266,12 +271,12 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
 
             // Fill buffer with errors
             (1 to InvokerActor.bufferSize).foreach { _ =>
-                invoker ! InvocationFinishedMessage("testinvoker", false)
+                invoker ! InvocationFinishedMessage(InstanceId(0), false)
             }
 
             // Fill buffer with successful invocations to become healthy again (one below errorTolerance)
             (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
-                invoker ! InvocationFinishedMessage("testinvoker", true)
+                invoker ! InvocationFinishedMessage(InstanceId(0), true)
             }
             pool.expectMsg(Transition(invoker, UnHealthy, Healthy))
         }
@@ -281,7 +286,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
     // offline -> unhealthy
     it should "go offline when unhealthy, if the state times out and go unhealthy on a successful ping again" in {
         val pool = TestProbe()
-        val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0)))
+        val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0), InstanceId(0)))
 
         within(timeout.duration) {
             pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -290,20 +295,20 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
             timeout(invoker)
             pool.expectMsg(Transition(invoker, UnHealthy, Offline))
 
-            invoker ! PingMessage("testinvoker")
+            invoker ! PingMessage(InstanceId(0))
             pool.expectMsg(Transition(invoker, Offline, UnHealthy))
         }
     }
 
     it should "start timer to send testactions when unhealthy" in {
-        val invoker = TestFSMRef(new InvokerActor(InstanceId(0)))
+        val invoker = TestFSMRef(new InvokerActor(InstanceId(0), InstanceId(0)))
         invoker.stateName shouldBe UnHealthy
 
         invoker.isTimerActive(InvokerActor.timerName) shouldBe true
 
         // Fill buffer with successful invocations to become healthy again (one below errorTolerance)
         (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
-            invoker ! InvocationFinishedMessage("testinvoker", true)
+            invoker ! InvocationFinishedMessage(InstanceId(0), true)
         }
         invoker.stateName shouldBe Healthy
 
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
index e314abd..f2619bb 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
@@ -17,17 +17,18 @@
 
 package whisk.core.loadBalancer.test
 
-import org.scalatest.{FlatSpec, Matchers}
-import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
-import whisk.core.loadBalancer.{ActivationEntry, LoadBalancerData}
+import org.scalatest.{ FlatSpec, Matchers }
+import whisk.core.entity.{ ActivationId, UUID, WhiskActivation }
+import whisk.core.loadBalancer.{ ActivationEntry, LoadBalancerData }
 
-import scala.concurrent.{Promise}
+import scala.concurrent.{ Promise }
+import whisk.core.entity.InstanceId
 
 class LoadBalancerDataTests extends FlatSpec with Matchers {
 
     val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
-    val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), "invoker0", activationIdPromise)
-    val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), "invoker1", activationIdPromise)
+    val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(0), activationIdPromise)
+    val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise)
 
     behavior of "LoadBalancerData"
 
@@ -38,7 +39,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
 
         val result = loadBalancerData.activationCountByNamespace
         result shouldBe Map(firstEntry.namespaceId -> 1)
-        loadBalancerData.activationCountByInvoker("invoker0") shouldBe 1
+        loadBalancerData.activationCountByInvoker(firstEntry.invokerName) shouldBe 1
         loadBalancerData.activationById(firstEntry.id) shouldBe Some(firstEntry)
     }
 
@@ -52,8 +53,8 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
 
         result shouldBe Map(firstEntry.invokerName -> 1, secondEntry.invokerName -> 1)
 
-        loadBalancerData.activationCountByInvoker("invoker0") shouldBe 1
-        loadBalancerData.activationCountByInvoker("invoker1") shouldBe 1
+        loadBalancerData.activationCountByInvoker(firstEntry.invokerName) shouldBe 1
+        loadBalancerData.activationCountByInvoker(secondEntry.invokerName) shouldBe 1
         loadBalancerData.activationById(firstEntry.id) shouldBe Some(firstEntry)
         loadBalancerData.activationById(secondEntry.id) shouldBe Some(secondEntry)
     }
@@ -99,7 +100,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
 
     it should "respond with different values accordingly" in {
 
-        val entry = ActivationEntry(ActivationId(), UUID(), "invoker1", activationIdPromise)
+        val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise)
         val entrySameInvokerAndNamespace = entry.copy(id = ActivationId())
         val entrySameInvoker = entry.copy(id = ActivationId(), namespaceId = UUID())
 
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
index 9b8bafc..ec45ed8 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
@@ -22,6 +22,10 @@ import org.scalatest.junit.JUnitRunner
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import whisk.core.loadBalancer.LoadBalancerService
+import whisk.core.loadBalancer.Healthy
+import whisk.core.loadBalancer.Offline
+import whisk.core.loadBalancer.UnHealthy
+import whisk.core.entity.InstanceId
 
 /**
  * Unit tests for the ContainerPool object.
@@ -70,34 +74,62 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
 
     behavior of "chooseInvoker"
 
-    def invokers(n: Int) = (0 until n).map(i => s"invoker$i")
+    def invokers(n: Int) = (0 until n).map(i => (InstanceId(i), Healthy, 0))
     def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size)
 
     it should "return None on an empty invokers list" in {
-        LoadBalancerService.schedule(Seq(), Map(), 0, 1) shouldBe None
+        LoadBalancerService.schedule(IndexedSeq(), 0, 1) shouldBe None
+    }
+
+    it should "return None on a list of offline/unhealthy invokers" in {
+        val invs = IndexedSeq(
+            (InstanceId(0), Offline, 0),
+            (InstanceId(1), UnHealthy, 0))
+
+        LoadBalancerService.schedule(invs, 0, 1) shouldBe None
     }
 
     it should "schedule to the home invoker" in {
         val invs = invokers(10)
         val hash = 2
 
-        LoadBalancerService.schedule(invs, Map(), 1, hash) shouldBe Some(hashInto(invs, hash))
+        LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size))
+    }
+
+    it should "take the only online invoker" in {
+        LoadBalancerService.schedule(IndexedSeq(
+            (InstanceId(0), Offline, 0),
+            (InstanceId(1), UnHealthy, 0),
+            (InstanceId(2), Healthy, 0)), 0, 1) shouldBe Some(InstanceId(2))
+    }
+
+    it should "skip an offline/unhealthy invoker, even if its underloaded" in {
+        val hash = 0
+        val invs = IndexedSeq(
+            (InstanceId(0), Healthy, 10),
+            (InstanceId(1), UnHealthy, 0),
+            (InstanceId(2), Healthy, 0))
+
+        LoadBalancerService.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
     }
 
     it should "jump to the next invoker determined by a hashed stepsize if the home invoker is overloaded" in {
         val invokerCount = 10
-        val invs = invokers(invokerCount)
         val hash = 2
+        val targetInvoker = hash % invokerCount
 
-        val targetInvoker = hashInto(invs, hash)
+        val invs = invokers(invokerCount).updated(targetInvoker, (InstanceId(targetInvoker), Healthy, 1))
         val step = hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash)
 
-        LoadBalancerService.schedule(invs, Map(targetInvoker -> 1), 1, hash) shouldBe Some(hashInto(invs, hash + step))
+        LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size))
     }
 
     it should "wrap the search at the end of the invoker list" in {
         val invokerCount = 3
-        val invs = invokers(invokerCount)
+        val invs = IndexedSeq(
+            (InstanceId(0), Healthy, 1),
+            (InstanceId(1), Healthy, 1),
+            (InstanceId(2), Healthy, 0))
         val hash = 1
 
         val targetInvoker = hashInto(invs, hash) // will be invoker1
@@ -106,36 +138,41 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
 
         // invoker1 is overloaded so it will step (2 steps) to the next one --> 1 2 0 --> invoker0 is next target
         // invoker0 is overloaded so it will step to the next one --> 0 1 2 --> invoker2 is next target and underloaded
-        LoadBalancerService.schedule(
-            invs,
-            Map("invoker0" -> 1, "invoker1" -> 1),
-            1, hash) shouldBe Some(hashInto(invs, hash + step + step))
+        LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size))
     }
 
     it should "multiply its threshold in 3 iterations to find an invoker with a good warm-chance" in {
-        val invokerCount = 3
-        val invs = invokers(invokerCount)
+        val invs = IndexedSeq(
+            (InstanceId(0), Healthy, 33),
+            (InstanceId(1), Healthy, 36),
+            (InstanceId(2), Healthy, 33))
         val hash = 0 // home is 0, stepsize is 1
 
         // even though invoker1 is not the home invoker in this case, it gets chosen over
         // the others because it's the first one encountered by the iteration mechanism to be below
         // the threshold of 3 * 16 invocations
-        LoadBalancerService.schedule(
-            invs,
-            Map("invoker0" -> 33, "invoker1" -> 36, "invoker2" -> 33),
-            16,
-            hash) shouldBe Some("invoker0")
+        LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
     }
 
     it should "choose the home invoker if all invokers are overloaded even above the muliplied threshold" in {
-        val invokerCount = 3
-        val invs = invokers(invokerCount)
+        val invs = IndexedSeq(
+            (InstanceId(0), Healthy, 51),
+            (InstanceId(1), Healthy, 50),
+            (InstanceId(2), Healthy, 49))
         val hash = 0 // home is 0, stepsize is 1
 
-        LoadBalancerService.schedule(
-            invs,
-            Map("invoker0" -> 51, "invoker1" -> 50, "invoker2" -> 49),
-            16,
-            hash) shouldBe Some("invoker0")
+        LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
+    }
+
+    it should "transparently work with partitioned sets of invokers" in {
+        val invs = IndexedSeq(
+            (InstanceId(3), Healthy, 0),
+            (InstanceId(4), Healthy, 0),
+            (InstanceId(5), Healthy, 0))
+
+        LoadBalancerService.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
+        LoadBalancerService.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
+        LoadBalancerService.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
+        LoadBalancerService.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
     }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Mime
View raw message