gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibuen...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-266] Improve Hive Task setup
Date Fri, 29 Sep 2017 17:37:34 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 8284bb76b -> e67799948


[GOBBLIN-266] Improve Hive Task setup

Closes #2117 from ibuenros/hive-materializer-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e6779994
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e6779994
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e6779994

Branch: refs/heads/master
Commit: e677999481f1311cf7bdc0ebe207a816e0e28382
Parents: 8284bb7
Author: ibuenros <issac.buenrostro@gmail.com>
Authored: Fri Sep 29 10:37:14 2017 -0700
Committer: Issac Buenrostro <ibuenros@apache.org>
Committed: Fri Sep 29 10:37:14 2017 -0700

----------------------------------------------------------------------
 .../materializer/CopyTableQueryGenerator.java   |  2 ++
 .../HiveMaterializerQueryGenerator.java         | 13 ++++++-
 .../MaterializeTableQueryGenerator.java         |  1 +
 .../QueryBasedMaterializerQueryGenerator.java   |  1 +
 .../conversion/hive/task/HiveTask.java          | 37 ++++++++++++++++++++
 5 files changed, 53 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e6779994/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java
index 8ff0913..9fdf9df 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java
@@ -50,6 +50,8 @@ public class CopyTableQueryGenerator extends HiveMaterializerFromEntityQueryGene
   @Override
   public List<String> generateQueries() {
 
+    ensureParentOfStagingPathExists();
+
     List<String> hiveQueries = Lists.newArrayList();
     /*
      * Setting partition mode to 'nonstrict' is needed to improve readability of the code.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e6779994/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java
index 803e043..eb43bc7 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java
@@ -34,6 +34,7 @@ import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiv
 import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
+import org.apache.hadoop.fs.Path;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -80,11 +81,21 @@ public abstract class HiveMaterializerQueryGenerator implements QueryGenerator
{
   /**
    * Returns hive queries to be run as a part of a hive task.
    * This does not include publish queries.
-   * @return
    */
   @Override
   public abstract List<String> generateQueries();
 
+  protected void ensureParentOfStagingPathExists() {
+    try {
+      Path parentStagingPath = new Path(this.stagingDataLocation).getParent();
+      if (!this.fs.exists(parentStagingPath)) {
+        this.fs.mkdirs(parentStagingPath);
+      }
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
   /**
    * Retuens a QueryBasedHivePublishEntity which includes publish level queries and cleanup
commands.
    * @return QueryBasedHivePublishEntity

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e6779994/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java
index fa91d15..d15cc0d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java
@@ -43,6 +43,7 @@ public class MaterializeTableQueryGenerator extends HiveMaterializerFromEntityQu
 
   @Override
   public List<String> generateQueries() {
+    ensureParentOfStagingPathExists();
     return Lists.newArrayList(HiveConverterUtils.generateStagingCTASStatementFromSelectStar(
         new HiveDatasetFinder.DbAndTable(this.outputDatabaseName, this.stagingTableName),
         new HiveDatasetFinder.DbAndTable(this.inputDbName, this.inputTableName),

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e6779994/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java
index 37a50b3..3b3f904 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java
@@ -54,6 +54,7 @@ public class QueryBasedMaterializerQueryGenerator extends HiveMaterializerQueryG
 
   @Override
   public List<String> generateQueries() {
+    ensureParentOfStagingPathExists();
     return Lists.newArrayList(HiveConverterUtils.generateStagingCTASStatement(
         new HiveDatasetFinder.DbAndTable(this.outputDatabaseName, this.stagingTableName),
         this.sourceQuery,

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e6779994/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
index 16a2028..26e357f 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.base.Splitter;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -55,6 +56,9 @@ import lombok.extern.slf4j.Slf4j;
  */
 public abstract class HiveTask extends BaseAbstractTask {
   private static final String USE_WATERMARKER_KEY = "internal.hiveTask.useWatermarker";
+  private static final String ADD_FILES = "internal.hiveTask.addFiles";
+  private static final String ADD_JARS = "internal.hiveTask.addJars";
+  private static final String SETUP_QUERIES = "internal.hiveTask.setupQueries";
 
   /**
    * Disable Hive watermarker. This is necessary when there is no concrete source table where
watermark can be inferred.
@@ -63,6 +67,27 @@ public abstract class HiveTask extends BaseAbstractTask {
     state.setProp(USE_WATERMARKER_KEY, Boolean.toString(false));
   }
 
+  /**
+   * Add the input file to the Hive session before running the task.
+   */
+  public static void addFile(State state, String file) {
+    state.setProp(ADD_FILES, state.getProp(ADD_FILES, "") + "," + file);
+  }
+
+  /**
+   * Add the input jar to the Hive session before running the task.
+   */
+  public static void addJar(State state, String jar) {
+    state.setProp(ADD_JARS, state.getProp(ADD_JARS, "") + "," + jar);
+  }
+
+  /**
+   * Run the specified setup query on the Hive session before running the task.
+   */
+  public static void addSetupQuery(State state, String query) {
+    state.setProp(SETUP_QUERIES, state.getProp(SETUP_QUERIES, "") + ";" + query);
+  }
+
   protected final TaskContext taskContext;
   protected final WorkUnitState workUnitState;
   protected final HiveWorkUnit workUnit;
@@ -71,6 +96,10 @@ public abstract class HiveTask extends BaseAbstractTask {
   protected final QueryBasedHivePublishEntity publishEntity;
   protected final HiveJdbcConnector hiveJdbcConnector;
 
+  private final List<String> addFiles;
+  private final List<String> addJars;
+  private final List<String> setupQueries;
+
   public HiveTask(TaskContext taskContext) {
     super(taskContext);
     this.taskContext = taskContext;
@@ -85,6 +114,10 @@ public abstract class HiveTask extends BaseAbstractTask {
     } catch (SQLException se) {
       throw new RuntimeException("Error in creating JDBC Connector", se);
     }
+
+    this.addFiles = this.workUnitState.getPropAsList(ADD_FILES, "");
+    this.addJars = this.workUnitState.getPropAsList(ADD_JARS, "");
+    this.setupQueries = Splitter.on(";").trimResults().omitEmptyStrings().splitToList(this.workUnitState.getProp(SETUP_QUERIES,
""));
   }
 
   /**
@@ -166,6 +199,10 @@ public abstract class HiveTask extends BaseAbstractTask {
   public void run() {
     try {
       List<String> queries = generateHiveQueries();
+
+      this.hiveJdbcConnector.executeStatements(Lists.transform(this.addFiles, file ->
"ADD FILE " + file).toArray(new String[]{}));
+      this.hiveJdbcConnector.executeStatements(Lists.transform(this.addJars, file -> "ADD
JAR " + file).toArray(new String[]{}));
+      this.hiveJdbcConnector.executeStatements(this.setupQueries.toArray(new String[]{}));
       this.hiveJdbcConnector.executeStatements(queries.toArray(new String[queries.size()]));
       super.run();
     } catch (Exception e) {


Mime
View raw message