hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1668161 - in /hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql: optimizer/calcite/translator/ parse/
Date Fri, 20 Mar 2015 21:37:46 GMT
Author: hashutosh
Date: Fri Mar 20 21:37:45 2015
New Revision: 1668161

URL: http://svn.apache.org/r1668161
Log:
HIVE-9825 : CBO (Calcite Return Path): Translate PTFs and Windowing to Hive Op [CBO branch]
(Jesus Camacho Rodriguez via Ashutosh Chauhan)

Modified:
    hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
    hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java
    hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
    hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
    hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java?rev=1668161&r1=1668160&r2=1668161&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
(original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
Fri Mar 20 21:37:45 2015
@@ -54,6 +54,7 @@ import org.apache.calcite.util.Immutable
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
@@ -641,6 +642,12 @@ public class ASTConverter {
         add(new ColumnInfo(null, projName));
       }
     }
+
+    public Schema(String tabAlias, List<RelDataTypeField> fieldList) {
+      for (RelDataTypeField field : fieldList) {
+        add(new ColumnInfo(tabAlias, field.getName()));
+      }
+    }
   }
 
   /*

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java?rev=1668161&r1=1668160&r2=1668161&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java
(original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java
Fri Mar 20 21:37:45 2015
@@ -24,41 +24,77 @@ import java.util.Calendar;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.rex.RexWindow;
+import org.apache.calcite.rex.RexWindowBound;
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter.Schema;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.CurrentRowSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.RangeBoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.ValueBoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFrameSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexVisitorImpl;
 
 /*
  * convert a RexNode to an ExprNodeDesc
  */
 public class ExprNodeConverter extends RexVisitorImpl<ExprNodeDesc> {
 
-  RelDataType rType;
-  String      tabAlias;
-  boolean     partitioningExpr;
+  String             tabAlias;
+  String             columnAlias;
+  RelDataType        inputRowType;
+  RelDataType        outputRowType;
+  boolean            partitioningExpr;
+  WindowFunctionSpec wfs;
+
+  public ExprNodeConverter(String tabAlias, RelDataType inputRowType,
+          boolean partitioningExpr) {
+    this(tabAlias, null, inputRowType, null, partitioningExpr);
+  }
 
-  public ExprNodeConverter(String tabAlias, RelDataType rType, boolean partitioningExpr)
{
+  public ExprNodeConverter(String tabAlias, String columnAlias, RelDataType inputRowType,
+          RelDataType outputRowType, boolean partitioningExpr) {
     super(true);
     this.tabAlias = tabAlias;
-    this.rType = rType;
+    this.columnAlias = columnAlias;
+    this.inputRowType = inputRowType;
+    this.outputRowType = outputRowType;
     this.partitioningExpr = partitioningExpr;
   }
 
+  public WindowFunctionSpec getWindowFunctionSpec() {
+    return this.wfs;
+  }
+
   @Override
   public ExprNodeDesc visitInputRef(RexInputRef inputRef) {
-    RelDataTypeField f = rType.getFieldList().get(inputRef.getIndex());
+    RelDataTypeField f = inputRowType.getFieldList().get(inputRef.getIndex());
     return new ExprNodeColumnDesc(TypeConverter.convert(f.getType()), f.getName(), tabAlias,
         partitioningExpr);
   }
@@ -163,4 +199,138 @@ public class ExprNodeConverter extends R
     }
   }
 
+  @Override
+  public ExprNodeDesc visitOver(RexOver over) {
+    if (!deep) {
+      return null;
+    }
+
+    final RexWindow window = over.getWindow();
+
+    final WindowSpec windowSpec = new WindowSpec();
+    final PartitioningSpec partitioningSpec = getPSpec(window);
+    windowSpec.setPartitioning(partitioningSpec);
+    final WindowFrameSpec windowFrameSpec = getWindowRange(window);
+    windowSpec.setWindowFrame(windowFrameSpec);
+
+    wfs = new WindowFunctionSpec();
+    wfs.setWindowSpec(windowSpec);
+    final Schema schema = new Schema(tabAlias, inputRowType.getFieldList());
+    final ASTNode wUDAFAst = new ASTConverter.RexVisitor(schema).visitOver(over);
+    wfs.setExpression(wUDAFAst);
+    ASTNode nameNode = (ASTNode) wUDAFAst.getChild(0);
+    wfs.setName(nameNode.getText());
+    for(int i=1; i < wUDAFAst.getChildCount()-1; i++) {
+      ASTNode child = (ASTNode) wUDAFAst.getChild(i);
+      wfs.addArg(child);
+    }
+    wfs.setAlias(columnAlias);
+
+    RelDataTypeField f = outputRowType.getField(columnAlias, false, false);
+    return new ExprNodeColumnDesc(TypeConverter.convert(f.getType()), columnAlias, tabAlias,
+            partitioningExpr);
+  }
+
+  private PartitioningSpec getPSpec(RexWindow window) {
+    PartitioningSpec partitioning = new PartitioningSpec();
+
+    if (window.partitionKeys != null && !window.partitionKeys.isEmpty()) {
+      PartitionSpec pSpec = new PartitionSpec();
+      for (RexNode pk : window.partitionKeys) {
+        PartitionExpression exprSpec = new PartitionExpression();
+        RexInputRef inputRef = (RexInputRef) pk;
+        RelDataTypeField f = inputRowType.getFieldList().get(inputRef.getIndex());
+        ASTNode astCol;
+        if (tabAlias == null || tabAlias.isEmpty()) {
+          astCol = ASTBuilder.unqualifiedName(f.getName());
+        } else {
+          astCol = ASTBuilder.qualifiedName(tabAlias, f.getName());
+        }
+        exprSpec.setExpression(astCol);
+        pSpec.addExpression(exprSpec);
+      }
+      partitioning.setPartSpec(pSpec);
+    }
+
+    if (window.orderKeys != null && !window.orderKeys.isEmpty()) {
+      OrderSpec oSpec = new OrderSpec();
+      for (RexFieldCollation ok : window.orderKeys) {
+        OrderExpression exprSpec = new OrderExpression();
+        Order order = ok.getDirection() == RelFieldCollation.Direction.ASCENDING ?
+                Order.ASC : Order.DESC;
+        exprSpec.setOrder(order);
+        RexInputRef inputRef = (RexInputRef) ok.left;
+        RelDataTypeField f = inputRowType.getFieldList().get(inputRef.getIndex());
+        ASTNode astCol;
+        if (tabAlias == null || tabAlias.isEmpty()) {
+          astCol = ASTBuilder.unqualifiedName(f.getName());
+        } else {
+          astCol = ASTBuilder.qualifiedName(tabAlias, f.getName());
+        }
+        exprSpec.setExpression(astCol);
+        oSpec.addExpression(exprSpec);
+      }
+      partitioning.setOrderSpec(oSpec);
+    }
+
+    return partitioning;
+  }
+
+  private WindowFrameSpec getWindowRange(RexWindow window) {
+    // NOTE: in Hive AST Rows->Range(Physical) & Range -> Values (logical)
+
+    WindowFrameSpec windowFrame = new WindowFrameSpec();
+
+    BoundarySpec start = null;
+    RexWindowBound ub = window.getUpperBound();
+    if (ub != null) {
+      start = getWindowBound(ub, window.isRows());
+    }
+
+    BoundarySpec end = null;
+    RexWindowBound lb = window.getLowerBound();
+    if (lb != null) {
+      end = getWindowBound(lb, window.isRows());
+    }
+
+    if (start != null || end != null) {
+      if (start != null) {
+        windowFrame.setStart(start);
+      }
+      if (end != null) {
+        windowFrame.setEnd(end);
+      }
+    }
+
+    return windowFrame;
+  }
+
+  private BoundarySpec getWindowBound(RexWindowBound wb, boolean isRows) {
+    BoundarySpec boundarySpec;
+
+    if (wb.isCurrentRow()) {
+      boundarySpec = new CurrentRowSpec();
+    } else {
+      final Direction direction;
+      final int amt;
+      if (wb.isPreceding()) {
+        direction = Direction.PRECEDING;
+      } else {
+        direction = Direction.FOLLOWING;
+      }
+      if (wb.isUnbounded()) {
+        amt = BoundarySpec.UNBOUNDED_AMOUNT;
+      } else {
+        amt = RexLiteral.intValue(wb.getOffset());
+      }
+      if (isRows) {
+        boundarySpec = new RangeBoundarySpec(direction, amt);
+      } else {
+        boundarySpec = new ValueBoundarySpec(direction, amt);
+      }
+    }
+
+    return boundarySpec;
+  }
+
 }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java?rev=1668161&r1=1668160&r2=1668161&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
(original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
Fri Mar 20 21:37:45 2015
@@ -69,7 +69,16 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
 import org.apache.hadoop.hive.ql.parse.JoinCond;
 import org.apache.hadoop.hive.ql.parse.JoinType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.PTFTranslator;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.UnparseTranslator;
+import org.apache.hadoop.hive.ql.parse.WindowingComponentizer;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
@@ -78,6 +87,7 @@ import org.apache.hadoop.hive.ql.plan.Jo
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
@@ -100,6 +110,9 @@ public class HiveOpConverter {
   };
 
   // TODO: remove this after stashing only rqd pieces from opconverter
+  private final SemanticAnalyzer                              semanticAnalyzer;
+  private final HiveConf                                      hiveConf;
+  private final UnparseTranslator                             unparseTranslator;
   private final Map<String, Operator<? extends OperatorDesc>> topOps;
   private final HIVEAGGOPMODE                                 aggMode;
   private final boolean                                       strictMode;
@@ -121,8 +134,13 @@ public class HiveOpConverter {
     return aggOpMode;
   }
 
-  public HiveOpConverter(Map<String, Operator<? extends OperatorDesc>> topOps,
+  public HiveOpConverter(SemanticAnalyzer semanticAnalyzer,
+          HiveConf hiveConf, UnparseTranslator unparseTranslator,
+          Map<String, Operator<? extends OperatorDesc>> topOps,
           HIVEAGGOPMODE aggMode, boolean strictMode) {
+    this.semanticAnalyzer = semanticAnalyzer;
+    this.hiveConf = hiveConf;
+    this.unparseTranslator = unparseTranslator;
     this.topOps = topOps;
     this.aggMode = aggMode;
     this.strictMode = strictMode;
@@ -250,8 +268,22 @@ public class HiveOpConverter {
               " with row type: [" + projectRel.getRowType() + "]");
     }
 
-    List<ExprNodeDesc> exprCols = convertToExprNodes(projectRel.getChildExps(), projectRel.getInput(),
-        inputOpAf.tabAlias);
+    WindowingSpec windowingSpec = new WindowingSpec();
+    List<ExprNodeDesc> exprCols = new ArrayList<ExprNodeDesc>();
+    for (int pos=0; pos<projectRel.getChildExps().size(); pos++) {
+      ExprNodeConverter converter = new ExprNodeConverter(inputOpAf.tabAlias,
+              projectRel.getRowType().getFieldNames().get(pos),
+              projectRel.getInput().getRowType(), projectRel.getRowType(), false);
+      exprCols.add((ExprNodeDesc) projectRel.getChildExps().get(pos).
+              accept(converter));
+      if (converter.getWindowFunctionSpec() != null) {
+        windowingSpec.addWindowFunction(converter.getWindowFunctionSpec());
+      }
+    }
+    if (windowingSpec.getWindowExpressions() != null &&
+            !windowingSpec.getWindowExpressions().isEmpty()) {
+      inputOpAf = genPTF(inputOpAf, windowingSpec);
+    }
     // TODO: is this a safe assumption (name collision, external names...)
     List<String> exprNames = new ArrayList<String>(projectRel.getRowType().getFieldNames());
     SelectDesc sd = new SelectDesc(exprCols, exprNames);
@@ -363,26 +395,12 @@ public class HiveOpConverter {
       // Use only 1 reducer for order by
       int numReducers = 1;
   
-      // 1.b. Generate reduce sink 
-      resultOp = genReduceSink(resultOp, sortCols.toArray(new ExprNodeDesc[sortCols.size()]),
+      // 1.b. Generate reduce sink and project operator
+      resultOp = genReduceSinkAndBacktrackSelect(resultOp, sortCols.toArray(new ExprNodeDesc[sortCols.size()]),
               -1, new ArrayList<ExprNodeDesc>(), order.toString(), numReducers,
               Operation.NOT_ACID, strictMode);
-  
-      // 1.c. Generate project operator
-      Map<String, ExprNodeDesc> descriptors = buildBacktrackFromReduceSink(
-              (ReduceSinkOperator) resultOp, inputOp);
-      SelectDesc selectDesc = new SelectDesc(
-              new ArrayList<ExprNodeDesc>(descriptors.values()),
-              new ArrayList<String>(descriptors.keySet()));
-      ArrayList<ColumnInfo> cinfoLst = createColInfos(inputOp);
-      resultOp = OperatorFactory.getAndMakeChild(selectDesc,
-              new RowSchema(cinfoLst), resultOp);
-      resultOp.setColumnExprMap(descriptors);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema()
+ "]");
-      }
     }
-  
+
     // 2. If we need to generate limit
     if (sortRel.fetch != null) {
       int limit = RexLiteral.intValue(sortRel.fetch);
@@ -484,7 +502,73 @@ public class HiveOpConverter {
     return inputOpAf.clone(rsOp);
   }
 
-  private static ExprNodeDesc[][] extractJoinKeys(JoinPredicateInfo joinPredInfo,
+  private OpAttr genPTF(OpAttr inputOpAf, WindowingSpec wSpec) throws SemanticException {
+    Operator<?> input = inputOpAf.inputs.get(0);
+    
+    wSpec.validateAndMakeEffective();
+    WindowingComponentizer groups = new WindowingComponentizer(wSpec);
+    RowResolver rr = new RowResolver();
+    for (ColumnInfo ci : input.getSchema().getSignature()) {
+      rr.put(ci.getTabAlias(), ci.getInternalName(), ci);
+    }
+
+    while(groups.hasNext() ) {
+      wSpec = groups.next(hiveConf, semanticAnalyzer, unparseTranslator, rr);
+
+      // 1. Create RS and backtrack Select operator on top
+      ArrayList<ExprNodeDesc> keyCols = new ArrayList<ExprNodeDesc>();
+      ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
+      StringBuilder order = new StringBuilder();
+
+      for (PartitionExpression partCol : wSpec.getQueryPartitionSpec().getExpressions())
{
+        ExprNodeDesc partExpr = semanticAnalyzer.genExprNodeDesc(partCol.getExpression(),
rr);
+        if (ExprNodeDescUtils.indexOf(partExpr, partCols) < 0) {
+          keyCols.add(partExpr);
+          partCols.add(partExpr);
+          order.append('+');
+        }
+      }
+
+      if (wSpec.getQueryOrderSpec() != null) {
+        for (OrderExpression orderCol : wSpec.getQueryOrderSpec().getExpressions()) {
+          ExprNodeDesc orderExpr = semanticAnalyzer.genExprNodeDesc(orderCol.getExpression(),
rr);
+          char orderChar = orderCol.getOrder() == PTFInvocationSpec.Order.ASC ? '+' : '-';
+          int index = ExprNodeDescUtils.indexOf(orderExpr, keyCols);
+          if (index >= 0) {
+            order.setCharAt(index, orderChar);
+            continue;
+          }
+          keyCols.add(orderExpr);
+          order.append(orderChar);
+        }
+      }
+
+      SelectOperator selectOp = genReduceSinkAndBacktrackSelect(input,
+              keyCols.toArray(new ExprNodeDesc[keyCols.size()]),
+              reduceSinkTagGenerator++, partCols, order.toString(),
+              -1, Operation.NOT_ACID, strictMode);
+
+      // 2. Finally create PTF
+      PTFTranslator translator = new PTFTranslator();
+      PTFDesc ptfDesc = translator.translate(wSpec, semanticAnalyzer, hiveConf, rr, unparseTranslator);
+      RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+
+      Operator<?> ptfOp = OperatorFactory.getAndMakeChild(ptfDesc,
+            new RowSchema(ptfOpRR.getColumnInfos()), selectOp);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generated " + ptfOp + " with row schema: [" + ptfOp.getSchema() + "]");
+      }
+
+      // 3. Prepare for next iteration (if any)
+      rr = ptfOpRR;
+      input = ptfOp;
+    }
+    
+    return inputOpAf.clone(input);
+  }
+
+  private ExprNodeDesc[][] extractJoinKeys(JoinPredicateInfo joinPredInfo,
           List<RelNode> inputs) {
     ExprNodeDesc[][] joinKeys = new ExprNodeDesc[inputs.size()][];
     for (int i = 0; i < inputs.size(); i++) {
@@ -498,6 +582,32 @@ public class HiveOpConverter {
     return joinKeys;
   }
 
+  private static SelectOperator genReduceSinkAndBacktrackSelect(Operator<?> input,
+          ExprNodeDesc[] keys, int tag, ArrayList<ExprNodeDesc> partitionCols,
+          String order, int numReducers, Operation acidOperation,
+          boolean strictMode) throws SemanticException {
+    // 1. Generate RS operator
+    ReduceSinkOperator rsOp = genReduceSink(input, keys, tag, partitionCols,
+            order, numReducers, acidOperation, strictMode);
+
+    // 2. Generate backtrack Select operator
+    Map<String, ExprNodeDesc> descriptors = buildBacktrackFromReduceSink(
+            (ReduceSinkOperator) rsOp, input);
+    SelectDesc selectDesc = new SelectDesc(
+            new ArrayList<ExprNodeDesc>(descriptors.values()),
+            new ArrayList<String>(descriptors.keySet()));
+    ArrayList<ColumnInfo> cinfoLst = createColInfos(input);
+    SelectOperator selectOp = (SelectOperator) OperatorFactory.getAndMakeChild(selectDesc,
+            new RowSchema(cinfoLst), rsOp);
+    selectOp.setColumnExprMap(descriptors);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated " + selectOp + " with row schema: [" + selectOp.getSchema() +
"]");
+    }
+
+    return selectOp;
+  }
+
   private static ReduceSinkOperator genReduceSink(Operator<?> input, ExprNodeDesc[]
keys,
           int tag, int numReducers, Operation acidOperation,
           boolean strictMode) throws SemanticException {
@@ -750,15 +860,6 @@ public class HiveOpConverter {
     return columnDescriptors;
   }
 
-  private static List<ExprNodeDesc> convertToExprNodes(List<RexNode> rNodeLst,
-      RelNode inputRel, String tabAlias) {
-    List<ExprNodeDesc> exprNodeLst = new ArrayList<ExprNodeDesc>();
-    for (RexNode rn : rNodeLst) {
-      exprNodeLst.add(convertToExprNode(rn, inputRel, tabAlias));
-    }
-    return exprNodeLst;
-  }
-
   private static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel,
       String tabAlias) {
     return (ExprNodeDesc) rn.accept(new ExprNodeConverter(tabAlias,

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java?rev=1668161&r1=1668160&r2=1668161&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java Fri
Mar 20 21:37:45 2015
@@ -610,7 +610,7 @@ public class CalcitePlanner extends Sema
 
     RelNode modifiedOptimizedOptiqPlan = introduceProjectIfNeeded(optimizedOptiqPlan);
 
-    Operator hiveRoot = new HiveOpConverter(topOps, HiveOpConverter.getAggOPMode(conf),
+    Operator<?> hiveRoot = new HiveOpConverter(this, conf, unparseTranslator, topOps,
HiveOpConverter.getAggOPMode(conf),
         conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase("strict")).convert(modifiedOptimizedOptiqPlan);
     RowResolver hiveRootRR = genRowResolver(hiveRoot, getQB());
     opParseCtx.put(hiveRoot, new OpParseContext(hiveRootRR));
@@ -2332,15 +2332,27 @@ public class CalcitePlanner extends Sema
         }
       }
 
-      return genSelectRelNode(projsForWindowSelOp, out_rwsch, srcRel);
+      return genSelectRelNode(projsForWindowSelOp, out_rwsch, srcRel, windowExpressions);
     }
 
     private RelNode genSelectRelNode(List<RexNode> calciteColLst, RowResolver out_rwsch,
-        RelNode srcRel) throws CalciteSemanticException {
+            RelNode srcRel) throws CalciteSemanticException {
+      return genSelectRelNode(calciteColLst, out_rwsch, srcRel, null);
+    }
+
+    private RelNode genSelectRelNode(List<RexNode> calciteColLst, RowResolver out_rwsch,
+        RelNode srcRel, List<WindowExpressionSpec> windowExpressions) throws CalciteSemanticException
{
       // 1. Build Column Names
       Set<String> colNamesSet = new HashSet<String>();
       List<ColumnInfo> cInfoLst = out_rwsch.getRowSchema().getSignature();
       ArrayList<String> columnNames = new ArrayList<String>();
+      Map<String,String> windowToAlias = null;
+      if (windowExpressions != null ) {
+        windowToAlias = new HashMap<String,String>();
+        for (WindowExpressionSpec wes : windowExpressions) {
+          windowToAlias.put(wes.getExpression().toStringTree().toLowerCase(), wes.getAlias());
+        }
+      }
       String[] qualifiedColNames;
       String tmpColAlias;
       for (int i = 0; i < calciteColLst.size(); i++) {
@@ -2358,8 +2370,11 @@ public class CalcitePlanner extends Sema
          * the names so we don't run into this issue when converting back to
          * Hive AST.
          */
-        if (tmpColAlias.startsWith("_c"))
+        if (tmpColAlias.startsWith("_c")) {
           tmpColAlias = "_o_" + tmpColAlias;
+        } else if (windowToAlias != null && windowToAlias.containsKey(tmpColAlias))
{
+          tmpColAlias = windowToAlias.get(tmpColAlias);
+        }
         int suffix = 1;
         while (colNamesSet.contains(tmpColAlias)) {
           tmpColAlias = qualifiedColNames[1] + suffix;

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1668161&r1=1668160&r2=1668161&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri
Mar 20 21:37:45 2015
@@ -257,7 +257,7 @@ public class SemanticAnalyzer extends Ba
   private CreateViewDesc createVwDesc;
   private ArrayList<String> viewsExpanded;
   private ASTNode viewSelect;
-  private final UnparseTranslator unparseTranslator;
+  protected final UnparseTranslator unparseTranslator;
   private final GlobalLimitCtx globalLimitCtx;
 
   // prefix for column names auto generated by hive
@@ -477,7 +477,7 @@ public class SemanticAnalyzer extends Ba
           wExprsInDest.containsKey(wFnSpec.getExpression().toStringTree())) {
         continue;
       }
-      wFnSpec.setAlias("_wcol" + wColIdx);
+      wFnSpec.setAlias(wFnSpec.getName() + "_window_" + wColIdx);
       spec.addWindowFunction(wFnSpec);
       qb.getParseInfo().addWindowingExprToClause(dest, wFnSpec.getExpression());
     }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java?rev=1668161&r1=1668160&r2=1668161&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java Fri
Mar 20 21:37:45 2015
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.ql.metadat
  * SemanticAnalyzer.saveViewDefinition() calls TokenRewriteStream.toString().
  *
  */
-class UnparseTranslator {
+public class UnparseTranslator {
   // key is token start index
   private final NavigableMap<Integer, Translation> translations;
   private final List<CopyTranslation> copyTranslations;



Mime
View raw message