openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markusthoem...@apache.org
Subject [incubator-openwhisk] branch master updated: SPI approach for pluggable implementations. (#2414)
Date Fri, 04 Aug 2017 06:54:30 GMT
This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 50fb60e  SPI approach for pluggable implementations. (#2414)
50fb60e is described below

commit 50fb60e221b47dd82026aa10a889a00047b75b0c
Author: tysonnorris <tysonnorris@gmail.com>
AuthorDate: Thu Aug 3 23:54:28 2017 -0700

    SPI approach for pluggable implementations. (#2414)
    
    Adds the ability to add pluggable implementations for defined Service Provider Interfaces
(SPI). The implementation to load is chosen via configuration.
    
    First set of plug-points are:
    - ArtifactStoreProvider
    - MessagingProvider
---
 common/scala/src/main/resources/reference.conf     |   4 +
 .../connector/kafka/KafkaMessagingProvider.scala   |  43 ++++++
 .../whisk/core/connector/MessagingProvider.scala   |  33 +++++
 .../core/database/ArtifactStoreProvider.scala      |  35 +++++
 .../whisk/core/database/CouchDbStoreProvider.scala |  45 ++++++
 .../main/scala/whisk/core/entity/WhiskStore.scala  |  33 ++---
 .../scala/src/main/scala/whisk/spi/SpiLoader.scala | 100 +++++++++++++
 .../scala/whisk/core/controller/Controller.scala   |   4 +-
 .../core/loadBalancer/LoadBalancerService.scala    |  20 ++-
 .../main/scala/whisk/core/invoker/Invoker.scala    |  29 ++--
 docs/spi.md                                        |  77 ++++++++++
 tests/src/test/resources/application.conf          |   8 ++
 .../core/controller/test/ActivationsApiTests.scala |   7 +-
 tests/src/test/scala/whisk/spi/SpiTests.scala      | 159 +++++++++++++++++++++
 14 files changed, 542 insertions(+), 55 deletions(-)

diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
new file mode 100644
index 0000000..52f30c3
--- /dev/null
+++ b/common/scala/src/main/resources/reference.conf
@@ -0,0 +1,4 @@
+whisk.spi{
+  ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
+  MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
+}
\ No newline at end of file
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
new file mode 100644
index 0000000..2d6bfe6
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.connector.kafka
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+import whisk.common.Logging
+import whisk.core.WhiskConfig
+import whisk.core.connector.MessageConsumer
+import whisk.core.connector.MessageProducer
+import whisk.core.connector.MessagingProvider
+import whisk.spi.Dependencies
+import whisk.spi.SingletonSpiFactory
+
+/**
+ * A Kafka based implementation of MessagingProvider
+ */
+class KafkaMessagingProvider() extends MessagingProvider {
+    def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval:
FiniteDuration)(implicit logging: Logging): MessageConsumer =
+        new KafkaConsumerConnector(config.kafkaHost, groupId, topic, maxPeek, maxPollInterval
= maxPollInterval)
+
+    def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging):
MessageProducer =
+        new KafkaProducerConnector(config.kafkaHost, ec)
+}
+
+object KafkaMessagingProvider extends SingletonSpiFactory[MessagingProvider] {
+    override def apply(dependencies: Dependencies): MessagingProvider = new KafkaMessagingProvider
+}
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
new file mode 100644
index 0000000..b88e8d9
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.connector
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.FiniteDuration
+import whisk.common.Logging
+import whisk.core.WhiskConfig
+import whisk.spi.Spi
+
+/**
+ * An Spi for providing Messaging implementations.
+ */
+trait MessagingProvider extends Spi {
+    def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int = Int.MaxValue,
maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging): MessageConsumer
+    def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging):
MessageProducer
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
new file mode 100644
index 0000000..5234fc8
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import akka.actor.ActorSystem
+import spray.json.RootJsonFormat
+import whisk.common.Logging
+import whisk.core.WhiskConfig
+import whisk.spi.Spi
+
+/**
+ * An Spi for providing ArtifactStore implementations
+ */
+
+trait ArtifactStoreProvider extends Spi {
+    def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig =>
String)(
+        implicit jsonFormat: RootJsonFormat[D],
+        actorSystem: ActorSystem,
+        logging: Logging): ArtifactStore[D]
+}
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
new file mode 100644
index 0000000..8b26f93
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import akka.actor.ActorSystem
+import spray.json.RootJsonFormat
+import whisk.common.Logging
+import whisk.core.WhiskConfig
+import whisk.spi.Dependencies
+import whisk.spi.SpiFactory
+
+/**
+ * A CouchDB implementation of ArtifactStoreProvider
+ */
+class CouchDbStoreProvider extends ArtifactStoreProvider {
+    def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig =>
String)(
+        implicit jsonFormat: RootJsonFormat[D],
+        actorSystem: ActorSystem,
+        logging: Logging): ArtifactStore[D] = {
+        require(config != null && config.isValid, "config is undefined or not valid")
+        require(config.dbProvider == "Cloudant" || config.dbProvider == "CouchDB", "Unsupported
db.provider: " + config.dbProvider)
+        assume(Set(config.dbProtocol, config.dbHost, config.dbPort, config.dbUsername, config.dbPassword,
name(config)).forall(_.nonEmpty), "At least one expected property is missing")
+
+        new CouchDbRestStore[D](config.dbProtocol, config.dbHost, config.dbPort.toInt, config.dbUsername,
config.dbPassword, name(config))
+    }
+}
+
+object CouchDbStoreProvider extends SpiFactory[ArtifactStoreProvider] {
+    override def apply(deps: Dependencies): ArtifactStoreProvider = new CouchDbStoreProvider
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
index aa48419..c25ff2e 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
@@ -18,11 +18,6 @@
 package whisk.core.entity
 
 import java.time.Instant
-
-import scala.concurrent.Future
-import scala.language.postfixOps
-import scala.util.Try
-
 import akka.actor.ActorSystem
 import spray.json.JsObject
 import spray.json.JsString
@@ -40,16 +35,19 @@ import whisk.core.WhiskConfig.dbProvider
 import whisk.core.WhiskConfig.dbUsername
 import whisk.core.WhiskConfig.dbWhisk
 import whisk.core.database.ArtifactStore
-import whisk.core.database.CouchDbRestStore
+import whisk.core.database.ArtifactStoreProvider
 import whisk.core.database.DocumentRevisionProvider
 import whisk.core.database.DocumentSerializer
+import scala.concurrent.Future
+import scala.language.postfixOps
+import scala.util.Try
+import whisk.spi.SpiLoader
 
 package object types {
     type AuthStore = ArtifactStore[WhiskAuth]
     type EntityStore = ArtifactStore[WhiskEntity]
     type ActivationStore = ArtifactStore[WhiskActivation]
 }
-
 protected[core] trait WhiskDocument
     extends DocumentSerializer
     with DocumentRevisionProvider {
@@ -86,19 +84,6 @@ protected[core] trait WhiskDocument
     }
 }
 
-protected[core] object Util {
-    def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig =>
String)(
-        implicit jsonFormat: RootJsonFormat[D],
-        actorSystem: ActorSystem,
-        logging: Logging): ArtifactStore[D] = {
-        require(config != null && config.isValid, "config is undefined or not valid")
-        require(config.dbProvider == "Cloudant" || config.dbProvider == "CouchDB", "Unsupported
db.provider: " + config.dbProvider)
-        assume(Set(config.dbProtocol, config.dbHost, config.dbPort, config.dbUsername, config.dbPassword,
name(config)).forall(_.nonEmpty), "At least one expected property is missing")
-
-        new CouchDbRestStore[D](config.dbProtocol, config.dbHost, config.dbPort.toInt, config.dbUsername,
config.dbPassword, name(config))
-    }
-}
-
 object WhiskAuthStore {
     def requiredProperties =
         Map(dbProvider -> null,
@@ -110,7 +95,7 @@ object WhiskAuthStore {
             dbAuths -> null)
 
     def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
-        Util.makeStore[WhiskAuth](config, _.dbAuths)
+        SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskAuth](config, _.dbAuths)
 }
 
 object WhiskEntityStore {
@@ -124,7 +109,8 @@ object WhiskEntityStore {
             dbWhisk -> null)
 
     def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
-        Util.makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging)
+        SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat,
system, logging)
+
 }
 
 object WhiskActivationStore {
@@ -138,9 +124,10 @@ object WhiskActivationStore {
             dbActivations -> null)
 
     def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) =
-        Util.makeStore[WhiskActivation](config, _.dbActivations)
+        SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskActivation](config, _.dbActivations)
 }
 
+
 /**
  * This object provides some utilities that query the whisk datastore.
  * The datastore is assumed to have views (pre-computed joins or indexes)
diff --git a/common/scala/src/main/scala/whisk/spi/SpiLoader.scala b/common/scala/src/main/scala/whisk/spi/SpiLoader.scala
new file mode 100644
index 0000000..849a1b6
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/spi/SpiLoader.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.spi
+
+import com.typesafe.config.ConfigFactory
+import java.util.concurrent.atomic.AtomicReference
+
+/** Marker trait to mark an Spi */
+trait Spi
+
+/** Trait to be extended by factory objects creating Spi implementations */
+trait SpiFactory[T <: Spi] {
+    def apply(dependencies: Dependencies): T
+
+    /**
+     * Proxy method only called by the SpiLoader so the apply method
+     * is overridable even with custom logic implemented.
+     *
+     * @param dependencies Dependencies to pass to the Spi
+     */
+    def buildInstance(dependencies: Dependencies): T = apply(dependencies)
+}
+
+/**
+ * SpiFactory which is guaranteed to always return the same reference for
+ * the same type of Spi.
+ */
+trait SingletonSpiFactory[T <: Spi] extends SpiFactory[T] {
+    private val ref = new AtomicReference[T]()
+
+    override def buildInstance(dependencies: Dependencies): T = {
+        val oldValue = ref.get()
+        if (oldValue != null.asInstanceOf[T]) {
+            oldValue
+        } else {
+            val newValue = apply(dependencies)
+            if (ref.compareAndSet(null.asInstanceOf[T], newValue)) {
+                newValue
+            } else {
+                ref.get()
+            }
+        }
+    }
+}
+
+trait SpiClassResolver {
+    /** Resolves the implementation for a given type */
+    def getClassNameForType[T](implicit man: Manifest[T]): String
+}
+
+object SpiLoader {
+    /**
+     * Instantiates an object of the given type.
+     *
+     * The ClassName to load is resolved via the SpiClassResolver in scode, which defaults
to
+     * a TypesafeConfig based resolver.
+     */
+    def get[A <: Spi](deps: Dependencies = Dependencies())(implicit resolver: SpiClassResolver
= TypesafeConfigClassResolver, man: Manifest[A]): A = {
+        val clazz = Class.forName(resolver.getClassNameForType[A] + "$")
+        clazz.getField("MODULE$").get(clazz).asInstanceOf[SpiFactory[A]].buildInstance(deps)
+    }
+}
+
+/** Lookup the classname for the SPI impl based on a key in the provided Config */
+object TypesafeConfigClassResolver extends SpiClassResolver {
+    private val config = ConfigFactory.load()
+
+    override def getClassNameForType[T](implicit man: Manifest[T]): String = config.getString("whisk.spi."
+ man.runtimeClass.getSimpleName)
+}
+
+/**
+ * Object containing arbitrary objects acting as dependencies.
+ *
+ * This is solely a helper type to cross the border between possibly heterogeneous Spi
+ * interfaces and the production code.
+ */
+case class Dependencies(private val deps: Any*) {
+    require(deps.map(_.getClass).distinct.size == deps.size, "A type can only occur once
as a dependency")
+
+    def get[T](implicit man: Manifest[T]): T =
+        deps.find(d => man.runtimeClass.isAssignableFrom(d.getClass)) match {
+            case Some(d: T) => d
+            case _          => throw new IllegalArgumentException(s"missing dependency
of type ${man.runtimeClass.getName}")
+        }
+}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 359e78c..0f5236b 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -26,8 +26,8 @@ import akka.japi.Creator
 import spray.http.StatusCodes._
 import spray.http.Uri
 import spray.httpx.SprayJsonSupport._
-import spray.json._
 import spray.json.DefaultJsonProtocol._
+import spray.json._
 import spray.routing.Directive.pimpApply
 import spray.routing.Route
 import whisk.common.AkkaLogging
@@ -43,8 +43,8 @@ import whisk.core.entity.ExecManifest.Runtimes
 import whisk.core.loadBalancer.LoadBalancerService
 import whisk.http.BasicHttpService
 import whisk.http.BasicRasService
+import scala.util.{Failure, Success}
 
-import scala.util.{ Failure, Success }
 
 /**
  * The Controller is the service that provides the REST API for OpenWhisk.
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 3418951..b50eebc 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -18,7 +18,6 @@
 package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
-
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
@@ -26,22 +25,18 @@ import scala.concurrent.Promise
 import scala.concurrent.duration.DurationInt
 import scala.util.Failure
 import scala.util.Success
-
 import org.apache.kafka.clients.producer.RecordMetadata
-
 import akka.actor.ActorRefFactory
 import akka.actor.ActorSystem
 import akka.actor.Props
 import akka.pattern.ask
 import akka.util.Timeout
-
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
-import whisk.connector.kafka.KafkaConsumerConnector
-import whisk.connector.kafka.KafkaProducerConnector
 import whisk.core.WhiskConfig
 import whisk.core.WhiskConfig._
+import whisk.core.connector.MessagingProvider
 import whisk.core.connector.{ ActivationMessage, CompletionMessage }
 import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessageProducer
@@ -55,6 +50,7 @@ import whisk.core.entity.types.EntityStore
 import scala.annotation.tailrec
 import whisk.core.entity.EntityName
 import whisk.core.entity.Identity
+import whisk.spi.SpiLoader
 
 trait LoadBalancer {
 
@@ -175,7 +171,8 @@ class LoadBalancerService(
     }
 
     /** Gets a producer which can publish messages to the kafka bus. */
-    private val messageProducer = new KafkaProducerConnector(config.kafkaHost, executionContext)
+    private val messasgingProvider = SpiLoader.get[MessagingProvider]()
+    private val messageProducer = messasgingProvider.getProducer(config, executionContext)
 
     private def sendActivationToInvoker(producer: MessageProducer, msg: ActivationMessage,
invoker: InstanceId): Future[RecordMetadata] = {
         implicit val transid = msg.transid
@@ -200,8 +197,7 @@ class LoadBalancerService(
         }
 
         val maxPingsPerPoll = 128
-        // Each controller gets its own Group Id, to receive all messages
-        val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}",
"health", maxPeek = maxPingsPerPoll)
+        val pingConsumer = messasgingProvider.getConsumer(config, s"health${instance.toInt}",
"health", maxPeek = maxPingsPerPoll)
         val invokerFactory = (f: ActorRefFactory, invokerInstance: InstanceId) => f.actorOf(InvokerActor.props(invokerInstance,
instance))
 
         actorSystem.actorOf(InvokerPool.props(
@@ -215,10 +211,10 @@ class LoadBalancerService(
      */
     val maxActiveAcksPerPoll = 128
     val activeAckPollDuration = 1.second
-
-    private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions",
s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll)
+    private val activeAckConsumer = messasgingProvider.getConsumer(config, "completions",
s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll)
     val activationFeed = actorSystem.actorOf(Props {
-        new MessageFeed("activeack", logging, activeAckConsumer, maxActiveAcksPerPoll, activeAckPollDuration,
processActiveAck)
+        new MessageFeed("activeack", logging,
+            activeAckConsumer, maxActiveAcksPerPoll, activeAckPollDuration, processActiveAck)
     })
 
     def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index dc2f09d..e5dc5dc 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -18,36 +18,34 @@
 package whisk.core.invoker
 
 import java.nio.charset.StandardCharsets
-import java.time.{ Clock, Instant }
-
-import scala.concurrent.{ Await, ExecutionContext, Future }
+import java.time.{Clock, Instant}
+import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.Promise
-import scala.concurrent.duration.{ Duration, DurationInt }
+import scala.concurrent.duration.{Duration, DurationInt}
 import scala.language.postfixOps
-import scala.util.{ Failure, Success }
+import scala.util.{Failure, Success}
 import scala.util.Try
-
 import org.apache.kafka.common.errors.RecordTooLargeException
-
-import akka.actor.{ ActorRef, ActorSystem, actorRef2Scala }
+import akka.actor.{ActorRef, ActorSystem, actorRef2Scala}
 import akka.japi.Creator
 import spray.json._
 import spray.json.DefaultJsonProtocol._
-import whisk.common.{ Counter, Logging, LoggingMarkers, TransactionId }
+import whisk.common.{Counter, Logging, LoggingMarkers, TransactionId}
 import whisk.common.AkkaLogging
 import whisk.common.Scheduler
-import whisk.connector.kafka.{ KafkaConsumerConnector, KafkaProducerConnector }
 import whisk.core.WhiskConfig
-import whisk.core.WhiskConfig.{ dockerImagePrefix, dockerRegistry, kafkaHost, logsDir, servicePort,
invokerUseReactivePool }
-import whisk.core.connector.{ ActivationMessage, CompletionMessage }
+import whisk.core.WhiskConfig.{dockerImagePrefix, dockerRegistry, invokerUseReactivePool,
kafkaHost, logsDir, servicePort}
+import whisk.core.connector.{ActivationMessage, CompletionMessage}
 import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessageProducer
+import whisk.core.connector.MessagingProvider
 import whisk.core.connector.PingMessage
 import whisk.core.container._
-import whisk.core.dispatcher.{ Dispatcher, MessageHandler }
+import whisk.core.dispatcher.{Dispatcher, MessageHandler}
 import whisk.core.entity._
 import whisk.http.BasicHttpService
 import whisk.http.Messages
+import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
 
 /**
@@ -477,8 +475,9 @@ object Invoker {
 
         val topic = s"invoker${invokerInstance.toInt}"
         val maxdepth = ContainerPool.getDefaultMaxActive(config)
-        val consumer = new KafkaConsumerConnector(config.kafkaHost, "invokers", topic, maxdepth,
maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
-        val producer = new KafkaProducerConnector(config.kafkaHost, ec)
+        val msgProvider = SpiLoader.get[MessagingProvider]()
+        val consumer = msgProvider.getConsumer(config, "invokers", topic, maxdepth, maxPollInterval
= TimeLimit.MAX_DURATION + 1.minute)
+        val producer = msgProvider.getProducer(config, ec)
         val dispatcher = new Dispatcher(consumer, 500 milliseconds, maxdepth, actorSystem)
 
         val invoker = if (Try(config.invokerUseReactivePool.toBoolean).getOrElse(false))
{
diff --git a/docs/spi.md b/docs/spi.md
new file mode 100644
index 0000000..b6da0de
--- /dev/null
+++ b/docs/spi.md
@@ -0,0 +1,77 @@
+# SPI extensions in OpenWhisk
+
+Alternate implementations of various components follow an SPI (Service Provider Interface)
pattern:
+* The pluggable component is defined as an Spi trait:
+```scala
+import whisk.spi.Spi
+trait ThisIsPluggable extends Spi { ... }
+```
+* Implementations implement the Spi trait
+```scala
+class TheImpl extends ThisIsPluggable { ... }
+class TheOtherImpl extends ThisIsPluggable { ... }
+```
+
+Runtime resolution of an Spi trait to a specific impl is provided by:
+* SpiLoader - a utility for loading the impl of a specific Spi, using a resolver to determine
the impls factory classname, and reflection to load the factory object
+* SpiFactory - a way to define a factory for each impl, all of which are loaded via reflection
+* application.conf - each SpiFactory is resolved to a classname based on the config key provided
to SpiLoader
+
+A single SpiFactory per unique Spi is usable at runtime, since the key will have a single
string value. 
+
+# Example
+
+The process to create and use an SPI is as follows:
+
+## Define the Spi and impl(s)
+
+* create your Spi trait `YourSpi` as an class that is an extension of `whisk.spi.Spi`
+* create you SpiFactory impl `YourSpiFactory` as an object that is an extension of `whisk.spi.SpiFactory`
(or `whisk.spi.SingletonSpiFactory`)
+* create your impls as classes that extend `YourSpi`
+
+## Define the SpiFactory to load the impl
+
+```scala
+class YourImplFactory extends SpiFactory[YourSpi]{
+  def apply(dependencies: Dependencies): { ...construct the impl...}
+}
+```
+for singleton behavior you can use
+```scala
+class YourImplFactory extends SingletonSpiFactory[YourSpi]{
+  def apply(dependencies: Dependencies): { ...construct the impl...}
+}
+```
+
+## Invoke SpiLoader.get to acquire an instance of the SPI
+
+SpiLoader uses a TypesafeConfig key to use for resolving which impl should be loaded. 
+
+The config key used to find the impl classname is `whisk.spi.<SpiInterface>` 
+
+For example, the SPI interface `whisk.core.database.ArtifactStoreProvider` would load a specific
impl indicated by the  `whisk.spi.ArtifactStoreProvider` config key.
+
+(so you cannot use multiple SPI interfaces with the same class name in different packages)
+ 
+
+Invoke the loader using `SpiLoader.get[<the SPI interface>]()(<implicit resolver>)`
+
+```scala
+val messagingProvider = SpiLoader.get[MessagingProvider]()
+```
+
+## Defaults
+
+Default impls resolution is dependent on the config values in order of priority from:
+1. application.conf
+2. reference.conf
+
+So use `reference.conf` to specify defaults.
+
+# Runtime
+
+Since SPI impls are loaded from the classpath, and a specific impl is used only if explicitly
configured it is possible to optimize the classpath based on your preference of:
+* include only default impls, and only use default impls
+* include all impls, and only use the specified impls
+* include some combination of defaults and alternate impls, and use the specified impls for
the alternates, and default impls for the rest
+
diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf
new file mode 100644
index 0000000..3dbdb36
--- /dev/null
+++ b/tests/src/test/resources/application.conf
@@ -0,0 +1,8 @@
+
+whisk.spi {
+  DependentSpi = whisk.spi.DepSpiImpl
+  TestSpi = whisk.spi.TestSpiImpl
+  SimpleSpi = whisk.spi.SimpleSpiImpl
+  MissingSpi = whisk.spi.MissingImpl
+  MissingModule = missing.module
+}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
index a2a9f4e..66209d2 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala
@@ -19,19 +19,19 @@ package whisk.core.controller.test
 
 import java.time.Clock
 import java.time.Instant
-
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
-
 import spray.http.StatusCodes._
 import spray.httpx.SprayJsonSupport._
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.core.controller.WhiskActivationsApi
+import whisk.core.database.ArtifactStoreProvider
 import whisk.core.entity._
 import whisk.core.entity.size._
 import whisk.http.ErrorResponse
 import whisk.http.Messages
+import whisk.spi.SpiLoader
 
 /**
  * Tests Activations API.
@@ -349,7 +349,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi
     }
 
     it should "report proper error when record is corrupted on get" in {
-        val activationStore = Util.makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat,
system, logging)
+
+        val activationStore = SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskEntity](whiskConfig,
_.dbActivations)(WhiskEntityJsonFormat, system, logging)
         implicit val tid = transid()
         val entity = BadEntity(namespace, EntityName(ActivationId().toString))
         put(activationStore, entity)
diff --git a/tests/src/test/scala/whisk/spi/SpiTests.scala b/tests/src/test/scala/whisk/spi/SpiTests.scala
new file mode 100644
index 0000000..dc309b0
--- /dev/null
+++ b/tests/src/test/scala/whisk/spi/SpiTests.scala
@@ -0,0 +1,159 @@
+package whisk.spi
+
+import com.typesafe.config.ConfigException
+import common.StreamLogging
+import common.WskActorSystem
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+import whisk.core.WhiskConfig
+
+@RunWith(classOf[JUnitRunner])
+class SpiTests extends FlatSpec with Matchers with WskActorSystem with StreamLogging {
+
+    behavior of "SpiProvider"
+
+    it should "load an Spi from SpiLoader via typesafe config" in {
+        val simpleSpi = SpiLoader.get[SimpleSpi]()
+        simpleSpi shouldBe a[SimpleSpi]
+    }
+
+    it should "throw an exception if the impl defined in application.conf is missing" in
{
+        a[ClassNotFoundException] should be thrownBy SpiLoader.get[MissingSpi]() // MissingSpi(actorSystem)
+    }
+
+    it should "throw an exception if the module is missing" in {
+        a[ClassNotFoundException] should be thrownBy SpiLoader.get[MissingModule]() // MissingModule(actorSystem)
+    }
+
+    it should "throw an exception if the config key is missing" in {
+        a[ConfigException] should be thrownBy SpiLoader.get[MissingKey]() // MissingModule(actorSystem)
+    }
+
+    it should "load an Spi with injected WhiskConfig" in {
+        val whiskConfig = new WhiskConfig(Map())
+        val deps = Dependencies("some name", whiskConfig)
+        val dependentSpi = SpiLoader.get[DependentSpi](deps)
+        dependentSpi.config shouldBe whiskConfig
+    }
+
+    it should "load an Spi with injected Spi" in {
+        val whiskConfig = new WhiskConfig(Map())
+        val deps = Dependencies("some name", whiskConfig)
+        val dependentSpi = SpiLoader.get[DependentSpi](deps)
+
+        val deps2 = Dependencies("dep2", dependentSpi)
+        val testSpi = SpiLoader.get[TestSpi](deps2)
+
+        testSpi.dep shouldBe dependentSpi
+    }
+
+    it should "not allow duplicate-type dependencies" in {
+        a[IllegalArgumentException] should be thrownBy Dependencies("some string", "some
other string")
+    }
+
+    it should "load SPI impls as singletons via SingletonSpiFactory" in {
+        val instance1 = SpiLoader.get[DependentSpi]()
+        val instance2 = SpiLoader.get[DependentSpi]()
+        val instance3 = SpiLoader.get[DependentSpi]()
+
+        instance1 shouldBe instance2
+        instance2 shouldBe instance3
+    }
+
+    it should "load SPI impls as singletons via lazy val init" in {
+        val instance1 = SpiLoader.get[SimpleSpi]()
+        val instance2 = SpiLoader.get[SimpleSpi]()
+        val instance3 = SpiLoader.get[SimpleSpi]()
+
+        instance1 shouldBe instance2
+        instance2 shouldBe instance3
+    }
+}
+
+trait TestSpi extends Spi {
+    val name: String
+    val dep: DependentSpi
+}
+
+trait DependentSpi extends Spi {
+    val name: String
+    val config: WhiskConfig
+}
+
+trait TestSpiFactory extends Spi {
+    def getTestSpi(name: String, dep: DependentSpi): TestSpi
+}
+
+trait DependentSpiFactory extends Spi {
+    def getDependentSpi(name: String, config: WhiskConfig): DependentSpi
+}
+
+abstract class Key(key: String) {
+
+}
+
+trait SimpleSpi extends Spi {
+    val name: String
+}
+
+trait MissingSpi extends Spi {
+    val name: String
+}
+
+trait MissingModule extends Spi {
+    val name: String
+}
+trait MissingKey extends Spi
+
+//SPI impls
+//a singleton enforced by SingletonSpiFactory
+class DepSpiImpl(val name: String, val config: WhiskConfig) extends DependentSpi
+object DepSpiImpl extends SingletonSpiFactory[DependentSpi] {
+    override def apply(deps: Dependencies): DependentSpi = {
+        new DepSpiImpl(deps.get[String], deps.get[WhiskConfig])
+    }
+}
+
+class TestSpiImpl(val name: String, val dep: DependentSpi) extends TestSpi
+//an alternative to extending SingletonSpiFactory is using lazy val:
+object TestSpiImpl extends SpiFactory[TestSpi] {
+    var name: String = null
+    var conf: DependentSpi = null
+    lazy val instance = new TestSpiImpl(name, conf)
+    override def apply(dependencies: Dependencies): TestSpi = {
+        name = dependencies.get[String]
+        conf = dependencies.get[DependentSpi]
+        instance
+    }
+
+}
+
+class TestSpiFactoryImpl extends TestSpiFactory {
+    def getTestSpi(name: String, dep: DependentSpi) = new TestSpiImpl(name, dep)
+}
+
+object TestSpiFactoryImpl extends SpiFactory[TestSpiFactory] {
+    override def apply(deps: Dependencies): TestSpiFactory = new TestSpiFactoryImpl()
+}
+
+class DependentSpiFactoryImpl extends DependentSpiFactory {
+    override def getDependentSpi(name: String, config: WhiskConfig): DependentSpi = new DepSpiImpl(name,
config)
+}
+
+object DependentSpiFactoryImpl extends SpiFactory[DependentSpiFactory] {
+    override def apply(deps: Dependencies): DependentSpiFactory = new DependentSpiFactoryImpl()
+}
+
+class SimpleSpiImpl(val name: String) extends SimpleSpi
+
+object SimpleSpiImpl extends SingletonSpiFactory[SimpleSpi] {
+    override def apply(dependencies: Dependencies): SimpleSpi = new SimpleSpiImpl("some val
")
+}
+
+class MissingSpiImpl(val name: String) extends MissingSpi
+
+object MissingSpiImpl extends SpiFactory[MissingSpi] {
+    override def apply(deps: Dependencies): MissingSpi = new MissingSpiImpl("some val ")
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Mime
View raw message