openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cbic...@apache.org
Subject [incubator-openwhisk] branch master updated: Refactor `ensureTopic` to expose failure details. (#3686)
Date Tue, 12 Jun 2018 11:37:27 GMT
This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 74216e1  Refactor `ensureTopic` to expose failure details. (#3686)
74216e1 is described below

commit 74216e131c58e9dac4bbb2f1fcd2a44495b9988d
Author: Markus Thömmes <markusthoemmes@me.com>
AuthorDate: Tue Jun 12 13:37:08 2018 +0200

    Refactor `ensureTopic` to expose failure details. (#3686)
    
    ensureTopic returns a `Boolean` value of whether it successfully created a topic or not.
    
    This changes that behavior to actually return the Exception in case of an error. That
enables the client-side code to handle (or log) that failure appropriately while maintaining
the ease of checking a successful result by using `isSuccess`.
---
 .../CausedBy.scala}                                | 33 +++++++--------
 .../connector/kafka/KafkaMessagingProvider.scala   | 47 ++++++++++------------
 .../whisk/core/connector/MessagingProvider.scala   |  4 +-
 .../scala/whisk/core/controller/Controller.scala   | 20 +++++----
 .../main/scala/whisk/core/invoker/Invoker.scala    |  2 +-
 .../test/scala/services/KafkaConnectorTests.scala  |  8 ++--
 6 files changed, 53 insertions(+), 61 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/common/CausedBy.scala
similarity index 51%
copy from common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
copy to common/scala/src/main/scala/whisk/common/CausedBy.scala
index 8ec1f5a..caa2ba4 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/common/CausedBy.scala
@@ -15,26 +15,21 @@
  * limitations under the License.
  */
 
-package whisk.core.connector
-
-import akka.actor.ActorSystem
-
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
-import whisk.common.Logging
-import whisk.core.WhiskConfig
-import whisk.spi.Spi
+package whisk.common
 
 /**
- * An Spi for providing Messaging implementations.
+ * Helper to match on exceptions caused by other exceptions.
+ *
+ * Use this like:
+ *
+ * ```
+ * try {
+ *   block()
+ * } catch {
+ *   case CausedBy(internalException: MyFancyException) => ...
+ * }
+ * ```
  */
-trait MessagingProvider extends Spi {
-  def getConsumer(
-    config: WhiskConfig,
-    groupId: String,
-    topic: String,
-    maxPeek: Int = Int.MaxValue,
-    maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging, actorSystem:
ActorSystem): MessageConsumer
-  def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem):
MessageProducer
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging:
Logging): Boolean
+object CausedBy {
+  def unapply(e: Throwable): Option[Throwable] = Option(e.getCause)
 }
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index e939a46..7351b64 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -18,18 +18,18 @@
 package whisk.connector.kafka
 
 import java.util.Properties
-import java.util.concurrent.ExecutionException
 
 import akka.actor.ActorSystem
 import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
 import org.apache.kafka.common.errors.TopicExistsException
 import pureconfig._
-import whisk.common.Logging
+import whisk.common.{CausedBy, Logging}
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.connector.{MessageConsumer, MessageProducer, MessagingProvider}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.FiniteDuration
+import scala.util.{Failure, Success, Try}
 
 case class KafkaConfig(replicationFactor: Short)
 
@@ -47,31 +47,28 @@ object KafkaMessagingProvider extends MessagingProvider {
   def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem):
MessageProducer =
     new KafkaProducerConnector(config.kafkaHosts)
 
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging:
Logging): Boolean = {
-    val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
-    val tc = KafkaConfiguration.configMapToKafkaConfig(
-      loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
+  def ensureTopic(config: WhiskConfig, topic: String, topicConfigKey: String)(implicit logging:
Logging): Try[Unit] = {
+    val kafkaConfig = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
+    val topicConfig = KafkaConfiguration.configMapToKafkaConfig(
+      loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + "." + topicConfigKey))
 
-    val baseConfig = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)
     val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
-    val client = AdminClient.create(baseConfig ++ commonConfig)
-    val numPartitions = 1
-    val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava)
-    val results = client.createTopics(List(nt).asJava)
-    try {
-      results.values().get(topic).get()
-      logging.info(this, s"created topic $topic")
-      true
-    } catch {
-      case e: ExecutionException if e.getCause.isInstanceOf[TopicExistsException] =>
-        logging.info(this, s"topic $topic already existed")
-        true
-      case e: Exception =>
-        logging.error(this, s"ensureTopic for $topic failed due to $e")
-        false
-    } finally {
-      client.close()
-    }
+    val client = AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG
-> config.kafkaHosts))
+    val partitions = 1
+    val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava)
+
+    val result = Try(client.createTopics(List(nt).asJava).values().get(topic).get())
+      .map(_ => logging.info(this, s"created topic $topic"))
+      .recoverWith {
+        case CausedBy(_: TopicExistsException) =>
+          Success(logging.info(this, s"topic $topic already existed"))
+        case t =>
+          logging.error(this, s"ensureTopic for $topic failed due to $t")
+          Failure(t)
+      }
+
+    client.close()
+    result
   }
 }
 
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
index 8ec1f5a..6e6b24a 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -25,6 +25,8 @@ import whisk.common.Logging
 import whisk.core.WhiskConfig
 import whisk.spi.Spi
 
+import scala.util.Try
+
 /**
  * An Spi for providing Messaging implementations.
  */
@@ -36,5 +38,5 @@ trait MessagingProvider extends Spi {
     maxPeek: Int = Int.MaxValue,
     maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging, actorSystem:
ActorSystem): MessageConsumer
   def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem):
MessageProducer
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging:
Logging): Boolean
+  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging:
Logging): Try[Unit]
 }
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index c1c7390..8096711 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -216,18 +216,16 @@ object Controller {
     }
 
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (!msgProvider.ensureTopic(config, topic = "completed" + instance, topicConfig = "completed"))
{
-      abort(s"failure during msgProvider.ensureTopic for topic completed$instance")
-    }
-    if (!msgProvider.ensureTopic(config, topic = "health", topicConfig = "health")) {
-      abort(s"failure during msgProvider.ensureTopic for topic health")
-    }
-    if (!msgProvider.ensureTopic(config, topic = "cacheInvalidation", topicConfig = "cache-invalidation"))
{
-      abort(s"failure during msgProvider.ensureTopic for topic cacheInvalidation")
-    }
 
-    if (!msgProvider.ensureTopic(config, topic = "events", topicConfig = "events")) {
-      abort(s"failure during msgProvider.ensureTopic for topic events")
+    Map(
+      "completed" + instance -> "completed",
+      "health" -> "health",
+      "cacheInvalidation" -> "cache-invalidation",
+      "events" -> "events").foreach {
+      case (topic, topicConfigurationKey) =>
+        if (msgProvider.ensureTopic(config, topic, topicConfigurationKey).isFailure) {
+          abort(s"failure during msgProvider.ensureTopic for topic $topic")
+        }
     }
 
     ExecManifest.initialize(config) match {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 4a69d3c..134aa05 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -168,7 +168,7 @@ object Invoker {
 
     val invokerInstance = InstanceId(assignedInvokerId, invokerName)
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (!msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig
= "invoker")) {
+    if (msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig
= "invoker").isFailure) {
       abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId")
     }
     val producer = msgProvider.getProducer(config)
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 3f91b59..57e2e56 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -18,6 +18,7 @@
 package services
 
 import java.io.File
+import java.nio.charset.StandardCharsets
 import java.util.Calendar
 
 import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
@@ -61,11 +62,9 @@ class KafkaConnectorTests
   val kafkaHosts: Array[String] = config.kafkaHosts.split(",")
   val replicationFactor: Int = kafkaHosts.length / 2 + 1
   System.setProperty("whisk.kafka.replication-factor", replicationFactor.toString)
-  println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
-  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic
failed")
 
   println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
-  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic
failed")
+  KafkaMessagingProvider.ensureTopic(config, topic, topic) shouldBe 'success
 
   val producer = new KafkaProducerConnector(config.kafkaHosts)
   val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
@@ -94,7 +93,8 @@ class KafkaConnectorTests
       val sent = Await.result(producer.send(topic, message), waitForSend)
       println(s"Successfully sent message to topic: $sent")
       println(s"Receiving message from topic.")
-      val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg,
"utf-8") }
+      val received =
+        consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, StandardCharsets.UTF_8)
}
       val end = java.lang.System.currentTimeMillis
       val elapsed = end - start
       println(s"Received ${received.size}. Took $elapsed msec: $received")

-- 
To stop receiving notification emails like this one, please contact
cbickel@apache.org.

Mime
View raw message