hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1661239 - /hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
Date Fri, 20 Feb 2015 23:16:06 GMT
Author: hashutosh
Date: Fri Feb 20 23:16:05 2015
New Revision: 1661239

URL: http://svn.apache.org/r1661239
Log:
HIVE-9722 : CBO (Calcite Return Path): Translate Sort/Limit to Hive Op [CBO branch] (Jesus
Camacho Rodriguez via Ashutosh Chauhan)

Modified:
    hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java?rev=1661239&r1=1661238&r2=1661239&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
(original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
Fri Feb 20 23:16:05 2015
@@ -27,9 +27,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.SemiJoin;
 import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.Pair;
 import org.apache.commons.logging.Log;
@@ -38,6 +42,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -49,6 +54,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort;
 import org.apache.hadoop.hive.ql.parse.JoinCond;
 import org.apache.hadoop.hive.ql.parse.JoinType;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -57,9 +63,11 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
 import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -133,6 +141,8 @@ public class HiveOpConverter {
       HiveJoin hj = HiveJoin.getJoin(sj.getCluster(), sj.getLeft(), sj.getRight(),
               sj.getCondition(), sj.getJoinType(), true);
       return visit(hj);
+    } else if (rn instanceof HiveSort) {
+      return visit((HiveSort) rn);
     }
     LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported"
             + " yet in return path.");
@@ -185,6 +195,99 @@ public class HiveOpConverter {
     // 8. Return result
     return new OpAttr(null, vcolMap, joinOp);
   }
+  
+  OpAttr visit(HiveSort sortRel) throws SemanticException {
+    OpAttr inputOpAf = dispatch(sortRel.getInput());
+  
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translating operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
+
+              " with row type: [" + sortRel.getRowType() + "]");
+      if (sortRel.getCollation() == RelCollations.EMPTY) {
+        LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName() +
+              " consists of limit");
+      }
+      else if (sortRel.fetch == null) {
+        LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName() +
+              " consists of sort");
+      }
+      else {
+        LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName() +
+              " consists of sort+limit");
+      }
+    }
+  
+    Operator<?> inputOp = inputOpAf.inputs.get(0);
+    Operator<?> resultOp = inputOpAf.inputs.get(0);
+    // 1. If we need to sort tuples based on the value of some
+    //    of their columns
+    if (sortRel.getCollation() != RelCollations.EMPTY) {
+  
+      // In strict mode, in the presence of order by, limit must be specified
+      if (strictMode && sortRel.fetch == null) {
+        throw new SemanticException(ErrorMsg.NO_LIMIT_WITH_ORDERBY.getMsg());
+      }
+  
+      // 1.a. Extract order for each column from collation
+      //      Generate sortCols and order
+      List<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
+      StringBuilder order = new StringBuilder();
+      for (RelCollation collation : sortRel.getCollationList()) {
+        for (RelFieldCollation sortInfo : collation.getFieldCollations()) {
+          int sortColumnPos = sortInfo.getFieldIndex();
+          ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature().
+                  get(sortColumnPos));
+          ExprNodeColumnDesc sortColumn = new ExprNodeColumnDesc(columnInfo.getType(),
+              columnInfo.getInternalName(), columnInfo.getTabAlias(),
+              columnInfo.getIsVirtualCol());
+          sortCols.add(sortColumn);
+          if (sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING) {
+            order.append("-");
+          }
+          else {
+            order.append("+");
+          }
+        }
+      }
+      // Use only 1 reducer for order by
+      int numReducers = 1;
+  
+      // 1.b. Generate reduce sink 
+      resultOp = genReduceSink(resultOp, sortCols.toArray(new ExprNodeDesc[sortCols.size()]),
+              -1, new ArrayList<ExprNodeDesc>(), order.toString(), numReducers,
+              Operation.NOT_ACID, strictMode);
+  
+      // 1.c. Generate project operator
+      Map<String, ExprNodeDesc> descriptors = buildBacktrackFromReduceSink(
+              (ReduceSinkOperator) resultOp, inputOp);
+      SelectDesc selectDesc = new SelectDesc(
+              new ArrayList<ExprNodeDesc>(descriptors.values()),
+              new ArrayList<String>(descriptors.keySet()));
+      ArrayList<ColumnInfo> cinfoLst = createColInfos(inputOp);
+      resultOp = OperatorFactory.getAndMakeChild(selectDesc,
+              new RowSchema(cinfoLst), resultOp);
+      resultOp.setColumnExprMap(descriptors);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema()
+ "]");
+      }
+    }
+  
+    // 2. If we need to generate limit
+    if (sortRel.fetch != null) {
+      int limit = RexLiteral.intValue(sortRel.fetch);
+      LimitDesc limitDesc = new LimitDesc(limit);
+      // TODO: Set 'last limit' global property
+      ArrayList<ColumnInfo> cinfoLst = createColInfos(inputOp);
+      resultOp = (LimitOperator) OperatorFactory.getAndMakeChild(
+          limitDesc, new RowSchema(cinfoLst), resultOp);
+  
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema()
+ "]");
+      }
+    }
+  
+    // 3. Return result
+    return inputOpAf.clone(resultOp);
+  }
 
   private static ExprNodeDesc[][] extractJoinKeys(JoinPredicateInfo joinPredInfo,
           List<RelNode> inputs) {



Mime
View raw message