Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3F2C0200CDA for ; Fri, 4 Aug 2017 08:54:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3D1FB16D2C9; Fri, 4 Aug 2017 06:54:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 883DF16D2C6 for ; Fri, 4 Aug 2017 08:54:34 +0200 (CEST) Received: (qmail 70388 invoked by uid 500); 4 Aug 2017 06:54:33 -0000 Mailing-List: contact commits-help@openwhisk.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@openwhisk.apache.org Delivered-To: mailing list commits@openwhisk.apache.org Received: (qmail 70378 invoked by uid 99); 4 Aug 2017 06:54:33 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Aug 2017 06:54:33 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id E1351875EB; Fri, 4 Aug 2017 06:54:31 +0000 (UTC) Date: Fri, 04 Aug 2017 06:54:30 +0000 To: "commits@openwhisk.apache.org" Subject: [incubator-openwhisk] branch master updated: SPI approach for pluggable implementations. (#2414) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <150182967092.27422.4418057416028156336@gitbox.apache.org> From: markusthoemmes@apache.org Reply-To: "commits@openwhisk.apache.org" X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-openwhisk X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: ba455729661cb2439c2f9215f9548addfba57eee X-Git-Newrev: 50fb60e221b47dd82026aa10a889a00047b75b0c X-Git-Rev: 50fb60e221b47dd82026aa10a889a00047b75b0c X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated archived-at: Fri, 04 Aug 2017 06:54:36 -0000 This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git The following commit(s) were added to refs/heads/master by this push: new 50fb60e SPI approach for pluggable implementations. (#2414) 50fb60e is described below commit 50fb60e221b47dd82026aa10a889a00047b75b0c Author: tysonnorris AuthorDate: Thu Aug 3 23:54:28 2017 -0700 SPI approach for pluggable implementations. (#2414) Adds the ability to add pluggable implementations for defined Service Provider Interfaces (SPI). The implementation to load is chosen via configuration. First set of plug-points are: - ArtifactStoreProvider - MessagingProvider --- common/scala/src/main/resources/reference.conf | 4 + .../connector/kafka/KafkaMessagingProvider.scala | 43 ++++++ .../whisk/core/connector/MessagingProvider.scala | 33 +++++ .../core/database/ArtifactStoreProvider.scala | 35 +++++ .../whisk/core/database/CouchDbStoreProvider.scala | 45 ++++++ .../main/scala/whisk/core/entity/WhiskStore.scala | 33 ++--- .../scala/src/main/scala/whisk/spi/SpiLoader.scala | 100 +++++++++++++ .../scala/whisk/core/controller/Controller.scala | 4 +- .../core/loadBalancer/LoadBalancerService.scala | 20 ++- .../main/scala/whisk/core/invoker/Invoker.scala | 29 ++-- docs/spi.md | 77 ++++++++++ tests/src/test/resources/application.conf | 8 ++ .../core/controller/test/ActivationsApiTests.scala | 7 +- tests/src/test/scala/whisk/spi/SpiTests.scala | 159 +++++++++++++++++++++ 14 files changed, 542 insertions(+), 55 deletions(-) diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf new file mode 100644 index 0000000..52f30c3 --- /dev/null +++ b/common/scala/src/main/resources/reference.conf @@ -0,0 +1,4 @@ +whisk.spi{ + ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider + MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider +} \ No newline at end of file diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala new file mode 100644 index 0000000..2d6bfe6 --- /dev/null +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.connector.kafka + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration +import whisk.common.Logging +import whisk.core.WhiskConfig +import whisk.core.connector.MessageConsumer +import whisk.core.connector.MessageProducer +import whisk.core.connector.MessagingProvider +import whisk.spi.Dependencies +import whisk.spi.SingletonSpiFactory + +/** + * A Kafka based implementation of MessagingProvider + */ +class KafkaMessagingProvider() extends MessagingProvider { + def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(implicit logging: Logging): MessageConsumer = + new KafkaConsumerConnector(config.kafkaHost, groupId, topic, maxPeek, maxPollInterval = maxPollInterval) + + def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer = + new KafkaProducerConnector(config.kafkaHost, ec) +} + +object KafkaMessagingProvider extends SingletonSpiFactory[MessagingProvider] { + override def apply(dependencies: Dependencies): MessagingProvider = new KafkaMessagingProvider +} diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala new file mode 100644 index 0000000..b88e8d9 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.connector + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.FiniteDuration +import whisk.common.Logging +import whisk.core.WhiskConfig +import whisk.spi.Spi + +/** + * An Spi for providing Messaging implementations. + */ +trait MessagingProvider extends Spi { + def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int = Int.MaxValue, maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging): MessageConsumer + def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer +} diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala new file mode 100644 index 0000000..5234fc8 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.database + +import akka.actor.ActorSystem +import spray.json.RootJsonFormat +import whisk.common.Logging +import whisk.core.WhiskConfig +import whisk.spi.Spi + +/** + * An Spi for providing ArtifactStore implementations + */ + +trait ArtifactStoreProvider extends Spi { + def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)( + implicit jsonFormat: RootJsonFormat[D], + actorSystem: ActorSystem, + logging: Logging): ArtifactStore[D] +} diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala new file mode 100644 index 0000000..8b26f93 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.database + +import akka.actor.ActorSystem +import spray.json.RootJsonFormat +import whisk.common.Logging +import whisk.core.WhiskConfig +import whisk.spi.Dependencies +import whisk.spi.SpiFactory + +/** + * A CouchDB implementation of ArtifactStoreProvider + */ +class CouchDbStoreProvider extends ArtifactStoreProvider { + def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)( + implicit jsonFormat: RootJsonFormat[D], + actorSystem: ActorSystem, + logging: Logging): ArtifactStore[D] = { + require(config != null && config.isValid, "config is undefined or not valid") + require(config.dbProvider == "Cloudant" || config.dbProvider == "CouchDB", "Unsupported db.provider: " + config.dbProvider) + assume(Set(config.dbProtocol, config.dbHost, config.dbPort, config.dbUsername, config.dbPassword, name(config)).forall(_.nonEmpty), "At least one expected property is missing") + + new CouchDbRestStore[D](config.dbProtocol, config.dbHost, config.dbPort.toInt, config.dbUsername, config.dbPassword, name(config)) + } +} + +object CouchDbStoreProvider extends SpiFactory[ArtifactStoreProvider] { + override def apply(deps: Dependencies): ArtifactStoreProvider = new CouchDbStoreProvider +} diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala index aa48419..c25ff2e 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala @@ -18,11 +18,6 @@ package whisk.core.entity import java.time.Instant - -import scala.concurrent.Future -import scala.language.postfixOps -import scala.util.Try - import akka.actor.ActorSystem import spray.json.JsObject import spray.json.JsString @@ -40,16 +35,19 @@ import whisk.core.WhiskConfig.dbProvider import whisk.core.WhiskConfig.dbUsername import whisk.core.WhiskConfig.dbWhisk import whisk.core.database.ArtifactStore -import whisk.core.database.CouchDbRestStore +import whisk.core.database.ArtifactStoreProvider import whisk.core.database.DocumentRevisionProvider import whisk.core.database.DocumentSerializer +import scala.concurrent.Future +import scala.language.postfixOps +import scala.util.Try +import whisk.spi.SpiLoader package object types { type AuthStore = ArtifactStore[WhiskAuth] type EntityStore = ArtifactStore[WhiskEntity] type ActivationStore = ArtifactStore[WhiskActivation] } - protected[core] trait WhiskDocument extends DocumentSerializer with DocumentRevisionProvider { @@ -86,19 +84,6 @@ protected[core] trait WhiskDocument } } -protected[core] object Util { - def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)( - implicit jsonFormat: RootJsonFormat[D], - actorSystem: ActorSystem, - logging: Logging): ArtifactStore[D] = { - require(config != null && config.isValid, "config is undefined or not valid") - require(config.dbProvider == "Cloudant" || config.dbProvider == "CouchDB", "Unsupported db.provider: " + config.dbProvider) - assume(Set(config.dbProtocol, config.dbHost, config.dbPort, config.dbUsername, config.dbPassword, name(config)).forall(_.nonEmpty), "At least one expected property is missing") - - new CouchDbRestStore[D](config.dbProtocol, config.dbHost, config.dbPort.toInt, config.dbUsername, config.dbPassword, name(config)) - } -} - object WhiskAuthStore { def requiredProperties = Map(dbProvider -> null, @@ -110,7 +95,7 @@ object WhiskAuthStore { dbAuths -> null) def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) = - Util.makeStore[WhiskAuth](config, _.dbAuths) + SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskAuth](config, _.dbAuths) } object WhiskEntityStore { @@ -124,7 +109,8 @@ object WhiskEntityStore { dbWhisk -> null) def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) = - Util.makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging) + SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging) + } object WhiskActivationStore { @@ -138,9 +124,10 @@ object WhiskActivationStore { dbActivations -> null) def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) = - Util.makeStore[WhiskActivation](config, _.dbActivations) + SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskActivation](config, _.dbActivations) } + /** * This object provides some utilities that query the whisk datastore. * The datastore is assumed to have views (pre-computed joins or indexes) diff --git a/common/scala/src/main/scala/whisk/spi/SpiLoader.scala b/common/scala/src/main/scala/whisk/spi/SpiLoader.scala new file mode 100644 index 0000000..849a1b6 --- /dev/null +++ b/common/scala/src/main/scala/whisk/spi/SpiLoader.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.spi + +import com.typesafe.config.ConfigFactory +import java.util.concurrent.atomic.AtomicReference + +/** Marker trait to mark an Spi */ +trait Spi + +/** Trait to be extended by factory objects creating Spi implementations */ +trait SpiFactory[T <: Spi] { + def apply(dependencies: Dependencies): T + + /** + * Proxy method only called by the SpiLoader so the apply method + * is overridable even with custom logic implemented. + * + * @param dependencies Dependencies to pass to the Spi + */ + def buildInstance(dependencies: Dependencies): T = apply(dependencies) +} + +/** + * SpiFactory which is guaranteed to always return the same reference for + * the same type of Spi. + */ +trait SingletonSpiFactory[T <: Spi] extends SpiFactory[T] { + private val ref = new AtomicReference[T]() + + override def buildInstance(dependencies: Dependencies): T = { + val oldValue = ref.get() + if (oldValue != null.asInstanceOf[T]) { + oldValue + } else { + val newValue = apply(dependencies) + if (ref.compareAndSet(null.asInstanceOf[T], newValue)) { + newValue + } else { + ref.get() + } + } + } +} + +trait SpiClassResolver { + /** Resolves the implementation for a given type */ + def getClassNameForType[T](implicit man: Manifest[T]): String +} + +object SpiLoader { + /** + * Instantiates an object of the given type. + * + * The ClassName to load is resolved via the SpiClassResolver in scode, which defaults to + * a TypesafeConfig based resolver. + */ + def get[A <: Spi](deps: Dependencies = Dependencies())(implicit resolver: SpiClassResolver = TypesafeConfigClassResolver, man: Manifest[A]): A = { + val clazz = Class.forName(resolver.getClassNameForType[A] + "$") + clazz.getField("MODULE$").get(clazz).asInstanceOf[SpiFactory[A]].buildInstance(deps) + } +} + +/** Lookup the classname for the SPI impl based on a key in the provided Config */ +object TypesafeConfigClassResolver extends SpiClassResolver { + private val config = ConfigFactory.load() + + override def getClassNameForType[T](implicit man: Manifest[T]): String = config.getString("whisk.spi." + man.runtimeClass.getSimpleName) +} + +/** + * Object containing arbitrary objects acting as dependencies. + * + * This is solely a helper type to cross the border between possibly heterogeneous Spi + * interfaces and the production code. + */ +case class Dependencies(private val deps: Any*) { + require(deps.map(_.getClass).distinct.size == deps.size, "A type can only occur once as a dependency") + + def get[T](implicit man: Manifest[T]): T = + deps.find(d => man.runtimeClass.isAssignableFrom(d.getClass)) match { + case Some(d: T) => d + case _ => throw new IllegalArgumentException(s"missing dependency of type ${man.runtimeClass.getName}") + } +} diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala index 359e78c..0f5236b 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala @@ -26,8 +26,8 @@ import akka.japi.Creator import spray.http.StatusCodes._ import spray.http.Uri import spray.httpx.SprayJsonSupport._ -import spray.json._ import spray.json.DefaultJsonProtocol._ +import spray.json._ import spray.routing.Directive.pimpApply import spray.routing.Route import whisk.common.AkkaLogging @@ -43,8 +43,8 @@ import whisk.core.entity.ExecManifest.Runtimes import whisk.core.loadBalancer.LoadBalancerService import whisk.http.BasicHttpService import whisk.http.BasicRasService +import scala.util.{Failure, Success} -import scala.util.{ Failure, Success } /** * The Controller is the service that provides the REST API for OpenWhisk. diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala index 3418951..b50eebc 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala @@ -18,7 +18,6 @@ package whisk.core.loadBalancer import java.nio.charset.StandardCharsets - import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -26,22 +25,18 @@ import scala.concurrent.Promise import scala.concurrent.duration.DurationInt import scala.util.Failure import scala.util.Success - import org.apache.kafka.clients.producer.RecordMetadata - import akka.actor.ActorRefFactory import akka.actor.ActorSystem import akka.actor.Props import akka.pattern.ask import akka.util.Timeout - import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId -import whisk.connector.kafka.KafkaConsumerConnector -import whisk.connector.kafka.KafkaProducerConnector import whisk.core.WhiskConfig import whisk.core.WhiskConfig._ +import whisk.core.connector.MessagingProvider import whisk.core.connector.{ ActivationMessage, CompletionMessage } import whisk.core.connector.MessageFeed import whisk.core.connector.MessageProducer @@ -55,6 +50,7 @@ import whisk.core.entity.types.EntityStore import scala.annotation.tailrec import whisk.core.entity.EntityName import whisk.core.entity.Identity +import whisk.spi.SpiLoader trait LoadBalancer { @@ -175,7 +171,8 @@ class LoadBalancerService( } /** Gets a producer which can publish messages to the kafka bus. */ - private val messageProducer = new KafkaProducerConnector(config.kafkaHost, executionContext) + private val messasgingProvider = SpiLoader.get[MessagingProvider]() + private val messageProducer = messasgingProvider.getProducer(config, executionContext) private def sendActivationToInvoker(producer: MessageProducer, msg: ActivationMessage, invoker: InstanceId): Future[RecordMetadata] = { implicit val transid = msg.transid @@ -200,8 +197,7 @@ class LoadBalancerService( } val maxPingsPerPoll = 128 - // Each controller gets its own Group Id, to receive all messages - val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health", maxPeek = maxPingsPerPoll) + val pingConsumer = messasgingProvider.getConsumer(config, s"health${instance.toInt}", "health", maxPeek = maxPingsPerPoll) val invokerFactory = (f: ActorRefFactory, invokerInstance: InstanceId) => f.actorOf(InvokerActor.props(invokerInstance, instance)) actorSystem.actorOf(InvokerPool.props( @@ -215,10 +211,10 @@ class LoadBalancerService( */ val maxActiveAcksPerPoll = 128 val activeAckPollDuration = 1.second - - private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll) + private val activeAckConsumer = messasgingProvider.getConsumer(config, "completions", s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll) val activationFeed = actorSystem.actorOf(Props { - new MessageFeed("activeack", logging, activeAckConsumer, maxActiveAcksPerPoll, activeAckPollDuration, processActiveAck) + new MessageFeed("activeack", logging, + activeAckConsumer, maxActiveAcksPerPoll, activeAckPollDuration, processActiveAck) }) def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future { diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala index dc2f09d..e5dc5dc 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala @@ -18,36 +18,34 @@ package whisk.core.invoker import java.nio.charset.StandardCharsets -import java.time.{ Clock, Instant } - -import scala.concurrent.{ Await, ExecutionContext, Future } +import java.time.{Clock, Instant} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.Promise -import scala.concurrent.duration.{ Duration, DurationInt } +import scala.concurrent.duration.{Duration, DurationInt} import scala.language.postfixOps -import scala.util.{ Failure, Success } +import scala.util.{Failure, Success} import scala.util.Try - import org.apache.kafka.common.errors.RecordTooLargeException - -import akka.actor.{ ActorRef, ActorSystem, actorRef2Scala } +import akka.actor.{ActorRef, ActorSystem, actorRef2Scala} import akka.japi.Creator import spray.json._ import spray.json.DefaultJsonProtocol._ -import whisk.common.{ Counter, Logging, LoggingMarkers, TransactionId } +import whisk.common.{Counter, Logging, LoggingMarkers, TransactionId} import whisk.common.AkkaLogging import whisk.common.Scheduler -import whisk.connector.kafka.{ KafkaConsumerConnector, KafkaProducerConnector } import whisk.core.WhiskConfig -import whisk.core.WhiskConfig.{ dockerImagePrefix, dockerRegistry, kafkaHost, logsDir, servicePort, invokerUseReactivePool } -import whisk.core.connector.{ ActivationMessage, CompletionMessage } +import whisk.core.WhiskConfig.{dockerImagePrefix, dockerRegistry, invokerUseReactivePool, kafkaHost, logsDir, servicePort} +import whisk.core.connector.{ActivationMessage, CompletionMessage} import whisk.core.connector.MessageFeed import whisk.core.connector.MessageProducer +import whisk.core.connector.MessagingProvider import whisk.core.connector.PingMessage import whisk.core.container._ -import whisk.core.dispatcher.{ Dispatcher, MessageHandler } +import whisk.core.dispatcher.{Dispatcher, MessageHandler} import whisk.core.entity._ import whisk.http.BasicHttpService import whisk.http.Messages +import whisk.spi.SpiLoader import whisk.utils.ExecutionContextFactory /** @@ -477,8 +475,9 @@ object Invoker { val topic = s"invoker${invokerInstance.toInt}" val maxdepth = ContainerPool.getDefaultMaxActive(config) - val consumer = new KafkaConsumerConnector(config.kafkaHost, "invokers", topic, maxdepth, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute) - val producer = new KafkaProducerConnector(config.kafkaHost, ec) + val msgProvider = SpiLoader.get[MessagingProvider]() + val consumer = msgProvider.getConsumer(config, "invokers", topic, maxdepth, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute) + val producer = msgProvider.getProducer(config, ec) val dispatcher = new Dispatcher(consumer, 500 milliseconds, maxdepth, actorSystem) val invoker = if (Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) { diff --git a/docs/spi.md b/docs/spi.md new file mode 100644 index 0000000..b6da0de --- /dev/null +++ b/docs/spi.md @@ -0,0 +1,77 @@ +# SPI extensions in OpenWhisk + +Alternate implementations of various components follow an SPI (Service Provider Interface) pattern: +* The pluggable component is defined as an Spi trait: +```scala +import whisk.spi.Spi +trait ThisIsPluggable extends Spi { ... } +``` +* Implementations implement the Spi trait +```scala +class TheImpl extends ThisIsPluggable { ... } +class TheOtherImpl extends ThisIsPluggable { ... } +``` + +Runtime resolution of an Spi trait to a specific impl is provided by: +* SpiLoader - a utility for loading the impl of a specific Spi, using a resolver to determine the impls factory classname, and reflection to load the factory object +* SpiFactory - a way to define a factory for each impl, all of which are loaded via reflection +* application.conf - each SpiFactory is resolved to a classname based on the config key provided to SpiLoader + +A single SpiFactory per unique Spi is usable at runtime, since the key will have a single string value. + +# Example + +The process to create and use an SPI is as follows: + +## Define the Spi and impl(s) + +* create your Spi trait `YourSpi` as an class that is an extension of `whisk.spi.Spi` +* create you SpiFactory impl `YourSpiFactory` as an object that is an extension of `whisk.spi.SpiFactory` (or `whisk.spi.SingletonSpiFactory`) +* create your impls as classes that extend `YourSpi` + +## Define the SpiFactory to load the impl + +```scala +class YourImplFactory extends SpiFactory[YourSpi]{ + def apply(dependencies: Dependencies): { ...construct the impl...} +} +``` +for singleton behavior you can use +```scala +class YourImplFactory extends SingletonSpiFactory[YourSpi]{ + def apply(dependencies: Dependencies): { ...construct the impl...} +} +``` + +## Invoke SpiLoader.get to acquire an instance of the SPI + +SpiLoader uses a TypesafeConfig key to use for resolving which impl should be loaded. + +The config key used to find the impl classname is `whisk.spi.` + +For example, the SPI interface `whisk.core.database.ArtifactStoreProvider` would load a specific impl indicated by the `whisk.spi.ArtifactStoreProvider` config key. + +(so you cannot use multiple SPI interfaces with the same class name in different packages) + + +Invoke the loader using `SpiLoader.get[]()()` + +```scala +val messagingProvider = SpiLoader.get[MessagingProvider]() +``` + +## Defaults + +Default impls resolution is dependent on the config values in order of priority from: +1. application.conf +2. reference.conf + +So use `reference.conf` to specify defaults. + +# Runtime + +Since SPI impls are loaded from the classpath, and a specific impl is used only if explicitly configured it is possible to optimize the classpath based on your preference of: +* include only default impls, and only use default impls +* include all impls, and only use the specified impls +* include some combination of defaults and alternate impls, and use the specified impls for the alternates, and default impls for the rest + diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf new file mode 100644 index 0000000..3dbdb36 --- /dev/null +++ b/tests/src/test/resources/application.conf @@ -0,0 +1,8 @@ + +whisk.spi { + DependentSpi = whisk.spi.DepSpiImpl + TestSpi = whisk.spi.TestSpiImpl + SimpleSpi = whisk.spi.SimpleSpiImpl + MissingSpi = whisk.spi.MissingImpl + MissingModule = missing.module +} diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala index a2a9f4e..66209d2 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala @@ -19,19 +19,19 @@ package whisk.core.controller.test import java.time.Clock import java.time.Instant - import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner - import spray.http.StatusCodes._ import spray.httpx.SprayJsonSupport._ import spray.json._ import spray.json.DefaultJsonProtocol._ import whisk.core.controller.WhiskActivationsApi +import whisk.core.database.ArtifactStoreProvider import whisk.core.entity._ import whisk.core.entity.size._ import whisk.http.ErrorResponse import whisk.http.Messages +import whisk.spi.SpiLoader /** * Tests Activations API. @@ -349,7 +349,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi } it should "report proper error when record is corrupted on get" in { - val activationStore = Util.makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging) + + val activationStore = SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging) implicit val tid = transid() val entity = BadEntity(namespace, EntityName(ActivationId().toString)) put(activationStore, entity) diff --git a/tests/src/test/scala/whisk/spi/SpiTests.scala b/tests/src/test/scala/whisk/spi/SpiTests.scala new file mode 100644 index 0000000..dc309b0 --- /dev/null +++ b/tests/src/test/scala/whisk/spi/SpiTests.scala @@ -0,0 +1,159 @@ +package whisk.spi + +import com.typesafe.config.ConfigException +import common.StreamLogging +import common.WskActorSystem +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import org.scalatest.junit.JUnitRunner +import whisk.core.WhiskConfig + +@RunWith(classOf[JUnitRunner]) +class SpiTests extends FlatSpec with Matchers with WskActorSystem with StreamLogging { + + behavior of "SpiProvider" + + it should "load an Spi from SpiLoader via typesafe config" in { + val simpleSpi = SpiLoader.get[SimpleSpi]() + simpleSpi shouldBe a[SimpleSpi] + } + + it should "throw an exception if the impl defined in application.conf is missing" in { + a[ClassNotFoundException] should be thrownBy SpiLoader.get[MissingSpi]() // MissingSpi(actorSystem) + } + + it should "throw an exception if the module is missing" in { + a[ClassNotFoundException] should be thrownBy SpiLoader.get[MissingModule]() // MissingModule(actorSystem) + } + + it should "throw an exception if the config key is missing" in { + a[ConfigException] should be thrownBy SpiLoader.get[MissingKey]() // MissingModule(actorSystem) + } + + it should "load an Spi with injected WhiskConfig" in { + val whiskConfig = new WhiskConfig(Map()) + val deps = Dependencies("some name", whiskConfig) + val dependentSpi = SpiLoader.get[DependentSpi](deps) + dependentSpi.config shouldBe whiskConfig + } + + it should "load an Spi with injected Spi" in { + val whiskConfig = new WhiskConfig(Map()) + val deps = Dependencies("some name", whiskConfig) + val dependentSpi = SpiLoader.get[DependentSpi](deps) + + val deps2 = Dependencies("dep2", dependentSpi) + val testSpi = SpiLoader.get[TestSpi](deps2) + + testSpi.dep shouldBe dependentSpi + } + + it should "not allow duplicate-type dependencies" in { + a[IllegalArgumentException] should be thrownBy Dependencies("some string", "some other string") + } + + it should "load SPI impls as singletons via SingletonSpiFactory" in { + val instance1 = SpiLoader.get[DependentSpi]() + val instance2 = SpiLoader.get[DependentSpi]() + val instance3 = SpiLoader.get[DependentSpi]() + + instance1 shouldBe instance2 + instance2 shouldBe instance3 + } + + it should "load SPI impls as singletons via lazy val init" in { + val instance1 = SpiLoader.get[SimpleSpi]() + val instance2 = SpiLoader.get[SimpleSpi]() + val instance3 = SpiLoader.get[SimpleSpi]() + + instance1 shouldBe instance2 + instance2 shouldBe instance3 + } +} + +trait TestSpi extends Spi { + val name: String + val dep: DependentSpi +} + +trait DependentSpi extends Spi { + val name: String + val config: WhiskConfig +} + +trait TestSpiFactory extends Spi { + def getTestSpi(name: String, dep: DependentSpi): TestSpi +} + +trait DependentSpiFactory extends Spi { + def getDependentSpi(name: String, config: WhiskConfig): DependentSpi +} + +abstract class Key(key: String) { + +} + +trait SimpleSpi extends Spi { + val name: String +} + +trait MissingSpi extends Spi { + val name: String +} + +trait MissingModule extends Spi { + val name: String +} +trait MissingKey extends Spi + +//SPI impls +//a singleton enforced by SingletonSpiFactory +class DepSpiImpl(val name: String, val config: WhiskConfig) extends DependentSpi +object DepSpiImpl extends SingletonSpiFactory[DependentSpi] { + override def apply(deps: Dependencies): DependentSpi = { + new DepSpiImpl(deps.get[String], deps.get[WhiskConfig]) + } +} + +class TestSpiImpl(val name: String, val dep: DependentSpi) extends TestSpi +//an alternative to extending SingletonSpiFactory is using lazy val: +object TestSpiImpl extends SpiFactory[TestSpi] { + var name: String = null + var conf: DependentSpi = null + lazy val instance = new TestSpiImpl(name, conf) + override def apply(dependencies: Dependencies): TestSpi = { + name = dependencies.get[String] + conf = dependencies.get[DependentSpi] + instance + } + +} + +class TestSpiFactoryImpl extends TestSpiFactory { + def getTestSpi(name: String, dep: DependentSpi) = new TestSpiImpl(name, dep) +} + +object TestSpiFactoryImpl extends SpiFactory[TestSpiFactory] { + override def apply(deps: Dependencies): TestSpiFactory = new TestSpiFactoryImpl() +} + +class DependentSpiFactoryImpl extends DependentSpiFactory { + override def getDependentSpi(name: String, config: WhiskConfig): DependentSpi = new DepSpiImpl(name, config) +} + +object DependentSpiFactoryImpl extends SpiFactory[DependentSpiFactory] { + override def apply(deps: Dependencies): DependentSpiFactory = new DependentSpiFactoryImpl() +} + +class SimpleSpiImpl(val name: String) extends SimpleSpi + +object SimpleSpiImpl extends SingletonSpiFactory[SimpleSpi] { + override def apply(dependencies: Dependencies): SimpleSpi = new SimpleSpiImpl("some val ") +} + +class MissingSpiImpl(val name: String) extends MissingSpi + +object MissingSpiImpl extends SpiFactory[MissingSpi] { + override def apply(deps: Dependencies): MissingSpi = new MissingSpiImpl("some val ") +} -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" '].