openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvras...@apache.org
Subject [incubator-openwhisk] branch master updated: Refactor Kafka clients for more generalized configuration reading. (#3325)
Date Thu, 26 Apr 2018 12:38:28 GMT
This is an automated email from the ASF dual-hosted git repository.

vvraskin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d1e600  Refactor Kafka clients for more generalized configuration reading. (#3325)
0d1e600 is described below

commit 0d1e60028ee1adbd0c240c1fe87ea3b5e27435ba
Author: Markus Thömmes <markusthoemmes@me.com>
AuthorDate: Thu Apr 26 14:38:25 2018 +0200

    Refactor Kafka clients for more generalized configuration reading. (#3325)
    
    Kafka's clients rely on Properties for passing the configuration in. That's cumbersome
to use in Scala so it's pushed to a central method now.
    
    Also using the implicit JavaConversions is discouraged (as it can be quite surprising).
Exchanged that with the explicit JavaConverters.
---
 .../connector/kafka/KafkaConsumerConnector.scala   | 46 +++++++-----------
 .../connector/kafka/KafkaMessagingProvider.scala   | 54 +++++++++++-----------
 .../connector/kafka/KafkaProducerConnector.scala   | 43 +++++------------
 3 files changed, 55 insertions(+), 88 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 20d5635..16cbe35 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -17,8 +17,6 @@
 
 package whisk.connector.kafka
 
-import java.util.Properties
-
 import akka.actor.ActorSystem
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.TopicPartition
@@ -26,10 +24,10 @@ import org.apache.kafka.common.errors.{RetriableException, WakeupException}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import pureconfig.loadConfigOrThrow
 import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler}
+import whisk.connector.kafka.KafkaConfiguration._
 import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
 
-import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList}
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.concurrent.{blocking, ExecutionContext, Future}
@@ -67,7 +65,7 @@ class KafkaConsumerConnector(
     val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds
+ 1.second)(consumer.wakeup())
 
     try {
-      val response = consumer.poll(duration.toMillis).map(r => (r.topic, r.partition,
r.offset, r.value))
+      val response = consumer.poll(duration.toMillis).asScala.map(r => (r.topic, r.partition,
r.offset, r.value))
       response.lastOption.foreach {
         case (_, _, newOffset, _) => offset = newOffset + 1
       }
@@ -112,40 +110,28 @@ class KafkaConsumerConnector(
     logging.info(this, s"closing '$topic' consumer")
   }
 
-  private def getProps: Properties = {
-    val props = new Properties
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid)
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahost)
-    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPeek.toString)
-
-    val config =
-      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
++
-        KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer))
-    config.foreach {
-      case (key, value) => props.put(key, value)
-    }
-    props
-  }
-
   /** Creates a new kafka consumer and subscribes to topic list if given. */
-  private def getConsumer(props: Properties, topics: Option[List[String]] = None) = {
-    val keyDeserializer = new ByteArrayDeserializer
-    val valueDeserializer = new ByteArrayDeserializer
-    val consumer = new KafkaConsumer(props, keyDeserializer, valueDeserializer)
-    topics.foreach(consumer.subscribe(_))
+  private def createConsumer(topic: String) = {
+    val config = Map(
+      ConsumerConfig.GROUP_ID_CONFIG -> groupid,
+      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahost,
+      ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPeek.toString) ++
+      configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
++
+      configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer))
+
+    val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
+    consumer.subscribe(Seq(topic).asJavaCollection)
     consumer
   }
 
   private def recreateConsumer(): Unit = {
     val oldConsumer = consumer
-    Future {
-      oldConsumer.close()
-      logging.info(this, s"old consumer closed")
-    }
-    consumer = getConsumer(getProps, Some(List(topic)))
+    oldConsumer.close()
+    logging.info(this, s"old consumer closed")
+    consumer = createConsumer(topic)
   }
 
-  @volatile private var consumer = getConsumer(getProps, Some(List(topic)))
+  @volatile private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = createConsumer(topic)
 
   // Read current lag of the consumed topic, e.g. invoker queue
   // Since we use only one partition in kafka, it is defined 0
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 6b0fc14..e939a46 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -20,22 +20,16 @@ package whisk.connector.kafka
 import java.util.Properties
 import java.util.concurrent.ExecutionException
 
-import scala.concurrent.duration.FiniteDuration
 import akka.actor.ActorSystem
-
-import scala.concurrent.duration._
-import scala.collection.JavaConverters._
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.clients.admin.AdminClient
-import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
 import org.apache.kafka.common.errors.TopicExistsException
-import whisk.common.Logging
-import whisk.core.ConfigKeys
-import whisk.core.WhiskConfig
-import whisk.core.connector.MessageConsumer
-import whisk.core.connector.MessageProducer
-import whisk.core.connector.MessagingProvider
 import pureconfig._
+import whisk.common.Logging
+import whisk.core.{ConfigKeys, WhiskConfig}
+import whisk.core.connector.{MessageConsumer, MessageProducer, MessagingProvider}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.FiniteDuration
 
 case class KafkaConfig(replicationFactor: Short)
 
@@ -43,6 +37,7 @@ case class KafkaConfig(replicationFactor: Short)
  * A Kafka based implementation of MessagingProvider
  */
 object KafkaMessagingProvider extends MessagingProvider {
+  import KafkaConfiguration._
 
   def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval:
FiniteDuration)(
     implicit logging: Logging,
@@ -56,18 +51,10 @@ object KafkaMessagingProvider extends MessagingProvider {
     val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
     val tc = KafkaConfiguration.configMapToKafkaConfig(
       loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
-    val props = new Properties
-    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
-
-    val commonConfig =
-      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
-    commonConfig.foreach {
-      case (key, value) => {
-        props.put(key, value)
-      }
-    }
 
-    val client = AdminClient.create(props)
+    val baseConfig = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)
+    val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
+    val client = AdminClient.create(baseConfig ++ commonConfig)
     val numPartitions = 1
     val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava)
     val results = client.createTopics(List(nt).asJava)
@@ -89,9 +76,24 @@ object KafkaMessagingProvider extends MessagingProvider {
 }
 
 object KafkaConfiguration {
-  def configToKafkaKey(configKey: String) = configKey.replace("-", ".")
+  import scala.language.implicitConversions
+
+  implicit def mapToProperties(map: Map[String, String]): Properties = {
+    val props = new Properties()
+    map.foreach { case (key, value) => props.setProperty(key, value) }
+    props
+  }
+
+  /**
+   * Converts TypesafeConfig keys to a KafkaConfig key.
+   *
+   * TypesafeConfig's keys are usually kebab-cased (dash-delimited), whereas KafkaConfig
keys are dot.delimited. This
+   * converts an example-key-to-illustrate to example.key.to.illustrate.
+   */
+  def configToKafkaKey(configKey: String): String = configKey.replace("-", ".")
 
-  def configMapToKafkaConfig(configMap: Map[String, String]) = configMap.map {
+  /** Converts a Map read from TypesafeConfig to a Map to be read by Kafka clients. */
+  def configMapToKafkaConfig(configMap: Map[String, String]): Map[String, String] = configMap.map
{
     case (key, value) => configToKafkaKey(key) -> value
   }
 }
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 125dbef..f82acaf 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -17,20 +17,14 @@
 
 package whisk.connector.kafka
 
-import java.util.Properties
-
 import akka.actor.ActorSystem
 import akka.pattern.after
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.errors.{
-  NotEnoughReplicasAfterAppendException,
-  RecordTooLargeException,
-  RetriableException,
-  TimeoutException
-}
+import org.apache.kafka.common.errors._
 import org.apache.kafka.common.serialization.StringSerializer
 import pureconfig._
 import whisk.common.{Counter, Logging, TransactionId}
+import whisk.connector.kafka.KafkaConfiguration._
 import whisk.core.ConfigKeys
 import whisk.core.connector.{Message, MessageProducer}
 import whisk.core.entity.UUIDs
@@ -97,35 +91,20 @@ class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID()
 
   private val sentCounter = new Counter()
 
-  private def getProps: Properties = {
-    val props = new Properties
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahosts)
-
-    // Load additional config from the config files and add them here.
-    val config =
-      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
++
-        KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
-
-    config.foreach {
-      case (key, value) => props.put(key, value)
-    }
-    props
-  }
+  private def createProducer(): KafkaProducer[String, String] = {
+    val config = Map(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahosts) ++
+      configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
++
+      configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
 
-  private def getProducer(props: Properties): KafkaProducer[String, String] = {
-    val keySerializer = new StringSerializer
-    val valueSerializer = new StringSerializer
-    new KafkaProducer(props, keySerializer, valueSerializer)
+    new KafkaProducer(config, new StringSerializer, new StringSerializer)
   }
 
   private def recreateProducer(): Unit = {
     val oldProducer = producer
-    Future {
-      oldProducer.close()
-      logging.info(this, s"old consumer closed")
-    }
-    producer = getProducer(getProps)
+    oldProducer.close()
+    logging.info(this, s"old producer closed")
+    producer = createProducer()
   }
 
-  @volatile private var producer = getProducer(getProps)
+  @volatile private var producer = createProducer()
 }

-- 
To stop receiving notification emails like this one, please contact
vvraskin@apache.org.

Mime
View raw message