From commits-return-3964-archive-asf-public=cust-asf.ponee.io@openwhisk.apache.org Wed Mar 21 13:32:17 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7106E180651 for ; Wed, 21 Mar 2018 13:32:16 +0100 (CET) Received: (qmail 91422 invoked by uid 500); 21 Mar 2018 12:32:15 -0000 Mailing-List: contact commits-help@openwhisk.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@openwhisk.apache.org Delivered-To: mailing list commits@openwhisk.apache.org Received: (qmail 91413 invoked by uid 99); 21 Mar 2018 12:32:15 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Mar 2018 12:32:15 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id BF15D80647; Wed, 21 Mar 2018 12:32:14 +0000 (UTC) Date: Wed, 21 Mar 2018 12:32:14 +0000 To: "commits@openwhisk.apache.org" Subject: [incubator-openwhisk] branch master updated: Fix several loadbalancer bugs. (#3451) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152163553446.2027.14122799809680114407@gitbox.apache.org> From: rabbah@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-openwhisk X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: c127f62c6ab5b2e9dc4fc3722ff50e94bb4dfcc1 X-Git-Newrev: a72ee3e3b0f300dd71fc87fc4ad2be83b2acf8bd X-Git-Rev: a72ee3e3b0f300dd71fc87fc4ad2be83b2acf8bd X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git The following commit(s) were added to refs/heads/master by this push: new a72ee3e Fix several loadbalancer bugs. (#3451) a72ee3e is described below commit a72ee3e3b0f300dd71fc87fc4ad2be83b2acf8bd Author: Christian Bickel AuthorDate: Wed Mar 21 13:32:10 2018 +0100 Fix several loadbalancer bugs. (#3451) 1. `padTo` (used when generating the list of semaphores for all invokers) does not recompute the value each time it adds an element to the underlying list. That resulted in **all** invokers-state being backed by the same Semaphore and thus not doing anything properly really. 2. The stepsizes need to be calculated for the list of invokers they are used to step to. Therefore we need a seperate list of step-sizes for the managed and the blackbox invoker list. 3. The assumption should be, that we freed our resources (and thus updated the state) **before** we return the request to the user. A minor change in code order is warranted here. 4. In the overload case we need to take the index we want to schedule to out of the `healthyInvokers` list, rather than using the `random` value directly. The `healthyInvokers` list might be shorter/in a different order than the list of all underlying invokers. Furthermore, this adds a useful metric to determine (from the loadbalancers point-of-view) how many activations are currently running in the system. That can be used to determine overall system utilization. Co-authored-by: Markus Thömmes --- .../src/main/scala/whisk/common/Logging.scala | 11 ++-- .../ShardingContainerPoolBalancer.scala | 59 ++++++++++++++-------- .../test/ShardingContainerPoolBalancerTests.scala | 11 ++-- 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala index 1780211..0e21fe3 100644 --- a/common/scala/src/main/scala/whisk/common/Logging.scala +++ b/common/scala/src/main/scala/whisk/common/Logging.scala @@ -18,15 +18,13 @@ package whisk.common import java.io.PrintStream -import java.time.Clock -import java.time.Instant -import java.time.ZoneId +import java.time.{Clock, Instant, ZoneId} import java.time.format.DateTimeFormatter -import akka.event.Logging.{DebugLevel, ErrorLevel, InfoLevel, WarningLevel} -import akka.event.Logging.LogLevel +import akka.event.Logging._ import akka.event.LoggingAdapter import kamon.Kamon +import whisk.core.entity.InstanceId trait Logging { @@ -279,6 +277,9 @@ object LoggingMarkers { val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count) val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer, "activations", count) + def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: InstanceId) = + LogMarkerToken(loadbalancer + controllerInstance.toInt, "activationsInflight", count) + // Time that is needed to execute the action val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start) diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 19b959a..1f5f398 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -18,8 +18,8 @@ package whisk.core.loadBalancer import java.nio.charset.StandardCharsets -import java.util.concurrent.atomic.LongAdder import java.util.concurrent.ThreadLocalRandom +import java.util.concurrent.atomic.LongAdder import akka.actor.{Actor, ActorSystem, Props} import akka.cluster.ClusterEvent._ @@ -28,6 +28,7 @@ import akka.event.Logging.InfoLevel import akka.stream.ActorMaterializer import org.apache.kafka.clients.producer.RecordMetadata import pureconfig._ +import whisk.common.LoggingMarkers._ import whisk.common._ import whisk.core.WhiskConfig._ import whisk.core.connector._ @@ -71,6 +72,10 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins /** State needed for scheduling. */ private val schedulingState = ShardingContainerPoolBalancerState()() + actorSystem.scheduler.schedule(0.seconds, 10.seconds) { + MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue) + } + /** * Monitors invoker supervision and the cluster to update the state sequentially * @@ -123,11 +128,13 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = { - val invokersToUse = if (!action.exec.pull) schedulingState.managedInvokers else schedulingState.blackboxInvokers + val (invokersToUse, stepSizes) = + if (!action.exec.pull) (schedulingState.managedInvokers, schedulingState.managedStepSizes) + else (schedulingState.blackboxInvokers, schedulingState.blackboxStepSizes) val chosen = if (invokersToUse.nonEmpty) { val hash = ShardingContainerPoolBalancer.generateHash(msg.user.namespace, action.fullyQualifiedName(false)) val homeInvoker = hash % invokersToUse.size - val stepSize = schedulingState.stepSizes(hash % schedulingState.stepSizes.size) + val stepSize = stepSizes(hash % stepSizes.size) ShardingContainerPoolBalancer.schedule(invokersToUse, schedulingState.invokerSlots, homeInvoker, stepSize) } else { None @@ -247,6 +254,10 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins activations.remove(aid) match { case Some(entry) => + totalActivations.decrement() + activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement()) + schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release()) + if (!forced) { entry.timeoutHandler.cancel() entry.promise.trySuccess(response) @@ -254,10 +265,6 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins entry.promise.tryFailure(new Throwable("no active ack received")) } - totalActivations.decrement() - activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement()) - schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release()) - logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid) // Active acks that are received here are strictly from user actions - health actions are not part of // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion. @@ -317,7 +324,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { dispatched: IndexedSeq[ForcableSemaphore], index: Int, step: Int, - stepsDone: Int = 0): Option[InstanceId] = { + stepsDone: Int = 0)(implicit logging: Logging): Option[InstanceId] = { val numInvokers = invokers.size if (numInvokers > 0) { @@ -331,9 +338,10 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { val healthyInvokers = invokers.filter(_.status == Healthy) if (healthyInvokers.nonEmpty) { // Choose a healthy invoker randomly - val random = ThreadLocalRandom.current().nextInt(healthyInvokers.size) - dispatched(random).forceAcquire() - Some(healthyInvokers(random).id) + val random = healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id + dispatched(random.toInt).forceAcquire() + logging.warn(this, s"system is overloaded. Chose invoker${random.toInt} by random assignment.") + Some(random) } else { None } @@ -354,14 +362,16 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { * @param _invokers all of the known invokers in the system * @param _managedInvokers all invokers for managed runtimes * @param _blackboxInvokers all invokers for blackbox runtimes - * @param _stepSizes the step-sizes possible for the current invoker count + * @param _managedStepSizes the step-sizes possible for the current managed invoker count + * @param _blackboxStepSizes the step-sizes possible for the current blackbox invoker count * @param _invokerSlots state of accessible slots of each invoker */ case class ShardingContainerPoolBalancerState( private var _invokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth], private var _managedInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth], private var _blackboxInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth], - private var _stepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0), + private var _managedStepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0), + private var _blackboxStepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0), private var _invokerSlots: IndexedSeq[ForcableSemaphore] = IndexedSeq.empty[ForcableSemaphore], private var _clusterSize: Int = 1)( lbConfig: ShardingContainerPoolBalancerConfig = @@ -377,7 +387,8 @@ case class ShardingContainerPoolBalancerState( def invokers: IndexedSeq[InvokerHealth] = _invokers def managedInvokers: IndexedSeq[InvokerHealth] = _managedInvokers def blackboxInvokers: IndexedSeq[InvokerHealth] = _blackboxInvokers - def stepSizes: Seq[Int] = _stepSizes + def managedStepSizes: Seq[Int] = _managedStepSizes + def blackboxStepSizes: Seq[Int] = _blackboxStepSizes def invokerSlots: IndexedSeq[ForcableSemaphore] = _invokerSlots def clusterSize: Int = _clusterSize @@ -396,14 +407,6 @@ case class ShardingContainerPoolBalancerState( val oldSize = _invokers.size val newSize = newInvokers.size - if (oldSize != newSize) { - _stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(newSize) - if (oldSize < newSize) { - // Keeps the existing state.. - _invokerSlots = _invokerSlots.padTo(newSize, new ForcableSemaphore(currentInvokerThreshold)) - } - } - val blackboxes = Math.max(1, (newSize.toDouble * blackboxFraction).toInt) val managed = Math.max(1, newSize - blackboxes) @@ -411,6 +414,18 @@ case class ShardingContainerPoolBalancerState( _blackboxInvokers = _invokers.takeRight(blackboxes) _managedInvokers = _invokers.take(managed) + if (oldSize != newSize) { + _managedStepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed) + _blackboxStepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes) + + if (oldSize < newSize) { + // Keeps the existing state.. + _invokerSlots = _invokerSlots ++ IndexedSeq.fill(newSize - oldSize) { + new ForcableSemaphore(currentInvokerThreshold) + } + } + } + logging.info( this, s"loadbalancer invoker status updated. managedInvokers = $managed blackboxInvokers = $blackboxes")( diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala index ca8442f..50201a6 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala @@ -50,7 +50,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str state.blackboxInvokers shouldBe 'empty state.managedInvokers shouldBe 'empty state.invokerSlots shouldBe 'empty - state.stepSizes shouldBe Seq() + state.managedStepSizes shouldBe Seq() + state.blackboxStepSizes shouldBe Seq() // apply one update, verify everything is updated accordingly val update1 = IndexedSeq(healthy(0)) @@ -59,8 +60,10 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str state.invokers shouldBe update1 state.blackboxInvokers shouldBe update1 // fallback to at least one state.managedInvokers shouldBe update1 // fallback to at least one + state.invokerSlots should have size update1.size state.invokerSlots.head.availablePermits shouldBe slots - state.stepSizes shouldBe Seq(1) + state.managedStepSizes shouldBe Seq(1) + state.blackboxStepSizes shouldBe Seq(1) // aquire a slot to alter invoker state state.invokerSlots.head.tryAcquire() @@ -73,9 +76,11 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str state.invokers shouldBe update2 state.managedInvokers shouldBe IndexedSeq(update2.head) state.blackboxInvokers shouldBe IndexedSeq(update2.last) + state.invokerSlots should have size update2.size state.invokerSlots.head.availablePermits shouldBe slots - 1 state.invokerSlots(1).availablePermits shouldBe slots - state.stepSizes shouldBe Seq(1) + state.managedStepSizes shouldBe Seq(1) + state.blackboxStepSizes shouldBe Seq(1) } it should "update the cluster size, adjusting the invoker slots accordingly" in { -- To stop receiving notification emails like this one, please contact rabbah@apache.org.