openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cbic...@apache.org
Subject [incubator-openwhisk] branch master updated: Share bookkeeping data across controllers (#2531)
Date Fri, 29 Sep 2017 12:47:49 GMT
This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 363d571  Share bookkeeping data across controllers (#2531)
363d571 is described below

commit 363d5714daf4cc87b5be220013d9edab65ffcfbc
Author: Vadim Raskin <raskinvadim@gmail.com>
AuthorDate: Fri Sep 29 14:47:47 2017 +0200

    Share bookkeeping data across controllers (#2531)
    
    * Use akka distributed map to store the shared state
    
    * Join seed nodes in the load balancer service
    
    * Add optional auto-down-unreachable-after
    
    * Local bookkeeping is used by default
    
    * Update documentation
---
 ansible/group_vars/all                             |   9 +
 ansible/roles/controller/tasks/deploy.yml          |  15 +-
 common/scala/build.gradle                          |   5 +-
 .../scala/src/main/scala/whisk/common/Config.scala |  10 +
 .../src/main/scala/whisk/core/WhiskConfig.scala    |  10 +-
 core/controller/build.gradle                       |   1 +
 .../controller/src/main/resources/application.conf |  27 ++
 .../scala/whisk/core/controller/Controller.scala   |   4 +-
 .../core/entitlement/ActivationThrottler.scala     |  31 ++-
 .../scala/whisk/core/entitlement/Entitlement.scala |  31 ++-
 .../loadBalancer/DistributedLoadBalancerData.scala |  90 +++++++
 .../whisk/core/loadBalancer/LoadBalancerData.scala |  65 +----
 .../core/loadBalancer/LoadBalancerService.scala    |  52 ++--
 .../core/loadBalancer/LocalLoadBalancerData.scala  |  76 ++++++
 .../core/loadBalancer/SeedNodesProvider.scala      |  41 +++
 .../core/loadBalancer/SharedDataService.scala      |  98 +++++++
 docs/README.md                                     |   1 +
 docs/deploy.md                                     |  16 ++
 tests/build.gradle                                 |   1 +
 .../controller/test/ControllerTestCommon.scala     |   9 +-
 .../loadBalancer/test/LoadBalancerDataTests.scala  | 292 ++++++++++++++-------
 .../loadBalancer/test/SeedNodesProviderTest.scala  |  58 ++++
 .../loadBalancer/test/SharedDataServiceTests.scala |  91 +++++++
 23 files changed, 825 insertions(+), 208 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index c288dbf..88ce571 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -108,6 +108,15 @@ controller:
   arguments: "{{ controller_arguments | default('') }}"
   blackboxFraction: "{{ controller_blackbox_fraction | default(0.10) }}"
   instances: "{{ groups['controllers'] | length }}"
+  localBookkeeping: "{{ controller_local_bookkeeping | default('true') }}"
+  akka:
+    provider: cluster
+    cluster:
+      basePort: 8000
+      host: "{{ groups['controllers'] | map('extract', hostvars, 'ansible_host') | list }}"
+      bindPort: 2551
+      # at this moment all controllers are seed nodes
+      seedNodes: "{{ groups['controllers'] | map('extract', hostvars, 'ansible_host') | list }}"
 
 registry:
   confdir: "{{ config_root_dir }}/registry"
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 1b791ef..a805d75 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -16,6 +16,12 @@
     mode: 0777
   become: "{{ logs.dir.become }}"
 
+- name: create seed nodes list
+  set_fact:
+    seed_nodes_list: "{{ seed_nodes_list | default([]) }} + [ \"{{item.1}}:{{controller.akka.cluster.basePort+item.0}}\" ]"
+  with_indexed_items:
+  - "{{ controller.akka.cluster.seedNodes }}"
+
 - name: (re)start controller
   docker_container:
     name: controller{{ groups['controllers'].index(inventory_hostname) }}
@@ -58,11 +64,18 @@
       "LOADBALANCER_INVOKERBUSYTHRESHOLD": "{{ invoker.busyThreshold }}"
 
       "RUNTIMES_MANIFEST": "{{ runtimesManifest | to_json }}"
+      "CONTROLLER_LOCALBOOKKEEPING": "{{ controller.localBookkeeping }}"
+      "AKKA_CLUSTER_PORT": "{{ controller.akka.cluster.basePort + groups['controllers'].index(inventory_hostname) }}"
+      "AKKA_CLUSTER_HOST": "{{ controller.akka.cluster.host[groups['controllers'].index(inventory_hostname)] }}"
+      "AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
+      "AKKA_CLUSTER_BIND_PORT": "{{ controller.akka.cluster.bindPort }}"
+      "AKKA_ACTOR_PROVIDER": "{{ controller.akka.provider }}"
     volumes:
       - "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}:/logs"
     ports:
       - "{{ controller.basePort + groups['controllers'].index(inventory_hostname) }}:8080"
-    command: /bin/sh -c "controller/bin/controller {{ groups['controllers'].index(inventory_hostname) }} >> /logs/controller{{ groups['controllers'].index(inventory_hostname) }}_logs.log 2>&1"
+      - "{{ controller.akka.cluster.basePort + groups['controllers'].index(inventory_hostname) }}:{{ controller.akka.cluster.bindPort }}"
+    command: /bin/sh -c "export CONTAINER_IP=$(hostname -I); controller/bin/controller {{ groups['controllers'].index(inventory_hostname) }} >> /logs/controller{{ groups['controllers'].index(inventory_hostname) }}_logs.log 2>&1"
 
 - name: wait until the Controller in this host is up and running
   uri:
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 03ef713..066f835 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -13,8 +13,9 @@ dependencies {
 
     compile 'io.spray:spray-json_2.11:1.3.3'
 
-    compile 'com.typesafe.akka:akka-actor_2.11:2.4.16'
-    compile 'com.typesafe.akka:akka-slf4j_2.11:2.4.16'
+    compile 'com.typesafe.akka:akka-actor_2.11:2.5.4'
+    compile 'com.typesafe.akka:akka-stream_2.11:2.5.4'
+    compile 'com.typesafe.akka:akka-slf4j_2.11:2.5.4'
     compile 'com.typesafe.akka:akka-http-core_2.11:10.0.10'
     compile 'com.typesafe.akka:akka-http-spray-json_2.11:10.0.10'
 
diff --git a/common/scala/src/main/scala/whisk/common/Config.scala b/common/scala/src/main/scala/whisk/common/Config.scala
index 9078822..128c224 100644
--- a/common/scala/src/main/scala/whisk/common/Config.scala
+++ b/common/scala/src/main/scala/whisk/common/Config.scala
@@ -95,6 +95,16 @@ class Config(requiredProperties: Map[String, String], optionalProperties: Set[St
   }
 
   /**
+   * Returns the value of a given key parsed as a boolean.
+   * If parsing fails, return the default value.
+   *
+   * @param key the property that has to be returned.
+   */
+  def getAsBoolean(key: String, defaultValue: Boolean): Boolean = {
+    Try(getProperty(key).toBoolean).getOrElse(defaultValue)
+  }
+
+  /**
    * Converts the set of property to a string for debugging.
    */
   def mkString: String = settings.mkString("\n")
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 051c883..254181b 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -20,9 +20,7 @@ package whisk.core
 import java.io.File
 
 import scala.io.Source
-
-import whisk.common.Config
-import whisk.common.Logging
+import whisk.common.{Config, Logging}
 
 /**
  * A set of properties which might be needed to run a whisk microservice implemented
@@ -95,12 +93,14 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint)
 
   val runtimesManifest = this(WhiskConfig.runtimesManifest)
-
   val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
   val actionInvokeConcurrentLimit = this(WhiskConfig.actionInvokeConcurrentLimit)
   val triggerFirePerMinuteLimit = this(WhiskConfig.triggerFirePerMinuteLimit)
   val actionInvokeSystemOverloadLimit = this(WhiskConfig.actionInvokeSystemOverloadLimit)
   val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
+  val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)
+  val controllerLocalBookkeeping = getAsBoolean(WhiskConfig.controllerLocalBookkeeping, false)
+
 }
 
 object WhiskConfig {
@@ -221,4 +221,6 @@ object WhiskConfig {
   val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent"
   val actionInvokeSystemOverloadLimit = "limits.actions.invokes.concurrentInSystem"
   val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
+  val controllerSeedNodes = "akka.cluster.seed.nodes"
+  val controllerLocalBookkeeping = "controller.localBookkeeping"
 }
diff --git a/core/controller/build.gradle b/core/controller/build.gradle
index f9d7726..a1b2322 100644
--- a/core/controller/build.gradle
+++ b/core/controller/build.gradle
@@ -11,6 +11,7 @@ repositories {
 }
 
 dependencies {
+    compile 'com.typesafe.akka:akka-distributed-data_2.11:2.5.4'
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile project(':common:scala')
 }
diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf
index d783df4..d32150c 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -55,3 +55,30 @@ akka.http {
     }
   }
 }
+
+# Check out all akka-remote-2.5.4 options here:
+# http://doc.akka.io/docs/akka/2.5.4/scala/general/configuration.html#config-akka-remote
+akka {
+  actor {
+    provider = ${?AKKA_ACTOR_PROVIDER}
+  }
+  remote {
+    log-remote-lifecycle-events = DEBUG
+    log-received-messages = on
+    log-sent-messages = on
+
+    netty.tcp {
+      hostname = ${?AKKA_CLUSTER_HOST}
+      port = ${?AKKA_CLUSTER_PORT}
+      bind-port = ${?AKKA_CLUSTER_BIND_PORT}
+      bind-hostname = ${?CONTAINER_IP}
+    }
+  }
+  cluster {
+    # Disable legacy metrics in akka-cluster.
+    metrics.enabled=off
+
+    auto-down-unreachable-after = ${?AUTO_DOWN_UNREACHABLE_AFTER}
+    #distributed-data.notify-subscribers-interval = 0.01
+  }
+}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 377ae22..fe4c75d 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -20,8 +20,6 @@ package whisk.core.controller
 import scala.concurrent.Await
 import scala.concurrent.duration.DurationInt
 import scala.util.{Failure, Success}
-
-import akka.actor._
 import akka.actor.ActorSystem
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.model.Uri
@@ -41,7 +39,7 @@ import whisk.core.entitlement._
 import whisk.core.entity._
 import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity.ExecManifest.Runtimes
-import whisk.core.loadBalancer.LoadBalancerService
+import whisk.core.loadBalancer.{LoadBalancerService}
 import whisk.http.BasicHttpService
 import whisk.http.BasicRasService
 
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 6afeb91..6256af2 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -23,6 +23,8 @@ import whisk.core.entity.Identity
 import whisk.core.loadBalancer.LoadBalancer
 import whisk.http.Messages
 
+import scala.concurrent.{ExecutionContext, Future}
+
 /**
  * Determines user limits and activation counts as seen by the invoker and the loadbalancer
  * in a scheduled, repeating task for other services to get the cached information to be able
@@ -34,7 +36,8 @@ import whisk.http.Messages
  * @param systemOverloadLimit the limit when the system is considered overloaded
  */
 class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: Int, systemOverloadLimit: Int)(
-  implicit logging: Logging) {
+  implicit logging: Logging,
+  executionContext: ExecutionContext) {
 
   logging.info(this, s"concurrencyLimit = $defaultConcurrencyLimit, systemOverloadLimit = $systemOverloadLimit")(
     TransactionId.controller)
@@ -42,22 +45,26 @@ class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: I
   /**
    * Checks whether the operation should be allowed to proceed.
    */
-  def check(user: Identity)(implicit tid: TransactionId): RateLimit = {
-    val concurrentActivations = loadBalancer.activeActivationsFor(user.uuid)
-    val concurrencyLimit = user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit)
-    logging.info(
-      this,
-      s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, limit = $concurrencyLimit")
-    ConcurrentRateLimit(concurrentActivations, concurrencyLimit)
+  def check(user: Identity)(implicit tid: TransactionId): Future[RateLimit] = {
+    loadBalancer.activeActivationsFor(user.uuid).map { concurrentActivations =>
+      val concurrencyLimit = user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit)
+      logging.info(
+        this,
+        s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit")
+      ConcurrentRateLimit(concurrentActivations, concurrencyLimit)
+    }
   }
 
   /**
    * Checks whether the system is in a generally overloaded state.
    */
-  def isOverloaded()(implicit tid: TransactionId): Boolean = {
-    val concurrentActivations = loadBalancer.totalActiveActivations
-    logging.info(this, s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit")
-    concurrentActivations > systemOverloadLimit
+  def isOverloaded()(implicit tid: TransactionId): Future[Boolean] = {
+    loadBalancer.totalActiveActivations.map { concurrentActivations =>
+      logging.info(
+        this,
+        s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit")
+      concurrentActivations > systemOverloadLimit
+    }
   }
 }
 
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 9a0e9ba..a2fc213 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -130,7 +130,7 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
 
     logging.info(this, s"checking user '${user.subject}' has not exceeded activation quota")
     checkSystemOverload(ACTIVATE)
-      .flatMap(_ => checkThrottleOverload(invokeRateThrottler.check(user)))
+      .flatMap(_ => checkThrottleOverload(Future.successful(invokeRateThrottler.check(user))))
       .flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user)))
   }
 
@@ -184,7 +184,6 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
     } else {
       Future.successful(false)
     }
-
     entitlementCheck andThen {
       case Success(r) if resources.nonEmpty =>
         logging.info(this, if (r) "authorized" else "not authorized")
@@ -231,11 +230,13 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
    * @return future completing successfully if system is not overloaded else failing with a rejection
    */
   protected def checkSystemOverload(right: Privilege)(implicit transid: TransactionId): Future[Unit] = {
-    val systemOverload = right == ACTIVATE && concurrentInvokeThrottler.isOverloaded
-    if (systemOverload) {
-      logging.error(this, "system is overloaded")
-      Future.failed(RejectRequest(TooManyRequests, systemOverloaded))
-    } else Future.successful(())
+    concurrentInvokeThrottler.isOverloaded.flatMap { isOverloaded =>
+      val systemOverload = right == ACTIVATE && isOverloaded
+      if (systemOverload) {
+        logging.error(this, "system is overloaded")
+        Future.failed(RejectRequest(TooManyRequests, systemOverloaded))
+      } else Future.successful(())
+    }
   }
 
   /**
@@ -253,9 +254,9 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
     implicit transid: TransactionId): Future[Unit] = {
     if (right == ACTIVATE) {
       if (resources.exists(_.collection.path == Collection.ACTIONS)) {
-        checkThrottleOverload(invokeRateThrottler.check(user))
+        checkThrottleOverload(Future.successful(invokeRateThrottler.check(user)))
       } else if (resources.exists(_.collection.path == Collection.TRIGGERS)) {
-        checkThrottleOverload(triggerRateThrottler.check(user))
+        checkThrottleOverload(Future.successful(triggerRateThrottler.check(user)))
       } else Future.successful(())
     } else Future.successful(())
   }
@@ -278,11 +279,13 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
     } else Future.successful(())
   }
 
-  private def checkThrottleOverload(throttle: RateLimit)(implicit transid: TransactionId): Future[Unit] = {
-    if (throttle.ok) {
-      Future.successful(())
-    } else {
-      Future.failed(RejectRequest(TooManyRequests, throttle.errorMsg))
+  private def checkThrottleOverload(throttle: Future[RateLimit])(implicit transid: TransactionId): Future[Unit] = {
+    throttle.flatMap { limit =>
+      if (limit.ok) {
+        Future.successful(())
+      } else {
+        Future.failed(RejectRequest(TooManyRequests, limit.errorMsg))
+      }
     }
   }
 }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
new file mode 100644
index 0000000..34b5d67
--- /dev/null
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer
+
+import akka.actor.ActorSystem
+import akka.util.Timeout
+import akka.pattern.ask
+import whisk.common.Logging
+import whisk.core.entity.{ActivationId, UUID}
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+/**
+ * Encapsulates data used for loadbalancer and active-ack bookkeeping.
+ *
+ * Note: The state keeping is backed by distributed akka actors. All CRUDs operations are done on local values, thus
+ * a stale value might be read.
+ */
+class DistributedLoadBalancerData(implicit actorSystem: ActorSystem, logging: Logging) extends LoadBalancerData {
+
+  implicit val timeout = Timeout(5.seconds)
+  implicit val executionContext = actorSystem.dispatcher
+  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
+
+  private val sharedStateInvokers = actorSystem.actorOf(
+    SharedDataService.props("Invokers"),
+    name =
+      "SharedDataServiceInvokers" + UUID())
+  private val sharedStateNamespaces = actorSystem.actorOf(
+    SharedDataService.props("Namespaces"),
+    name =
+      "SharedDataServiceNamespaces" + UUID())
+
+  def totalActivationCount =
+    (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.values.sum.toInt)
+
+  def activationCountOn(namespace: UUID): Future[Int] = {
+    (sharedStateNamespaces ? GetMap)
+      .mapTo[Map[String, BigInt]]
+      .map(_.mapValues(_.toInt).getOrElse(namespace.toString, 0))
+  }
+
+  def activationCountPerInvoker: Future[Map[String, Int]] = {
+    (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.mapValues(_.toInt))
+  }
+
+  def activationById(activationId: ActivationId): Option[ActivationEntry] = {
+    activationsById.get(activationId)
+  }
+
+  def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
+    activationsById.getOrElseUpdate(id, {
+      val entry = update
+      sharedStateNamespaces ! IncreaseCounter(entry.namespaceId.asString, 1)
+      sharedStateInvokers ! IncreaseCounter(entry.invokerName.toString, 1)
+      logging.debug(this, "increased shared counters")
+      entry
+    })
+  }
+
+  def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
+    activationsById.remove(entry.id).map { activationEntry =>
+      sharedStateInvokers ! DecreaseCounter(entry.invokerName.toString, 1)
+      sharedStateNamespaces ! DecreaseCounter(entry.namespaceId.asString, 1)
+      logging.debug(this, "decreased shared counters")
+      activationEntry
+    }
+  }
+
+  def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
+    activationsById.get(aid).flatMap(removeActivation)
+  }
+}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index c693a8f..1866d2d 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -17,35 +17,18 @@
 
 package whisk.core.loadBalancer
 
-import java.util.concurrent.atomic.AtomicInteger
+import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation}
 
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Promise
+import scala.concurrent.{Future, Promise}
 
-import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
-import whisk.core.entity.InstanceId
-
-/** Encapsulates data relevant for a single activation */
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
                            invokerName: InstanceId,
                            promise: Promise[Either[ActivationId, WhiskActivation]])
-
-/**
- * Encapsulates data used for loadbalancer and active-ack bookkeeping.
- *
- * Note: The state keeping is backed by concurrent data-structures. As such,
- * concurrent reads can return stale values (especially the counters returned).
- */
-class LoadBalancerData() {
-
-  private val activationByInvoker = TrieMap[InstanceId, AtomicInteger]()
-  private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
-  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
-  private val totalActivations = new AtomicInteger(0)
+trait LoadBalancerData {
 
   /** Get the number of activations across all namespaces. */
-  def totalActivationCount = totalActivations.get
+  def totalActivationCount: Future[Int]
 
   /**
    * Get the number of activations for a specific namespace.
@@ -53,19 +36,14 @@ class LoadBalancerData() {
    * @param namespace The namespace to get the activation count for
    * @return a map (namespace -> number of activations in the system)
    */
-  def activationCountOn(namespace: UUID) = {
-    activationByNamespaceId.get(namespace).map(_.get).getOrElse(0)
-  }
+  def activationCountOn(namespace: UUID): Future[Int]
 
   /**
-   * Get the number of activations for a specific invoker.
+   * Get the number of activations for each invoker.
    *
-   * @param invoker The invoker to get the activation count for
    * @return a map (invoker -> number of activations queued for the invoker)
    */
-  def activationCountOn(invoker: InstanceId): Int = {
-    activationByInvoker.get(invoker).map(_.get).getOrElse(0)
-  }
+  def activationCountPerInvoker: Future[Map[String, Int]]
 
   /**
    * Get an activation entry for a given activation id.
@@ -73,29 +51,19 @@ class LoadBalancerData() {
    * @param activationId activation id to get data for
    * @return the respective activation or None if it doesn't exist
    */
-  def activationById(activationId: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(activationId)
-  }
+  def activationById(activationId: ActivationId): Option[ActivationEntry]
 
   /**
    * Adds an activation entry.
    *
-   * @param id identifier to deduplicate the entry
+   * @param id     identifier to deduplicate the entry
    * @param update block calculating the entry to add.
    *               Note: This is evaluated iff the entry
    *               didn't exist before.
    * @return the entry calculated by the block or iff it did
    *         exist before the entry from the state
    */
-  def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
-    activationsById.getOrElseUpdate(id, {
-      val entry = update
-      totalActivations.incrementAndGet()
-      activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).incrementAndGet()
-      activationByInvoker.getOrElseUpdate(entry.invokerName, new AtomicInteger(0)).incrementAndGet()
-      entry
-    })
-  }
+  def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry
 
   /**
    * Removes the given entry.
@@ -103,14 +71,7 @@ class LoadBalancerData() {
    * @param entry the entry to remove
    * @return The deleted entry or None if nothing got deleted
    */
-  def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
-    activationsById.remove(entry.id).map { x =>
-      totalActivations.decrementAndGet()
-      activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).decrementAndGet()
-      activationByInvoker.getOrElseUpdate(entry.invokerName, new AtomicInteger(0)).decrementAndGet()
-      x
-    }
-  }
+  def removeActivation(entry: ActivationEntry): Option[ActivationEntry]
 
   /**
    * Removes the activation identified by the given activation id.
@@ -118,7 +79,5 @@ class LoadBalancerData() {
    * @param aid activation id to remove
    * @return The deleted entry or None if nothing got deleted
    */
-  def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(aid).flatMap(removeActivation)
-  }
+  def removeActivation(aid: ActivationId): Option[ActivationEntry]
 }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index c957fa2..0b5a06d 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -27,15 +27,13 @@ import scala.concurrent.Promise
 import scala.concurrent.duration.DurationInt
 import scala.util.Failure
 import scala.util.Success
-
 import org.apache.kafka.clients.producer.RecordMetadata
-
 import akka.actor.ActorRefFactory
 import akka.actor.ActorSystem
 import akka.actor.Props
+import akka.cluster.Cluster
 import akka.util.Timeout
 import akka.pattern.ask
-
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
@@ -61,10 +59,10 @@ trait LoadBalancer {
   val activeAckTimeoutGrace = 1.minute
 
   /** Gets the number of in-flight activations for a specific user. */
-  def activeActivationsFor(namspace: UUID): Int
+  def activeActivationsFor(namespace: UUID): Future[Int]
 
   /** Gets the number of in-flight activations in the system. */
-  def totalActiveActivations: Int
+  def totalActiveActivations: Future[Int]
 
   /**
    * Publishes activation message on internal bus for an invoker to pick up.
@@ -95,7 +93,18 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
   private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, config.controllerBlackboxFraction))
   logging.info(this, s"blackboxFraction = $blackboxFraction")(TransactionId.loadbalancer)
 
-  private val loadBalancerData = new LoadBalancerData()
+  /** Feature switch for shared load balancer data **/
+  private val loadBalancerData = {
+    if (config.controllerLocalBookkeeping) {
+      new LocalLoadBalancerData()
+    } else {
+
+      /** Specify how seed nodes are generated */
+      val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
+      Cluster(actorSystem).joinSeedNodes(seedNodesProvider.getSeedNodes())
+      new DistributedLoadBalancerData()
+    }
+  }
 
   override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace)
 
@@ -226,7 +235,6 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
       case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic")
     }
   }
-
   private val invokerPool = {
     // Do not create the invokerPool if it is not possible to create the health test action to recover the invokers.
     InvokerPool
@@ -307,18 +315,20 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
   private def chooseInvoker(user: Identity, action: ExecutableWhiskAction): Future[InstanceId] = {
     val hash = generateHash(user.namespace, action)
 
-    allInvokers.flatMap { invokers =>
-      val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
-      val invokersWithUsage = invokersToUse.view.map {
-        // Using a view defers the comparably expensive lookup to actual access of the element
-        case (instance, state) => (instance, state, loadBalancerData.activationCountOn(instance))
-      }
+    loadBalancerData.activationCountPerInvoker.flatMap { currentActivations =>
+      allInvokers.flatMap { invokers =>
+        val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
+        val invokersWithUsage = invokersToUse.view.map {
+          // Using a view defers the comparably expensive lookup to actual access of the element
+          case (instance, state) => (instance, state, currentActivations.getOrElse(instance.toString, 0))
+        }
 
-      LoadBalancerService.schedule(invokersWithUsage, config.loadbalancerInvokerBusyThreshold, hash) match {
-        case Some(invoker) => Future.successful(invoker)
-        case None =>
-          logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
-          Future.failed(new LoadBalancerException("no invokers available"))
+        LoadBalancerService.schedule(invokersWithUsage, config.loadbalancerInvokerBusyThreshold, hash) match {
+          case Some(invoker) => Future.successful(invoker)
+          case None =>
+            logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
+            Future.failed(new LoadBalancerException("no invokers available"))
+        }
       }
     }
   }
@@ -331,7 +341,11 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
 
 object LoadBalancerService {
   def requiredProperties =
-    kafkaHost ++ Map(loadbalancerInvokerBusyThreshold -> null, controllerBlackboxFraction -> null)
+    kafkaHost ++ Map(
+      loadbalancerInvokerBusyThreshold -> null,
+      controllerBlackboxFraction -> null,
+      controllerLocalBookkeeping -> null,
+      controllerSeedNodes -> null)
 
   /** Memoizes the result of `f` for later use. */
   def memoize[I, O](f: I => O): I => O = new scala.collection.mutable.HashMap[I, O]() {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
new file mode 100644
index 0000000..92e3789
--- /dev/null
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.Future
+import whisk.core.entity.{ActivationId, UUID}
+
+/**
+ * Loadbalancer bookkeeping data which are stored locally,
+ * e.g. not shared with other controller instances.
+ *
+ * Note: The state keeping is backed by concurrent data-structures. As such,
+ * concurrent reads can return stale values (especially the counters returned).
+ */
+class LocalLoadBalancerData() extends LoadBalancerData {
+
+  private val activationByInvoker = TrieMap[String, AtomicInteger]()
+  private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
+  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
+  private val totalActivations = new AtomicInteger(0)
+
+  override def totalActivationCount: Future[Int] = Future.successful(totalActivations.get)
+
+  override def activationCountOn(namespace: UUID): Future[Int] = {
+    Future.successful(activationByNamespaceId.get(namespace).map(_.get).getOrElse(0))
+  }
+
+  override def activationCountPerInvoker: Future[Map[String, Int]] = {
+    Future.successful(activationByInvoker.toMap.mapValues(_.get))
+  }
+
+  override def activationById(activationId: ActivationId): Option[ActivationEntry] = {
+    activationsById.get(activationId)
+  }
+
+  override def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
+    activationsById.getOrElseUpdate(id, {
+      val entry = update
+      totalActivations.incrementAndGet()
+      activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).incrementAndGet()
+      activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new AtomicInteger(0)).incrementAndGet()
+      entry
+    })
+  }
+
+  override def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
+    activationsById.remove(entry.id).map { x =>
+      totalActivations.decrementAndGet()
+      activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).decrementAndGet()
+      activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new AtomicInteger(0)).decrementAndGet()
+      x
+    }
+  }
+
+  override def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
+    activationsById.get(aid).flatMap(removeActivation)
+  }
+}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala
new file mode 100644
index 0000000..be630a4
--- /dev/null
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer
+
+import akka.actor.Address
+
+import scala.collection.immutable.Seq
+
+trait SeedNodesProvider {
+  def getSeedNodes(): Seq[Address]
+}
+
+class StaticSeedNodesProvider(seedNodes: String, actorSystemName: String) extends SeedNodesProvider {
+  def getSeedNodes(): Seq[Address] = {
+    seedNodes
+      .split(' ')
+      .flatMap { rawNodes =>
+        val ipWithPort = rawNodes.split(":")
+        ipWithPort match {
+          case Array(host, port) => Seq(Address("akka.tcp", actorSystemName, host, port.toInt))
+          case _                 => Seq.empty[Address]
+        }
+      }
+      .toIndexedSeq
+  }
+}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
new file mode 100644
index 0000000..d0595d3
--- /dev/null
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer
+
+import akka.actor.{Actor, ActorLogging, ActorRef, Props}
+import akka.cluster.Cluster
+import akka.cluster.ClusterEvent._
+import akka.cluster.ddata.{DistributedData, PNCounterMap, PNCounterMapKey}
+import akka.cluster.ddata.Replicator._
+import whisk.common.AkkaLogging
+
+case class IncreaseCounter(key: String, value: Long)
+case class DecreaseCounter(key: String, value: Long)
+case class ReadCounter(key: String)
+case class RemoveCounter(key: String)
+case object GetMap
+
+/**
+ * Companion object to specify actor properties from the outside, e.g. name of the shared map and cluster seed nodes
+ */
+object SharedDataService {
+  def props(storageName: String): Props =
+    Props(new SharedDataService(storageName))
+}
+
+class SharedDataService(storageName: String) extends Actor with ActorLogging {
+
+  val replicator = DistributedData(context.system).replicator
+
+  val logging = new AkkaLogging(context.system.log)
+
+  val storage = PNCounterMapKey[String](storageName)
+
+  implicit val node = Cluster(context.system)
+
+  /**
+   * Subscribe this node for the changes in the Map, initialize the Map
+   */
+  override def preStart(): Unit = {
+    replicator ! Subscribe(storage, self)
+    node.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
+    replicator ! Update(storage, PNCounterMap.empty[String], writeLocal)(_.remove(node, "0"))
+  }
+  override def postStop(): Unit = node.unsubscribe(self)
+
+  /**
+   * CRUD operations on the counter, process cluster member events for logging
+   * @return
+   */
+  def receive = {
+
+    case (IncreaseCounter(key, increment)) =>
+      replicator ! Update(storage, PNCounterMap.empty[String], writeLocal)(_.increment(key, increment))
+
+    case (DecreaseCounter(key, decrement)) =>
+      replicator ! Update(storage, PNCounterMap[String], writeLocal)(_.decrement(key, decrement))
+
+    case GetMap =>
+      replicator ! Get(storage, readLocal, request = Some((sender())))
+
+    case MemberUp(member) =>
+      logging.info(this, "Member is Up: " + member.address)
+
+    case MemberRemoved(member, previousStatus) =>
+      logging.warn(this, s"Member is Removed: ${member.address} after $previousStatus")
+
+    case c @ Changed(_) =>
+      logging.debug(this, "Current elements: " + c.get(storage))
+
+    case g @ GetSuccess(_, Some((replyTo: ActorRef))) =>
+      val map = g.get(storage).entries
+      replyTo ! map
+
+    case g @ GetSuccess(_, Some((replyTo: ActorRef, key: String))) =>
+      if (g.get(storage).contains(key)) {
+        val response = g.get(storage).getValue(key).intValue()
+        replyTo ! response
+      } else
+        replyTo ! None
+
+    case _ => // ignore
+  }
+}
diff --git a/docs/README.md b/docs/README.md
index 957381f..89f4465 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -53,6 +53,7 @@ This programming model is a perfect match for microservices, mobile, IoT and man
 
 ## Programming model
 - [System details](./reference.md)
+- [Component clustering](./deploy.md)
 - [Catalog of OpenWhisk provided services](./catalog.md)
 - [Actions](./actions.md)
 - [Triggers and Rules](./triggers_rules.md)
diff --git a/docs/deploy.md b/docs/deploy.md
new file mode 100644
index 0000000..196dce9
--- /dev/null
+++ b/docs/deploy.md
@@ -0,0 +1,16 @@
+# Requirements for controller clustering
+
+This page describes requirements related to the introduction of akka clustering in Controller. There are cases where its usage requires special treatment wrt to the underlying infrastructure/deployment model. Please carefully read it before you decide to enable it.
+
+
+## Controller nodes must have static IPs/Port combination.
+
+It guarantees that failed nodes are able to join the cluster again.
+This limitation refers to the fact that Akka clustering doesn't allow to add new nodes when one of the existing members is unreachable (e.g. JVM failure). If each container receives a its ip and port dynamically upon the restart, in case of controller failure, it could come back online under a new ip/port combination which makes cluster consider it as a new member and it won't be added to the cluster (in certain cases it could join as a weeklyUp member). However, the cluster will still r [...]
+
+How to down the members.
+1. manually (sending an HTTP or JMX request to the controller). For this case an external supervisor for the cluster is required, which will down the nodes and provide an up-to-date list of seed nodes.
+2. automatically by setting the "auto-down-property" in controller that will allow the leader to down the node after a certain timeout. In order to mitigate brain split one could define a list of seed nodes which are reachable under static IPs or have static DNS entries.
+
+Link to akka clustering documentation:
+https://doc.akka.io/docs/akka/2.5.4/scala/cluster-usage.html
diff --git a/tests/build.gradle b/tests/build.gradle
index bb7b741..6fa5a66 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -48,6 +48,7 @@ dependencies {
     compile 'junit:junit:4.11'
     compile 'com.jayway.restassured:rest-assured:2.6.0'
     compile 'org.scalatest:scalatest_2.11:3.0.1'
+    compile 'com.typesafe.akka:akka-testkit_2.11:2.5.4'
     compile 'com.google.code.gson:gson:2.3.1'
     compile 'org.scalamock:scalamock-scalatest-support_2.11:3.4.2'
     compile 'com.typesafe.akka:akka-testkit_2.11:2.4.16'
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index 988033d..cb79804 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -21,20 +21,15 @@ import scala.concurrent.{Await, Future}
 import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.{DurationInt, FiniteDuration}
 import scala.language.postfixOps
-
 import org.scalatest.BeforeAndAfter
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
-
 import common.StreamLogging
-
 import akka.http.scaladsl.testkit.ScalatestRouteTest
 import akka.http.scaladsl.testkit.RouteTestTimeout
-
 import spray.json.DefaultJsonProtocol
 import spray.json.JsString
-
 import whisk.common.TransactionCounter
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
@@ -183,8 +178,8 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC
   // unit tests that need an activation via active ack/fast path should set this to value expected
   var whiskActivationStub: Option[(FiniteDuration, WhiskActivation)] = None
 
-  override def totalActiveActivations = 0
-  override def activeActivationsFor(namespace: UUID) = 0
+  override def totalActiveActivations = Future.successful(0)
+  override def activeActivationsFor(namespace: UUID) = Future.successful(0)
 
   override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)(
     implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] =
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
index 42acf72..af51102 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
@@ -17,71 +17,134 @@
 
 package whisk.core.loadBalancer.test
 
+import akka.actor.ActorSystem
+import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
+import common.StreamLogging
 import org.scalatest.{FlatSpec, Matchers}
 import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
-import whisk.core.loadBalancer.{ActivationEntry, LoadBalancerData}
+import whisk.core.loadBalancer.{ActivationEntry, DistributedLoadBalancerData, LocalLoadBalancerData}
 
-import scala.concurrent.{Promise}
+import scala.concurrent.{Await, Future, Promise}
 import whisk.core.entity.InstanceId
 
-class LoadBalancerDataTests extends FlatSpec with Matchers {
+import scala.concurrent.duration._
+
+class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
 
   val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
   val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(0), activationIdPromise)
   val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise)
 
-  behavior of "LoadBalancerData"
+  val port = 2552
+  val host = "127.0.0.1"
+  val config = ConfigFactory
+    .parseString(s"akka.remote.netty.tcp.hostname = $host")
+    .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port))
+    .withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("cluster"))
+    .withFallback(ConfigFactory.load())
 
-  it should "return the number of activations for a namespace" in {
+  val actorSystemName = "controller-actor-system"
 
-    val loadBalancerData = new LoadBalancerData()
-    loadBalancerData.putActivation(firstEntry.id, firstEntry)
+  implicit val actorSystem = ActorSystem(actorSystemName, config)
 
-    loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
-    loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
-    loadBalancerData.activationById(firstEntry.id) shouldBe Some(firstEntry)
+  def await[A](f: Future[A], timeout: FiniteDuration = 1.second) = Await.result(f, timeout)
+
+  behavior of "LoadBalancerData"
+
+  it should "return the number of activations for a namespace" in {
+    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val localLoadBalancerData = new LocalLoadBalancerData()
+//    test all implementations
+    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
+    loadBalancerDataArray.map { lbd =>
+      lbd.putActivation(firstEntry.id, firstEntry)
+      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      await(lbd.activationCountPerInvoker) shouldBe Map(firstEntry.invokerName.toString -> 1)
+      lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
+
+      // clean up after yourself
+      lbd.removeActivation(firstEntry.id)
+    }
   }
 
   it should "return the number of activations for each invoker" in {
 
-    val loadBalancerData = new LoadBalancerData()
-    loadBalancerData.putActivation(firstEntry.id, firstEntry)
-    loadBalancerData.putActivation(secondEntry.id, secondEntry)
+    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val localLoadBalancerData = new LocalLoadBalancerData()
+
+    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
+    loadBalancerDataArray.map { lbd =>
+      lbd.putActivation(firstEntry.id, firstEntry)
+      lbd.putActivation(secondEntry.id, secondEntry)
+
+      val res = await(lbd.activationCountPerInvoker)
+
+      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
+      res.get(secondEntry.invokerName.toString()) shouldBe Some(1)
+
+      lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
+      lbd.activationById(secondEntry.id) shouldBe Some(secondEntry)
+
+      // clean up after yourself
+      lbd.removeActivation(firstEntry.id)
+      lbd.removeActivation(secondEntry.id)
+    }
 
-    loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
-    loadBalancerData.activationCountOn(secondEntry.invokerName) shouldBe 1
-    loadBalancerData.activationById(firstEntry.id) shouldBe Some(firstEntry)
-    loadBalancerData.activationById(secondEntry.id) shouldBe Some(secondEntry)
   }
 
   it should "remove activations and reflect that accordingly" in {
 
-    val loadBalancerData = new LoadBalancerData()
-    loadBalancerData.putActivation(firstEntry.id, firstEntry)
-    loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
-    loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
+    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val localLoadBalancerData = new LocalLoadBalancerData()
+
+    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
+    loadBalancerDataArray.map { lbd =>
+      lbd.putActivation(firstEntry.id, firstEntry)
+      val res = await(lbd.activationCountPerInvoker)
+      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
+
+      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
 
-    loadBalancerData.removeActivation(firstEntry)
+      lbd.removeActivation(firstEntry)
+
+      val resAfterRemoval = await(lbd.activationCountPerInvoker)
+      resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
+
+      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 0
+      lbd.activationById(firstEntry.id) shouldBe None
+    }
 
-    loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 0
-    loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 0
-    loadBalancerData.activationById(firstEntry.id) shouldBe None
   }
 
   it should "remove activations from all 3 maps by activation id" in {
 
-    val loadBalancerData = new LoadBalancerData()
-    loadBalancerData.putActivation(firstEntry.id, firstEntry)
-    loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val localLoadBalancerData = new LocalLoadBalancerData()
+
+    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
+    loadBalancerDataArray.map { lbd =>
+      lbd.putActivation(firstEntry.id, firstEntry)
+
+      val res = await(lbd.activationCountPerInvoker)
+      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
 
-    loadBalancerData.removeActivation(firstEntry.id)
+      lbd.removeActivation(firstEntry.id)
+
+      val resAfterRemoval = await(lbd.activationCountPerInvoker)
+      resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
+    }
 
-    loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 0
   }
 
   it should "return None if the entry doesn't exist when we remove it" in {
-    val loadBalancerData = new LoadBalancerData()
-    loadBalancerData.removeActivation(firstEntry) shouldBe None
+    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val localLoadBalancerData = new LocalLoadBalancerData()
+
+    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
+    loadBalancerDataArray.map { lbd =>
+      lbd.removeActivation(firstEntry) shouldBe None
+    }
+
   }
 
   it should "respond with different values accordingly" in {
@@ -90,90 +153,133 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
     val entrySameInvokerAndNamespace = entry.copy(id = ActivationId())
     val entrySameInvoker = entry.copy(id = ActivationId(), namespaceId = UUID())
 
-    val loadBalancerData = new LoadBalancerData()
-    loadBalancerData.putActivation(entry.id, entry)
-    loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 1
-    loadBalancerData.activationCountOn(entry.invokerName) shouldBe 1
+    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val localLoadBalancerData = new LocalLoadBalancerData()
+
+    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
+    loadBalancerDataArray.map { lbd =>
+      lbd.putActivation(entry.id, entry)
+
+      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
+      var res = await(lbd.activationCountPerInvoker)
+      res.get(entry.invokerName.toString()) shouldBe Some(1)
 
-    loadBalancerData.putActivation(entrySameInvokerAndNamespace.id, entrySameInvokerAndNamespace)
-    loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 2
-    loadBalancerData.activationCountOn(entry.invokerName) shouldBe 2
+      lbd.putActivation(entrySameInvokerAndNamespace.id, entrySameInvokerAndNamespace)
+      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
+      res = await(lbd.activationCountPerInvoker)
+      res.get(entry.invokerName.toString()) shouldBe Some(2)
 
-    loadBalancerData.putActivation(entrySameInvoker.id, entrySameInvoker)
-    loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 2
-    loadBalancerData.activationCountOn(entry.invokerName) shouldBe 3
+      lbd.putActivation(entrySameInvoker.id, entrySameInvoker)
+      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
+      res = await(lbd.activationCountPerInvoker)
+      res.get(entry.invokerName.toString()) shouldBe Some(3)
 
-    loadBalancerData.removeActivation(entrySameInvokerAndNamespace)
-    loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 1
-    loadBalancerData.activationCountOn(entry.invokerName) shouldBe 2
+      lbd.removeActivation(entrySameInvokerAndNamespace)
+      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
+      res = await(lbd.activationCountPerInvoker)
+      res.get(entry.invokerName.toString()) shouldBe Some(2)
 
-    // removing non existing entry doesn't mess up
-    loadBalancerData.removeActivation(entrySameInvokerAndNamespace)
-    loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 1
-    loadBalancerData.activationCountOn(entry.invokerName) shouldBe 2
+      // removing non existing entry doesn't mess up
+      lbd.removeActivation(entrySameInvokerAndNamespace)
+      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
+      res = await(lbd.activationCountPerInvoker)
+      res.get(entry.invokerName.toString()) shouldBe Some(2)
+
+      // clean up
+      lbd.removeActivation(entry)
+      lbd.removeActivation(entrySameInvoker.id)
+    }
 
   }
 
   it should "not add the same entry more then once" in {
 
-    val loadBalancerData = new LoadBalancerData()
+    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val localLoadBalancerData = new LocalLoadBalancerData()
+
+    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
+    loadBalancerDataArray.map { lbd =>
+      lbd.putActivation(firstEntry.id, firstEntry)
+      val res = await(lbd.activationCountPerInvoker)
+      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
+      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
 
-    loadBalancerData.putActivation(firstEntry.id, firstEntry)
-    loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
-    loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
+      lbd.putActivation(firstEntry.id, firstEntry)
+      val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
+      resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1)
+      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+
+      lbd.removeActivation(firstEntry)
+      lbd.removeActivation(firstEntry)
+    }
 
-    loadBalancerData.putActivation(firstEntry.id, firstEntry)
-    loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
-    loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
   }
 
   it should "not evaluate the given block if an entry already exists" in {
 
-    val loadBalancerData = new LoadBalancerData()
-    var called = 0
+    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val localLoadBalancerData = new LocalLoadBalancerData()
+
+    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
+    loadBalancerDataArray.map { lbd =>
+      var called = 0
 
-    val entry = loadBalancerData.putActivation(firstEntry.id, {
-      called += 1
-      firstEntry
-    })
+      val entry = lbd.putActivation(firstEntry.id, {
+        called += 1
+        firstEntry
+      })
 
-    called shouldBe 1
+      called shouldBe 1
 
-    // entry already exists, should not evaluate the block
-    val entryAfterSecond = loadBalancerData.putActivation(firstEntry.id, {
-      called += 1
-      firstEntry
-    })
+      // entry already exists, should not evaluate the block
+      val entryAfterSecond = lbd.putActivation(firstEntry.id, {
+        called += 1
+        firstEntry
+      })
+
+      called shouldBe 1
+      entry shouldBe entryAfterSecond
+
+      // clean up after yourself
+      lbd.removeActivation(firstEntry)
+    }
 
-    called shouldBe 1
-    entry shouldBe entryAfterSecond
   }
 
   it should "not evaluate the given block even if an entry is different (but has the same id)" in {
 
-    val loadBalancerData = new LoadBalancerData()
-    var called = 0
-    val entrySameId = secondEntry.copy(id = firstEntry.id)
-
-    val entry = loadBalancerData.putActivation(firstEntry.id, {
-      called += 1
-      firstEntry
-    })
-
-    called shouldBe 1
-    loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
-    loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
-
-    // entry already exists, should not evaluate the block and change the state
-    val entryAfterSecond = loadBalancerData.putActivation(entrySameId.id, {
-      called += 1
-      entrySameId
-    })
-
-    called shouldBe 1
-    entry shouldBe entryAfterSecond
-    loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
-    loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
+    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val localLoadBalancerData = new LocalLoadBalancerData()
+
+    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
+    loadBalancerDataArray.map { lbd =>
+      var called = 0
+      val entrySameId = secondEntry.copy(id = firstEntry.id)
+
+      val entry = lbd.putActivation(firstEntry.id, {
+        called += 1
+        firstEntry
+      })
+
+      called shouldBe 1
+
+      val res = await(lbd.activationCountPerInvoker)
+      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
+      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+
+      // entry already exists, should not evaluate the block and change the state
+      val entryAfterSecond = lbd.putActivation(entrySameId.id, {
+        called += 1
+        entrySameId
+      })
+
+      called shouldBe 1
+      entry shouldBe entryAfterSecond
+      val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
+      resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1)
+      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+    }
+
   }
 
 }
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTest.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTest.scala
new file mode 100644
index 0000000..345c885
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTest.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer.test
+
+import akka.actor.Address
+import org.scalatest.{FlatSpec, Matchers}
+import whisk.core.loadBalancer.{StaticSeedNodesProvider}
+
+class SeedNodesProviderTest extends FlatSpec with Matchers {
+
+  val actorSystemName = "controller-actor-system"
+  val host = "192.168.99.100"
+  val port = 8000
+  val oneFakeSeedNode = s"$host:$port"
+  val twoFakeSeedNodes = s"$host:$port $host:${port + 1}"
+  val twoFakeSeedNodesWithoutWhitespace = s"$host:$port$host:${port + 1}"
+
+  behavior of "StaticSeedNodesProvider"
+
+  it should "return a sequence with a single seed node" in {
+    val seedNodesProvider = new StaticSeedNodesProvider(oneFakeSeedNode, actorSystemName)
+    val seqWithOneSeedNode = seedNodesProvider.getSeedNodes()
+    seqWithOneSeedNode shouldBe IndexedSeq(Address("akka.tcp", actorSystemName, host, port))
+  }
+  it should "return a sequence with more then one seed node" in {
+    val seedNodesProvider = new StaticSeedNodesProvider(twoFakeSeedNodes, actorSystemName)
+    val seqWithTwoSeedNodes = seedNodesProvider.getSeedNodes()
+    seqWithTwoSeedNodes shouldBe IndexedSeq(
+      Address("akka.tcp", actorSystemName, host, port),
+      Address("akka.tcp", actorSystemName, host, port + 1))
+  }
+  it should "return an empty sequence if seed nodes are provided in the wrong format" in {
+    val seedNodesProvider = new StaticSeedNodesProvider(twoFakeSeedNodesWithoutWhitespace, actorSystemName)
+    val noneResponse = seedNodesProvider.getSeedNodes()
+    noneResponse shouldBe IndexedSeq.empty[Address]
+  }
+  it should "return an empty sequence if no seed nodes specified" in {
+    val seedNodesProvider = new StaticSeedNodesProvider("", actorSystemName)
+    val noneResponse = seedNodesProvider.getSeedNodes()
+    noneResponse shouldBe IndexedSeq.empty[Address]
+  }
+
+}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
new file mode 100644
index 0000000..3961e53
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer.test
+
+import akka.actor.ActorSystem
+import akka.testkit.{ImplicitSender, TestKit}
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import com.typesafe.config.ConfigFactory
+import org.scalatest._
+import whisk.core.loadBalancer._
+import org.scalatest.FlatSpecLike
+
+import scala.concurrent.duration._
+
+// Define your test specific configuration here
+
+object TestKitConfig {
+  val config = """
+    akka.remote.netty.tcp {
+      hostname = "127.0.0.1"
+      port = 2555
+    }
+    """
+}
+
+class SharedDataServiceTests()
+    extends TestKit(ActorSystem("ControllerCluster", ConfigFactory.parseString(TestKitConfig.config)))
+    with ImplicitSender
+    with FlatSpecLike
+    with Matchers
+    with BeforeAndAfterAll {
+
+  override def afterAll {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  behavior of "SharedDataService"
+
+  val port = 2552
+  val host = "127.0.0.1"
+  val config = ConfigFactory
+    .parseString(s"akka.remote.netty.tcp.hostname=$host")
+    .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port))
+    .withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("cluster"))
+    .withFallback(ConfigFactory.load())
+
+  val s = ActorSystem("controller-actor-system", config)
+  val sharedDataService = s.actorOf(SharedDataService.props("Candidates"), name = "busyMan")
+  implicit val timeout = Timeout(5.seconds)
+
+  it should "retrieve an empty map after initialization" in {
+    sharedDataService ! GetMap
+    val msg = Map()
+    expectMsg(msg)
+  }
+  it should "increase the counter" in {
+    sharedDataService ! (IncreaseCounter("Donald", 1))
+    sharedDataService ! GetMap
+    val msg = Map("Donald" -> 1)
+    expectMsg(msg)
+  }
+  it should "decrease the counter" in {
+    sharedDataService ! (IncreaseCounter("Donald", 2))
+    sharedDataService ! (DecreaseCounter("Donald", 2))
+    sharedDataService ! GetMap
+    val msg = Map("Donald" -> 1)
+    expectMsg(msg)
+  }
+  it should "receive the map with all counters" in {
+    sharedDataService ! (IncreaseCounter("Hilary", 1))
+    sharedDataService ! GetMap
+    val msg = Map("Hilary" -> 1, "Donald" -> 1)
+    expectMsg(msg)
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Mime
View raw message