gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-245] Create topic specific extract of a WorkUnit in KafkaSource
Date Fri, 08 Sep 2017 01:09:34 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 05bf034e3 -> 6b616d472


[GOBBLIN-245] Create topic specific extract of a WorkUnit in KafkaSource

Closes #2095 from zxcware/kafka


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6b616d47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6b616d47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6b616d47

Branch: refs/heads/master
Commit: 6b616d472f61047a47364def3f681e4735207b64
Parents: 05bf034
Author: zhchen <zhchen@linkedin.com>
Authored: Thu Sep 7 18:09:24 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Thu Sep 7 18:09:24 2017 -0700

----------------------------------------------------------------------
 .../extractor/extract/kafka/KafkaSource.java    | 29 ++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b616d47/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index af735d1..606be62 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Joiner;
 import com.google.common.base.Function;
-import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -66,8 +65,6 @@ import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.dataset.DatasetUtils;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
-import org.apache.gobblin.source.workunit.MultiWorkUnit;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -133,7 +130,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S,
D> {
 
   private volatile boolean doneGettingAllPreviousOffsets = false;
   private Extract.TableType tableType;
-  private String extractNameSpace;
+  private String extractNamespace;
   private boolean isFullExtract;
   private boolean shouldEnableDatasetStateStore;
   private AtomicBoolean isDatasetStateEnabled = new AtomicBoolean(false);
@@ -167,14 +164,15 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S,
D> {
       String tableTypeStr =
           state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString());
       tableType = Extract.TableType.valueOf(tableTypeStr);
-      extractNameSpace =
+      extractNamespace =
           state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME);
     } else {
       // To be compatible, reject table type and namespace configuration keys as previous
implementation
       tableType = KafkaSource.DEFAULT_TABLE_TYPE;
-      extractNameSpace = KafkaSource.DEFAULT_NAMESPACE_NAME;
+      extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME;
     }
     isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY);
+
     this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE,
         DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE);
 
@@ -517,8 +515,22 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S,
D> {
 
   private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets offsets,
       Optional<State> topicSpecificState) {
-    Extract extract = this.createExtract(tableType, extractNameSpace, partition.getTopicName());
-    if (isFullExtract) {
+    // Default to job level configurations
+    Extract.TableType currentTableType = tableType;
+    String currentExtractNamespace = extractNamespace;
+    boolean isCurrentFullExtract = isFullExtract;
+    // Update to topic specific configurations if any
+    if (topicSpecificState.isPresent()) {
+      State topicState = topicSpecificState.get();
+      if (topicState.contains(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY)) {
+        currentTableType = Extract.TableType.valueOf(topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY));
+      }
+      currentExtractNamespace = topicState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY,
extractNamespace);
+      isCurrentFullExtract = topicState.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY,
isFullExtract);
+    }
+
+    Extract extract = this.createExtract(currentTableType, currentExtractNamespace, partition.getTopicName());
+    if (isCurrentFullExtract) {
       extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true);
     }
 
@@ -536,6 +548,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S,
D> {
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
     LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d,
range=%d", partition,
         offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() -
offsets.getStartOffset()));
+
     return workUnit;
   }
 


Mime
View raw message