hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject hive git commit: HIVE-17036: Lineage: Minor CPU/Mem optimization for lineage transform (Rajesh Balamohan, reviewed by Ashutosh Chauhan)
Date Thu, 06 Jul 2017 06:17:37 GMT
Repository: hive
Updated Branches:
  refs/heads/master 2a718a1b3 -> 555f00114


HIVE-17036: Lineage: Minor CPU/Mem optimization for lineage transform (Rajesh Balamohan, reviewed
by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/555f0011
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/555f0011
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/555f0011

Branch: refs/heads/master
Commit: 555f001146c4fc471e29e18899a0e02a4043cca5
Parents: 2a718a1
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Thu Jul 6 11:47:25 2017 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Thu Jul 6 11:47:25 2017 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/lib/RuleRegExp.java   |  3 ++
 .../ql/optimizer/lineage/ExprProcFactory.java   | 48 ++++++++++++++------
 .../hive/ql/optimizer/lineage/Generator.java    |  6 +++
 .../ql/optimizer/lineage/OpProcFactory.java     | 36 ++++++++++-----
 4 files changed, 69 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/555f0011/ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java b/ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java
index 1e850d6..107ee64 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java
@@ -80,6 +80,9 @@ public class RuleRegExp implements Rule {
       if (wildCards.contains(pc)) {
         hasWildCard = true;
         ret = ret && (pc == wcc);
+        if (!ret) {
+          return false;
+        }
       }
     }
     return ret && hasWildCard;

http://git-wip-us.apache.org/repos/asf/hive/blob/555f0011/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
index 09ef490..5034988 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer.lineage;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -65,6 +65,19 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
  */
 public class ExprProcFactory {
 
+
+  private static final String exprNodeColDescRegExp = ExprNodeColumnDesc.class.getName()
+ "%";
+  private static final String exprNodeFieldDescRegExp = ExprNodeFieldDesc.class.getName()
+ "%";
+  private static final String exprNodeGenFuncDescRegExp = ExprNodeGenericFuncDesc.class.getName()
+ "%";
+
+  private static final Map<Rule, NodeProcessor> exprRules = new LinkedHashMap<Rule,
NodeProcessor>();
+
+  static {
+    exprRules.put(new RuleRegExp("R1", exprNodeColDescRegExp), getColumnProcessor());
+    exprRules.put(new RuleRegExp("R2", exprNodeFieldDescRegExp), getFieldProcessor());
+    exprRules.put(new RuleRegExp("R3", exprNodeGenFuncDescRegExp), getGenericFuncProcessor());
+  }
+
   /**
    * Processor for column expressions.
    */
@@ -272,6 +285,26 @@ public class ExprProcFactory {
   public static Dependency getExprDependency(LineageCtx lctx,
       Operator<? extends OperatorDesc> inpOp, ExprNodeDesc expr)
       throws SemanticException {
+    return getExprDependency(lctx, inpOp, expr, new HashMap<Node, Object>());
+  }
+
+  /**
+   * Gets the expression dependencies for the expression.
+   *
+   * @param lctx
+   *          The lineage context containing the input operators dependencies.
+   * @param inpOp
+   *          The input operator to the current operator.
+   * @param expr
+   *          The expression that is being processed.
+   * @param outputMap
+   * @throws SemanticException
+   */
+  public static Dependency getExprDependency(LineageCtx lctx,
+      Operator<? extends OperatorDesc> inpOp, ExprNodeDesc expr, HashMap<Node, Object>
outputMap)
+      throws SemanticException {
+
+    outputMap.clear();
 
     // Create the walker, the rules dispatcher and the context.
     ExprProcCtx exprCtx = new ExprProcCtx(lctx, inpOp);
@@ -279,15 +312,6 @@ public class ExprProcFactory {
     // create a walker which walks the tree in a DFS manner while maintaining
     // the operator stack. The dispatcher
     // generates the plan from the operator tree
-    Map<Rule, NodeProcessor> exprRules = new LinkedHashMap<Rule, NodeProcessor>();
-    exprRules.put(
-        new RuleRegExp("R1", ExprNodeColumnDesc.class.getName() + "%"),
-        getColumnProcessor());
-    exprRules.put(
-        new RuleRegExp("R2", ExprNodeFieldDesc.class.getName() + "%"),
-        getFieldProcessor());
-    exprRules.put(new RuleRegExp("R3", ExprNodeGenericFuncDesc.class.getName()
-        + "%"), getGenericFuncProcessor());
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -295,10 +319,8 @@ public class ExprProcFactory {
         exprRules, exprCtx);
     GraphWalker egw = new DefaultGraphWalker(disp);
 
-    List<Node> startNodes = new ArrayList<Node>();
-    startNodes.add(expr);
+    List<Node> startNodes = Collections.singletonList((Node)expr);
 
-    HashMap<Node, Object> outputMap = new HashMap<Node, Object>();
     egw.startWalking(startNodes, outputMap);
     return (Dependency)outputMap.get(expr);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/555f0011/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
index 0c2ff32..747aae0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class generates the lineage information for the columns
@@ -55,6 +57,8 @@ import org.apache.hadoop.hive.ql.session.SessionState;
  */
 public class Generator extends Transform {
 
+  private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
+
   /* (non-Javadoc)
    * @see org.apache.hadoop.hive.ql.optimizer.Transform#transform(org.apache.hadoop.hive.ql.parse.ParseContext)
    */
@@ -64,6 +68,7 @@ public class Generator extends Transform {
     Index index = SessionState.get() != null ?
       SessionState.get().getLineageState().getIndex() : new Index();
 
+    long sTime = System.currentTimeMillis();
     // Create the lineage context
     LineageCtx lCtx = new LineageCtx(pctx, index);
 
@@ -101,6 +106,7 @@ public class Generator extends Transform {
     topNodes.addAll(pctx.getTopOps().values());
     ogw.startWalking(topNodes, null);
 
+    LOG.debug("Time taken for lineage transform={}", (System.currentTimeMillis() - sTime));
     return pctx;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/555f0011/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
index d95b45b..73f88e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
@@ -202,6 +202,9 @@ public class OpProcFactory {
    * Processor for Join Operator.
    */
   public static class JoinLineage extends DefaultLineage implements NodeProcessor {
+
+    private final HashMap<Node, Object> outputMap = new HashMap<Node, Object>();
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
@@ -237,7 +240,7 @@ public class OpProcFactory {
 
         // Otherwise look up the expression corresponding to this ci
         ExprNodeDesc expr = exprs.get(cnt++);
-        Dependency dependency = ExprProcFactory.getExprDependency(lCtx, inpOp, expr);
+        Dependency dependency = ExprProcFactory.getExprDependency(lCtx, inpOp, expr, outputMap);
         lCtx.getIndex().mergeDependency(op, ci, dependency);
       }
 
@@ -348,6 +351,9 @@ public class OpProcFactory {
    * Processor for Select operator.
    */
   public static class SelectLineage extends DefaultLineage implements NodeProcessor {
+
+    private final HashMap<Node, Object> outputMap = new HashMap<Node, Object>();
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
@@ -372,7 +378,7 @@ public class OpProcFactory {
       ArrayList<ColumnInfo> col_infos = rs.getSignature();
       int cnt = 0;
       for(ExprNodeDesc expr : sop.getConf().getColList()) {
-        Dependency dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr);
+        Dependency dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr, outputMap);
         if (dep != null && dep.getExpr() == null && (dep.getBaseCols().isEmpty()
             || dep.getType() != LineageInfo.DependencyType.SIMPLE)) {
           dep.setExpr(ExprProcFactory.getExprString(rs, expr, lctx, inpOp, null));
@@ -401,6 +407,9 @@ public class OpProcFactory {
    * Processor for GroupBy operator.
    */
   public static class GroupByLineage extends DefaultLineage implements NodeProcessor {
+
+    private final HashMap<Node, Object> outputMap = new HashMap<Node, Object>();
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
@@ -414,7 +423,7 @@ public class OpProcFactory {
 
       for(ExprNodeDesc expr : gop.getConf().getKeys()) {
         lctx.getIndex().putDependency(gop, col_infos.get(cnt++),
-            ExprProcFactory.getExprDependency(lctx, inpOp, expr));
+            ExprProcFactory.getExprDependency(lctx, inpOp, expr, outputMap));
       }
 
       // If this is a reduce side GroupBy operator, check if there is
@@ -438,7 +447,7 @@ public class OpProcFactory {
           } else {
             sb.append(", ");
           }
-          Dependency expr_dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr);
+          Dependency expr_dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr, outputMap);
           if (expr_dep != null && !expr_dep.getBaseCols().isEmpty()) {
             new_type = LineageCtx.getNewDependencyType(expr_dep.getType(), new_type);
             bci_set.addAll(expr_dep.getBaseCols());
@@ -542,10 +551,13 @@ public class OpProcFactory {
       lCtx.getIndex().copyPredicates(inpOp, op);
       RowSchema rs = op.getSchema();
       ArrayList<ColumnInfo> inp_cols = inpOp.getSchema().getSignature();
-      int cnt = 0;
-      for(ColumnInfo ci : rs.getSignature()) {
-        Dependency inp_dep = lCtx.getIndex().getDependency(inpOp, inp_cols.get(cnt++));
+
+      // check only for input cols
+      for(ColumnInfo input : inp_cols) {
+        Dependency inp_dep = lCtx.getIndex().getDependency(inpOp, input);
         if (inp_dep != null) {
+          //merge it with rs colInfo
+          ColumnInfo ci = rs.getColumnInfo(input.getInternalName());
           lCtx.getIndex().mergeDependency(op, ci, inp_dep);
         }
       }
@@ -558,6 +570,8 @@ public class OpProcFactory {
    */
   public static class ReduceSinkLineage implements NodeProcessor {
 
+    private final HashMap<Node, Object> outputMap = new HashMap<Node, Object>();
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
@@ -584,11 +598,11 @@ public class OpProcFactory {
         ArrayList<ColumnInfo> col_infos = rop.getSchema().getSignature();
         for(ExprNodeDesc expr : rop.getConf().getKeyCols()) {
           lCtx.getIndex().putDependency(rop, col_infos.get(cnt++),
-              ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+              ExprProcFactory.getExprDependency(lCtx, inpOp, expr, outputMap));
         }
         for(ExprNodeDesc expr : rop.getConf().getValueCols()) {
           lCtx.getIndex().putDependency(rop, col_infos.get(cnt++),
-              ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+              ExprProcFactory.getExprDependency(lCtx, inpOp, expr, outputMap));
         }
       } else {
         RowSchema schema = rop.getSchema();
@@ -602,7 +616,7 @@ public class OpProcFactory {
             continue;   // key in values
           }
           lCtx.getIndex().putDependency(rop, column,
-              ExprProcFactory.getExprDependency(lCtx, inpOp, keyCols.get(i)));
+              ExprProcFactory.getExprDependency(lCtx, inpOp, keyCols.get(i), outputMap));
         }
         List<ExprNodeDesc> valCols = desc.getValueCols();
         ArrayList<String> valColNames = desc.getOutputValueColumnNames();
@@ -614,7 +628,7 @@ public class OpProcFactory {
             column = schema.getColumnInfo(Utilities.ReduceField.VALUE + "." + valColNames.get(i));
           }
           lCtx.getIndex().putDependency(rop, column,
-              ExprProcFactory.getExprDependency(lCtx, inpOp, valCols.get(i)));
+              ExprProcFactory.getExprDependency(lCtx, inpOp, valCols.get(i), outputMap));
         }
       }
 


Mime
View raw message