gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-33] Allow state store locations to be independently configured for intermediate and final state storage
Date Tue, 15 Aug 2017 19:46:45 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master eca08356b -> 7e63c7b91


[GOBBLIN-33] Allow state store locations to be independently configured for intermediate and
final state storage

Allow state store locations to be independently
configured for intermediate and final state
storage.

Allow port to be non-standard when setting up
`StateStores`

Better way to strip path from appWorkDir

Closes #2035 from kadaan/Fix_for_GOBBLIN-33


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/7e63c7b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/7e63c7b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/7e63c7b9

Branch: refs/heads/master
Commit: 7e63c7b914bdbd511a502f40c79b051acf8e514e
Parents: eca0835
Author: Joel Baranick <joel.baranick@ensighten.com>
Authored: Tue Aug 15 12:46:40 2017 -0700
Committer: Abhishek Tiwari <abhishektiwari.btech@gmail.com>
Committed: Tue Aug 15 12:46:40 2017 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  4 +++
 .../cluster/GobblinHelixJobLauncher.java        |  8 ++++-
 .../gobblin/cluster/GobblinTaskRunner.java      |  7 +++-
 .../org/apache/gobblin/runtime/JobContext.java  |  4 ++-
 .../StateStoreBasedWatermarkStorage.java        |  1 -
 .../runtime/commit/DatasetStateCommitStep.java  |  4 +--
 .../gobblin/runtime/util/StateStores.java       | 34 +++++++++++++-------
 .../java/org/apache/gobblin/util/PathUtils.java | 12 +++++++
 8 files changed, 56 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7e63c7b9/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index dd386ef..acf3b52 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -48,6 +48,10 @@ public class ConfigurationKeys {
    */
   // State store type.  References an alias or factory class name
   public static final String STATE_STORE_TYPE_KEY = "state.store.type";
+  public static final String DATASET_STATE_STORE_PREFIX = "dataset";
+  public static final String DATASET_STATE_STORE_TYPE_KEY = DATASET_STATE_STORE_PREFIX +
".state.store.type";
+  public static final String INTERMEDIATE_STATE_STORE_PREFIX = "intermediate";
+  public static final String INTERMEDIATE_STATE_STORE_TYPE_KEY = INTERMEDIATE_STATE_STORE_PREFIX
+ ".state.store.type";
   public static final String DEFAULT_STATE_STORE_TYPE = "fs";
   public static final String STATE_STORE_TYPE_NOOP = "noop";
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7e63c7b9/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 7d32827..80c0667 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -144,7 +145,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
 
     jobConfig = ConfigUtils.propertiesToConfig(jobProps);
 
-    this.stateStores = new StateStores(jobConfig, appWorkDir,
+    Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
+        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef(
+            new URI(appWorkDir.toUri().getScheme(), null, appWorkDir.toUri().getHost(),
+                appWorkDir.toUri().getPort(), null, null, null).toString()));
+
+    this.stateStores = new StateStores(stateStoreJobConfig, appWorkDir,
         GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, appWorkDir,
         GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7e63c7b9/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 3ddbf55..7877da8 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -81,6 +81,7 @@ import org.apache.gobblin.runtime.services.JMXReportingService;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
 
 
@@ -169,11 +170,15 @@ public class GobblinTaskRunner {
 
     this.containerMetrics = buildContainerMetrics(this.config, properties, applicationName,
this.taskRunnerId);
 
+    URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri();
+    Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
+        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
+
     // Register task factory for the Helix task state model
     Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
     taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME,
         new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, taskStateTracker,
this.fs, appWorkDir,
-            config));
+            stateStoreJobConfig));
     this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, taskFactoryMap);
     this.helixManager.getStateMachineEngine().registerStateModelFactory("Task", this.taskStateModelFactory);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7e63c7b9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index d5d43af..33c5701 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -180,7 +180,9 @@ public class JobContext implements Closeable {
       stateStoreType = ConfigurationKeys.STATE_STORE_TYPE_NOOP;
     } else {
       stateStoreType = ConfigUtils
-          .getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_TYPE);
+          .getString(jobConfig, ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY,
+              ConfigUtils.getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY,
+                  ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
     }
 
     ClassAliasResolver<DatasetStateStore.Factory> resolver = new ClassAliasResolver<>(DatasetStateStore.Factory.class);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7e63c7b9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
index 01cb557..04d8e9d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
@@ -21,7 +21,6 @@ package org.apache.gobblin.runtime;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7e63c7b9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/commit/DatasetStateCommitStep.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/commit/DatasetStateCommitStep.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/commit/DatasetStateCommitStep.java
index a59c497..a106602 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/commit/DatasetStateCommitStep.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/commit/DatasetStateCommitStep.java
@@ -105,8 +105,8 @@ public class DatasetStateCommitStep extends CommitStepBase {
       ClassAliasResolver<DatasetStateStore.Factory> resolver =
           new ClassAliasResolver<>(DatasetStateStore.Factory.class);
 
-      String stateStoreType = this.props.getProp(ConfigurationKeys.STATE_STORE_TYPE_KEY,
-            ConfigurationKeys.DEFAULT_STATE_STORE_TYPE);
+      String stateStoreType = this.props.getProp(ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY,
+          this.props.getProp(ConfigurationKeys.STATE_STORE_TYPE_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
 
       try {
         DatasetStateStore.Factory stateStoreFactory =

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7e63c7b9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java
index 311a76a..8d1c51f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java
@@ -17,6 +17,8 @@
 package org.apache.gobblin.runtime.util;
 
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValue;
 import com.typesafe.config.ConfigValueFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.StateStore;
@@ -27,6 +29,8 @@ import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.fs.Path;
 
+import java.util.Map;
+
 /**
  * state stores used for storing work units and task states
  */
@@ -46,8 +50,9 @@ public class StateStores {
    */
   public StateStores(Config config, Path taskStoreBase, String taskStoreTable, Path workUnitStoreBase,
       String workUnitStoreTable) {
-    String stateStoreType = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_TYPE_KEY,
-        ConfigurationKeys.DEFAULT_STATE_STORE_TYPE);
+    String stateStoreType = ConfigUtils.getString(config, ConfigurationKeys.INTERMEDIATE_STATE_STORE_TYPE_KEY,
+        ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_TYPE_KEY,
+            ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
 
     ClassAliasResolver<StateStore.Factory> resolver =
         new ClassAliasResolver<>(StateStore.Factory.class);
@@ -65,22 +70,27 @@ public class StateStores {
 
     // Override properties to configure the WorkUnit and MultiWorkUnit StateStores with the
appropriate root/db location
     Path inputWorkUnitDir = new Path(workUnitStoreBase, workUnitStoreTable);
-    Config wuStateStoreConfig = config
-        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
-            ConfigValueFactory.fromAnyRef(inputWorkUnitDir.toString()))
-        .withValue(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
-            ConfigValueFactory.fromAnyRef(workUnitStoreTable));
+    Config wuStateStoreConfig = getStateStoreConfig(config, inputWorkUnitDir.toString(),
workUnitStoreTable);
 
     // Override properties to place the TaskState StateStore at the appropriate location
     Path taskStateOutputDir = new Path(taskStoreBase, taskStoreTable);
-    Config taskStateStoreConfig = config
-        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
-            ConfigValueFactory.fromAnyRef(taskStateOutputDir.toString()))
-        .withValue(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
-            ConfigValueFactory.fromAnyRef(taskStoreTable));
+    Config taskStateStoreConfig = getStateStoreConfig(config, taskStateOutputDir.toString(),
taskStoreTable);
 
     taskStateStore = stateStoreFactory.createStateStore(taskStateStoreConfig, TaskState.class);
     wuStateStore = stateStoreFactory.createStateStore(wuStateStoreConfig, WorkUnit.class);
     mwuStateStore = stateStoreFactory.createStateStore(wuStateStoreConfig, MultiWorkUnit.class);
   }
+
+  private static Config getStateStoreConfig(Config config, String rootDir, String dbTableKey)
{
+    Config fallbackConfig = ConfigFactory.empty()
+            .withFallback(config)
+            .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(rootDir))
+            .withValue(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, ConfigValueFactory.fromAnyRef(dbTableKey));
+    Config scopedConfig = ConfigFactory.empty();
+    for (Map.Entry<String, ConfigValue> entry : config.withOnlyPath(ConfigurationKeys.INTERMEDIATE_STATE_STORE_PREFIX).entrySet())
{
+      scopedConfig.withValue(entry.getKey().substring(ConfigurationKeys.INTERMEDIATE_STATE_STORE_PREFIX.length()),
+              entry.getValue());
+    }
+    return scopedConfig.withFallback(fallbackConfig);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7e63c7b9/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
index 88ab113..64aa0a3 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
@@ -69,6 +69,18 @@ public class PathUtils {
   }
 
   /**
+   * Returns the root path for the specified path.
+   *
+   * @see Path
+   */
+  public static Path getRootPath(Path path) {
+    if (path.isRoot()) {
+      return path;
+    }
+    return getRootPath(path.getParent());
+  }
+
+  /**
    * Removes the leading slash if present.
    *
    */


Mime
View raw message