From commits-return-4413-archive-asf-public=cust-asf.ponee.io@openwhisk.apache.org Thu Apr 26 14:38:31 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8783F180648 for ; Thu, 26 Apr 2018 14:38:30 +0200 (CEST) Received: (qmail 61634 invoked by uid 500); 26 Apr 2018 12:38:29 -0000 Mailing-List: contact commits-help@openwhisk.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@openwhisk.apache.org Delivered-To: mailing list commits@openwhisk.apache.org Received: (qmail 61625 invoked by uid 99); 26 Apr 2018 12:38:29 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Apr 2018 12:38:29 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id E2740852D7; Thu, 26 Apr 2018 12:38:28 +0000 (UTC) Date: Thu, 26 Apr 2018 12:38:28 +0000 To: "commits@openwhisk.apache.org" Subject: [incubator-openwhisk] branch master updated: Refactor Kafka clients for more generalized configuration reading. (#3325) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152474630854.13447.3620694983274302713@gitbox.apache.org> From: vvraskin@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-openwhisk X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: d1be01bcd1177aaf0c51b6dd71a8071e86b9d382 X-Git-Newrev: 0d1e60028ee1adbd0c240c1fe87ea3b5e27435ba X-Git-Rev: 0d1e60028ee1adbd0c240c1fe87ea3b5e27435ba X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. vvraskin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git The following commit(s) were added to refs/heads/master by this push: new 0d1e600 Refactor Kafka clients for more generalized configuration reading. (#3325) 0d1e600 is described below commit 0d1e60028ee1adbd0c240c1fe87ea3b5e27435ba Author: Markus Thömmes AuthorDate: Thu Apr 26 14:38:25 2018 +0200 Refactor Kafka clients for more generalized configuration reading. (#3325) Kafka's clients rely on Properties for passing the configuration in. That's cumbersome to use in Scala so it's pushed to a central method now. Also using the implicit JavaConversions is discouraged (as it can be quite surprising). Exchanged that with the explicit JavaConverters. --- .../connector/kafka/KafkaConsumerConnector.scala | 46 +++++++----------- .../connector/kafka/KafkaMessagingProvider.scala | 54 +++++++++++----------- .../connector/kafka/KafkaProducerConnector.scala | 43 +++++------------ 3 files changed, 55 insertions(+), 88 deletions(-) diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala index 20d5635..16cbe35 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala @@ -17,8 +17,6 @@ package whisk.connector.kafka -import java.util.Properties - import akka.actor.ActorSystem import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.TopicPartition @@ -26,10 +24,10 @@ import org.apache.kafka.common.errors.{RetriableException, WakeupException} import org.apache.kafka.common.serialization.ByteArrayDeserializer import pureconfig.loadConfigOrThrow import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler} +import whisk.connector.kafka.KafkaConfiguration._ import whisk.core.ConfigKeys import whisk.core.connector.MessageConsumer -import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList} import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.concurrent.{blocking, ExecutionContext, Future} @@ -67,7 +65,7 @@ class KafkaConsumerConnector( val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup()) try { - val response = consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value)) + val response = consumer.poll(duration.toMillis).asScala.map(r => (r.topic, r.partition, r.offset, r.value)) response.lastOption.foreach { case (_, _, newOffset, _) => offset = newOffset + 1 } @@ -112,40 +110,28 @@ class KafkaConsumerConnector( logging.info(this, s"closing '$topic' consumer") } - private def getProps: Properties = { - val props = new Properties - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid) - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahost) - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPeek.toString) - - val config = - KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++ - KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer)) - config.foreach { - case (key, value) => props.put(key, value) - } - props - } - /** Creates a new kafka consumer and subscribes to topic list if given. */ - private def getConsumer(props: Properties, topics: Option[List[String]] = None) = { - val keyDeserializer = new ByteArrayDeserializer - val valueDeserializer = new ByteArrayDeserializer - val consumer = new KafkaConsumer(props, keyDeserializer, valueDeserializer) - topics.foreach(consumer.subscribe(_)) + private def createConsumer(topic: String) = { + val config = Map( + ConsumerConfig.GROUP_ID_CONFIG -> groupid, + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahost, + ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPeek.toString) ++ + configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++ + configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer)) + + val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) + consumer.subscribe(Seq(topic).asJavaCollection) consumer } private def recreateConsumer(): Unit = { val oldConsumer = consumer - Future { - oldConsumer.close() - logging.info(this, s"old consumer closed") - } - consumer = getConsumer(getProps, Some(List(topic))) + oldConsumer.close() + logging.info(this, s"old consumer closed") + consumer = createConsumer(topic) } - @volatile private var consumer = getConsumer(getProps, Some(List(topic))) + @volatile private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = createConsumer(topic) // Read current lag of the consumed topic, e.g. invoker queue // Since we use only one partition in kafka, it is defined 0 diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala index 6b0fc14..e939a46 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala @@ -20,22 +20,16 @@ package whisk.connector.kafka import java.util.Properties import java.util.concurrent.ExecutionException -import scala.concurrent.duration.FiniteDuration import akka.actor.ActorSystem - -import scala.concurrent.duration._ -import scala.collection.JavaConverters._ -import org.apache.kafka.clients.admin.AdminClientConfig -import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic} import org.apache.kafka.common.errors.TopicExistsException -import whisk.common.Logging -import whisk.core.ConfigKeys -import whisk.core.WhiskConfig -import whisk.core.connector.MessageConsumer -import whisk.core.connector.MessageProducer -import whisk.core.connector.MessagingProvider import pureconfig._ +import whisk.common.Logging +import whisk.core.{ConfigKeys, WhiskConfig} +import whisk.core.connector.{MessageConsumer, MessageProducer, MessagingProvider} + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.FiniteDuration case class KafkaConfig(replicationFactor: Short) @@ -43,6 +37,7 @@ case class KafkaConfig(replicationFactor: Short) * A Kafka based implementation of MessagingProvider */ object KafkaMessagingProvider extends MessagingProvider { + import KafkaConfiguration._ def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)( implicit logging: Logging, @@ -56,18 +51,10 @@ object KafkaMessagingProvider extends MessagingProvider { val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka) val tc = KafkaConfiguration.configMapToKafkaConfig( loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig")) - val props = new Properties - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts) - - val commonConfig = - KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) - commonConfig.foreach { - case (key, value) => { - props.put(key, value) - } - } - val client = AdminClient.create(props) + val baseConfig = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts) + val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) + val client = AdminClient.create(baseConfig ++ commonConfig) val numPartitions = 1 val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava) val results = client.createTopics(List(nt).asJava) @@ -89,9 +76,24 @@ object KafkaMessagingProvider extends MessagingProvider { } object KafkaConfiguration { - def configToKafkaKey(configKey: String) = configKey.replace("-", ".") + import scala.language.implicitConversions + + implicit def mapToProperties(map: Map[String, String]): Properties = { + val props = new Properties() + map.foreach { case (key, value) => props.setProperty(key, value) } + props + } + + /** + * Converts TypesafeConfig keys to a KafkaConfig key. + * + * TypesafeConfig's keys are usually kebab-cased (dash-delimited), whereas KafkaConfig keys are dot.delimited. This + * converts an example-key-to-illustrate to example.key.to.illustrate. + */ + def configToKafkaKey(configKey: String): String = configKey.replace("-", ".") - def configMapToKafkaConfig(configMap: Map[String, String]) = configMap.map { + /** Converts a Map read from TypesafeConfig to a Map to be read by Kafka clients. */ + def configMapToKafkaConfig(configMap: Map[String, String]): Map[String, String] = configMap.map { case (key, value) => configToKafkaKey(key) -> value } } diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala index 125dbef..f82acaf 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala @@ -17,20 +17,14 @@ package whisk.connector.kafka -import java.util.Properties - import akka.actor.ActorSystem import akka.pattern.after import org.apache.kafka.clients.producer._ -import org.apache.kafka.common.errors.{ - NotEnoughReplicasAfterAppendException, - RecordTooLargeException, - RetriableException, - TimeoutException -} +import org.apache.kafka.common.errors._ import org.apache.kafka.common.serialization.StringSerializer import pureconfig._ import whisk.common.{Counter, Logging, TransactionId} +import whisk.connector.kafka.KafkaConfiguration._ import whisk.core.ConfigKeys import whisk.core.connector.{Message, MessageProducer} import whisk.core.entity.UUIDs @@ -97,35 +91,20 @@ class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID() private val sentCounter = new Counter() - private def getProps: Properties = { - val props = new Properties - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahosts) - - // Load additional config from the config files and add them here. - val config = - KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++ - KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer)) - - config.foreach { - case (key, value) => props.put(key, value) - } - props - } + private def createProducer(): KafkaProducer[String, String] = { + val config = Map(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahosts) ++ + configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++ + configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer)) - private def getProducer(props: Properties): KafkaProducer[String, String] = { - val keySerializer = new StringSerializer - val valueSerializer = new StringSerializer - new KafkaProducer(props, keySerializer, valueSerializer) + new KafkaProducer(config, new StringSerializer, new StringSerializer) } private def recreateProducer(): Unit = { val oldProducer = producer - Future { - oldProducer.close() - logging.info(this, s"old consumer closed") - } - producer = getProducer(getProps) + oldProducer.close() + logging.info(this, s"old producer closed") + producer = createProducer() } - @volatile private var producer = getProducer(getProps) + @volatile private var producer = createProducer() } -- To stop receiving notification emails like this one, please contact vvraskin@apache.org.