hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [04/23] hive git commit: HIVE-17757: REPL LOAD need to use customised configurations to execute distcp/remote copy (Sankar Hariappan, reviewed by Thejas M Nair)
Date Fri, 13 Oct 2017 00:06:55 GMT
HIVE-17757: REPL LOAD need to use customised configurations to execute distcp/remote copy (Sankar
Hariappan, reviewed by Thejas M Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fe8e6e09
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fe8e6e09
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fe8e6e09

Branch: refs/heads/hive-14535
Commit: fe8e6e092a997f5bfee0cf9f891ed6f9f9d55a6f
Parents: a0c7e87
Author: Sankar Hariappan <sankarh@apache.org>
Authored: Wed Oct 11 11:23:04 2017 +0530
Committer: Sankar Hariappan <sankarh@apache.org>
Committed: Wed Oct 11 11:23:04 2017 +0530

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/ReplCopyTask.java       |  4 +-
 .../org/apache/hadoop/hive/ql/exec/Task.java    |  8 +++-
 .../apache/hadoop/hive/ql/exec/TaskFactory.java | 13 +++++-
 .../ql/exec/repl/bootstrap/ReplLoadTask.java    |  2 +-
 .../apache/hadoop/hive/ql/parse/HiveParser.g    | 20 +++++++++-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 42 +++++++++++++++-----
 .../parse/TestReplicationSemanticAnalyzer.java  | 39 +++++++++++++++++-
 7 files changed, 110 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fe8e6e09/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 39e5bf1..e6aa247 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -223,10 +223,10 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
         String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
         rcwork.setDistCpDoAsUser(distCpDoAsUser);
       }
-      copyTask = TaskFactory.get(rcwork, conf);
+      copyTask = TaskFactory.get(rcwork, conf, true);
     } else {
       LOG.debug("ReplCopyTask:\tcwork");
-      copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
+      copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf, true);
     }
     return copyTask;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/fe8e6e09/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 6193b90..ab495cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -160,7 +160,9 @@ public abstract class Task<T extends Serializable> implements Serializable,
Node
     this.queryPlan = queryPlan;
     setInitialized();
     this.queryState = queryState;
-    this.conf = queryState.getConf();
+    if (null == this.conf) {
+      this.conf = queryState.getConf();
+    }
     this.driverContext = driverContext;
     console = new LogHelper(LOG);
   }
@@ -422,7 +424,9 @@ public abstract class Task<T extends Serializable> implements Serializable,
Node
     return isrunnable;
   }
 
-
+  public void setConf(HiveConf conf) {
+    this.conf = conf;
+  }
 
   public void setWork(T work) {
     this.work = work;

http://git-wip-us.apache.org/repos/asf/hive/blob/fe8e6e09/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index e1969bb..f341cec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -154,10 +154,13 @@ public final class TaskFactory {
   }
 
   @SafeVarargs
-  public static <T extends Serializable> Task<T> get(T work, HiveConf conf,
-      Task<? extends Serializable>... tasklist) {
+  public static <T extends Serializable> Task<T> get(T work, HiveConf conf, boolean
setConf,
+                                                     Task<? extends Serializable>...
tasklist) {
     Task<T> ret = get((Class<T>) work.getClass(), conf);
     ret.setWork(work);
+    if (setConf && (null != conf)) {
+      ret.setConf(conf);
+    }
     if (tasklist.length == 0) {
       return (ret);
     }
@@ -170,6 +173,12 @@ public final class TaskFactory {
     return (ret);
   }
 
+  @SafeVarargs
+  public static <T extends Serializable> Task<T> get(T work, HiveConf conf,
+      Task<? extends Serializable>... tasklist) {
+    return get(work, conf, false, tasklist);
+  }
+
   public static <T extends Serializable> Task<T> getAndMakeChild(T work,
       HiveConf conf, Task<? extends Serializable>... tasklist) {
     Task<T> ret = get((Class<T>) work.getClass(), conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/fe8e6e09/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
index 4d8d06a..bf5c819 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
@@ -287,7 +287,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements
Serializable {
     use loadTask as dependencyCollection
    */
     if (shouldCreateAnotherLoadTask) {
-      Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf);
+      Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf, true);
       dependency(rootTasks, loadTask);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/fe8e6e09/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index d238833..020a300 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -330,6 +330,7 @@ TOK_DBPROPLIST;
 TOK_ALTERDATABASE_PROPERTIES;
 TOK_ALTERDATABASE_OWNER;
 TOK_ALTERDATABASE_LOCATION;
+TOK_DBNAME;
 TOK_TABNAME;
 TOK_TABSRC;
 TOK_RESTRICT;
@@ -393,6 +394,8 @@ TOK_DELETE;
 TOK_REPL_DUMP;
 TOK_REPL_LOAD;
 TOK_REPL_STATUS;
+TOK_REPL_CONFIG;
+TOK_REPL_CONFIG_LIST;
 TOK_TO;
 TOK_ONLY;
 TOK_SUMMARY;
@@ -842,9 +845,24 @@ replLoadStatement
       : KW_REPL KW_LOAD
         ((dbName=identifier) (DOT tblName=identifier)?)?
         KW_FROM (path=StringLiteral)
-      -> ^(TOK_REPL_LOAD $path $dbName? $tblName?)
+        (KW_WITH replConf=replConfigs)?
+      -> ^(TOK_REPL_LOAD $path ^(TOK_DBNAME $dbName)? ^(TOK_TABNAME $tblName)? $replConf?)
       ;
 
+replConfigs
+@init { pushMsg("repl configurations", state); }
+@after { popMsg(state); }
+    :
+      LPAREN replConfigsList RPAREN -> ^(TOK_REPL_CONFIG replConfigsList)
+    ;
+
+replConfigsList
+@init { pushMsg("repl configurations list", state); }
+@after { popMsg(state); }
+    :
+      keyValueProperty (COMMA keyValueProperty)* -> ^(TOK_REPL_CONFIG_LIST keyValueProperty+)
+    ;
+
 replStatusStatement
 @init { pushMsg("replication load statement", state); }
 @after { popMsg(state); }

http://git-wip-us.apache.org/repos/asf/hive/blob/fe8e6e09/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index e7e7f9b..ade47ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -56,11 +57,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT;
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS;
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABNAME;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO;
 
 public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
@@ -73,6 +77,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private Integer maxEventLimit;
   // Base path for REPL LOAD
   private String path;
+  // Added conf member to set the REPL command specific config entries without affecting
the configs
+  // of any other queries running in the session
+  private HiveConf conf;
 
   private static String testInjectDumpDir = null; // unit tests can overwrite this to affect
default dump behaviour
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
@@ -82,6 +89,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
+    this.conf = new HiveConf(super.conf);
   }
 
   @Override
@@ -189,14 +197,30 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
   }
 
   // REPL LOAD
-  private void initReplLoad(ASTNode ast) {
-    int numChildren = ast.getChildCount();
+  private void initReplLoad(ASTNode ast) throws SemanticException {
     path = PlanUtils.stripQuotes(ast.getChild(0).getText());
-    if (numChildren > 1) {
-      dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText());
-    }
-    if (numChildren > 2) {
-      tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(2).getText());
+    int numChildren = ast.getChildCount();
+    for (int i = 1; i < numChildren; i++) {
+      ASTNode childNode = (ASTNode) ast.getChild(i);
+      switch (childNode.getToken().getType()) {
+        case TOK_DBNAME:
+          dbNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText());
+          break;
+        case TOK_TABNAME:
+          tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText());
+          break;
+        case TOK_REPL_CONFIG:
+          Map<String, String> replConfigs
+                  = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0));
+          if (null != replConfigs) {
+            for (Map.Entry<String, String> config : replConfigs.entrySet()) {
+              conf.set(config.getKey(), config.getValue());
+            }
+          }
+          break;
+        default:
+          throw new SemanticException("Unrecognized token in REPL LOAD statement");
+      }
     }
   }
 
@@ -289,7 +313,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
         ReplLoadWork replLoadWork =
             new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern,
                 SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId());
-        rootTasks.add(TaskFactory.get(replLoadWork, conf));
+        rootTasks.add(TaskFactory.get(replLoadWork, conf, true));
         return;
       }
 
@@ -320,7 +344,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
 
         ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
             SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId());
-        rootTasks.add(TaskFactory.get(replLoadWork, conf));
+        rootTasks.add(TaskFactory.get(replLoadWork, conf, true));
         //
         //        for (FileStatus dir : dirsInLoadPath) {
         //          analyzeDatabaseLoad(dbNameOrPattern, fs, dir);

http://git-wip-us.apache.org/repos/asf/hive/blob/fe8e6e09/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
index 17cf4d0..3305998 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
@@ -191,11 +191,14 @@ public class TestReplicationSemanticAnalyzer {
     ParseDriver pd = new ParseDriver();
     ASTNode root;
     ASTNode child;
+    ASTNode subChild;
+    ASTNode configNode;
     String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
     Path dumpRoot = new Path(replRoot, "next");
     System.out.println(replRoot);
     System.out.println(dumpRoot);
     String newDB = "default_bak";
+    String newDB2= "default_bak_2";
 
     String query = "repl load  from '" + dumpRoot.toString() + "'";
     root = (ASTNode) pd.parse(query).getChild(0);
@@ -213,8 +216,42 @@ public class TestReplicationSemanticAnalyzer {
     assertEquals(child.getText(), "'" + dumpRoot.toString() + "'");
     assertEquals(child.getChildCount(), 0);
     child =  (ASTNode) root.getChild(1);
-    assertEquals(child.getText(), newDB);
+    assertEquals(child.getText(), "TOK_DBNAME");
+    assertEquals(child.getChildCount(), 1);
+    subChild = (ASTNode) child.getChild(0);
+    assertEquals(subChild.getText(), newDB);
+    assertEquals(subChild.getChildCount(), 0);
+
+    query = "repl load " + newDB2 + " from '" + dumpRoot.toString()
+            + "' with ('mapred.job.queue.name'='repl','hive.repl.approx.max.load.tasks'='100')";
+    root = (ASTNode) pd.parse(query).getChild(0);
+    assertEquals(root.getText(), "TOK_REPL_LOAD");
+    assertEquals(root.getChildCount(), 3);
+    child =  (ASTNode) root.getChild(0);
+    assertEquals(child.getText(), "'" + dumpRoot.toString() + "'");
     assertEquals(child.getChildCount(), 0);
+    child =  (ASTNode) root.getChild(1);
+    assertEquals(child.getText(), "TOK_DBNAME");
+    assertEquals(child.getChildCount(), 1);
+    subChild = (ASTNode) child.getChild(0);
+    assertEquals(subChild.getText(), newDB2);
+    assertEquals(subChild.getChildCount(), 0);
+    child =  (ASTNode) root.getChild(2);
+    assertEquals(child.getText(), "TOK_REPL_CONFIG");
+    assertEquals(child.getChildCount(), 1);
+    subChild = (ASTNode) child.getChild(0);
+    assertEquals(subChild.getText(), "TOK_REPL_CONFIG_LIST");
+    assertEquals(subChild.getChildCount(), 2);
+    configNode = (ASTNode) subChild.getChild(0);
+    assertEquals(configNode.getText(), "TOK_TABLEPROPERTY");
+    assertEquals(configNode.getChildCount(), 2);
+    assertEquals(configNode.getChild(0).getText(), "'mapred.job.queue.name'");
+    assertEquals(configNode.getChild(1).getText(), "'repl'");
+    configNode = (ASTNode) subChild.getChild(1);
+    assertEquals(configNode.getText(), "TOK_TABLEPROPERTY");
+    assertEquals(configNode.getChildCount(), 2);
+    assertEquals(configNode.getChild(0).getText(), "'hive.repl.approx.max.load.tasks'");
+    assertEquals(configNode.getChild(1).getText(), "'100'");
   }
 
   //@Test


Mime
View raw message