openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rab...@apache.org
Subject [incubator-openwhisk] branch master updated: Fix several loadbalancer bugs. (#3451)
Date Wed, 21 Mar 2018 12:32:14 GMT
This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a72ee3e  Fix several loadbalancer bugs. (#3451)
a72ee3e is described below

commit a72ee3e3b0f300dd71fc87fc4ad2be83b2acf8bd
Author: Christian Bickel <github@cbickel.de>
AuthorDate: Wed Mar 21 13:32:10 2018 +0100

    Fix several loadbalancer bugs. (#3451)
    
    1. `padTo` (used when generating the list of semaphores for all invokers) does not recompute
the value each time it adds an element to the underlying list. That resulted in **all** invokers-state
being backed by the same Semaphore and thus not doing anything properly really.
    
    2. The stepsizes need to be calculated for the list of invokers they are used to step
to. Therefore we need a seperate list of step-sizes for the managed and the blackbox invoker
list.
    
    3. The assumption should be, that we freed our resources (and thus updated the state)
**before** we return the request to the user. A minor change in code order is warranted here.
    
    4. In the overload case we need to take the index we want to schedule to out of the `healthyInvokers`
list, rather than using the `random` value directly. The `healthyInvokers` list might be shorter/in
a different order than the list of all underlying invokers.
    
    Furthermore, this adds a useful metric to determine (from the loadbalancers point-of-view)
how many activations are currently running in the system. That can be used to determine overall
system utilization.
    
    Co-authored-by: Markus Thömmes <markusthoemmes@me.com>
---
 .../src/main/scala/whisk/common/Logging.scala      | 11 ++--
 .../ShardingContainerPoolBalancer.scala            | 59 ++++++++++++++--------
 .../test/ShardingContainerPoolBalancerTests.scala  | 11 ++--
 3 files changed, 51 insertions(+), 30 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 1780211..0e21fe3 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -18,15 +18,13 @@
 package whisk.common
 
 import java.io.PrintStream
-import java.time.Clock
-import java.time.Instant
-import java.time.ZoneId
+import java.time.{Clock, Instant, ZoneId}
 import java.time.format.DateTimeFormatter
 
-import akka.event.Logging.{DebugLevel, ErrorLevel, InfoLevel, WarningLevel}
-import akka.event.Logging.LogLevel
+import akka.event.Logging._
 import akka.event.LoggingAdapter
 import kamon.Kamon
+import whisk.core.entity.InstanceId
 
 trait Logging {
 
@@ -279,6 +277,9 @@ object LoggingMarkers {
   val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count)
   val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer, "activations", count)
 
+  def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: InstanceId) =
+    LogMarkerToken(loadbalancer + controllerInstance.toInt, "activationsInflight", count)
+
   // Time that is needed to execute the action
   val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
 
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 19b959a..1f5f398 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -18,8 +18,8 @@
 package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.atomic.LongAdder
 import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.atomic.LongAdder
 
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.cluster.ClusterEvent._
@@ -28,6 +28,7 @@ import akka.event.Logging.InfoLevel
 import akka.stream.ActorMaterializer
 import org.apache.kafka.clients.producer.RecordMetadata
 import pureconfig._
+import whisk.common.LoggingMarkers._
 import whisk.common._
 import whisk.core.WhiskConfig._
 import whisk.core.connector._
@@ -71,6 +72,10 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance:
Ins
   /** State needed for scheduling. */
   private val schedulingState = ShardingContainerPoolBalancerState()()
 
+  actorSystem.scheduler.schedule(0.seconds, 10.seconds) {
+    MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance),
totalActivations.longValue)
+  }
+
   /**
    * Monitors invoker supervision and the cluster to update the state sequentially
    *
@@ -123,11 +128,13 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance:
Ins
   override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
     implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
= {
 
-    val invokersToUse = if (!action.exec.pull) schedulingState.managedInvokers else schedulingState.blackboxInvokers
+    val (invokersToUse, stepSizes) =
+      if (!action.exec.pull) (schedulingState.managedInvokers, schedulingState.managedStepSizes)
+      else (schedulingState.blackboxInvokers, schedulingState.blackboxStepSizes)
     val chosen = if (invokersToUse.nonEmpty) {
       val hash = ShardingContainerPoolBalancer.generateHash(msg.user.namespace, action.fullyQualifiedName(false))
       val homeInvoker = hash % invokersToUse.size
-      val stepSize = schedulingState.stepSizes(hash % schedulingState.stepSizes.size)
+      val stepSize = stepSizes(hash % stepSizes.size)
       ShardingContainerPoolBalancer.schedule(invokersToUse, schedulingState.invokerSlots,
homeInvoker, stepSize)
     } else {
       None
@@ -247,6 +254,10 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance:
Ins
 
     activations.remove(aid) match {
       case Some(entry) =>
+        totalActivations.decrement()
+        activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+        schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
+
         if (!forced) {
           entry.timeoutHandler.cancel()
           entry.promise.trySuccess(response)
@@ -254,10 +265,6 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance:
Ins
           entry.promise.tryFailure(new Throwable("no active ack received"))
         }
 
-        totalActivations.decrement()
-        activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
-        schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
-
         logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
         // Active acks that are received here are strictly from user actions - health actions
are not part of
         // the load balancer's activation map. Inform the invoker pool supervisor of the
user action completion.
@@ -317,7 +324,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
                dispatched: IndexedSeq[ForcableSemaphore],
                index: Int,
                step: Int,
-               stepsDone: Int = 0): Option[InstanceId] = {
+               stepsDone: Int = 0)(implicit logging: Logging): Option[InstanceId] = {
     val numInvokers = invokers.size
 
     if (numInvokers > 0) {
@@ -331,9 +338,10 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
           val healthyInvokers = invokers.filter(_.status == Healthy)
           if (healthyInvokers.nonEmpty) {
             // Choose a healthy invoker randomly
-            val random = ThreadLocalRandom.current().nextInt(healthyInvokers.size)
-            dispatched(random).forceAcquire()
-            Some(healthyInvokers(random).id)
+            val random = healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id
+            dispatched(random.toInt).forceAcquire()
+            logging.warn(this, s"system is overloaded. Chose invoker${random.toInt} by random
assignment.")
+            Some(random)
           } else {
             None
           }
@@ -354,14 +362,16 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
  * @param _invokers all of the known invokers in the system
  * @param _managedInvokers all invokers for managed runtimes
  * @param _blackboxInvokers all invokers for blackbox runtimes
- * @param _stepSizes the step-sizes possible for the current invoker count
+ * @param _managedStepSizes the step-sizes possible for the current managed invoker count
+ * @param _blackboxStepSizes the step-sizes possible for the current blackbox invoker count
  * @param _invokerSlots state of accessible slots of each invoker
  */
 case class ShardingContainerPoolBalancerState(
   private var _invokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
   private var _managedInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
   private var _blackboxInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
-  private var _stepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+  private var _managedStepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+  private var _blackboxStepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
   private var _invokerSlots: IndexedSeq[ForcableSemaphore] = IndexedSeq.empty[ForcableSemaphore],
   private var _clusterSize: Int = 1)(
   lbConfig: ShardingContainerPoolBalancerConfig =
@@ -377,7 +387,8 @@ case class ShardingContainerPoolBalancerState(
   def invokers: IndexedSeq[InvokerHealth] = _invokers
   def managedInvokers: IndexedSeq[InvokerHealth] = _managedInvokers
   def blackboxInvokers: IndexedSeq[InvokerHealth] = _blackboxInvokers
-  def stepSizes: Seq[Int] = _stepSizes
+  def managedStepSizes: Seq[Int] = _managedStepSizes
+  def blackboxStepSizes: Seq[Int] = _blackboxStepSizes
   def invokerSlots: IndexedSeq[ForcableSemaphore] = _invokerSlots
   def clusterSize: Int = _clusterSize
 
@@ -396,14 +407,6 @@ case class ShardingContainerPoolBalancerState(
     val oldSize = _invokers.size
     val newSize = newInvokers.size
 
-    if (oldSize != newSize) {
-      _stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(newSize)
-      if (oldSize < newSize) {
-        // Keeps the existing state..
-        _invokerSlots = _invokerSlots.padTo(newSize, new ForcableSemaphore(currentInvokerThreshold))
-      }
-    }
-
     val blackboxes = Math.max(1, (newSize.toDouble * blackboxFraction).toInt)
     val managed = Math.max(1, newSize - blackboxes)
 
@@ -411,6 +414,18 @@ case class ShardingContainerPoolBalancerState(
     _blackboxInvokers = _invokers.takeRight(blackboxes)
     _managedInvokers = _invokers.take(managed)
 
+    if (oldSize != newSize) {
+      _managedStepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed)
+      _blackboxStepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes)
+
+      if (oldSize < newSize) {
+        // Keeps the existing state..
+        _invokerSlots = _invokerSlots ++ IndexedSeq.fill(newSize - oldSize) {
+          new ForcableSemaphore(currentInvokerThreshold)
+        }
+      }
+    }
+
     logging.info(
       this,
       s"loadbalancer invoker status updated. managedInvokers = $managed blackboxInvokers
= $blackboxes")(
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index ca8442f..50201a6 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -50,7 +50,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers
with Str
     state.blackboxInvokers shouldBe 'empty
     state.managedInvokers shouldBe 'empty
     state.invokerSlots shouldBe 'empty
-    state.stepSizes shouldBe Seq()
+    state.managedStepSizes shouldBe Seq()
+    state.blackboxStepSizes shouldBe Seq()
 
     // apply one update, verify everything is updated accordingly
     val update1 = IndexedSeq(healthy(0))
@@ -59,8 +60,10 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers
with Str
     state.invokers shouldBe update1
     state.blackboxInvokers shouldBe update1 // fallback to at least one
     state.managedInvokers shouldBe update1 // fallback to at least one
+    state.invokerSlots should have size update1.size
     state.invokerSlots.head.availablePermits shouldBe slots
-    state.stepSizes shouldBe Seq(1)
+    state.managedStepSizes shouldBe Seq(1)
+    state.blackboxStepSizes shouldBe Seq(1)
 
     // aquire a slot to alter invoker state
     state.invokerSlots.head.tryAcquire()
@@ -73,9 +76,11 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers
with Str
     state.invokers shouldBe update2
     state.managedInvokers shouldBe IndexedSeq(update2.head)
     state.blackboxInvokers shouldBe IndexedSeq(update2.last)
+    state.invokerSlots should have size update2.size
     state.invokerSlots.head.availablePermits shouldBe slots - 1
     state.invokerSlots(1).availablePermits shouldBe slots
-    state.stepSizes shouldBe Seq(1)
+    state.managedStepSizes shouldBe Seq(1)
+    state.blackboxStepSizes shouldBe Seq(1)
   }
 
   it should "update the cluster size, adjusting the invoker slots accordingly" in {

-- 
To stop receiving notification emails like this one, please contact
rabbah@apache.org.

Mime
View raw message