gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-312] Pass extra kafka configuration to the KafkaConsumer in KafkaSimpleStreamingSource
Date Fri, 10 Nov 2017 05:07:15 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f957934a1 -> ba44dd304


[GOBBLIN-312] Pass extra kafka configuration to the KafkaConsumer in KafkaSimpleStreamingSource

Closes #2166 from htran1/kafka_consumer_config1


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ba44dd30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ba44dd30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ba44dd30

Branch: refs/heads/master
Commit: ba44dd30423d08f8ed9fbf27e2113055ac239ca9
Parents: f957934
Author: Hung Tran <hutran@linkedin.com>
Authored: Thu Nov 9 21:07:08 2017 -0800
Committer: Hung Tran <hutran@linkedin.com>
Committed: Thu Nov 9 21:07:08 2017 -0800

----------------------------------------------------------------------
 .../apache/gobblin/kafka/client/Kafka09ConsumerClient.java  | 7 ++++++-
 .../extractor/extract/kafka/KafkaSimpleStreamingSource.java | 9 ++++++++-
 .../kafka/client/AbstractBaseKafkaConsumerClient.java       | 5 +++--
 3 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba44dd30/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index 4943ac5..7f83192 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -94,7 +94,12 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
     props.put(KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers));
     props.put(KAFKA_09_CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis);
 
-    Config scopedConfig = config.getConfig(CONFIG_PREFIX_NO_DOT).withFallback(FALLBACK);
+    // grab all the config under "source.kafka" and add the defaults as fallback.
+    Config baseConfig = ConfigUtils.getConfigOrEmpty(config, CONFIG_NAMESPACE).withFallback(FALLBACK);
+    // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka
+    Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG);
+    // The specific config overrides settings in the base config
+    Config scopedConfig = specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG));
     props.putAll(ConfigUtils.configToProperties(scopedConfig));
 
     this.consumer = new KafkaConsumer<>(props);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba44dd30/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java
index 72e0778..60cdf91 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java
@@ -46,7 +46,6 @@ import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 
-
 /**
  * A {@link EventBasedSource} implementation for a simple streaming kafka extractor.
  *
@@ -67,6 +66,8 @@ public class KafkaSimpleStreamingSource<S, D> extends EventBasedSource<S,
Record
   public static final String TOPIC_KEY_DESERIALIZER = "gobblin.streaming.kafka.topic.key.deserializer";
   public static final String TOPIC_VALUE_DESERIALIZER = "gobblin.streaming.kafka.topic.value.deserializer";
 
+  public static final String KAFKA_CONSUMER_CONFIG_PREFIX = "gobblin.streaming.kafka.consumerConfig";
+
   /**
    * Private config keys used to pass data into work unit state
    */
@@ -103,6 +104,12 @@ public class KafkaSimpleStreamingSource<S, D> extends EventBasedSource<S,
Record
     props.put("key.deserializer", config.getString(TOPIC_KEY_DESERIALIZER));
     Preconditions.checkArgument(config.hasPath(TOPIC_VALUE_DESERIALIZER));
     props.put("value.deserializer", config.getString(TOPIC_VALUE_DESERIALIZER));
+
+    // pass along any config scoped under source.kafka.config
+    // one use case of this is to pass SSL configuration
+    Config scopedConfig = ConfigUtils.getConfigOrEmpty(config, KAFKA_CONSUMER_CONFIG_PREFIX);
+    props.putAll(ConfigUtils.configToProperties(scopedConfig));
+
     Consumer consumer = null;
     try {
       consumer = new KafkaConsumer<>(props);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba44dd30/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 638a879..24c737f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -41,8 +41,9 @@ import javax.annotation.Nullable;
  */
 public abstract class AbstractBaseKafkaConsumerClient implements GobblinKafkaConsumerClient
{
 
-  public static final String CONFIG_PREFIX_NO_DOT = "source.kafka";
-  public static final String CONFIG_PREFIX = CONFIG_PREFIX_NO_DOT + ".";
+  public static final String CONFIG_NAMESPACE = "source.kafka";
+  public static final String CONFIG_PREFIX = CONFIG_NAMESPACE + ".";
+  public static final String CONSUMER_CONFIG = "consumerConfig";
   public static final String CONFIG_KAFKA_FETCH_TIMEOUT_VALUE = CONFIG_PREFIX + "fetchTimeoutMillis";
   public static final int CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT = 1000; // 1 second
   public static final String CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES = CONFIG_PREFIX + "fetchMinBytes";


Mime
View raw message