gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-331] Add sharedConfig support for the KafkaDataWriters
Date Mon, 04 Dec 2017 22:28:53 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 4a8f7ba8d -> b7b2bd9d1


[GOBBLIN-331] Add sharedConfig support for the KafkaDataWriters

Closes #2183 from htran1/producer_shared_config


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b7b2bd9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b7b2bd9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b7b2bd9d

Branch: refs/heads/master
Commit: b7b2bd9d1be3e7452d375c30b09125dc020c9457
Parents: 4a8f7ba
Author: Hung Tran <hutran@linkedin.com>
Authored: Mon Dec 4 14:28:46 2017 -0800
Committer: Hung Tran <hutran@linkedin.com>
Committed: Mon Dec 4 14:28:46 2017 -0800

----------------------------------------------------------------------
 .../writer/KafkaWriterConfigurationKeys.java    |  3 +-
 .../gobblin/kafka/writer/KafkaWriterHelper.java | 10 +++-
 .../kafka/writer/KafkaWriterHelperTest.java     | 49 ++++++++++++++++++++
 3 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7b2bd9d/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index 279812e..84255d4 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -38,7 +38,8 @@ public class KafkaWriterConfigurationKeys {
    * Kafka producer configurations will be passed through as is as long as they are prefixed
    * by the PREFIX specified below.
    */
-  public static final String KAFKA_PRODUCER_CONFIG_PREFIX = "writer.kafka.producerConfig.";
+  public static final String KAFKA_PRODUCER_CONFIG_PREFIX_NO_DOT = "writer.kafka.producerConfig";
+  public static final String KAFKA_PRODUCER_CONFIG_PREFIX = KAFKA_PRODUCER_CONFIG_PREFIX_NO_DOT
+ ".";
 
   /** Kafka producer scoped configuration keys go here **/
   static final String KEY_SERIALIZER_CONFIG = "key.serializer";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7b2bd9d/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
index 28da311..1999480 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
@@ -28,6 +28,7 @@ import com.typesafe.config.ConfigFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
 import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.*;
@@ -41,7 +42,14 @@ import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.CLIEN
 public class KafkaWriterHelper {
 
   static Properties getProducerProperties(Properties props) {
-    Properties producerProperties = stripPrefix(props, KAFKA_PRODUCER_CONFIG_PREFIX);
+    Config config = ConfigUtils.propertiesToConfig(props);
+
+    // get the "writer.kafka.producerConfig" config for producer config to pass along to
Kafka with a fallback to the
+    // shared config that start with "gobblin.kafka.sharedConfig"
+    Config producerConfig = ConfigUtils.getConfigOrEmpty(config, KAFKA_PRODUCER_CONFIG_PREFIX_NO_DOT).withFallback(
+        ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+    Properties producerProperties = ConfigUtils.configToProperties(producerConfig);
 
     // Provide default properties if not set from above
     setDefaultIfUnset(producerProperties, KEY_SERIALIZER_CONFIG, DEFAULT_KEY_SERIALIZER);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7b2bd9d/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterHelperTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterHelperTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterHelperTest.java
new file mode 100644
index 0000000..1a134c1
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterHelperTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class KafkaWriterHelperTest {
+
+  @Test
+  public void testSharedConfig() {
+    Properties props = new Properties();
+
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "key1",
"value1");
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "key2",
"value2");
+
+    props.setProperty(ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX + ".key1", "sharedValue1");
+    props.setProperty(ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX + ".key3", "sharedValue3");
+
+    Properties producerProps = KafkaWriterHelper.getProducerProperties(props);
+
+    // specific config overrides shared config
+    Assert.assertEquals(producerProps.getProperty("key1"), "value1");
+    Assert.assertEquals(producerProps.getProperty("key2"), "value2");
+    Assert.assertEquals(producerProps.getProperty("key3"), "sharedValue3");
+  }
+}


Mime
View raw message