hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r1629544 [10/33] - in /hive/branches/spark-new: ./ accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/common/type/ c...
Date Sun, 05 Oct 2014 22:26:58 GMT
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Sun Oct  5 22:26:43 2014
@@ -71,7 +71,6 @@ import org.apache.hadoop.hive.ql.plan.Pl
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.IntWritable;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -85,6 +84,7 @@ import com.google.common.collect.Maps;
  */
 public class SortedDynPartitionOptimizer implements Transform {
 
+  private static final String BUCKET_NUMBER_COL_NAME = "_bucket_number";
   @Override
   public ParseContext transform(ParseContext pCtx) throws SemanticException {
 
@@ -216,6 +216,13 @@ public class SortedDynPartitionOptimizer
       ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder,
           newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
 
+      if (!bucketColumns.isEmpty()) {
+        String tableAlias = outRR.getColumnInfos().get(0).getTabAlias();
+        ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo,
+            tableAlias, true, true);
+        outRR.put(tableAlias, BUCKET_NUMBER_COL_NAME, ci);
+      }
+
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
           OperatorFactory.getAndMakeChild(rsConf, new RowSchema(outRR.getColumnInfos()), fsParent),
@@ -380,8 +387,11 @@ public class SortedDynPartitionOptimizer
       // corresponding with bucket number and hence their OIs
       for (Integer idx : keyColsPosInVal) {
         if (idx < 0) {
-          newKeyCols.add(new ExprNodeConstantDesc(TypeInfoFactory
-              .getPrimitiveTypeInfoFromPrimitiveWritable(IntWritable.class), -1));
+          // add bucket number column to both key and value
+          ExprNodeConstantDesc encd = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo,
+              BUCKET_NUMBER_COL_NAME);
+          newKeyCols.add(encd);
+          newValueCols.add(encd);
         } else {
           newKeyCols.add(newValueCols.get(idx).clone());
         }
@@ -395,7 +405,8 @@ public class SortedDynPartitionOptimizer
       // should honor the ordering of records provided by ORDER BY in SELECT statement
       ReduceSinkOperator parentRSOp = OperatorUtils.findSingleOperatorUpstream(parent,
           ReduceSinkOperator.class);
-      if (parentRSOp != null) {
+      boolean isOrderBy = parseCtx.getQB().getParseInfo().getDestToOrderBy().size() > 0;
+      if (parentRSOp != null && isOrderBy) {
         String parentRSOpOrder = parentRSOp.getConf().getOrder();
         if (parentRSOpOrder != null && !parentRSOpOrder.isEmpty() && sortPositions.isEmpty()) {
           newKeyCols.addAll(parentRSOp.getConf().getKeyCols());
@@ -417,6 +428,9 @@ public class SortedDynPartitionOptimizer
       List<String> outCols = Utilities.getInternalColumnNamesFromSignature(parent.getSchema()
           .getSignature());
       ArrayList<String> outValColNames = Lists.newArrayList(outCols);
+      if (!bucketColumns.isEmpty()) {
+        outValColNames.add(BUCKET_NUMBER_COL_NAME);
+      }
       List<FieldSchema> valFields = PlanUtils.getFieldSchemasFromColumnList(newValueCols,
           outValColNames, 0, "");
       TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields);

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Sun Oct  5 22:26:43 2014
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Stack;
 
+import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -104,7 +105,12 @@ public class OpTraitsRulesProcFactory {
 
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
       listBucketCols.add(bucketCols);
-      OpTraits opTraits = new OpTraits(listBucketCols, -1);
+      int numBuckets = -1;
+      OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getOpTraits();
+      if (parentOpTraits != null) {
+        numBuckets = parentOpTraits.getNumBuckets();
+      }
+      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols);
       rs.setOpTraits(opTraits);
       return null;
     }
@@ -163,15 +169,21 @@ public class OpTraitsRulesProcFactory {
       } catch (HiveException e) {
         prunedPartList = null;
       }
-      boolean bucketMapJoinConvertible = checkBucketedTable(table, 
+      boolean isBucketed = checkBucketedTable(table,
           opTraitsCtx.getParseContext(), prunedPartList);
-      List<List<String>>bucketCols = new ArrayList<List<String>>();
+      List<List<String>> bucketColsList = new ArrayList<List<String>>();
+      List<List<String>> sortedColsList = new ArrayList<List<String>>();
       int numBuckets = -1;
-      if (bucketMapJoinConvertible) {
-        bucketCols.add(table.getBucketCols());
+      if (isBucketed) {
+        bucketColsList.add(table.getBucketCols());
         numBuckets = table.getNumBuckets();
+        List<String> sortCols = new ArrayList<String>();
+        for (Order colSortOrder : table.getSortCols()) {
+          sortCols.add(colSortOrder.getCol());
+        }
+        sortedColsList.add(sortCols);
       }
-      OpTraits opTraits = new OpTraits(bucketCols, numBuckets);
+      OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList);
       ts.setOpTraits(opTraits);
       return null;
     }
@@ -197,7 +209,7 @@ public class OpTraitsRulesProcFactory {
 
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
       listBucketCols.add(gbyKeys);
-      OpTraits opTraits = new OpTraits(listBucketCols, -1);
+      OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols);
       gbyOp.setOpTraits(opTraits);
       return null;
     }
@@ -205,22 +217,17 @@ public class OpTraitsRulesProcFactory {
 
   public static class SelectRule implements NodeProcessor {
 
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      SelectOperator selOp = (SelectOperator)nd;
-      List<List<String>> parentBucketColNames = 
-          selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
-
+    public List<List<String>> getConvertedColNames(List<List<String>> parentColNames,
+        SelectOperator selOp) {
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
       if (selOp.getColumnExprMap() != null) {
-        if (parentBucketColNames != null) {
-          for (List<String> colNames : parentBucketColNames) {
+        if (parentColNames != null) {
+          for (List<String> colNames : parentColNames) {
             List<String> bucketColNames = new ArrayList<String>();
             for (String colName : colNames) {
               for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) {
                 if (entry.getValue() instanceof ExprNodeColumnDesc) {
-                  if(((ExprNodeColumnDesc)(entry.getValue())).getColumn().equals(colName)) {
+                  if (((ExprNodeColumnDesc) (entry.getValue())).getColumn().equals(colName)) {
                     bucketColNames.add(entry.getKey());
                   }
                 }
@@ -231,11 +238,34 @@ public class OpTraitsRulesProcFactory {
         }
       }
 
+      return listBucketCols;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      SelectOperator selOp = (SelectOperator)nd;
+      List<List<String>> parentBucketColNames =
+          selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
+
+      List<List<String>> listBucketCols = null;
+      List<List<String>> listSortCols = null;
+      if (selOp.getColumnExprMap() != null) {
+        if (parentBucketColNames != null) {
+          listBucketCols = getConvertedColNames(parentBucketColNames, selOp);
+        }
+        List<List<String>> parentSortColNames = selOp.getParentOperators().get(0).getOpTraits()
+            .getSortCols();
+        if (parentSortColNames != null) {
+          listSortCols = getConvertedColNames(parentSortColNames, selOp);
+        }
+      }
+
       int numBuckets = -1;
       if (selOp.getParentOperators().get(0).getOpTraits() != null) {
         numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets();
       }
-      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets);
+      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols);
       selOp.setOpTraits(opTraits);
       return null;
     }
@@ -248,6 +278,7 @@ public class OpTraitsRulesProcFactory {
         Object... nodeOutputs) throws SemanticException {
       JoinOperator joinOp = (JoinOperator)nd;
       List<List<String>> bucketColsList = new ArrayList<List<String>>();
+      List<List<String>> sortColsList = new ArrayList<List<String>>();
       byte pos = 0;
       for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
         if (!(parentOp instanceof ReduceSinkOperator)) {
@@ -259,26 +290,24 @@ public class OpTraitsRulesProcFactory {
           ReduceSinkRule rsRule = new ReduceSinkRule();
           rsRule.process(rsOp, stack, procCtx, nodeOutputs);
         }
-        bucketColsList.add(getOutputColNames(joinOp, rsOp, pos));
+        bucketColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getBucketColNames(), pos));
+        sortColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getSortCols(), pos));
         pos++;
       }
 
-      joinOp.setOpTraits(new OpTraits(bucketColsList, -1));
+      joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList));
       return null;
     }
 
-    private List<String> getOutputColNames(JoinOperator joinOp,
-        ReduceSinkOperator rs, byte pos) {
-      List<List<String>> parentBucketColNames =
-          rs.getOpTraits().getBucketColNames();
-
-      if (parentBucketColNames != null) {
+    private List<String> getOutputColNames(JoinOperator joinOp, List<List<String>> parentColNames,
+        byte pos) {
+      if (parentColNames != null) {
         List<String> bucketColNames = new ArrayList<String>();
 
         // guaranteed that there is only 1 list within this list because
         // a reduce sink always brings down the bucketing cols to a single list.
         // may not be true with correlation operators (mux-demux)
-        List<String> colNames = parentBucketColNames.get(0);
+        List<String> colNames = parentColNames.get(0);
         for (String colName : colNames) {
           for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) {
             if (exprNode instanceof ExprNodeColumnDesc) {
@@ -317,7 +346,7 @@ public class OpTraitsRulesProcFactory {
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      OpTraits opTraits = new OpTraits(null, -1);
+      OpTraits opTraits = new OpTraits(null, -1, null);
       @SuppressWarnings("unchecked")
       Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>)nd;
       operator.setOpTraits(opTraits);

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java Sun Oct  5 22:26:43 2014
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -152,6 +154,11 @@ public class CrossProductCheck implement
 
   private void checkMapJoins(TezWork tzWrk) throws SemanticException {
     for(BaseWork wrk : tzWrk.getAllWork() ) {
+
+      if ( wrk instanceof MergeJoinWork ) {
+        wrk = ((MergeJoinWork)wrk).getMainWork();
+      }
+
       List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk);
       if ( !warnings.isEmpty() ) {
         for(String w : warnings) {
@@ -163,12 +170,17 @@ public class CrossProductCheck implement
 
   private void checkTezReducer(TezWork tzWrk) throws SemanticException {
     for(BaseWork wrk : tzWrk.getAllWork() ) {
-      if ( !(wrk instanceof ReduceWork) ) {
+
+      if ( wrk instanceof MergeJoinWork ) {
+        wrk = ((MergeJoinWork)wrk).getMainWork();
+      }
+
+      if ( !(wrk instanceof ReduceWork ) ) {
         continue;
       }
       ReduceWork rWork = (ReduceWork) wrk;
       Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer();
-      if ( reducer instanceof JoinOperator ) {
+      if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) {
         Map<Integer, ExtractReduceSinkInfo.Info> rsInfo =
             new HashMap<Integer, ExtractReduceSinkInfo.Info>();
         for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) {
@@ -185,7 +197,7 @@ public class CrossProductCheck implement
       return;
     }
     Operator<? extends OperatorDesc> reducer = rWrk.getReducer();
-    if ( reducer instanceof JoinOperator ) {
+    if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) {
       BaseWork prntWork = mrWrk.getMapWork();
       checkForCrossProduct(taskName, reducer,
           new ExtractReduceSinkInfo(null).analyze(prntWork));

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java Sun Oct  5 22:26:43 2014
@@ -102,13 +102,19 @@ public class NullScanTaskDispatcher impl
   }
 
   private void processAlias(MapWork work, String alias) {
+    List<String> paths = getPathsForAlias(work, alias);
+    if (paths.isEmpty()) {
+      // partitioned table which don't select any partitions
+      // there are no paths to replace with fakePath
+      return;
+    }
     work.setUseOneNullRowInputFormat(true);
 
     // Change the alias partition desc
     PartitionDesc aliasPartn = work.getAliasToPartnInfo().get(alias);
     changePartitionToMetadataOnly(aliasPartn);
 
-    List<String> paths = getPathsForAlias(work, alias);
+
     for (String path : paths) {
       PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
       PartitionDesc newPartition = changePartitionToMetadataOnly(partDesc);

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Sun Oct  5 22:26:43 2014
@@ -422,10 +422,12 @@ public class Vectorizer implements Physi
 
         // Check value ObjectInspector.
         ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector();
-        if (valueObjectInspector == null || !(valueObjectInspector instanceof StructObjectInspector)) {
+        if (valueObjectInspector == null ||
+                !(valueObjectInspector instanceof StructObjectInspector)) {
           return false;
         }
-        StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
+        StructObjectInspector valueStructObjectInspector =
+                (StructObjectInspector)valueObjectInspector;
         valueColCount = valueStructObjectInspector.getAllStructFieldRefs().size();
       } catch (Exception e) {
         throw new SemanticException(e);
@@ -471,18 +473,20 @@ public class Vectorizer implements Physi
       LOG.info("Vectorizing ReduceWork...");
       reduceWork.setVectorMode(true);
  
-      // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as expected.
-      // We need to descend down, otherwise it breaks our algorithm that determines VectorizationContext...
-      // Do we use PreOrderWalker instead of DefaultGraphWalker.
+      // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as
+      // expected.  We need to descend down, otherwise it breaks our algorithm that determines
+      // VectorizationContext...  Do we use PreOrderWalker instead of DefaultGraphWalker.
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      ReduceWorkVectorizationNodeProcessor vnp = new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount);
+      ReduceWorkVectorizationNodeProcessor vnp =
+              new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount);
       addReduceWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new PreOrderWalker(disp);
       // iterator the reduce operator tree
       ArrayList<Node> topNodes = new ArrayList<Node>();
       topNodes.add(reduceWork.getReducer());
-      LOG.info("vectorizeReduceWork reducer Operator: " + reduceWork.getReducer().getName() + "...");
+      LOG.info("vectorizeReduceWork reducer Operator: " +
+              reduceWork.getReducer().getName() + "...");
       HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
       ogw.startWalking(topNodes, nodeOutput);
 
@@ -561,7 +565,7 @@ public class Vectorizer implements Physi
     protected final Map<String, VectorizationContext> scratchColumnContext =
         new HashMap<String, VectorizationContext>();
 
-    protected final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByTSOp =
+    protected final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByOp =
         new HashMap<Operator<? extends OperatorDesc>, VectorizationContext>();
 
     protected final Set<Operator<? extends OperatorDesc>> opsDone =
@@ -589,28 +593,30 @@ public class Vectorizer implements Physi
       return scratchColumnMap;
     }
 
-    public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack, Operator<? extends OperatorDesc> op)
-            throws SemanticException {
+    public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack,
+            Operator<? extends OperatorDesc> op) throws SemanticException {
       VectorizationContext vContext = null;
       if (stack.size() <= 1) {
-        throw new SemanticException(String.format("Expected operator stack for operator %s to have at least 2 operators", op.getName()));
+        throw new SemanticException(
+            String.format("Expected operator stack for operator %s to have at least 2 operators",
+                  op.getName()));
       }
       // Walk down the stack of operators until we found one willing to give us a context.
       // At the bottom will be the root operator, guaranteed to have a context
       int i= stack.size()-2;
       while (vContext == null) {
         if (i < 0) {
-          throw new SemanticException(String.format("Did not find vectorization context for operator %s in operator stack", op.getName()));
+          return null;
         }
         Operator<? extends OperatorDesc> opParent = (Operator<? extends OperatorDesc>) stack.get(i);
-        vContext = vContextsByTSOp.get(opParent);
+        vContext = vContextsByOp.get(opParent);
         --i;
       }
       return vContext;
     }
 
-    public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, VectorizationContext vContext)
-            throws SemanticException {
+    public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op,
+            VectorizationContext vContext) throws SemanticException {
       Operator<? extends OperatorDesc> vectorOp = op;
       try {
         if (!opsDone.contains(op)) {
@@ -622,7 +628,7 @@ public class Vectorizer implements Physi
           if (vectorOp instanceof VectorizationContextRegion) {
             VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
             VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
-            vContextsByTSOp.put(op, vOutContext);
+            vContextsByOp.put(op, vOutContext);
             scratchColumnContext.put(vOutContext.getFileKey(), vOutContext);
           }
         }
@@ -669,13 +675,24 @@ public class Vectorizer implements Physi
               //
               vContext.setFileKey(onefile);
               scratchColumnContext.put(onefile, vContext);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Vectorized MapWork operator " + op.getName() +
+                        " with vectorization context key=" + vContext.getFileKey() +
+                        ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
+                        ", columnMap: " + vContext.getColumnMap().toString());
+              }
               break;
             }
           }
         }
-        vContextsByTSOp.put(op, vContext);
+        vContextsByOp.put(op, vContext);
       } else {
         vContext = walkStackToFindVectorizationContext(stack, op);
+        if (vContext == null) {
+          throw new SemanticException(
+              String.format("Did not find vectorization context for operator %s in operator stack",
+                      op.getName()));
+        }
       }
 
       assert vContext != null;
@@ -690,7 +707,22 @@ public class Vectorizer implements Physi
         return null;
       }
 
-      doVectorize(op, vContext);
+      Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Vectorized MapWork operator " + vectorOp.getName() +
+                " with vectorization context key=" + vContext.getFileKey() +
+                ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
+                ", columnMap: " + vContext.getColumnMap().toString());
+        if (vectorOp instanceof VectorizationContextRegion) {
+          VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
+          VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
+          LOG.debug("Vectorized MapWork operator " + vectorOp.getName() +
+                  " added new vectorization context key=" + vOutContext.getFileKey() +
+                  ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() +
+                  ", columnMap: " + vOutContext.getColumnMap().toString());
+        }
+      }
 
       return null;
     }
@@ -702,6 +734,8 @@ public class Vectorizer implements Physi
     private int keyColCount;
     private int valueColCount;
     private Map<String, Integer> reduceColumnNameMap;
+    
+    private VectorizationContext reduceShuffleVectorizationContext;
 
     private Operator<? extends OperatorDesc> rootVectorOp;
 
@@ -709,12 +743,14 @@ public class Vectorizer implements Physi
       return rootVectorOp;
     }
 
-    public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount, int valueColCount) {
+    public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount,
+            int valueColCount) {
       this.rWork = rWork;
       reduceColumnNameMap = rWork.getReduceColumnNameMap();
       this.keyColCount = keyColCount;
       this.valueColCount = valueColCount;
       rootVectorOp = null;
+      reduceShuffleVectorizationContext = null;
     }
 
     @Override
@@ -722,7 +758,8 @@ public class Vectorizer implements Physi
         Object... nodeOutputs) throws SemanticException {
 
       Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
-      LOG.info("ReduceWorkVectorizationNodeProcessor processing Operator: " + op.getName() + "...");
+      LOG.info("ReduceWorkVectorizationNodeProcessor processing Operator: " +
+              op.getName() + "...");
 
       VectorizationContext vContext = null;
 
@@ -730,10 +767,24 @@ public class Vectorizer implements Physi
 
       if (op.getParentOperators().size() == 0) {
         vContext = getReduceVectorizationContext(reduceColumnNameMap);
-        vContextsByTSOp.put(op, vContext);
+        vContext.setFileKey("_REDUCE_SHUFFLE_");
+        scratchColumnContext.put("_REDUCE_SHUFFLE_", vContext);
+        reduceShuffleVectorizationContext = vContext;
         saveRootVectorOp = true;
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context key=" +
+                  vContext.getFileKey() +
+                  ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
+                  ", columnMap: " + vContext.getColumnMap().toString());
+        }
       } else {
         vContext = walkStackToFindVectorizationContext(stack, op);
+        if (vContext == null) {
+          // If we didn't find a context among the operators, assume the top -- reduce shuffle's
+          // vectorization context.
+          vContext = reduceShuffleVectorizationContext;
+        }
       }
 
       assert vContext != null;
@@ -749,6 +800,21 @@ public class Vectorizer implements Physi
       }
 
       Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() +
+                " with vectorization context key=" + vContext.getFileKey() +
+                ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
+                ", columnMap: " + vContext.getColumnMap().toString());
+        if (vectorOp instanceof VectorizationContextRegion) {
+          VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
+          VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
+          LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() +
+                  " added new vectorization context key=" + vOutContext.getFileKey() +
+                  ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() +
+                  ", columnMap: " + vOutContext.getColumnMap().toString());
+        }
+      }
       if (vectorOp instanceof VectorGroupByOperator) {
         VectorGroupByOperator groupBy = (VectorGroupByOperator) vectorOp;
         VectorGroupByDesc vectorDesc = groupBy.getConf().getVectorDesc();
@@ -827,6 +893,7 @@ public class Vectorizer implements Physi
         break;
       case FILESINK:
       case LIMIT:
+      case EVENT:
         ret = true;
         break;
       default:
@@ -866,6 +933,7 @@ public class Vectorizer implements Physi
         ret = validateFileSinkOperator((FileSinkOperator) op);
         break;
       case LIMIT:
+      case EVENT:
         ret = true;
         break;
       default:
@@ -1005,11 +1073,6 @@ public class Vectorizer implements Physi
   }
 
   private boolean validateFileSinkOperator(FileSinkOperator op) {
-    // HIVE-7557: For now, turn off dynamic partitioning to give more time to 
-    // figure out how to make VectorFileSink work correctly with it...
-   if (op.getConf().getDynPartCtx() != null) {
-     return false;
-   }
    return true;
   }
 
@@ -1017,7 +1080,8 @@ public class Vectorizer implements Physi
     return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION);
   }
 
-  private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, VectorExpressionDescriptor.Mode mode) {
+  private boolean validateExprNodeDesc(List<ExprNodeDesc> descs,
+          VectorExpressionDescriptor.Mode mode) {
     for (ExprNodeDesc d : descs) {
       boolean ret = validateExprNodeDesc(d, mode);
       if (!ret) {
@@ -1109,8 +1173,8 @@ public class Vectorizer implements Physi
     if (!supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase())) {
       return false;
     }
-    if (aggDesc.getParameters() != null) {
-      return validateExprNodeDesc(aggDesc.getParameters());
+    if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters())) {
+      return false;
     }
     // See if we can vectorize the aggregation.
     try {
@@ -1175,11 +1239,13 @@ public class Vectorizer implements Physi
     return new VectorizationContext(cmap, columnCount);
   }
 
-  private VectorizationContext getReduceVectorizationContext(Map<String, Integer> reduceColumnNameMap) {
+  private VectorizationContext getReduceVectorizationContext(
+          Map<String, Integer> reduceColumnNameMap) {
     return new VectorizationContext(reduceColumnNameMap, reduceColumnNameMap.size());
   }
 
-  private void fixupParentChildOperators(Operator<? extends OperatorDesc> op, Operator<? extends OperatorDesc> vectorOp) {
+  private void fixupParentChildOperators(Operator<? extends OperatorDesc> op, 
+          Operator<? extends OperatorDesc> vectorOp) {
     if (op.getParentOperators() != null) {
       vectorOp.setParentOperators(op.getParentOperators());
       for (Operator<? extends OperatorDesc> p : op.getParentOperators()) {
@@ -1207,6 +1273,7 @@ public class Vectorizer implements Physi
       case REDUCESINK:
       case LIMIT:
       case EXTRACT:
+      case EVENT:
         vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
         break;
       default:

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java Sun Oct  5 22:26:43 2014
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 /**
  * The transformation step that does partition pruning.
@@ -155,27 +156,85 @@ public class PartitionPruner implements 
    *         pruner condition.
    * @throws HiveException
    */
-  private static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr,
+  public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr,
       HiveConf conf, String alias, Map<String, PrunedPartitionList> prunedPartitionsMap)
           throws SemanticException {
+
     LOG.trace("Started pruning partiton");
     LOG.trace("dbname = " + tab.getDbName());
     LOG.trace("tabname = " + tab.getTableName());
-    LOG.trace("prune Expression = " + prunerExpr);
+    LOG.trace("prune Expression = " + prunerExpr == null ? "" : prunerExpr);
 
     String key = tab.getDbName() + "." + tab.getTableName() + ";";
 
-    if (prunerExpr != null) {
-      key = key + prunerExpr.getExprString();
+    if (!tab.isPartitioned()) {
+      // If the table is not partitioned, return empty list.
+      return getAllPartsFromCacheOrServer(tab, key, false, prunedPartitionsMap);
+    }
+
+    if ("strict".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEMAPREDMODE))
+        && !hasColumnExpr(prunerExpr)) {
+      // If the "strict" mode is on, we have to provide partition pruner for each table.
+      throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE
+          .getMsg("for Alias \"" + alias + "\" Table \"" + tab.getTableName() + "\""));
+    }
+
+    if (prunerExpr == null) {
+      // In non-strict mode and there is no predicates at all - get everything.
+      return getAllPartsFromCacheOrServer(tab, key, false, prunedPartitionsMap);
+    }
+
+    Set<String> partColsUsedInFilter = new LinkedHashSet<String>();
+    // Replace virtual columns with nulls. See javadoc for details.
+    prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab), partColsUsedInFilter);
+    // Remove all parts that are not partition columns. See javadoc for details.
+    ExprNodeGenericFuncDesc compactExpr = (ExprNodeGenericFuncDesc)compactExpr(prunerExpr.clone());
+    String oldFilter = prunerExpr.getExprString();
+    if (compactExpr == null) {
+      // Non-strict mode, and all the predicates are on non-partition columns - get everything.
+      LOG.debug("Filter " + oldFilter + " was null after compacting");
+      return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap);
+    }
+    LOG.debug("Filter w/ compacting: " + compactExpr.getExprString()
+        + "; filter w/o compacting: " + oldFilter);
+
+    key = key + compactExpr.getExprString();
+    PrunedPartitionList ppList = prunedPartitionsMap.get(key);
+    if (ppList != null) {
+      return ppList;
+    }
+
+    ppList = getPartitionsFromServer(tab, compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString()));
+    prunedPartitionsMap.put(key, ppList);
+    return ppList;
+  }
+
+  private static PrunedPartitionList getAllPartsFromCacheOrServer(Table tab, String key, boolean unknownPartitions,
+    Map<String, PrunedPartitionList> partsCache)  throws SemanticException {
+    PrunedPartitionList ppList = partsCache.get(key);
+    if (ppList != null) {
+      return ppList;
     }
-    PrunedPartitionList ret = prunedPartitionsMap.get(key);
-    if (ret != null) {
-      return ret;
+    Set<Partition> parts;
+    try {
+      parts = getAllPartitions(tab);
+    } catch (HiveException e) {
+      throw new SemanticException(e);
     }
+    ppList = new PrunedPartitionList(tab, parts, null, unknownPartitions);
+    partsCache.put(key, ppList);
+    return ppList;
+  }
 
-    ret = getPartitionsFromServer(tab, prunerExpr, conf, alias);
-    prunedPartitionsMap.put(key, ret);
-    return ret;
+  private static ExprNodeDesc removeTruePredciates(ExprNodeDesc e) {
+    if (e instanceof ExprNodeConstantDesc) {
+      ExprNodeConstantDesc eC = (ExprNodeConstantDesc) e;
+      if (e.getTypeInfo() == TypeInfoFactory.booleanTypeInfo
+          && eC.getValue() == Boolean.TRUE) {
+        return null;
+      }
+    }
+    return e;
   }
 
   /**
@@ -187,7 +246,8 @@ public class PartitionPruner implements 
    */
   static private ExprNodeDesc compactExpr(ExprNodeDesc expr) {
     if (expr instanceof ExprNodeConstantDesc) {
-      if (((ExprNodeConstantDesc)expr).getValue() == null) {
+      expr = removeTruePredciates(expr);
+      if (expr == null || ((ExprNodeConstantDesc)expr).getValue() == null) {
         return null;
       } else {
         throw new IllegalStateException("Unexpected non-null ExprNodeConstantDesc: "
@@ -198,10 +258,11 @@ public class PartitionPruner implements 
       boolean isAnd = udf instanceof GenericUDFOPAnd;
       if (isAnd || udf instanceof GenericUDFOPOr) {
         List<ExprNodeDesc> children = expr.getChildren();
-        ExprNodeDesc left = children.get(0);
-        children.set(0, compactExpr(left));
-        ExprNodeDesc right = children.get(1);
-        children.set(1, compactExpr(right));
+        ExprNodeDesc left = removeTruePredciates(children.get(0));
+        children.set(0, left == null ? null : compactExpr(left));
+        ExprNodeDesc right = removeTruePredciates(children.get(1));
+        children.set(1, right == null ? null : compactExpr(right));
+
         // Note that one does not simply compact (not-null or null) to not-null.
         // Only if we have an "and" is it valid to send one side to metastore.
         if (children.get(0) == null && children.get(1) == null) {
@@ -267,40 +328,8 @@ public class PartitionPruner implements 
   }
 
   private static PrunedPartitionList getPartitionsFromServer(Table tab,
-      ExprNodeDesc prunerExpr, HiveConf conf, String alias) throws SemanticException {
+      final ExprNodeGenericFuncDesc compactExpr, HiveConf conf, String alias, Set<String> partColsUsedInFilter, boolean isPruningByExactFilter) throws SemanticException {
     try {
-      if (!tab.isPartitioned()) {
-        // If the table is not partitioned, return everything.
-        return new PrunedPartitionList(tab, getAllPartitions(tab), null, false);
-      }
-      LOG.debug("tabname = " + tab.getTableName() + " is partitioned");
-
-      if ("strict".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEMAPREDMODE))
-          && !hasColumnExpr(prunerExpr)) {
-        // If the "strict" mode is on, we have to provide partition pruner for each table.
-        throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE
-            .getMsg("for Alias \"" + alias + "\" Table \"" + tab.getTableName() + "\""));
-      }
-
-      if (prunerExpr == null) {
-        // Non-strict mode, and there is no predicates at all - get everything.
-        return new PrunedPartitionList(tab, getAllPartitions(tab), null, false);
-      }
-
-      Set<String> referred = new LinkedHashSet<String>();
-      // Replace virtual columns with nulls. See javadoc for details.
-      prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab), referred);
-      // Remove all parts that are not partition columns. See javadoc for details.
-      ExprNodeGenericFuncDesc compactExpr = (ExprNodeGenericFuncDesc)compactExpr(prunerExpr.clone());
-      String oldFilter = prunerExpr.getExprString();
-      if (compactExpr == null) {
-        // Non-strict mode, and all the predicates are on non-partition columns - get everything.
-        LOG.debug("Filter " + oldFilter + " was null after compacting");
-        return new PrunedPartitionList(tab, getAllPartitions(tab), null, true);
-      }
-
-      LOG.debug("Filter w/ compacting: " + compactExpr.getExprString()
-        + "; filter w/o compacting: " + oldFilter);
 
       // Finally, check the filter for non-built-in UDFs. If these are present, we cannot
       // do filtering on the server, and have to fall back to client path.
@@ -330,9 +359,8 @@ public class PartitionPruner implements 
       // The partitions are "unknown" if the call says so due to the expression
       // evaluator returning null for a partition, or if we sent a partial expression to
       // metastore and so some partitions may have no data based on other filters.
-      boolean isPruningByExactFilter = oldFilter.equals(compactExpr.getExprString());
       return new PrunedPartitionList(tab, new LinkedHashSet<Partition>(partitions),
-          new ArrayList<String>(referred),
+          new ArrayList<String>(partColsUsedInFilter),
           hasUnknownPartitions || !isPruningByExactFilter);
     } catch (SemanticException e) {
       throw e;

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Sun Oct  5 22:26:43 2014
@@ -18,8 +18,14 @@
 
 package org.apache.hadoop.hive.ql.optimizer.stats.annotation;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -31,10 +37,12 @@ import org.apache.hadoop.hive.ql.exec.Fi
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -48,10 +56,12 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
@@ -66,17 +76,15 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class StatsRulesProcFactory {
 
   private static final Log LOG = LogFactory.getLog(StatsRulesProcFactory.class.getName());
+  private static final boolean isDebugEnabled = LOG.isDebugEnabled();
 
   /**
    * Collect basic statistics like number of rows, data size and column level statistics from the
@@ -103,9 +111,9 @@ public class StatsRulesProcFactory {
         Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, tsop);
         tsop.setStatistics(stats.clone());
 
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("[0] STATS-" + tsop.toString() + " (" + table.getTableName()
-              + "): " + stats.extendedToString());
+        if (isDebugEnabled) {
+          LOG.debug("[0] STATS-" + tsop.toString() + " (" + table.getTableName() + "): " +
+              stats.extendedToString());
         }
       } catch (CloneNotSupportedException e) {
         throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg());
@@ -167,14 +175,14 @@ public class StatsRulesProcFactory {
           stats.setDataSize(setMaxIfInvalid(dataSize));
           sop.setStatistics(stats);
 
-          if (LOG.isDebugEnabled()) {
+          if (isDebugEnabled) {
             LOG.debug("[0] STATS-" + sop.toString() + ": " + stats.extendedToString());
           }
         } else {
           if (parentStats != null) {
             sop.setStatistics(parentStats.clone());
 
-            if (LOG.isDebugEnabled()) {
+            if (isDebugEnabled) {
               LOG.debug("[1] STATS-" + sop.toString() + ": " + parentStats.extendedToString());
             }
           }
@@ -264,7 +272,7 @@ public class StatsRulesProcFactory {
               updateStats(st, newNumRows, true, fop);
             }
 
-            if (LOG.isDebugEnabled()) {
+            if (isDebugEnabled) {
               LOG.debug("[0] STATS-" + fop.toString() + ": " + st.extendedToString());
             }
           } else {
@@ -274,7 +282,7 @@ public class StatsRulesProcFactory {
               updateStats(st, newNumRows, false, fop);
             }
 
-            if (LOG.isDebugEnabled()) {
+            if (isDebugEnabled) {
               LOG.debug("[1] STATS-" + fop.toString() + ": " + st.extendedToString());
             }
           }
@@ -576,52 +584,103 @@ public class StatsRulesProcFactory {
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
+
       GroupByOperator gop = (GroupByOperator) nd;
       Operator<? extends OperatorDesc> parent = gop.getParentOperators().get(0);
       Statistics parentStats = parent.getStatistics();
+
+      // parent stats are not populated yet
+      if (parentStats == null) {
+        return null;
+      }
+
       AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
       HiveConf conf = aspCtx.getConf();
-      int mapSideParallelism =
-          HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_STATS_MAP_SIDE_PARALLELISM);
+      long maxSplitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
       List<AggregationDesc> aggDesc = gop.getConf().getAggregators();
       Map<String, ExprNodeDesc> colExprMap = gop.getColumnExprMap();
       RowSchema rs = gop.getSchema();
       Statistics stats = null;
+      List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(conf, parentStats,
+          colExprMap, rs);
+      long cardinality;
+      long parallelism = 1L;
       boolean mapSide = false;
-      int multiplier = mapSideParallelism;
-      long newNumRows;
-      long newDataSize;
+      boolean mapSideHashAgg = false;
+      long inputSize = 1L;
+      boolean containsGroupingSet = gop.getConf().isGroupingSetsPresent();
+      long sizeOfGroupingSet =
+          containsGroupingSet ? gop.getConf().getListGroupingSets().size() : 1L;
+
+      // There are different cases for Group By depending on map/reduce side, hash aggregation,
+      // grouping sets and column stats. If we don't have column stats, we just assume hash
+      // aggregation is disabled. Following are the possible cases and rule for cardinality
+      // estimation
+
+      // MAP SIDE:
+      // Case 1: NO column stats, NO hash aggregation, NO grouping sets — numRows
+      // Case 2: NO column stats, NO hash aggregation, grouping sets — numRows * sizeOfGroupingSet
+      // Case 3: column stats, hash aggregation, NO grouping sets — Min(numRows / 2, ndvProduct * parallelism)
+      // Case 4: column stats, hash aggregation, grouping sets — Min((numRows * sizeOfGroupingSet) / 2, ndvProduct * parallelism * sizeOfGroupingSet)
+      // Case 5: column stats, NO hash aggregation, NO grouping sets — numRows
+      // Case 6: column stats, NO hash aggregation, grouping sets — numRows * sizeOfGroupingSet
+
+      // REDUCE SIDE:
+      // Case 7: NO column stats — numRows / 2
+      // Case 8: column stats, grouping sets — Min(numRows, ndvProduct * sizeOfGroupingSet)
+      // Case 9: column stats, NO grouping sets - Min(numRows, ndvProduct)
 
-      // map side
       if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator ||
           gop.getChildOperators().get(0) instanceof AppMasterEventOperator) {
 
-         mapSide = true;
+        mapSide = true;
 
-        // map-side grouping set present. if grouping set is present then
-        // multiply the number of rows by number of elements in grouping set
-        if (gop.getConf().isGroupingSetsPresent()) {
-          multiplier *= gop.getConf().getListGroupingSets().size();
+        // consider approximate map side parallelism to be table data size
+        // divided by max split size
+        TableScanOperator top = OperatorUtils.findSingleOperatorUpstream(gop,
+            TableScanOperator.class);
+        // if top is null then there are multiple parents (RS as well), hence
+        // lets use parent statistics to get data size. Also maxSplitSize should
+        // be updated to bytes per reducer (1GB default)
+        if (top == null) {
+          inputSize = parentStats.getDataSize();
+          maxSplitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.BYTESPERREDUCER);
+        } else {
+          inputSize = top.getConf().getStatistics().getDataSize();
         }
+        parallelism = (int) Math.ceil((double) inputSize / maxSplitSize);
+      }
+
+      if (isDebugEnabled) {
+        LOG.debug("STATS-" + gop.toString() + ": inputSize: " + inputSize + " maxSplitSize: " +
+            maxSplitSize + " parallelism: " + parallelism + " containsGroupingSet: " +
+            containsGroupingSet + " sizeOfGroupingSet: " + sizeOfGroupingSet);
       }
 
       try {
+        // satisfying precondition means column statistics is available
         if (satisfyPrecondition(parentStats)) {
-          stats = parentStats.clone();
 
-          List<ColStatistics> colStats =
-              StatsUtils.getColStatisticsFromExprMap(conf, parentStats, colExprMap, rs);
+          // check if map side aggregation is possible or not based on column stats
+          mapSideHashAgg = checkMapSideAggregation(gop, colStats, conf);
+
+          if (isDebugEnabled) {
+            LOG.debug("STATS-" + gop.toString() + " mapSideHashAgg: " + mapSideHashAgg);
+          }
+
+          stats = parentStats.clone();
           stats.setColumnStats(colStats);
-          long dvProd = 1;
+          long ndvProduct = 1;
+          final long parentNumRows = stats.getNumRows();
 
           // compute product of distinct values of grouping columns
           for (ColStatistics cs : colStats) {
             if (cs != null) {
-              long dv = cs.getCountDistint();
+              long ndv = cs.getCountDistint();
               if (cs.getNumNulls() > 0) {
-                dv += 1;
+                ndv += 1;
               }
-              dvProd *= dv;
+              ndvProduct *= ndv;
             } else {
               if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) {
                 // the column must be an aggregate column inserted by GBY. We
@@ -632,65 +691,130 @@ public class StatsRulesProcFactory {
                 // partial column statistics on grouping attributes case.
                 // if column statistics on grouping attribute is missing, then
                 // assume worst case.
-                // GBY rule will emit half the number of rows if dvProd is 0
-                dvProd = 0;
+                // GBY rule will emit half the number of rows if ndvProduct is 0
+                ndvProduct = 0;
               }
               break;
             }
           }
 
-          // map side
+          // if ndvProduct is 0 then column stats state must be partial and we are missing
+          // column stats for a group by column
+          if (ndvProduct == 0) {
+            ndvProduct = parentNumRows / 2;
+
+            if (isDebugEnabled) {
+              LOG.debug("STATS-" + gop.toString() + ": ndvProduct became 0 as some column does not" +
+                  " have stats. ndvProduct changed to: " + ndvProduct);
+            }
+          }
+
           if (mapSide) {
+            // MAP SIDE
 
-            // since we do not know if hash-aggregation will be enabled or disabled
-            // at runtime we will assume that map-side group by does not do any
-            // reduction.hence no group by rule will be applied
-
-            // map-side grouping set present. if grouping set is present then
-            // multiply the number of rows by number of elements in grouping set
-            if (gop.getConf().isGroupingSetsPresent()) {
-              newNumRows = setMaxIfInvalid(multiplier * stats.getNumRows());
-              newDataSize = setMaxIfInvalid(multiplier * stats.getDataSize());
-              stats.setNumRows(newNumRows);
-              stats.setDataSize(newDataSize);
-              for (ColStatistics cs : colStats) {
-                if (cs != null) {
-                  long oldNumNulls = cs.getNumNulls();
-                  long newNumNulls = multiplier * oldNumNulls;
-                  cs.setNumNulls(newNumNulls);
+            if (mapSideHashAgg) {
+              if (containsGroupingSet) {
+                // Case 4: column stats, hash aggregation, grouping sets
+                cardinality = Math.min((parentNumRows * sizeOfGroupingSet) / 2,
+                    ndvProduct * parallelism * sizeOfGroupingSet);
+
+                if (isDebugEnabled) {
+                  LOG.debug("[Case 4] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+                }
+              } else {
+                // Case 3: column stats, hash aggregation, NO grouping sets
+                cardinality = Math.min(parentNumRows / 2, ndvProduct * parallelism);
+
+                if (isDebugEnabled) {
+                  LOG.debug("[Case 3] STATS-" + gop.toString() + ": cardinality: " + cardinality);
                 }
               }
             } else {
+              if (containsGroupingSet) {
+                // Case 6: column stats, NO hash aggregation, grouping sets
+                cardinality = parentNumRows * sizeOfGroupingSet;
+
+                if (isDebugEnabled) {
+                  LOG.debug("[Case 6] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+                }
+              } else {
+                // Case 5: column stats, NO hash aggregation, NO grouping sets
+                cardinality = parentNumRows;
 
-              // map side no grouping set
-              newNumRows = stats.getNumRows() * multiplier;
-              updateStats(stats, newNumRows, true, gop);
+                if (isDebugEnabled) {
+                  LOG.debug("[Case 5] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+                }
+              }
             }
           } else {
+            // REDUCE SIDE
+
+            // in reduce side GBY, we don't know if the grouping set was present or not. so get it
+            // from map side GBY
+            GroupByOperator mGop = OperatorUtils.findSingleOperatorUpstream(parent, GroupByOperator.class);
+            if (mGop != null) {
+              containsGroupingSet = mGop.getConf().isGroupingSetsPresent();
+              sizeOfGroupingSet = mGop.getConf().getListGroupingSets().size();
+            }
+
+            if (containsGroupingSet) {
+              // Case 8: column stats, grouping sets
+              cardinality = Math.min(parentNumRows, ndvProduct * sizeOfGroupingSet);
+
+              if (isDebugEnabled) {
+                LOG.debug("[Case 8] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+              }
+            } else {
+              // Case 9: column stats, NO grouping sets
+              cardinality = Math.min(parentNumRows, ndvProduct);
 
-            // reduce side
-            newNumRows = applyGBYRule(stats.getNumRows(), dvProd);
-            updateStats(stats, newNumRows, true, gop);
+              if (isDebugEnabled) {
+                LOG.debug("[Case 9] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+              }
+            }
           }
+
+          // update stats, but don't update NDV as it will not change
+          updateStats(stats, cardinality, true, gop, false);
         } else {
+
+          // NO COLUMN STATS
           if (parentStats != null) {
 
             stats = parentStats.clone();
+            final long parentNumRows = stats.getNumRows();
 
-            // worst case, in the absence of column statistics assume half the rows are emitted
+            // if we don't have column stats, we just assume hash aggregation is disabled
             if (mapSide) {
+              // MAP SIDE
+
+              if (containsGroupingSet) {
+                // Case 2: NO column stats, NO hash aggregation, grouping sets
+                cardinality = parentNumRows * sizeOfGroupingSet;
+
+                if (isDebugEnabled) {
+                  LOG.debug("[Case 2] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+                }
+              } else {
+                // Case 1: NO column stats, NO hash aggregation, NO grouping sets
+                cardinality = parentNumRows;
 
-              // map side
-              newNumRows = multiplier * stats.getNumRows();
-              newDataSize = multiplier * stats.getDataSize();
-              stats.setNumRows(newNumRows);
-              stats.setDataSize(newDataSize);
+                if (isDebugEnabled) {
+                  LOG.debug("[Case 1] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+                }
+              }
             } else {
+              // REDUCE SIDE
+
+              // Case 7: NO column stats
+              cardinality = parentNumRows / 2;
 
-              // reduce side
-              newNumRows = parentStats.getNumRows() / 2;
-              updateStats(stats, newNumRows, false, gop);
+              if (isDebugEnabled) {
+                LOG.debug("[Case 7] STATS-" + gop.toString() + ": cardinality: " + cardinality);
+              }
             }
+
+            updateStats(stats, cardinality, false, gop);
           }
         }
 
@@ -738,7 +862,7 @@ public class StatsRulesProcFactory {
 
         gop.setStatistics(stats);
 
-        if (LOG.isDebugEnabled() && stats != null) {
+        if (isDebugEnabled && stats != null) {
           LOG.debug("[0] STATS-" + gop.toString() + ": " + stats.extendedToString());
         }
       } catch (CloneNotSupportedException e) {
@@ -747,6 +871,107 @@ public class StatsRulesProcFactory {
       return null;
     }
 
+    /**
+     * This method does not take into account many configs used at runtime to
+     * disable hash aggregation like HIVEMAPAGGRHASHMINREDUCTION. This method
+     * roughly estimates the number of rows and size of each row to see if it
+     * can fit in hashtable for aggregation.
+     * @param gop - group by operator
+     * @param colStats - column stats for key columns
+     * @param conf - hive conf
+     * @return
+     */
+    private boolean checkMapSideAggregation(GroupByOperator gop,
+        List<ColStatistics> colStats, HiveConf conf) {
+
+      List<AggregationDesc> aggDesc = gop.getConf().getAggregators();
+      GroupByDesc desc = gop.getConf();
+      GroupByDesc.Mode mode = desc.getMode();
+
+      if (mode.equals(GroupByDesc.Mode.HASH)) {
+        float hashAggMem = conf.getFloatVar(
+            HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
+        float hashAggMaxThreshold = conf.getFloatVar(
+            HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+
+        // get memory for container. May be use mapreduce.map.java.opts instead?
+        long totalMemory =
+            DagUtils.getContainerResource(conf).getMemory() * 1000L * 1000L;
+        long maxMemHashAgg = Math
+            .round(totalMemory * hashAggMem * hashAggMaxThreshold);
+
+        // estimated number of rows will be product of NDVs
+        long numEstimatedRows = 1;
+
+        // estimate size of key from column statistics
+        long avgKeySize = 0;
+        for (ColStatistics cs : colStats) {
+          if (cs != null) {
+            numEstimatedRows *= cs.getCountDistint();
+            avgKeySize += Math.ceil(cs.getAvgColLen());
+          }
+        }
+
+        // average value size will be sum of all sizes of aggregation buffers
+        long avgValSize = 0;
+        // go over all aggregation buffers and see they implement estimable
+        // interface if so they aggregate the size of the aggregation buffer
+        GenericUDAFEvaluator[] aggregationEvaluators;
+        aggregationEvaluators = new GenericUDAFEvaluator[aggDesc.size()];
+
+        // get aggregation evaluators
+        for (int i = 0; i < aggregationEvaluators.length; i++) {
+          AggregationDesc agg = aggDesc.get(i);
+          aggregationEvaluators[i] = agg.getGenericUDAFEvaluator();
+        }
+
+        // estimate size of aggregation buffer
+        for (int i = 0; i < aggregationEvaluators.length; i++) {
+
+          // each evaluator has constant java object overhead
+          avgValSize += gop.javaObjectOverHead;
+          GenericUDAFEvaluator.AggregationBuffer agg = null;
+          try {
+            agg = aggregationEvaluators[i].getNewAggregationBuffer();
+          } catch (HiveException e) {
+            // in case of exception assume unknown type (256 bytes)
+            avgValSize += gop.javaSizeUnknownType;
+          }
+
+          // aggregate size from aggregation buffers
+          if (agg != null) {
+            if (GenericUDAFEvaluator.isEstimable(agg)) {
+              avgValSize += ((GenericUDAFEvaluator.AbstractAggregationBuffer) agg)
+                  .estimate();
+            } else {
+              // if the aggregation buffer is not estimable then get all the
+              // declared fields and compute the sizes from field types
+              Field[] fArr = ObjectInspectorUtils
+                  .getDeclaredNonStaticFields(agg.getClass());
+              for (Field f : fArr) {
+                long avgSize = StatsUtils
+                    .getAvgColLenOfFixedLengthTypes(f.getType().getName());
+                avgValSize += avgSize == 0 ? gop.javaSizeUnknownType : avgSize;
+              }
+            }
+          }
+        }
+
+        // total size of each hash entry
+        long hashEntrySize = gop.javaHashEntryOverHead + avgKeySize + avgValSize;
+
+        // estimated hash table size
+        long estHashTableSize = numEstimatedRows * hashEntrySize;
+
+        if (estHashTableSize < maxMemHashAgg) {
+          return true;
+        }
+      }
+
+      // worst-case, hash aggregation disabled
+      return false;
+    }
+
     private long applyGBYRule(long numRows, long dvProd) {
       long newNumRows = numRows;
 
@@ -967,7 +1192,7 @@ public class StatsRulesProcFactory {
               outInTabAlias);
           jop.setStatistics(stats);
 
-          if (LOG.isDebugEnabled()) {
+          if (isDebugEnabled) {
             LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString());
           }
         } else {
@@ -1001,7 +1226,7 @@ public class StatsRulesProcFactory {
           wcStats.setDataSize(setMaxIfInvalid(newDataSize));
           jop.setStatistics(wcStats);
 
-          if (LOG.isDebugEnabled()) {
+          if (isDebugEnabled) {
             LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
           }
         }
@@ -1195,7 +1420,7 @@ public class StatsRulesProcFactory {
           }
           lop.setStatistics(stats);
 
-          if (LOG.isDebugEnabled()) {
+          if (isDebugEnabled) {
             LOG.debug("[0] STATS-" + lop.toString() + ": " + stats.extendedToString());
           }
         } else {
@@ -1213,7 +1438,7 @@ public class StatsRulesProcFactory {
             }
             lop.setStatistics(wcStats);
 
-            if (LOG.isDebugEnabled()) {
+            if (isDebugEnabled) {
               LOG.debug("[1] STATS-" + lop.toString() + ": " + wcStats.extendedToString());
             }
           }
@@ -1281,7 +1506,7 @@ public class StatsRulesProcFactory {
             outStats.setColumnStats(colStats);
           }
           rop.setStatistics(outStats);
-          if (LOG.isDebugEnabled()) {
+          if (isDebugEnabled) {
             LOG.debug("[0] STATS-" + rop.toString() + ": " + outStats.extendedToString());
           }
         } catch (CloneNotSupportedException e) {
@@ -1322,7 +1547,7 @@ public class StatsRulesProcFactory {
                   stats.addToColumnStats(parentStats.getColumnStats());
                   op.getConf().setStatistics(stats);
 
-                  if (LOG.isDebugEnabled()) {
+                  if (isDebugEnabled) {
                     LOG.debug("[0] STATS-" + op.toString() + ": " + stats.extendedToString());
                   }
                 }
@@ -1378,6 +1603,7 @@ public class StatsRulesProcFactory {
     return new DefaultStatsRule();
   }
 
+
   /**
    * Update the basic statistics of the statistics object based on the row number
    * @param stats
@@ -1389,6 +1615,12 @@ public class StatsRulesProcFactory {
    */
   static void updateStats(Statistics stats, long newNumRows,
       boolean useColStats, Operator<? extends OperatorDesc> op) {
+    updateStats(stats, newNumRows, useColStats, op, true);
+  }
+
+  static void updateStats(Statistics stats, long newNumRows,
+      boolean useColStats, Operator<? extends OperatorDesc> op,
+      boolean updateNDV) {
 
     if (newNumRows <= 0) {
       LOG.info("STATS-" + op.toString() + ": Overflow in number of rows."
@@ -1406,17 +1638,19 @@ public class StatsRulesProcFactory {
         long oldNumNulls = cs.getNumNulls();
         long oldDV = cs.getCountDistint();
         long newNumNulls = Math.round(ratio * oldNumNulls);
-        long newDV = oldDV;
+        cs.setNumNulls(newNumNulls);
+        if (updateNDV) {
+          long newDV = oldDV;
 
-        // if ratio is greater than 1, then number of rows increases. This can happen
-        // when some operators like GROUPBY duplicates the input rows in which case
-        // number of distincts should not change. Update the distinct count only when
-        // the output number of rows is less than input number of rows.
-        if (ratio <= 1.0) {
-          newDV = (long) Math.ceil(ratio * oldDV);
+          // if ratio is greater than 1, then number of rows increases. This can happen
+          // when some operators like GROUPBY duplicates the input rows in which case
+          // number of distincts should not change. Update the distinct count only when
+          // the output number of rows is less than input number of rows.
+          if (ratio <= 1.0) {
+            newDV = (long) Math.ceil(ratio * oldDV);
+          }
+          cs.setCountDistint(newDV);
         }
-        cs.setNumNulls(newNumNulls);
-        cs.setCountDistint(newDV);
       }
       stats.setColumnStats(colStats);
       long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats);

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Sun Oct  5 22:26:43 2014
@@ -207,7 +207,7 @@ public abstract class BaseSemanticAnalyz
   }
 
   public abstract void analyzeInternal(ASTNode ast) throws SemanticException;
-  public void init() {
+  public void init(boolean clearPartsCache) {
     //no-op
   }
 
@@ -217,7 +217,7 @@ public abstract class BaseSemanticAnalyz
 
   public void analyze(ASTNode ast, Context ctx) throws SemanticException {
     initCtx(ctx);
-    init();
+    init(true);
     analyzeInternal(ast);
   }
 
@@ -244,7 +244,7 @@ public abstract class BaseSemanticAnalyz
     this.fetchTask = fetchTask;
   }
 
-  protected void reset() {
+  protected void reset(boolean clearPartsCache) {
     rootTasks = new ArrayList<Task<? extends Serializable>>();
   }
 
@@ -406,7 +406,6 @@ public abstract class BaseSemanticAnalyz
 
   @SuppressWarnings("nls")
   public static String unescapeSQLString(String b) {
-
     Character enclosure = null;
 
     // Some of the strings can be passed in as unicode. For example, the
@@ -487,7 +486,7 @@ public abstract class BaseSemanticAnalyz
         case '\\':
           sb.append("\\");
           break;
-        // The following 2 lines are exactly what MySQL does
+        // The following 2 lines are exactly what MySQL does TODO: why do we do this?
         case '%':
           sb.append("\\%");
           break;
@@ -505,6 +504,58 @@ public abstract class BaseSemanticAnalyz
     return sb.toString();
   }
 
+  /**
+   * Escapes the string for AST; doesn't enclose it in quotes, however.
+   */
+  public static String escapeSQLString(String b) {
+    // There's usually nothing to escape so we will be optimistic.
+    String result = b;
+    for (int i = 0; i < result.length(); ++i) {
+      char currentChar = result.charAt(i);
+      if (currentChar == '\\' && ((i + 1) < result.length())) {
+        // TODO: do we need to handle the "this is what MySQL does" here?
+        char nextChar = result.charAt(i + 1);
+        if (nextChar == '%' || nextChar == '_') {
+          ++i;
+          continue;
+        }
+      }
+      switch (currentChar) {
+      case '\0': result = spliceString(result, i, "\\0"); ++i; break;
+      case '\'': result = spliceString(result, i, "\\'"); ++i; break;
+      case '\"': result = spliceString(result, i, "\\\""); ++i; break;
+      case '\b': result = spliceString(result, i, "\\b"); ++i; break;
+      case '\n': result = spliceString(result, i, "\\n"); ++i; break;
+      case '\r': result = spliceString(result, i, "\\r"); ++i; break;
+      case '\t': result = spliceString(result, i, "\\t"); ++i; break;
+      case '\\': result = spliceString(result, i, "\\\\"); ++i; break;
+      case '\u001A': result = spliceString(result, i, "\\Z"); ++i; break;
+      default: {
+        if (currentChar < ' ') {
+          String hex = Integer.toHexString(currentChar);
+          String unicode = "\\u";
+          for (int j = 4; j > hex.length(); --j) {
+            unicode += '0';
+          }
+          unicode += hex;
+          result = spliceString(result, i, unicode);
+          i += (unicode.length() - 1);
+        }
+        break; // if not a control character, do nothing
+      }
+      }
+    }
+    return result;
+  }
+
+  private static String spliceString(String str, int i, String replacement) {
+    return spliceString(str, i, 1, replacement);
+  }
+
+  private static String spliceString(String str, int i, int length, String replacement) {
+    return str.substring(0, i) + replacement + str.substring(i + length);
+  }
+
   public HashSet<ReadEntity> getInputs() {
     return inputs;
   }
@@ -1234,7 +1285,7 @@ public abstract class BaseSemanticAnalyz
     try {
       database = db.getDatabase(dbName);
     } catch (Exception e) {
-      throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(dbName), e);
+      throw new SemanticException(e.getMessage(), e);
     }
     if (database == null && throwException) {
       throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(dbName));
@@ -1264,9 +1315,13 @@ public abstract class BaseSemanticAnalyz
     try {
       tab = database == null ? db.getTable(tblName, false)
           : db.getTable(database, tblName, false);
-    } catch (Exception e) {
+    }
+    catch (InvalidTableException e) {
       throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tblName), e);
     }
+    catch (Exception e) {
+      throw new SemanticException(e.getMessage(), e);
+    }
     if (tab == null && throwException) {
       throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tblName));
     }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java Sun Oct  5 22:26:43 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 
 /**
@@ -58,7 +59,7 @@ public class ColumnStatsSemanticAnalyzer
   private Table tbl;
 
   public ColumnStatsSemanticAnalyzer(HiveConf conf) throws SemanticException {
-    super(conf);
+    super(conf, false);
   }
 
   private boolean shouldRewrite(ASTNode tree) {
@@ -95,8 +96,10 @@ public class ColumnStatsSemanticAnalyzer
     String tableName = getUnescapedName((ASTNode) tree.getChild(0).getChild(0));
     try {
       return db.getTable(tableName);
+    } catch (InvalidTableException e) {
+      throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName), e);
     } catch (HiveException e) {
-      throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName));
+      throw new SemanticException(e.getMessage(), e);
     }
   }
 
@@ -377,7 +380,7 @@ public class ColumnStatsSemanticAnalyzer
     QBParseInfo qbp;
 
     // initialize QB
-    init();
+    init(true);
 
     // check if it is no scan. grammar prevents coexit noscan/columns
     super.processNoScanCommand(ast);

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Sun Oct  5 22:26:43 2014
@@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.lockmgr
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.authorization.AuthorizationParseUtils;
@@ -267,11 +268,11 @@ public class DDLSemanticAnalyzer extends
       } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_UNARCHIVE) {
         analyzeAlterTableArchive(qualified, ast, true);
       } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_ADDCOLS) {
-        analyzeAlterTableModifyCols(qualified, ast, AlterTableTypes.ADDCOLS);
+        analyzeAlterTableModifyCols(qualified, ast, partSpec, AlterTableTypes.ADDCOLS);
       } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_REPLACECOLS) {
-        analyzeAlterTableModifyCols(qualified, ast, AlterTableTypes.REPLACECOLS);
+        analyzeAlterTableModifyCols(qualified, ast, partSpec, AlterTableTypes.REPLACECOLS);
       } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_RENAMECOL) {
-        analyzeAlterTableRenameCol(qualified, ast);
+        analyzeAlterTableRenameCol(qualified, ast, partSpec);
       } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_ADDPARTS) {
         analyzeAlterTableAddParts(qualified, ast, false);
       } else if (ast.getType() == HiveParser.TOK_ALTERTABLE_DROPPARTS) {
@@ -847,7 +848,8 @@ public class DDLSemanticAnalyzer extends
       outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_EXCLUSIVE));
     }
 
-    DropTableDesc dropTblDesc = new DropTableDesc(tableName, expectView, ifExists);
+    boolean ifPurge = (ast.getFirstChildWithType(HiveParser.KW_PURGE) != null);
+    DropTableDesc dropTblDesc = new DropTableDesc(tableName, expectView, ifExists, ifPurge);
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
         dropTblDesc), conf));
   }
@@ -1717,7 +1719,8 @@ public class DDLSemanticAnalyzer extends
 
     // assume the first component of DOT delimited name is tableName
     // get the attemptTableName
-    static public String getAttemptTableName(Hive db, String qualifiedName, boolean isColumn) {
+    static public String getAttemptTableName(Hive db, String qualifiedName, boolean isColumn)
+        throws SemanticException {
       // check whether the name starts with table
       // DESCRIBE table
       // DESCRIBE table.column
@@ -1738,11 +1741,13 @@ public class DDLSemanticAnalyzer extends
             return tableName;
           }
         }
-      } catch (HiveException e) {
+      } catch (InvalidTableException e) {
         // assume the first DOT delimited component is tableName
         // OK if it is not
         // do nothing when having exception
         return null;
+      } catch (HiveException e) {
+        throw new SemanticException(e.getMessage(), e);
       }
       return null;
     }
@@ -1823,7 +1828,7 @@ public class DDLSemanticAnalyzer extends
       ASTNode parentAst,
       ASTNode ast,
       String tableName,
-      Map<String, String> partSpec) {
+      Map<String, String> partSpec) throws SemanticException {
 
       // if parent has two children
       // it could be DESCRIBE table key
@@ -1879,11 +1884,13 @@ public class DDLSemanticAnalyzer extends
         Table tab = null;
         try {
           tab = db.getTable(tableName);
-        } catch (HiveException e) {
-          // if table not valid
-          // throw semantic exception
+        }
+        catch (InvalidTableException e) {
           throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName), e);
         }
+        catch (HiveException e) {
+          throw new SemanticException(e.getMessage(), e);
+        }
 
         if (partSpec != null) {
           Partition part = null;
@@ -2480,7 +2487,8 @@ public class DDLSemanticAnalyzer extends
         alterTblDesc), conf));
   }
 
-  private void analyzeAlterTableRenameCol(String[] qualified, ASTNode ast) throws SemanticException {
+  private void analyzeAlterTableRenameCol(String[] qualified, ASTNode ast,
+      HashMap<String, String> partSpec) throws SemanticException {
     String newComment = null;
     String newType = null;
     newType = getTypeStringFromAST((ASTNode) ast.getChild(2));
@@ -2521,10 +2529,10 @@ public class DDLSemanticAnalyzer extends
     }
 
     String tblName = getDotName(qualified);
-    AlterTableDesc alterTblDesc = new AlterTableDesc(tblName,
+    AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, partSpec,
         unescapeIdentifier(oldColName), unescapeIdentifier(newColName),
         newType, newComment, first, flagCol);
-    addInputsOutputsAlterTable(tblName, null, alterTblDesc);
+    addInputsOutputsAlterTable(tblName, partSpec, alterTblDesc);
 
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
         alterTblDesc), conf));
@@ -2568,14 +2576,14 @@ public class DDLSemanticAnalyzer extends
   }
 
   private void analyzeAlterTableModifyCols(String[] qualified, ASTNode ast,
-      AlterTableTypes alterType) throws SemanticException {
+      HashMap<String, String> partSpec, AlterTableTypes alterType) throws SemanticException {
 
     String tblName = getDotName(qualified);
     List<FieldSchema> newCols = getColumns((ASTNode) ast.getChild(0));
-    AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, newCols,
+    AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, partSpec, newCols,
         alterType);
 
-    addInputsOutputsAlterTable(tblName, null, alterTblDesc);
+    addInputsOutputsAlterTable(tblName, partSpec, alterTblDesc);
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
         alterTblDesc), conf));
   }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g Sun Oct  5 22:26:43 2014
@@ -263,7 +263,7 @@ searchCondition
 // INSERT INTO <table> (col1,col2,...) SELECT * FROM (VALUES(1,2,3),(4,5,6),...) as Foo(a,b,c)
 valueRowConstructor
     :
-    LPAREN atomExpression (COMMA atomExpression)* RPAREN -> ^(TOK_VALUE_ROW atomExpression+)
+    LPAREN precedenceUnaryPrefixExpression (COMMA precedenceUnaryPrefixExpression)* RPAREN -> ^(TOK_VALUE_ROW precedenceUnaryPrefixExpression+)
     ;
 
 valuesTableConstructor

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java Sun Oct  5 22:26:43 2014
@@ -22,6 +22,8 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -81,7 +83,7 @@ public class FunctionSemanticAnalyzer ex
         new CreateFunctionDesc(functionName, isTemporaryFunction, className, resources);
     rootTasks.add(TaskFactory.get(new FunctionWork(desc), conf));
 
-    addEntities(functionName, isTemporaryFunction);
+    addEntities(functionName, isTemporaryFunction, resources);
   }
 
   private void analyzeDropFunction(ASTNode ast) throws SemanticException {
@@ -106,7 +108,7 @@ public class FunctionSemanticAnalyzer ex
     DropFunctionDesc desc = new DropFunctionDesc(functionName, isTemporaryFunction);
     rootTasks.add(TaskFactory.get(new FunctionWork(desc), conf));
 
-    addEntities(functionName, isTemporaryFunction);
+    addEntities(functionName, isTemporaryFunction, null);
   }
 
   private ResourceType getResourceType(ASTNode token) throws SemanticException {
@@ -152,8 +154,8 @@ public class FunctionSemanticAnalyzer ex
   /**
    * Add write entities to the semantic analyzer to restrict function creation to privileged users.
    */
-  private void addEntities(String functionName, boolean isTemporaryFunction)
-      throws SemanticException {
+  private void addEntities(String functionName, boolean isTemporaryFunction,
+      List<ResourceUri> resources) throws SemanticException {
     // If the function is being added under a database 'namespace', then add an entity representing
     // the database (only applicable to permanent/metastore functions).
     // We also add a second entity representing the function name.
@@ -183,5 +185,13 @@ public class FunctionSemanticAnalyzer ex
     // Add the function name as a WriteEntity
     outputs.add(new WriteEntity(database, functionName, Type.FUNCTION,
         WriteEntity.WriteType.DDL_NO_LOCK));
+
+    if (resources != null) {
+      for (ResourceUri resource : resources) {
+        String uriPath = resource.getUri();
+        outputs.add(new WriteEntity(new Path(uriPath),
+            FileUtils.isLocalFile(conf, uriPath)));
+      }
+    }
   }
 }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Sun Oct  5 22:26:43 2014
@@ -29,6 +29,7 @@ import java.util.Set;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
@@ -132,6 +134,8 @@ public class GenTezProcContext implement
 
   // remember which reducesinks we've already connected
   public final Set<ReduceSinkOperator> connectedReduceSinks;
+  public final Map<Operator<?>, MergeJoinWork> opMergeJoinWorkMap;
+  public CommonMergeJoinOperator currentMergeJoinOperator;
 
   // remember the event operators we've seen
   public final Set<AppMasterEventOperator> eventOperatorSet;
@@ -176,6 +180,8 @@ public class GenTezProcContext implement
     this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
     this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
     this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>();
+    this.opMergeJoinWorkMap = new LinkedHashMap<Operator<?>, MergeJoinWork>();
+    this.currentMergeJoinOperator = null;
 
     rootTasks.add(currentTask);
   }



Mime
View raw message