gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-254] Add config key to update watermark when a partition is empty
Date Fri, 15 Sep 2017 00:00:23 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master fdb233d61 -> b89706257


[GOBBLIN-254] Add config key to update watermark when a partition is empty

Closes #2105 from jack-moseley/salesforce_retry


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b8970625
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b8970625
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b8970625

Branch: refs/heads/master
Commit: b8970625772ac64e3e3394ac79dd869f3c720adb
Parents: fdb233d
Author: Jack Moseley <jmoseley@linkedin.com>
Authored: Thu Sep 14 17:00:17 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Thu Sep 14 17:00:17 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 4 ++++
 .../gobblin/source/extractor/extract/QueryBasedSource.java       | 4 +++-
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b8970625/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 224064d..e54a54b 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -483,6 +483,10 @@ public class ConfigurationKeys {
       "source.querybased.promoteUnsignedIntToBigInt";
   public static final boolean DEFAULT_SOURCE_QUERYBASED_PROMOTE_UNSIGNED_INT_TO_BIGINT =
false;
 
+  public static final String SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK =
+      "source.querybased.resetEmptyPartitionWatermark";
+  public static final boolean DEFAULT_SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK =
true;
+
   public static final String ENABLE_DELIMITED_IDENTIFIER = "enable.delimited.identifier";
   public static final boolean DEFAULT_ENABLE_DELIMITED_IDENTIFIER = false;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b8970625/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
index d94dede..c77051d 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
@@ -437,7 +437,9 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S,
D> {
       if (tablesWithFailedTasks.contains(entry.getKey())) {
         log.info("Resetting low watermark to {} because previous run failed.", entry.getValue());
         result.put(entry.getKey(), entry.getValue());
-      } else if (tablesWithNoUpdatesOnPreviousRun.contains(entry.getKey())) {
+      } else if (tablesWithNoUpdatesOnPreviousRun.contains(entry.getKey())
+          && state.getPropAsBoolean(ConfigurationKeys.SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK,
+          ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK)) {
         log.info("Resetting low watermakr to {} because previous run processed no data.",
entry.getValue());
         result.put(entry.getKey(), entry.getValue());
       } else {


Mime
View raw message