gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-259] Support writing Kafka messages to db/table file path
Date Wed, 20 Sep 2017 20:50:31 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 707c1e4e5 -> 6160adca2


[GOBBLIN-259] Support writing Kafka messages to db/table file path

Closes #2111 from zxcware/dbtable


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6160adca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6160adca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6160adca

Branch: refs/heads/master
Commit: 6160adca2b4c14eeabfc7b9d8484fd25dd47d6ee
Parents: 707c1e4
Author: zhchen <zhchen@linkedin.com>
Authored: Wed Sep 20 13:50:20 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Wed Sep 20 13:50:20 2017 -0700

----------------------------------------------------------------------
 .../gobblin/source/workunit/WorkUnit.java       | 14 +++++++++
 .../extractor/extract/kafka/KafkaSource.java    |  7 +++--
 .../org/apache/gobblin/util/WriterUtils.java    | 31 +++++++++++++++++---
 .../apache/gobblin/util/WriterUtilsTest.java    | 12 +++++++-
 4 files changed, 57 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6160adca/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
index 5df29a6..38aabcb 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
@@ -304,6 +304,20 @@ public class WorkUnit extends State {
     return getPropAsLong(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY);
   }
 
+  @Override
+  public boolean contains(String key) {
+    return super.contains(key) || this.extract.contains(key);
+  }
+
+  @Override
+  public String getProp(String key) {
+    String value = super.getProp(key);
+    if (value == null) {
+      value = this.extract.getProp(key);
+    }
+    return value;
+  }
+
   /**
    * Set the low watermark of this {@link WorkUnit}.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6160adca/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 606be62..56f81e1 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -518,6 +518,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S,
D> {
     // Default to job level configurations
     Extract.TableType currentTableType = tableType;
     String currentExtractNamespace = extractNamespace;
+    String currentExtractTableName = partition.getTopicName();
     boolean isCurrentFullExtract = isFullExtract;
     // Update to topic specific configurations if any
     if (topicSpecificState.isPresent()) {
@@ -526,10 +527,12 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S,
D> {
         currentTableType = Extract.TableType.valueOf(topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY));
       }
       currentExtractNamespace = topicState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY,
extractNamespace);
+      currentExtractTableName =
+          topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partition.getTopicName());
       isCurrentFullExtract = topicState.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY,
isFullExtract);
     }
 
-    Extract extract = this.createExtract(currentTableType, currentExtractNamespace, partition.getTopicName());
+    Extract extract = this.createExtract(currentTableType, currentExtractNamespace, currentExtractTableName);
     if (isCurrentFullExtract) {
       extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true);
     }
@@ -538,9 +541,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S,
D> {
     if (topicSpecificState.isPresent()) {
       workUnit.addAll(topicSpecificState.get());
     }
+
     workUnit.setProp(TOPIC_NAME, partition.getTopicName());
     addDatasetUrnOptionally(workUnit);
-    workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partition.getTopicName());
     workUnit.setProp(PARTITION_ID, partition.getId());
     workUnit.setProp(LEADER_ID, partition.getLeader().getId());
     workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6160adca/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
index f174979..bb5f495 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
@@ -57,12 +57,19 @@ public class WriterUtils {
 
   public static final Config NO_RETRY_CONFIG = ConfigFactory.empty();
 
-  /**
-   * TABLENAME should be used for jobs that pull from multiple tables/topics and intend to
write the records
-   * in each table/topic to a separate folder. Otherwise, DEFAULT can be used.
-   */
   public enum WriterFilePathType {
+    /**
+     * Write records into namespace/table folder. If namespace has multiple components, each
component will be
+     * a folder in the path. For example: the write file path for namespace 'org.apache.gobblin'
and table 'tableName'
+     * will be 'org/apache/gobblin/tableName'
+     */
+    NAMESPACE_TABLE,
+    /**
+     * TABLENAME should be used for jobs that pull from multiple tables/topics and intend
to write the records
+     * in each table/topic to a separate folder.
+     */
     TABLENAME,
+    /** Write records into the output file decided by {@link org.apache.gobblin.source.workunit.Extract}*/
     DEFAULT
   }
 
@@ -156,6 +163,8 @@ public class WriterUtils {
     }
 
     switch (getWriterFilePathType(state)) {
+      case NAMESPACE_TABLE:
+        return getNamespaceTableWriterFilePath(state);
       case TABLENAME:
         return WriterUtils.getTableNameWriterFilePath(state);
       default:
@@ -170,6 +179,20 @@ public class WriterUtils {
   }
 
   /**
+   * Creates {@link Path} for case {@link WriterFilePathType#NAMESPACE_TABLE} with configurations
+   * {@link ConfigurationKeys#EXTRACT_NAMESPACE_NAME_KEY} and {@link ConfigurationKeys#EXTRACT_TABLE_NAME_KEY}
+   * @param state
+   * @return a path
+   */
+  public static Path getNamespaceTableWriterFilePath(State state) {
+    Preconditions.checkArgument(state.contains(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY));
+    Preconditions.checkArgument(state.contains(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY));
+
+    String namespace = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY).replaceAll("\\.",
Path.SEPARATOR);
+    return new Path( namespace + Path.SEPARATOR + state.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY));
+  }
+
+  /**
    * Creates {@link Path} for the {@link ConfigurationKeys#WRITER_FILE_PATH} key according
to
    * {@link ConfigurationKeys#EXTRACT_TABLE_NAME_KEY}.
    * @param state

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6160adca/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java
index 820b5a3..e14972f 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java
@@ -93,13 +93,23 @@ public class WriterUtilsTest {
 
   @Test
   public void testGetWriterFilePath() {
-    WorkUnit state = WorkUnit.createEmpty();
+    Extract extract = new Extract(TableType.SNAPSHOT_ONLY, "org.apache.gobblin.dbNamespace",
"tableName");
+    WorkUnit state = WorkUnit.create(extract);
 
     state.setProp(ConfigurationKeys.WRITER_FILE_PATH, TEST_WRITER_FILE_PATH);
     Assert.assertEquals(WriterUtils.getWriterFilePath(state, 0, 0), TEST_WRITER_FILE_PATH);
 
     state.setProp(ConfigurationKeys.WRITER_FILE_PATH + ".0", TEST_WRITER_FILE_PATH);
     Assert.assertEquals(WriterUtils.getWriterFilePath(state, 1, 1), TEST_WRITER_FILE_PATH);
+
+    state.removeProp(ConfigurationKeys.WRITER_FILE_PATH);
+
+    state.setProp(ConfigurationKeys.WRITER_FILE_PATH_TYPE, "tablename");
+    Assert.assertEquals(WriterUtils.getWriterFilePath(state, 0, 0), new Path("tableName"));
+
+    state.setProp(ConfigurationKeys.WRITER_FILE_PATH_TYPE, "namespace_table");
+    Assert.assertEquals(WriterUtils.getWriterFilePath(state, 0, 0),
+        new Path("org/apache/gobblin/dbNamespace/tableName"));
   }
 
   @Test


Mime
View raw message