From commits-return-4309-archive-asf-public=cust-asf.ponee.io@openwhisk.apache.org Wed Apr 18 00:23:16 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 097DC180649 for ; Wed, 18 Apr 2018 00:23:15 +0200 (CEST) Received: (qmail 49995 invoked by uid 500); 17 Apr 2018 22:23:15 -0000 Mailing-List: contact commits-help@openwhisk.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@openwhisk.apache.org Delivered-To: mailing list commits@openwhisk.apache.org Received: (qmail 49986 invoked by uid 99); 17 Apr 2018 22:23:15 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Apr 2018 22:23:15 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 6104F80B3B; Tue, 17 Apr 2018 22:23:14 +0000 (UTC) Date: Tue, 17 Apr 2018 22:23:14 +0000 To: "commits@openwhisk.apache.org" Subject: [incubator-openwhisk] branch master updated: Implement possibility to discover cluster nodes via Akka Management. (#3548) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152400379405.9655.4142355913124779739@gitbox.apache.org> From: tysonnorris@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-openwhisk X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 1104b3f92cf9329ba31181a6a6ef0388f7022349 X-Git-Newrev: 166189a8f15c99d9237e7020865a34c5bc92a0c2 X-Git-Rev: 166189a8f15c99d9237e7020865a34c5bc92a0c2 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. tysonnorris pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git The following commit(s) were added to refs/heads/master by this push: new 166189a Implement possibility to discover cluster nodes via Akka Management. (#3548) 166189a is described below commit 166189a8f15c99d9237e7020865a34c5bc92a0c2 Author: Markus Thömmes AuthorDate: Wed Apr 18 00:23:10 2018 +0200 Implement possibility to discover cluster nodes via Akka Management. (#3548) * Implement possibility to discover cluster nodes via Akka Management. Akka Management makes it possible to discover cluster nodes via a cluster API (like Kubernetes or Mesos). This also cleans up the way we provide seed-nodes if static configuration is required. Co-authored-by: Tyson Norris * Remove unneeded SeedNodesProvider. * Documentation. --- ansible/roles/controller/tasks/deploy.yml | 39 ++++++++------ .../src/main/scala/whisk/core/WhiskConfig.scala | 1 + core/controller/build.gradle | 3 ++ .../controller/src/main/resources/application.conf | 2 - core/controller/src/main/resources/reference.conf | 3 ++ .../core/loadBalancer/SeedNodesProvider.scala | 41 --------------- .../ShardingContainerPoolBalancer.scala | 24 ++++++--- docs/deploy.md | 7 +++ .../loadBalancer/test/SeedNodesProviderTests.scala | 61 ---------------------- 9 files changed, 54 insertions(+), 127 deletions(-) diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 87167f5..d543a40 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -93,21 +93,9 @@ set_fact: controller_args: "{{ controller.arguments }} {{ jmx.jvmCommonArgs }} -Djava.rmi.server.hostname={{ inventory_hostname }} -Dcom.sun.management.jmxremote.rmi.port={{ jmx.rmiBasePortController + (controller_index | int) }} -Dcom.sun.management.jmxremote.port={{ jmx.basePortController + (controller_index | int) }}" -- name: create seed nodes list +- name: populate environment variables for controller set_fact: - seed_nodes_list: "{{ seed_nodes_list | default([]) }} + [ \"{{item.1}}:{{controller.akka.cluster.basePort+item.0}}\" ]" - with_indexed_items: - - "{{ controller.akka.cluster.seedNodes }}" - -- name: (re)start controller - docker_container: - name: "{{ controller_name }}" - image: "{{ docker_registry }}{{ docker.image.prefix }}/controller:{{ docker.image.tag }}" - state: started - recreate: true - restart_policy: "{{ docker.restart.policy }}" - hostname: "{{ controller_name }}" - env: + controller_env: "JAVA_OPTS": "-Xmx{{ controller.heap }} -XX:+CrashOnOutOfMemoryError -XX:+UseGCOverheadLimit -XX:ErrorFile=/logs/java_error.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/logs" "CONTROLLER_OPTS": "{{ controller_args | default(controller.arguments) }}" "CONTROLLER_INSTANCES": "{{ controller.instances }}" @@ -171,7 +159,6 @@ "CONFIG_whisk_runtimes_bypassPullForLocalImages": "{{ runtimes_bypass_pull_for_local_images | default() }}" "CONFIG_whisk_runtimes_localImagePrefix": "{{ runtimes_local_image_prefix | default() }}" - "AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}" "METRICS_KAMON": "{{ metrics.kamon.enabled }}" "METRICS_KAMON_TAGS": "{{ metrics.kamon.tags }}" "METRICS_LOG": "{{ metrics.log.enabled }}" @@ -199,6 +186,28 @@ "CONFIG_logback_log_level": "{{ controller.loglevel }}" "CONFIG_whisk_transactions_stride": "{{ transactions.stride | default() }}" + +- name: create seed nodes list + set_fact: + seed_nodes_list: "{{ seed_nodes_list | default([]) }} + [ '{{ item.1 }}:{{ controller.akka.cluster.basePort+item.0 }}' ]" + with_indexed_items: + - "{{ controller.akka.cluster.seedNodes }}" + +- name: add seed nodes to controller environment + set_fact: + controller_env: "{{ controller_env | default({}) | combine({'CONFIG_akka_cluster_seedNodes_' ~ item.0: 'akka.tcp://controller-actor-system@' ~ item.1}) }}" + with_indexed_items: "{{ seed_nodes_list }}" + + +- name: (re)start controller + docker_container: + name: "{{ controller_name }}" + image: "{{ docker_registry }}{{ docker.image.prefix }}/controller:{{ docker.image.tag }}" + state: started + recreate: true + restart_policy: "{{ docker.restart.policy }}" + hostname: "{{ controller_name }}" + env: "{{ controller_env }}" volumes: - "{{ whisk_logs_dir }}/{{ controller_name }}:/logs" - "{{ controller.confdir }}/{{ controller_name }}:/conf" diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index c9683d6..36f74e0 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -205,6 +205,7 @@ object WhiskConfig { } object ConfigKeys { + val cluster = "whisk.cluster" val loadbalancer = "whisk.loadbalancer" val couchdb = "whisk.couchdb" diff --git a/core/controller/build.gradle b/core/controller/build.gradle index 2a184d1..8a50af3 100644 --- a/core/controller/build.gradle +++ b/core/controller/build.gradle @@ -15,6 +15,9 @@ repositories { dependencies { compile "org.scala-lang:scala-library:${gradle.scala.version}" + compile 'com.lightbend.akka.management:akka-management-cluster-bootstrap_2.11:0.11.0' + compile 'com.lightbend.akka.discovery:akka-discovery-kubernetes-api_2.11:0.11.0' + compile 'com.lightbend.akka.discovery:akka-discovery-marathon-api_2.11:0.11.0' compile project(':common:scala') } diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf index 446d5fe..e72bbc3 100644 --- a/core/controller/src/main/resources/application.conf +++ b/core/controller/src/main/resources/application.conf @@ -67,7 +67,5 @@ akka { cluster { # Disable legacy metrics in akka-cluster. metrics.enabled=off - - #distributed-data.notify-subscribers-interval = 0.01 } } diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf index ce13c1e..ec7cd8a 100644 --- a/core/controller/src/main/resources/reference.conf +++ b/core/controller/src/main/resources/reference.conf @@ -1,5 +1,8 @@ whisk { + cluster { + use-cluster-bootstrap: false + } loadbalancer { invoker-busy-threshold: 4 blackbox-fraction: 10% diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala deleted file mode 100644 index be630a4..0000000 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package whisk.core.loadBalancer - -import akka.actor.Address - -import scala.collection.immutable.Seq - -trait SeedNodesProvider { - def getSeedNodes(): Seq[Address] -} - -class StaticSeedNodesProvider(seedNodes: String, actorSystemName: String) extends SeedNodesProvider { - def getSeedNodes(): Seq[Address] = { - seedNodes - .split(' ') - .flatMap { rawNodes => - val ipWithPort = rawNodes.split(":") - ipWithPort match { - case Array(host, port) => Seq(Address("akka.tcp", actorSystemName, host, port.toInt)) - case _ => Seq.empty[Address] - } - } - .toIndexedSeq - } -} diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index d10f03e..e99b972 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -25,6 +25,8 @@ import akka.actor.{Actor, ActorSystem, Cancellable, Props} import akka.cluster.ClusterEvent._ import akka.cluster.{Cluster, Member, MemberStatus} import akka.event.Logging.InfoLevel +import akka.management.AkkaManagement +import akka.management.cluster.bootstrap.ClusterBootstrap import akka.stream.ActorMaterializer import org.apache.kafka.clients.producer.RecordMetadata import pureconfig._ @@ -57,13 +59,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins private implicit val executionContext: ExecutionContext = actorSystem.dispatcher /** Build a cluster of all loadbalancers */ - private val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name) - private val seedNodes = seedNodesProvider.getSeedNodes() - - private val cluster: Option[Cluster] = if (seedNodes.nonEmpty) { - val cluster = Cluster(actorSystem) - cluster.joinSeedNodes(seedNodes) - Some(cluster) + private val cluster: Option[Cluster] = if (loadConfigOrThrow[ClusterConfig](ConfigKeys.cluster).useClusterBootstrap) { + AkkaManagement(actorSystem).start() + ClusterBootstrap(actorSystem).start() + Some(Cluster(actorSystem)) + } else if (loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) { + Some(Cluster(actorSystem)) } else { None } @@ -305,7 +306,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { logging: Logging, materializer: ActorMaterializer): LoadBalancer = new ShardingContainerPoolBalancer(whiskConfig, instance) - def requiredProperties: Map[String, String] = kafkaHosts ++ Map(controllerSeedNodes -> null) + def requiredProperties: Map[String, String] = kafkaHosts /** Generates a hash based on the string representation of namespace and action */ def generateHash(namespace: EntityName, action: FullyQualifiedEntityName): Int = { @@ -472,6 +473,13 @@ case class ShardingContainerPoolBalancerState( } /** + * Configuration for the cluster created between loadbalancers. + * + * @param useClusterBootstrap Whether or not to use a bootstrap mechanism + */ +case class ClusterConfig(useClusterBootstrap: Boolean) + +/** * Configuration for the sharding container pool balancer. * * @param blackboxFraction the fraction of all invokers to use exclusively for blackboxes diff --git a/docs/deploy.md b/docs/deploy.md index 1ad8456..a1abad0 100644 --- a/docs/deploy.md +++ b/docs/deploy.md @@ -4,6 +4,13 @@ This page documents configuration options that should be considered when deployi The system can be configured to use Akka clustering to manage the distributed state of the Contoller's load balancing algorithm. This imposes the following constraints on a deployment +## Cluster setup + +To setup a cluster, the controllers need to be able to discover each other. There are 2 basic ways to achieve this: + +1. Provide the so called **seed-nodes** explicitly on deployment. Essentially you have a static list of possible seed nodes which are used to build a cluster. In an ansible based deployment, they are determined for you from the `hosts` file. On any other deployment model, the `CONFIG_akka_cluster_seedNodes.$i` variables will need to be provided according to the [akka cluster documentation](https://doc.akka.io/docs/akka/2.5/cluster-usage.html#joining-to-seed-nodes). +2. Discover the nodes from an external service. This is built upon [akka-management](https://developer.lightbend.com/docs/akka-management/current/) and by default [Kubernetes](https://developer.lightbend.com/docs/akka-management/current/discovery.html#discovery-method-kubernetes-api) and [Mesos (Marathon)](https://developer.lightbend.com/docs/akka-management/current/discovery.html#discovery-method-marathon-api) are supported. You can refer to the respective documentation above to configu [...] + ## Controller nodes must have static IPs/Port combination. diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala deleted file mode 100644 index ef4ae85..0000000 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package whisk.core.loadBalancer.test - -import akka.actor.Address -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{FlatSpec, Matchers} -import whisk.core.loadBalancer.StaticSeedNodesProvider - -@RunWith(classOf[JUnitRunner]) -class SeedNodesProviderTests extends FlatSpec with Matchers { - - val actorSystemName = "controller-actor-system" - val host = "192.168.99.100" - val port = 8000 - val oneFakeSeedNode = s"$host:$port" - val twoFakeSeedNodes = s"$host:$port $host:${port + 1}" - val twoFakeSeedNodesWithoutWhitespace = s"$host:$port$host:${port + 1}" - - behavior of "StaticSeedNodesProvider" - - it should "return a sequence with a single seed node" in { - val seedNodesProvider = new StaticSeedNodesProvider(oneFakeSeedNode, actorSystemName) - val seqWithOneSeedNode = seedNodesProvider.getSeedNodes() - seqWithOneSeedNode shouldBe IndexedSeq(Address("akka.tcp", actorSystemName, host, port)) - } - it should "return a sequence with more then one seed node" in { - val seedNodesProvider = new StaticSeedNodesProvider(twoFakeSeedNodes, actorSystemName) - val seqWithTwoSeedNodes = seedNodesProvider.getSeedNodes() - seqWithTwoSeedNodes shouldBe IndexedSeq( - Address("akka.tcp", actorSystemName, host, port), - Address("akka.tcp", actorSystemName, host, port + 1)) - } - it should "return an empty sequence if seed nodes are provided in the wrong format" in { - val seedNodesProvider = new StaticSeedNodesProvider(twoFakeSeedNodesWithoutWhitespace, actorSystemName) - val noneResponse = seedNodesProvider.getSeedNodes() - noneResponse shouldBe IndexedSeq.empty[Address] - } - it should "return an empty sequence if no seed nodes specified" in { - val seedNodesProvider = new StaticSeedNodesProvider("", actorSystemName) - val noneResponse = seedNodesProvider.getSeedNodes() - noneResponse shouldBe IndexedSeq.empty[Address] - } - -} -- To stop receiving notification emails like this one, please contact tysonnorris@apache.org.