hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1465721 [1/4] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/ ql/src/java/org/apache/hadoop/hive/ql/plan/ ql/sr...
Date Mon, 08 Apr 2013 18:53:40 GMT
Author: hashutosh
Date: Mon Apr  8 18:53:39 2013
New Revision: 1465721

URL: http://svn.apache.org/r1465721
Log:
HIVE-2340 : optimize orderby followed by a groupby (Navis via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q
    hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_extended.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
    hive/trunk/ql/src/test/queries/clientpositive/auto_join26.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby_distinct_samekey.q
    hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q
    hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_cube1.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_distinct_samekey.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_rollup1.q.out
    hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort.q.out
    hive/trunk/ql/src/test/results/clientpositive/ppd2.q.out
    hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Apr  8 18:53:39 2013
@@ -544,6 +544,7 @@ public class HiveConf extends Configurat
     HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
     HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
     HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
+    HIVEOPTREDUCEDEDUPLICATIONMINREDUCER("hive.optimize.reducededuplication.min.reducer", 4),
     // whether to optimize union followed by select followed by filesink
     // It creates sub-directories in the final output, so should not be turned on in systems
     // where MAPREDUCE-1501 is not present

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Mon Apr  8 18:53:39 2013
@@ -1077,6 +1077,14 @@
 </property>
 
 <property>
+  <name>hive.optimize.reducededuplication.min.reducer</name>
+  <value>4</value>
+  <description>Reduce deduplication merges two RSs by moving key/parts/reducer-num of the child RS to parent RS.
+  That means if reducer-num of the child RS is fixed (order by or forced bucketing) and small, it can make very slow, single MR.
+  The optimization will be disabled if number of reducers is less than specified value.</description>
+</property>
+
+<property>
   <name>hive.exec.dynamic.partition</name>
   <value>true</value>
   <description>Whether or not to allow dynamic partitions in DML/DDL.</description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Mon Apr  8 18:53:39 2013
@@ -749,71 +749,39 @@ public final class ColumnPrunerProcFacto
       ReduceSinkOperator reduce, ColumnPrunerProcCtx cppCtx) throws SemanticException {
     ReduceSinkDesc reduceConf = reduce.getConf();
     Map<String, ExprNodeDesc> oldMap = reduce.getColumnExprMap();
-    Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
-    ArrayList<ColumnInfo> sig = new ArrayList<ColumnInfo>();
     RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(reduce).getRowResolver();
-    RowResolver newRR = new RowResolver();
-    ArrayList<String> originalValueOutputColNames = reduceConf
-        .getOutputValueColumnNames();
-    java.util.ArrayList<ExprNodeDesc> originalValueEval = reduceConf
-        .getValueCols();
-    ArrayList<String> newOutputColNames = new ArrayList<String>();
-    java.util.ArrayList<ExprNodeDesc> newValueEval = new ArrayList<ExprNodeDesc>();
-    // ReduceSinkOperators that precede GroupByOperators have the keys in the schema in addition
-    // to the values.  These are not pruned.
-    List<ColumnInfo> oldSchema = oldRR.getRowSchema().getSignature();
-    for (ColumnInfo colInfo : oldSchema) {
-      if (colInfo.getInternalName().startsWith(Utilities.ReduceField.KEY.toString() + ".")) {
-        String[] nm = oldRR.reverseLookup(colInfo.getInternalName());
-        newRR.put(nm[0], nm[1], colInfo);
-        sig.add(colInfo);
-      } else {
-        break;
-      }
-    }
+    ArrayList<ColumnInfo> signature = oldRR.getRowSchema().getSignature();
+
+    List<String> valueColNames = reduceConf.getOutputValueColumnNames();
+    ArrayList<String> newValueColNames = new ArrayList<String>();
+
+    List<ExprNodeDesc> valueExprs = reduceConf.getValueCols();
+    ArrayList<ExprNodeDesc> newValueExprs = new ArrayList<ExprNodeDesc>();
+
     for (int i = 0; i < retainFlags.length; i++) {
-      if (retainFlags[i]) {
-        newValueEval.add(originalValueEval.get(i));
-        String outputCol = originalValueOutputColNames.get(i);
-        newOutputColNames.add(outputCol);
+      String outputCol = valueColNames.get(i);
+      ExprNodeDesc outputColExpr = valueExprs.get(i);
+      if (!retainFlags[i]) {
         String[] nm = oldRR.reverseLookup(outputCol);
         if (nm == null) {
           outputCol = Utilities.ReduceField.VALUE.toString() + "." + outputCol;
           nm = oldRR.reverseLookup(outputCol);
         }
-        newMap.put(outputCol, oldMap.get(outputCol));
-        ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
-        newRR.put(nm[0], nm[1], colInfo);
-        sig.add(colInfo);
-      }
-    }
-
-    ArrayList<ExprNodeDesc> keyCols = reduceConf.getKeyCols();
-    List<String> keys = new ArrayList<String>();
-    RowResolver parResover = cppCtx.getOpToParseCtxMap().get(
-        reduce.getParentOperators().get(0)).getRowResolver();
-    for (int i = 0; i < keyCols.size(); i++) {
-      keys = Utilities.mergeUniqElems(keys, keyCols.get(i).getCols());
-    }
-    for (int i = 0; i < keys.size(); i++) {
-      String outputCol = keys.get(i);
-      String[] nm = parResover.reverseLookup(outputCol);
-      ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
-      if (colInfo != null) {
-        String internalName=colInfo.getInternalName();
-        newMap.put(internalName, oldMap.get(internalName));
-        newRR.put(nm[0], nm[1], colInfo);
+        ColumnInfo colInfo = oldRR.getFieldMap(nm[0]).remove(nm[1]);
+        oldRR.getInvRslvMap().remove(colInfo.getInternalName());
+        oldMap.remove(outputCol);
+        signature.remove(colInfo);
+      } else {
+        newValueColNames.add(outputCol);
+        newValueExprs.add(outputColExpr);
       }
     }
 
-    cppCtx.getOpToParseCtxMap().get(reduce).setRowResolver(newRR);
-    reduce.setColumnExprMap(newMap);
-    reduce.getSchema().setSignature(sig);
-    reduceConf.setOutputValueColumnNames(newOutputColNames);
-    reduceConf.setValueCols(newValueEval);
+    reduceConf.setOutputValueColumnNames(newValueColNames);
+    reduceConf.setValueCols(newValueExprs);
     TableDesc newValueTable = PlanUtils.getReduceValueTableDesc(PlanUtils
         .getFieldSchemasFromColumnList(reduceConf.getValueCols(),
-        newOutputColNames, 0, ""));
+        newValueColNames, 0, ""));
     reduceConf.setValueSerializeInfo(newValueTable);
   }
 
@@ -1042,4 +1010,4 @@ public final class ColumnPrunerProcFacto
     return new ColumnPrunerMapJoinProc();
   }
 
-}
\ No newline at end of file
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Apr  8 18:53:39 2013
@@ -397,9 +397,8 @@ public class MapJoinProcessor implements
       byte srcTag = entry.getKey();
       List<ExprNodeDesc> filter = entry.getValue();
 
-      Operator<?> start = oldReduceSinkParentOps.get(srcTag);
-      Operator<?> terminal = newParentOps.get(srcTag);
-      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, start, terminal));
+      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
+      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
     }
     desc.setFilters(filters = newFilters);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java Mon Apr  8 18:53:39 2013
@@ -27,7 +27,6 @@ import java.util.Set;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -84,8 +83,6 @@ public class NonBlockingOpDeDupProc impl
 
       // For SEL-SEL(compute) case, move column exprs/names of child to parent.
       if (!cSEL.getConf().isSelStarNoCompute()) {
-        Operator<?> terminal = ExprNodeDescUtils.getSingleParent(pSEL, null);
-
         Set<String> funcOutputs = getFunctionOutputs(
             pSEL.getConf().getOutputColumnNames(), pSEL.getConf().getColList());
 
@@ -93,7 +90,7 @@ public class NonBlockingOpDeDupProc impl
         if (!funcOutputs.isEmpty() && !checkReferences(sources, funcOutputs)) {
           return null;
         }
-        pSEL.getConf().setColList(ExprNodeDescUtils.backtrack(sources, pSEL, terminal));
+        pSEL.getConf().setColList(ExprNodeDescUtils.backtrack(sources, cSEL, pSEL));
         pSEL.getConf().setOutputColumnNames(cSEL.getConf().getOutputColumnNames());
 
         // updates schema only (this should be the last optimizer modifying operator tree)

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java Mon Apr  8 18:53:39 2013
@@ -18,15 +18,17 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.io.Serializable;
+import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ExtractOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -52,32 +54,54 @@ import org.apache.hadoop.hive.ql.parse.O
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONMINREDUCER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST;
 
 /**
- * If two reducer sink operators share the same partition/sort columns, we
- * should merge them. This should happen after map join optimization because map
+ * If two reducer sink operators share the same partition/sort columns and order,
+ * they can be merged. This should happen after map join optimization because map
  * join optimization will remove reduce sink operators.
  */
 public class ReduceSinkDeDuplication implements Transform{
 
+  private static final String RS = ReduceSinkOperator.getOperatorName();
+  private static final String GBY = GroupByOperator.getOperatorName();
+  private static final String JOIN = JoinOperator.getOperatorName();
+
   protected ParseContext pGraphContext;
 
   @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
     pGraphContext = pctx;
 
- // generate pruned column list for all relevant operators
+    // generate pruned column list for all relevant operators
     ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext);
 
+    boolean mergeJoins = !pctx.getConf().getBoolVar(HIVECONVERTJOIN) &&
+        !pctx.getConf().getBoolVar(HIVECONVERTJOINNOCONDITIONALTASK);
+
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp("R1",
-      ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
-      ReduceSinkDeduplicateProcFactory.getReducerReducerProc());
+    opRules.put(new RuleRegExp("R1", RS + "%.*%" + RS + "%"),
+        ReduceSinkDeduplicateProcFactory.getReducerReducerProc());
+    opRules.put(new RuleRegExp("R2", RS + "%" + GBY + "%.*%" + RS + "%"),
+        ReduceSinkDeduplicateProcFactory.getGroupbyReducerProc());
+    if (mergeJoins) {
+      opRules.put(new RuleRegExp("R3", JOIN + "%.*%" + RS + "%"),
+          ReduceSinkDeduplicateProcFactory.getJoinReducerProc());
+    }
+    // TODO RS+JOIN
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -92,23 +116,27 @@ public class ReduceSinkDeDuplication imp
     return pGraphContext;
   }
 
-  class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx{
+  class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx {
+
     ParseContext pctx;
-    List<ReduceSinkOperator> rejectedRSList;
+    boolean trustScript;
+    // min reducer num for merged RS (to avoid query contains "order by" executed by one reducer)
+    int minReducer;
+    Set<Operator<?>> removedOps;
 
     public ReduceSinkDeduplicateProcCtx(ParseContext pctx) {
-      rejectedRSList = new ArrayList<ReduceSinkOperator>();
+      removedOps = new HashSet<Operator<?>>();
+      trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST);
+      minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER);
       this.pctx = pctx;
     }
 
-    public boolean contains (ReduceSinkOperator rsOp) {
-      return rejectedRSList.contains(rsOp);
+    public boolean contains(Operator<?> rsOp) {
+      return removedOps.contains(rsOp);
     }
 
-    public void addRejectedReduceSinkOperator(ReduceSinkOperator rsOp) {
-      if (!rejectedRSList.contains(rsOp)) {
-        rejectedRSList.add(rsOp);
-      }
+    public boolean addRemovedOperator(Operator<?> rsOp) {
+      return removedOps.add(rsOp);
     }
 
     public ParseContext getPctx() {
@@ -120,355 +148,598 @@ public class ReduceSinkDeDuplication imp
     }
   }
 
-
   static class ReduceSinkDeduplicateProcFactory {
 
-
     public static NodeProcessor getReducerReducerProc() {
       return new ReducerReducerProc();
     }
 
+    public static NodeProcessor getGroupbyReducerProc() {
+      return new GroupbyReducerProc();
+    }
+
+    public static NodeProcessor getJoinReducerProc() {
+      return new JoinReducerProc();
+    }
+
     public static NodeProcessor getDefaultProc() {
       return new DefaultProc();
     }
+  }
 
-    /*
-     * do nothing.
-     */
-    static class DefaultProc implements NodeProcessor {
-      @Override
-      public Object process(Node nd, Stack<Node> stack,
-          NodeProcessorCtx procCtx, Object... nodeOutputs)
-          throws SemanticException {
-        return null;
-      }
+  /*
+   * do nothing.
+   */
+  static class DefaultProc implements NodeProcessor {
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      return null;
     }
+  }
 
-    static class ReducerReducerProc implements NodeProcessor {
-      @Override
-      public Object process(Node nd, Stack<Node> stack,
-          NodeProcessorCtx procCtx, Object... nodeOutputs)
-          throws SemanticException {
-        ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx;
-        ReduceSinkOperator childReduceSink = (ReduceSinkOperator)nd;
+  public abstract static class AbsctractReducerReducerProc implements NodeProcessor {
 
-        if(ctx.contains(childReduceSink)) {
-          return null;
-        }
+    ReduceSinkDeduplicateProcCtx dedupCtx;
 
-        List<Operator<? extends OperatorDesc>> childOp =
-          childReduceSink.getChildOperators();
-        if (childOp != null && childOp.size() == 1) {
-          Operator<? extends OperatorDesc> child = childOp.get(0);
-          if (child instanceof GroupByOperator || child instanceof JoinOperator) {
-            ctx.addRejectedReduceSinkOperator(childReduceSink);
-            return null;
-          }
-        }
+    protected boolean trustScript() {
+      return dedupCtx.trustScript;
+    }
 
-        ParseContext pGraphContext = ctx.getPctx();
-        HashMap<String, String> childColumnMapping =
-          getPartitionAndKeyColumnMapping(childReduceSink);
-        ReduceSinkOperator parentRS = null;
-        parentRS = findSingleParentReduceSink(childReduceSink, pGraphContext);
-        if (parentRS == null) {
-          ctx.addRejectedReduceSinkOperator(childReduceSink);
-          return null;
-        }
-        HashMap<String, String> parentColumnMapping = getPartitionAndKeyColumnMapping(parentRS);
-        Operator<? extends OperatorDesc> stopBacktrackFlagOp = null;
-        if (parentRS.getParentOperators() == null
-            || parentRS.getParentOperators().size() == 0) {
-          stopBacktrackFlagOp =  parentRS;
-        } else if (parentRS.getParentOperators().size() != 1) {
-          return null;
-        } else {
-          stopBacktrackFlagOp = parentRS.getParentOperators().get(0);
-        }
+    protected int minReducer() {
+      return dedupCtx.minReducer;
+    }
 
-        boolean succeed = backTrackColumnNames(childColumnMapping, childReduceSink, stopBacktrackFlagOp, pGraphContext);
-        if (!succeed) {
-          return null;
-        }
-        succeed = backTrackColumnNames(parentColumnMapping, parentRS, stopBacktrackFlagOp, pGraphContext);
-        if (!succeed) {
-          return null;
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      dedupCtx = (ReduceSinkDeduplicateProcCtx) procCtx;
+      if (dedupCtx.contains((Operator<?>) nd)) {
+        return false;
+      }
+      ReduceSinkOperator cRS = (ReduceSinkOperator) nd;
+      Operator<?> child = getSingleChild(cRS);
+      if (child instanceof JoinOperator) {
+        return false; // not supported
+      }
+      ParseContext pctx = dedupCtx.getPctx();
+      if (child instanceof GroupByOperator) {
+        GroupByOperator cGBY = (GroupByOperator) child;
+        if (!hasGroupingSet(cRS) && !cGBY.getConf().isGroupingSetsPresent()) {
+          return process(cRS, cGBY, pctx);
         }
+        return false;
+      }
+      if (child instanceof ExtractOperator) {
+        return process(cRS, pctx);
+      }
+      return false;
+    }
 
-        boolean same = compareReduceSink(childReduceSink, parentRS, childColumnMapping, parentColumnMapping);
-        if (!same) {
-          return null;
-        }
-        replaceReduceSinkWithSelectOperator(childReduceSink, pGraphContext);
-        return null;
+    private boolean hasGroupingSet(ReduceSinkOperator cRS) {
+      GroupByOperator cGBYm = getSingleParent(cRS, GroupByOperator.class);
+      if (cGBYm != null && cGBYm.getConf().isGroupingSetsPresent()) {
+        return true;
+      }
+      return false;
+    }
+
+    protected Operator<?> getSingleParent(Operator<?> operator) {
+      List<Operator<?>> parents = operator.getParentOperators();
+      if (parents != null && parents.size() == 1) {
+        return parents.get(0);
+      }
+      return null;
+    }
+
+    protected Operator<?> getSingleChild(Operator<?> operator) {
+      List<Operator<?>> children = operator.getChildOperators();
+      if (children != null && children.size() == 1) {
+        return children.get(0);
       }
+      return null;
+    }
 
-      private void replaceReduceSinkWithSelectOperator(
-          ReduceSinkOperator childReduceSink, ParseContext pGraphContext) throws SemanticException {
-        List<Operator<? extends OperatorDesc>> parentOp =
-          childReduceSink.getParentOperators();
-        List<Operator<? extends OperatorDesc>> childOp =
-          childReduceSink.getChildOperators();
+    protected <T> T getSingleParent(Operator<?> operator, Class<T> type) {
+      Operator<?> parent = getSingleParent(operator);
+      return type.isInstance(parent) ? (T)parent : null;
+    }
 
-        Operator<? extends OperatorDesc> oldParent = childReduceSink;
+    protected abstract Object process(ReduceSinkOperator cRS, ParseContext context)
+        throws SemanticException;
 
-        if (childOp != null && childOp.size() == 1
-            && ((childOp.get(0)) instanceof ExtractOperator)) {
-          oldParent = childOp.get(0);
-          childOp = childOp.get(0).getChildOperators();
-        }
+    protected abstract Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+        ParseContext context) throws SemanticException;
 
-        Operator<? extends OperatorDesc> input = parentOp.get(0);
-        input.getChildOperators().clear();
+    protected Operator<?> getStartForGroupBy(ReduceSinkOperator cRS) {
+      Operator<? extends Serializable> parent = getSingleParent(cRS);
+      return parent instanceof GroupByOperator ? parent : cRS;  // skip map-aggr GBY
+    }
 
-        RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver();
+    // for JOIN-RS case, it's not possible generally to merge if child has
+    // more key/partition columns than parents
+    protected boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReducer)
+        throws SemanticException {
+      List<Operator<?>> parents = pJoin.getParentOperators();
+      ReduceSinkOperator[] pRSs = parents.toArray(new ReduceSinkOperator[parents.size()]);
+      ReduceSinkDesc cRSc = cRS.getConf();
+      ReduceSinkDesc pRS0c = pRSs[0].getConf();
+      if (cRSc.getKeyCols().size() > pRS0c.getKeyCols().size()) {
+        return false;
+      }
+      if (cRSc.getPartitionCols().size() > pRS0c.getPartitionCols().size()) {
+        return false;
+      }
+      Integer moveReducerNumTo = checkNumReducer(cRSc.getNumReducers(), pRS0c.getNumReducers());
+      if (moveReducerNumTo == null ||
+          moveReducerNumTo > 0 && cRSc.getNumReducers() < minReducer) {
+        return false;
+      }
 
-        ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
-        ArrayList<String> outputs = new ArrayList<String>();
-        List<String> outputCols = childReduceSink.getConf().getOutputValueColumnNames();
-        RowResolver outputRS = new RowResolver();
+      Integer moveRSOrderTo = checkOrder(cRSc.getOrder(), pRS0c.getOrder());
+      if (moveRSOrderTo == null) {
+        return false;
+      }
 
-        Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+      boolean[] sorted = getSortedTags(pJoin);
 
-        for (int i = 0; i < outputCols.size(); i++) {
-          String internalName = outputCols.get(i);
-          String[] nm = inputRR.reverseLookup(internalName);
-          ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]);
-          ExprNodeDesc colDesc = childReduceSink.getConf().getValueCols().get(i);
-          exprs.add(colDesc);
-          outputs.add(internalName);
-          outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo
-              .getType(), nm[0], valueInfo.getIsVirtualCol(), valueInfo.isHiddenVirtualCol()));
-          colExprMap.put(internalName, colDesc);
+      int cKeySize = cRSc.getKeyCols().size();
+      for (int i = 0; i < cKeySize; i++) {
+        ExprNodeDesc cexpr = cRSc.getKeyCols().get(i);
+        ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+        for (int tag = 0; tag < pRSs.length; tag++) {
+          pexprs[tag] = pRSs[tag].getConf().getKeyCols().get(i);
         }
+        int found = indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+        if (found < 0) {
+          return false;
+        }
+      }
+      int cPartSize = cRSc.getPartitionCols().size();
+      for (int i = 0; i < cPartSize; i++) {
+        ExprNodeDesc cexpr = cRSc.getPartitionCols().get(i);
+        ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+        for (int tag = 0; tag < pRSs.length; tag++) {
+          pexprs[tag] = pRSs[tag].getConf().getPartitionCols().get(i);
+        }
+        int found = indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+        if (found < 0) {
+          return false;
+        }
+      }
 
-        SelectDesc select = new SelectDesc(exprs, outputs, false);
+      if (moveReducerNumTo > 0) {
+        for (ReduceSinkOperator pRS : pRSs) {
+          pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
+        }
+      }
+      return true;
+    }
 
-        SelectOperator sel = (SelectOperator) putOpInsertMap(
-            OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
-            .getColumnInfos()), input), inputRR, pGraphContext);
+    private boolean[] getSortedTags(JoinOperator joinOp) {
+      boolean[] result = new boolean[joinOp.getParentOperators().size()];
+      for (int tag = 0; tag < result.length; tag++) {
+        result[tag] = isSortedTag(joinOp, tag);
+      }
+      return result;
+    }
 
-        sel.setColumnExprMap(colExprMap);
+    private boolean isSortedTag(JoinOperator joinOp, int tag) {
+      for (JoinCondDesc cond : joinOp.getConf().getConds()) {
+        switch (cond.getType()) {
+          case JoinDesc.LEFT_OUTER_JOIN:
+            if (cond.getRight() == tag) {
+              return false;
+            }
+            continue;
+          case JoinDesc.RIGHT_OUTER_JOIN:
+            if (cond.getLeft() == tag) {
+              return false;
+            }
+            continue;
+          case JoinDesc.FULL_OUTER_JOIN:
+            if (cond.getLeft() == tag || cond.getRight() == tag) {
+              return false;
+            }
+        }
+      }
+      return true;
+    }
 
-        // Insert the select operator in between.
-        sel.setChildOperators(childOp);
-        for (Operator<? extends OperatorDesc> ch : childOp) {
-          ch.replaceParent(oldParent, sel);
+    private int indexOf(ExprNodeDesc cexpr, ExprNodeDesc[] pexprs, Operator child,
+        Operator[] parents, boolean[] sorted) throws SemanticException {
+      for (int tag = 0; tag < parents.length; tag++) {
+        if (sorted[tag] &&
+            pexprs[tag].isSame(ExprNodeDescUtils.backtrack(cexpr, child, parents[tag]))) {
+          return tag;
         }
+      }
+      return -1;
+    }
 
+    protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+        throws SemanticException {
+      int[] result = checkStatus(cRS, pRS, minReducer);
+      if (result == null) {
+        return false;
+      }
+      if (result[0] > 0) {
+        ArrayList<ExprNodeDesc> childKCs = cRS.getConf().getKeyCols();
+        pRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(childKCs, cRS, pRS));
+      }
+      if (result[1] > 0) {
+        ArrayList<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+        pRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(childPCs, cRS, pRS));
+      }
+      if (result[2] > 0) {
+        pRS.getConf().setOrder(cRS.getConf().getOrder());
+      }
+      if (result[3] > 0) {
+        pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
       }
+      return true;
+    }
 
-      private Operator<? extends OperatorDesc> putOpInsertMap(
-        Operator<? extends OperatorDesc> op, RowResolver rr, ParseContext pGraphContext) {
-        OpParseContext ctx = new OpParseContext(rr);
-        pGraphContext.getOpParseCtx().put(op, ctx);
-        return op;
+    // -1 for p to c, 1 for c to p
+    private int[] checkStatus(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+        throws SemanticException {
+      ReduceSinkDesc cConf = cRS.getConf();
+      ReduceSinkDesc pConf = pRS.getConf();
+      Integer moveRSOrderTo = checkOrder(cConf.getOrder(), pConf.getOrder());
+      if (moveRSOrderTo == null) {
+        return null;
+      }
+      Integer moveReducerNumTo = checkNumReducer(cConf.getNumReducers(), pConf.getNumReducers());
+      if (moveReducerNumTo == null ||
+          moveReducerNumTo > 0 && cConf.getNumReducers() < minReducer) {
+        return null;
+      }
+      List<ExprNodeDesc> ckeys = cConf.getKeyCols();
+      List<ExprNodeDesc> pkeys = pConf.getKeyCols();
+      Integer moveKeyColTo = checkExprs(ckeys, pkeys, cRS, pRS);
+      if (moveKeyColTo == null) {
+        return null;
       }
+      List<ExprNodeDesc> cpars = cConf.getPartitionCols();
+      List<ExprNodeDesc> ppars = pConf.getPartitionCols();
+      Integer movePartitionColTo = checkExprs(cpars, ppars, cRS, pRS);
+      if (movePartitionColTo == null) {
+        return null;
+      }
+      return new int[] {moveKeyColTo, movePartitionColTo, moveRSOrderTo, moveReducerNumTo};
+    }
 
-      private boolean compareReduceSink(ReduceSinkOperator childReduceSink,
-          ReduceSinkOperator parentRS,
-          HashMap<String, String> childColumnMapping,
-          HashMap<String, String> parentColumnMapping) {
+    private Integer checkExprs(List<ExprNodeDesc> ckeys, List<ExprNodeDesc> pkeys,
+        ReduceSinkOperator cRS, ReduceSinkOperator pRS) throws SemanticException {
+      Integer moveKeyColTo = 0;
+      if (ckeys == null || ckeys.isEmpty()) {
+        if (pkeys != null && !pkeys.isEmpty()) {
+          moveKeyColTo = -1;
+        }
+      } else {
+        if (pkeys == null || pkeys.isEmpty()) {
+          for (ExprNodeDesc ckey : ckeys) {
+            if (ExprNodeDescUtils.backtrack(ckey, cRS, pRS) == null) {
+              return null;
+            }
+          }
+          moveKeyColTo = 1;
+        } else {
+          moveKeyColTo = sameKeys(ckeys, pkeys, cRS, pRS);
+        }
+      }
+      return moveKeyColTo;
+    }
 
-        ArrayList<ExprNodeDesc> childPartitionCols = childReduceSink.getConf().getPartitionCols();
-        ArrayList<ExprNodeDesc> parentPartitionCols = parentRS.getConf().getPartitionCols();
+    protected Integer sameKeys(List<ExprNodeDesc> cexprs, List<ExprNodeDesc> pexprs,
+        Operator<?> child, Operator<?> parent) throws SemanticException {
+      int common = Math.min(cexprs.size(), pexprs.size());
+      int limit = Math.max(cexprs.size(), pexprs.size());
+      int i = 0;
+      for (; i < common; i++) {
+        ExprNodeDesc pexpr = pexprs.get(i);
+        ExprNodeDesc cexpr = ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent);
+        if (!pexpr.isSame(cexpr)) {
+          return null;
+        }
+      }
+      for (;i < limit; i++) {
+        if (cexprs.size() > pexprs.size()) {
+          if (ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent) == null) {
+            return null;
+          }
+        }
+      }
+      return Integer.valueOf(cexprs.size()).compareTo(pexprs.size());
+    }
 
-        boolean ret = compareExprNodes(childColumnMapping, parentColumnMapping,
-            childPartitionCols, parentPartitionCols);
-        if (!ret) {
-          return false;
+    protected Integer checkOrder(String corder, String porder) {
+      if (corder == null || corder.trim().equals("")) {
+        if (porder == null || porder.trim().equals("")) {
+          return 0;
         }
+        return -1;
+      }
+      if (porder == null || porder.trim().equals("")) {
+        return 1;
+      }
+      corder = corder.trim();
+      porder = porder.trim();
+      int target = Math.min(corder.length(), porder.length());
+      if (!corder.substring(0, target).equals(porder.substring(0, target))) {
+        return null;
+      }
+      return Integer.valueOf(corder.length()).compareTo(porder.length());
+    }
 
-        ArrayList<ExprNodeDesc> childReduceKeyCols = childReduceSink.getConf().getKeyCols();
-        ArrayList<ExprNodeDesc> parentReduceKeyCols = parentRS.getConf().getKeyCols();
-        ret = compareExprNodes(childColumnMapping, parentColumnMapping,
-            childReduceKeyCols, parentReduceKeyCols);
-        if (!ret) {
-          return false;
+    protected Integer checkNumReducer(int creduce, int preduce) {
+      if (creduce < 0) {
+        if (preduce < 0) {
+          return 0;
         }
+        return -1;
+      }
+      if (preduce < 0) {
+        return 1;
+      }
+      if (creduce != preduce) {
+        return null;
+      }
+      return 0;
+    }
 
-        String childRSOrder = childReduceSink.getConf().getOrder();
-        String parentRSOrder = parentRS.getConf().getOrder();
-        boolean moveChildRSOrderToParent = false;
-        //move child reduce sink's order to the parent reduce sink operator.
-        if (childRSOrder != null && !(childRSOrder.trim().equals(""))) {
-          if (parentRSOrder == null
-              || !childRSOrder.trim().equals(parentRSOrder.trim())) {
-            return false;
-          }
-        } else {
-          if(parentRSOrder == null || parentRSOrder.trim().equals("")) {
-            moveChildRSOrderToParent = true;
-          }
+    protected <T extends Operator<?>> T findPossibleParent(Operator<?> start, Class<T> target,
+        boolean trustScript) throws SemanticException {
+      T[] parents = findPossibleParents(start, target, trustScript);
+      return parents != null && parents.length == 1 ? parents[0] : null;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T extends Operator<?>> T[] findPossibleParents(Operator<?> start, Class<T> target,
+        boolean trustScript) {
+      Operator<?> cursor = getSingleParent(start);
+      for (; cursor != null; cursor = getSingleParent(cursor)) {
+        if (target.isAssignableFrom(cursor.getClass())) {
+          T[] array = (T[]) Array.newInstance(target, 1);
+          array[0] = (T) cursor;
+          return array;
+        }
+        if (cursor instanceof JoinOperator) {
+          return findParents((JoinOperator) cursor, target);
         }
+        if (cursor instanceof ScriptOperator && !trustScript) {
+          return null;
+        }
+        if (!(cursor instanceof SelectOperator
+            || cursor instanceof FilterOperator
+            || cursor instanceof ExtractOperator
+            || cursor instanceof ForwardOperator
+            || cursor instanceof ScriptOperator
+            || cursor instanceof ReduceSinkOperator)) {
+          return null;
+        }
+      }
+      return null;
+    }
 
-        int childNumReducers = childReduceSink.getConf().getNumReducers();
-        int parentNumReducers = parentRS.getConf().getNumReducers();
-        boolean moveChildReducerNumToParent = false;
-        //move child reduce sink's number reducers to the parent reduce sink operator.
-        if (childNumReducers != parentNumReducers) {
-          if (childNumReducers == -1) {
-            //do nothing.
-          } else if (parentNumReducers == -1) {
-            //set childNumReducers in the parent reduce sink operator.
-            moveChildReducerNumToParent = true;
-          } else {
-            return false;
+    @SuppressWarnings("unchecked")
+    private <T extends Operator<?>> T[] findParents(JoinOperator join, Class<T> target) {
+      List<Operator<?>> parents = join.getParentOperators();
+      T[] result = (T[]) Array.newInstance(target, parents.size());
+      for (int tag = 0; tag < result.length; tag++) {
+        Operator<?> cursor = parents.get(tag);
+        for (; cursor != null; cursor = getSingleParent(cursor)) {
+          if (target.isAssignableFrom(cursor.getClass())) {
+            result[tag] = (T) cursor;
+            break;
           }
         }
-
-        if(moveChildRSOrderToParent) {
-          parentRS.getConf().setOrder(childRSOrder);
+        if (result[tag] == null) {
+          throw new IllegalStateException("failed to find " + target.getSimpleName()
+              + " from " + join + " on tag " + tag);
         }
+      }
+      return result;
+    }
 
-        if(moveChildReducerNumToParent) {
-          parentRS.getConf().setNumReducers(childNumReducers);
-        }
+    protected SelectOperator replaceReduceSinkWithSelectOperator(ReduceSinkOperator childRS,
+        ParseContext context) throws SemanticException {
+      SelectOperator select = replaceOperatorWithSelect(childRS, context);
+      select.getConf().setOutputColumnNames(childRS.getConf().getOutputValueColumnNames());
+      select.getConf().setColList(childRS.getConf().getValueCols());
+      return select;
+    }
 
-        return true;
+    private SelectOperator replaceOperatorWithSelect(Operator<?> operator, ParseContext context)
+        throws SemanticException {
+      RowResolver inputRR = context.getOpParseCtx().get(operator).getRowResolver();
+      SelectDesc select = new SelectDesc(null, null);
+
+      Operator<?> parent = getSingleParent(operator);
+      Operator<?> child = getSingleChild(operator);
+
+      parent.getChildOperators().clear();
+
+      SelectOperator sel = (SelectOperator) putOpInsertMap(
+          OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
+              .getColumnInfos()), parent), inputRR, context);
+
+      sel.setColumnExprMap(operator.getColumnExprMap());
+
+      sel.setChildOperators(operator.getChildOperators());
+      for (Operator<? extends Serializable> ch : operator.getChildOperators()) {
+        ch.replaceParent(operator, sel);
+      }
+      if (child instanceof ExtractOperator) {
+        removeOperator(child, getSingleChild(child), sel, context);
+        dedupCtx.addRemovedOperator(child);
       }
+      operator.setChildOperators(null);
+      operator.setParentOperators(null);
+      dedupCtx.addRemovedOperator(operator);
+      return sel;
+    }
 
-      private boolean compareExprNodes(HashMap<String, String> childColumnMapping,
-          HashMap<String, String> parentColumnMapping,
-          ArrayList<ExprNodeDesc> childColExprs,
-          ArrayList<ExprNodeDesc> parentColExprs) {
+    protected void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupByOperator cGBYr,
+        ParseContext context) throws SemanticException {
 
-        boolean childEmpty = childColExprs == null || childColExprs.size() == 0;
-        boolean parentEmpty = parentColExprs == null || parentColExprs.size() == 0;
+      Operator<?> parent = getSingleParent(cRS);
 
-        if (childEmpty) { //both empty
-          return true;
-        }
+      if (parent instanceof GroupByOperator) {
+        GroupByOperator cGBYm = (GroupByOperator) parent;
 
-        //child not empty here
-        if (parentEmpty) { // child not empty, but parent empty
-          return false;
+        cGBYr.getConf().setKeys(cGBYm.getConf().getKeys());
+        cGBYr.getConf().setAggregators(cGBYm.getConf().getAggregators());
+        for (AggregationDesc aggr : cGBYm.getConf().getAggregators()) {
+          aggr.setMode(GenericUDAFEvaluator.Mode.COMPLETE);
         }
-
-        if (childColExprs.size() != parentColExprs.size()) {
-          return false;
+        cGBYr.setColumnExprMap(cGBYm.getColumnExprMap());
+        cGBYr.setSchema(cGBYm.getSchema());
+        RowResolver resolver = context.getOpParseCtx().get(cGBYm).getRowResolver();
+        context.getOpParseCtx().get(cGBYr).setRowResolver(resolver);
+      } else {
+        cGBYr.getConf().setKeys(ExprNodeDescUtils.backtrack(cGBYr.getConf().getKeys(), cGBYr, cRS));
+        for (AggregationDesc aggr : cGBYr.getConf().getAggregators()) {
+          aggr.setParameters(ExprNodeDescUtils.backtrack(aggr.getParameters(), cGBYr, cRS));
         }
-        int i = 0;
-        while (i < childColExprs.size()) {
-          ExprNodeDesc childExpr = childColExprs.get(i);
-          ExprNodeDesc parentExpr = parentColExprs.get(i);
-
-          if ((childExpr instanceof ExprNodeColumnDesc)
-              && (parentExpr instanceof ExprNodeColumnDesc)) {
-            String childCol = childColumnMapping
-                .get(((ExprNodeColumnDesc) childExpr).getColumn());
-            String parentCol = parentColumnMapping
-                .get(((ExprNodeColumnDesc) childExpr).getColumn());
 
-            if (!childCol.equals(parentCol)) {
-              return false;
-            }
-          } else {
-            return false;
+        Map<String, ExprNodeDesc> oldMap = cGBYr.getColumnExprMap();
+        RowResolver oldRR = context.getOpParseCtx().get(cGBYr).getRowResolver();
+
+        Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
+        RowResolver newRR = new RowResolver();
+
+        List<String> outputCols = cGBYr.getConf().getOutputColumnNames();
+        for (int i = 0; i < outputCols.size(); i++) {
+          String colName = outputCols.get(i);
+          String[] nm = oldRR.reverseLookup(colName);
+          ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
+          newRR.put(nm[0], nm[1], colInfo);
+          ExprNodeDesc colExpr = ExprNodeDescUtils.backtrack(oldMap.get(colName), cGBYr, cRS);
+          if (colExpr != null) {
+            newMap.put(colInfo.getInternalName(), colExpr);
           }
-          i++;
         }
-        return true;
+        cGBYr.setColumnExprMap(newMap);
+        cGBYr.setSchema(new RowSchema(newRR.getColumnInfos()));
+        context.getOpParseCtx().get(cGBYr).setRowResolver(newRR);
       }
+      cGBYr.getConf().setMode(GroupByDesc.Mode.COMPLETE);
 
-      /*
-       * back track column names to find their corresponding original column
-       * names. Only allow simple operators like 'select column' or filter.
-       */
-      private boolean backTrackColumnNames(
-          HashMap<String, String> columnMapping,
-          ReduceSinkOperator reduceSink,
-          Operator<? extends OperatorDesc> stopBacktrackFlagOp,
-          ParseContext pGraphContext) {
-        Operator<? extends OperatorDesc> startOperator = reduceSink;
-        while (startOperator != null && startOperator != stopBacktrackFlagOp) {
-          startOperator = startOperator.getParentOperators().get(0);
-          Map<String, ExprNodeDesc> colExprMap = startOperator.getColumnExprMap();
-          if(colExprMap == null || colExprMap.size()==0) {
-            continue;
-          }
-          Iterator<String> keyIter = columnMapping.keySet().iterator();
-          while (keyIter.hasNext()) {
-            String key = keyIter.next();
-            String oldCol = columnMapping.get(key);
-            ExprNodeDesc exprNode = colExprMap.get(oldCol);
-            if(exprNode instanceof ExprNodeColumnDesc) {
-              String col = ((ExprNodeColumnDesc)exprNode).getColumn();
-              columnMapping.put(key, col);
-            } else {
-              return false;
-            }
-          }
-        }
+      removeOperator(cRS, cGBYr, parent, context);
+      dedupCtx.addRemovedOperator(cRS);
+
+      if (parent instanceof GroupByOperator) {
+        removeOperator(parent, cGBYr, getSingleParent(parent), context);
+        dedupCtx.addRemovedOperator(cGBYr);
+      }
+    }
+
+    private void removeOperator(Operator<?> target, Operator<?> child, Operator<?> parent,
+        ParseContext context) {
+      for (Operator<?> aparent : target.getParentOperators()) {
+        aparent.replaceChild(target, child);
+      }
+      for (Operator<?> achild : target.getChildOperators()) {
+        achild.replaceParent(target, parent);
+      }
+      target.setChildOperators(null);
+      target.setParentOperators(null);
+      context.getOpParseCtx().remove(target);
+    }
+
+    private Operator<? extends Serializable> putOpInsertMap(Operator<?> op, RowResolver rr,
+        ParseContext context) {
+      OpParseContext ctx = new OpParseContext(rr);
+      context.getOpParseCtx().put(op, ctx);
+      return op;
+    }
+  }
 
+  static class GroupbyReducerProc extends AbsctractReducerReducerProc {
+
+    // pRS-pGBY-cRS
+    public Object process(ReduceSinkOperator cRS, ParseContext context)
+        throws SemanticException {
+      GroupByOperator pGBY = findPossibleParent(cRS, GroupByOperator.class, trustScript());
+      if (pGBY == null) {
+        return false;
+      }
+      ReduceSinkOperator pRS = findPossibleParent(pGBY, ReduceSinkOperator.class, trustScript());
+      if (pRS != null && merge(cRS, pRS, minReducer())) {
+        replaceReduceSinkWithSelectOperator(cRS, context);
         return true;
       }
+      return false;
+    }
 
-      private HashMap<String, String> getPartitionAndKeyColumnMapping(ReduceSinkOperator reduceSink) {
-        HashMap<String, String> columnMapping = new HashMap<String, String> ();
-        ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();
-        ArrayList<ExprNodeDesc> partitionCols = reduceSinkDesc.getPartitionCols();
-        ArrayList<ExprNodeDesc> reduceKeyCols = reduceSinkDesc.getKeyCols();
-        if(partitionCols != null) {
-          for (ExprNodeDesc desc : partitionCols) {
-            List<String> cols = desc.getCols();
-            if ( cols != null ) {
-              for(String col : cols) {
-                columnMapping.put(col, col);
-              }
-            }
-          }
-        }
-        if(reduceKeyCols != null) {
-          for (ExprNodeDesc desc : reduceKeyCols) {
-            List<String> cols = desc.getCols();
-            if ( cols != null ) {
-              for(String col : cols) {
-                columnMapping.put(col, col);
-              }
-            }
-          }
-        }
-        return columnMapping;
+    // pRS-pGBY-cRS-cGBY
+    public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context)
+        throws SemanticException {
+      Operator<?> start = getStartForGroupBy(cRS);
+      GroupByOperator pGBY = findPossibleParent(start, GroupByOperator.class, trustScript());
+      if (pGBY == null) {
+        return false;
       }
+      ReduceSinkOperator pRS = getSingleParent(pGBY, ReduceSinkOperator.class);
+      if (pRS != null && merge(cRS, pRS, minReducer())) {
+        removeReduceSinkForGroupBy(cRS, cGBY, context);
+        return true;
+      }
+      return false;
+    }
+  }
 
-      private ReduceSinkOperator findSingleParentReduceSink(ReduceSinkOperator childReduceSink, ParseContext pGraphContext) {
-        Operator<? extends OperatorDesc> start = childReduceSink;
-        while(start != null) {
-          if (start.getParentOperators() == null
-              || start.getParentOperators().size() != 1) {
-            // this potentially is a join operator
-            return null;
-          }
+  static class JoinReducerProc extends AbsctractReducerReducerProc {
 
-          boolean allowed = false;
-          if ((start instanceof SelectOperator)
-              || (start instanceof FilterOperator)
-              || (start instanceof ExtractOperator)
-              || (start instanceof ForwardOperator)
-              || (start instanceof ScriptOperator)
-              || (start instanceof ReduceSinkOperator)) {
-            allowed = true;
-          }
+    // pRS-pJOIN-cRS
+    public Object process(ReduceSinkOperator cRS, ParseContext context)
+        throws SemanticException {
+      JoinOperator pJoin = findPossibleParent(cRS, JoinOperator.class, trustScript());
+      if (pJoin != null && merge(cRS, pJoin, minReducer())) {
+        pJoin.getConf().setFixedAsSorted(true);
+        replaceReduceSinkWithSelectOperator(cRS, context);
+        return true;
+      }
+      return false;
+    }
 
-          if (!allowed) {
-            return null;
-          }
+    // pRS-pJOIN-cRS-cGBY
+    public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context)
+        throws SemanticException {
+      Operator<?> start = getStartForGroupBy(cRS);
+      JoinOperator pJoin = findPossibleParent(start, JoinOperator.class, trustScript());
+      if (pJoin != null && merge(cRS, pJoin, minReducer())) {
+        pJoin.getConf().setFixedAsSorted(true);
+        removeReduceSinkForGroupBy(cRS, cGBY, context);
+        return true;
+      }
+      return false;
+    }
+  }
 
-          if ((start instanceof ScriptOperator)
-              && !HiveConf.getBoolVar(pGraphContext.getConf(),
-                  HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST)) {
-            return null;
-          }
+  static class ReducerReducerProc extends AbsctractReducerReducerProc {
 
-          start = start.getParentOperators().get(0);
-          if(start instanceof ReduceSinkOperator) {
-            return (ReduceSinkOperator)start;
-          }
-        }
-        return null;
+    // pRS-cRS
+    public Object process(ReduceSinkOperator cRS, ParseContext context)
+        throws SemanticException {
+      ReduceSinkOperator pRS = findPossibleParent(cRS, ReduceSinkOperator.class, trustScript());
+      if (pRS != null && merge(cRS, pRS, minReducer())) {
+        replaceReduceSinkWithSelectOperator(cRS, context);
+        return true;
       }
+      return false;
     }
 
+    // pRS-cRS-cGBY
+    public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context)
+        throws SemanticException {
+      Operator<?> start = getStartForGroupBy(cRS);
+      ReduceSinkOperator pRS = findPossibleParent(start, ReduceSinkOperator.class, trustScript());
+      if (pRS != null && merge(cRS, pRS, minReducer())) {
+        removeReduceSinkForGroupBy(cRS, cGBY, context);
+        return true;
+      }
+      return false;
+    }
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java Mon Apr  8 18:53:39 2013
@@ -256,7 +256,7 @@ public class CommonJoinResolver implemen
 
       // whether it contains common join op; if contains, return this common join op
       JoinOperator joinOp = getJoinOp(currTask);
-      if (joinOp == null) {
+      if (joinOp == null || joinOp.getConf().isFixedAsSorted()) {
         return null;
       }
       currTask.setTaskTag(Task.COMMON_JOIN);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java Mon Apr  8 18:53:39 2013
@@ -52,6 +52,9 @@ public final class SkewJoinProcFactory {
         Object... nodeOutputs) throws SemanticException {
       SkewJoinProcCtx context = (SkewJoinProcCtx) ctx;
       JoinOperator op = (JoinOperator) nd;
+      if (op.getConf().isFixedAsSorted()) {
+        return null;
+      }
       ParseContext parseContext = context.getParseCtx();
       Task<? extends Serializable> currentTsk = context.getCurrentTask();
       GenMRSkewJoinProcessor.processSkewJoin(op, currentTsk, parseContext);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java Mon Apr  8 18:53:39 2013
@@ -187,9 +187,10 @@ public class ExprNodeDescUtils {
     return result;
   }
 
-  private static ExprNodeDesc backtrack(ExprNodeDesc source, Operator<?> current,
+  public static ExprNodeDesc backtrack(ExprNodeDesc source, Operator<?> current,
       Operator<?> terminal) throws SemanticException {
-    if (current == null || current == terminal) {
+    Operator<?> parent = getSingleParent(current, terminal);
+    if (parent == null) {
       return source;
     }
     if (source instanceof ExprNodeGenericFuncDesc) {
@@ -200,7 +201,7 @@ public class ExprNodeDescUtils {
     }
     if (source instanceof ExprNodeColumnDesc) {
       ExprNodeColumnDesc column = (ExprNodeColumnDesc) source;
-      return backtrack(column, current, terminal);
+      return backtrack(column, parent, terminal);
     }
     if (source instanceof ExprNodeFieldDesc) {
       // field epression should be resolved
@@ -215,20 +216,19 @@ public class ExprNodeDescUtils {
   // Resolve column expression to input expression by using expression mapping in current operator
   private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator<?> current,
       Operator<?> terminal) throws SemanticException {
-    if (current == null || current == terminal) {
-      return column;
-    }
-    Operator<?> parent = getSingleParent(current, terminal);
     Map<String, ExprNodeDesc> mapping = current.getColumnExprMap();
     if (mapping == null || !mapping.containsKey(column.getColumn())) {
-      return backtrack(column, parent, terminal);  // forward
+      return backtrack((ExprNodeDesc)column, current, terminal);
     }
     ExprNodeDesc mapped = mapping.get(column.getColumn());
-    return backtrack(mapped, parent, terminal);    // forward with resolved expr
+    return backtrack(mapped, current, terminal);
   }
 
   public static Operator<?> getSingleParent(Operator<?> current, Operator<?> terminal)
       throws SemanticException {
+    if (current == terminal) {
+      return null;
+    }
     List<Operator<?>> parents = current.getParentOperators();
     if (parents == null || parents.isEmpty()) {
       if (terminal != null) {
@@ -236,9 +236,12 @@ public class ExprNodeDescUtils {
       }
       return null;
     }
-    if (current.getParentOperators().size() > 1) {
-      throw new SemanticException("Met multiple parent operators");
+    if (parents.size() == 1) {
+      return parents.get(0);
+    }
+    if (terminal != null && parents.contains(terminal)) {
+      return terminal;
     }
-    return parents.get(0);
+    throw new SemanticException("Met multiple parent operators");
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java Mon Apr  8 18:53:39 2013
@@ -81,6 +81,10 @@ public class JoinDesc extends AbstractOp
   protected Byte[] tagOrder;
   private TableDesc keyTableDesc;
 
+  // this operator cannot be converted to mapjoin cause output is expected to be sorted on join key
+  // it's resulted from RS-dedup optimization, which removes following RS under some condition
+  private boolean fixedAsSorted;
+
   public JoinDesc() {
   }
 
@@ -525,4 +529,12 @@ public class JoinDesc extends AbstractOp
     }
     return result;
   }
+
+  public boolean isFixedAsSorted() {
+    return fixedAsSorted;
+  }
+
+  public void setFixedAsSorted(boolean fixedAsSorted) {
+    this.fixedAsSorted = fixedAsSorted;
+  }
 }

Modified: hive/trunk/ql/src/test/queries/clientpositive/auto_join26.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/auto_join26.q?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/auto_join26.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/auto_join26.q Mon Apr  8 18:53:39 2013
@@ -1,10 +1,10 @@
-CREATE TABLE dest_j1(key INT, cnt INT);
-set hive.auto.convert.join = true;
-EXPLAIN
-INSERT OVERWRITE TABLE dest_j1 
-SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;
-
-INSERT OVERWRITE TABLE dest_j1 
-SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;
-
-select * from dest_j1 x order by x.key;
+CREATE TABLE dest_j1(key INT, cnt INT);
+set hive.auto.convert.join = true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_j1 
+SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;
+
+INSERT OVERWRITE TABLE dest_j1 
+SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;
+
+select * from dest_j1 x order by x.key;

Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby_distinct_samekey.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_distinct_samekey.q?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_distinct_samekey.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_distinct_samekey.q Mon Apr  8 18:53:39 2013
@@ -1,7 +1,10 @@
--- This test covers HIVE-2322
+-- This test covers HIVE-2332
 
 create table t1 (int1 int, int2 int, str1 string, str2 string);
 
+set hive.optimize.reducededuplication=false;
+--disabled RS-dedup for keeping intention of test
+
 insert into table t1 select cast(key as int), cast(key as int), value, value from src where key < 6;
 explain select Q1.int1, sum(distinct Q1.int1) from (select * from t1 order by int1) Q1 group by Q1.int1;
 explain select int1, sum(distinct int1) from t1 group by int1;

Modified: hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q Mon Apr  8 18:53:39 2013
@@ -2,7 +2,8 @@ set hive.input.format=org.apache.hadoop.
 set hive.enforce.bucketing = true;
 set hive.exec.reducers.max = 1;
 set hive.exec.script.trust = true;
-
+set hive.optimize.reducededuplication = true;
+set hive.optimize.reducededuplication.min.reducer = 1;
 
 
 CREATE TABLE bucket5_1(key string, value string) CLUSTERED BY (key) INTO 2 BUCKETS;

Added: hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q?rev=1465721&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q Mon Apr  8 18:53:39 2013
@@ -0,0 +1,37 @@
+set hive.optimize.reducededuplication=true;
+set hive.optimize.reducededuplication.min.reducer=1;
+set hive.map.aggr=true;
+
+explain select key, sum(key) from (select * from src distribute by key sort by key, value) Q1 group by key;
+explain select key, sum(key), lower(value) from (select * from src order by key) Q1 group by key, lower(value);
+explain select key, sum(key), (X + 1) from (select key, (value + 1) as X from src order by key) Q1 group by key, (X + 1);
+explain select key, sum(key) as value from src group by key order by key, value;
+explain select src.key, sum(src.key) FROM src JOIN src1 ON src.key = src1.key group by src.key, src.value;
+explain select src.key, src.value FROM src JOIN src1 ON src.key = src1.key order by src.key, src.value;
+explain from (select key, value from src group by key, value) s select s.key group by s.key;
+
+select key, sum(key) from (select * from src distribute by key sort by key, value) Q1 group by key;
+select key, sum(key), lower(value) from (select * from src order by key) Q1 group by key, lower(value);
+select key, sum(key), (X + 1) from (select key, (value + 1) as X from src order by key) Q1 group by key, (X + 1);
+select key, sum(key) as value from src group by key order by key, value;
+select src.key, sum(src.key) FROM src JOIN src1 ON src.key = src1.key group by src.key, src.value;
+select src.key, src.value FROM src JOIN src1 ON src.key = src1.key order by src.key, src.value;
+from (select key, value from src group by key, value) s select s.key group by s.key;
+
+set hive.map.aggr=false;
+
+explain select key, sum(key) from (select * from src distribute by key sort by key, value) Q1 group by key;
+explain select key, sum(key), lower(value) from (select * from src order by key) Q1 group by key, lower(value);
+explain select key, sum(key), (X + 1) from (select key, (value + 1) as X from src order by key) Q1 group by key, (X + 1);
+explain select key, sum(key) as value from src group by key order by key, value;
+explain select src.key, sum(src.key) FROM src JOIN src1 ON src.key = src1.key group by src.key, src.value;
+explain select src.key, src.value FROM src JOIN src1 ON src.key = src1.key order by src.key, src.value;
+explain from (select key, value from src group by key, value) s select s.key group by s.key;
+
+select key, sum(key) from (select * from src distribute by key sort by key, value) Q1 group by key;
+select key, sum(key), lower(value) from (select * from src order by key) Q1 group by key, lower(value);
+select key, sum(key), (X + 1) from (select key, (value + 1) as X from src order by key) Q1 group by key, (X + 1);
+select key, sum(key) as value from src group by key order by key, value;
+select src.key, sum(src.key) FROM src JOIN src1 ON src.key = src1.key group by src.key, src.value;
+select src.key, src.value FROM src JOIN src1 ON src.key = src1.key order by src.key, src.value;
+from (select key, value from src group by key, value) s select s.key group by s.key;
\ No newline at end of file

Modified: hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out Mon Apr  8 18:53:39 2013
@@ -16,9 +16,8 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-2 depends on stages: Stage-1
-  Stage-0 depends on stages: Stage-2
-  Stage-3 depends on stages: Stage-0
+  Stage-0 depends on stages: Stage-1
+  Stage-2 depends on stages: Stage-0
 
 STAGE PLANS:
   Stage: Stage-1
@@ -56,43 +55,7 @@ STAGE PLANS:
           keys:
                 expr: KEY._col0
                 type: string
-          mode: partial1
-          outputColumnNames: _col0, _col1, _col2
-          File Output Operator
-            compressed: false
-            GlobalTableId: 0
-            table:
-                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-
-  Stage: Stage-2
-    Map Reduce
-      Alias -> Map Operator Tree:
-#### A masked pattern was here ####
-            Reduce Output Operator
-              key expressions:
-                    expr: _col0
-                    type: string
-              sort order: +
-              Map-reduce partition columns:
-                    expr: _col0
-                    type: string
-              tag: -1
-              value expressions:
-                    expr: _col1
-                    type: bigint
-                    expr: _col2
-                    type: double
-      Reduce Operator Tree:
-        Group By Operator
-          aggregations:
-                expr: count(VALUE._col0)
-                expr: sum(VALUE._col1)
-          bucketGroup: false
-          keys:
-                expr: KEY._col0
-                type: string
-          mode: final
+          mode: complete
           outputColumnNames: _col0, _col1, _col2
           Select Operator
             expressions:
@@ -122,7 +85,7 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest_g2
 
-  Stage: Stage-3
+  Stage: Stage-2
     Stats-Aggr Operator
 
 

Modified: hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out Mon Apr  8 18:53:39 2013
@@ -16,9 +16,8 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-2 depends on stages: Stage-1
-  Stage-0 depends on stages: Stage-2
-  Stage-3 depends on stages: Stage-0
+  Stage-0 depends on stages: Stage-1
+  Stage-2 depends on stages: Stage-0
 
 STAGE PLANS:
   Stage: Stage-1
@@ -73,43 +72,7 @@ STAGE PLANS:
           keys:
                 expr: KEY._col0
                 type: string
-          mode: partials
-          outputColumnNames: _col0, _col1, _col2
-          File Output Operator
-            compressed: false
-            GlobalTableId: 0
-            table:
-                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-
-  Stage: Stage-2
-    Map Reduce
-      Alias -> Map Operator Tree:
-#### A masked pattern was here ####
-            Reduce Output Operator
-              key expressions:
-                    expr: _col0
-                    type: string
-              sort order: +
-              Map-reduce partition columns:
-                    expr: _col0
-                    type: string
-              tag: -1
-              value expressions:
-                    expr: _col1
-                    type: bigint
-                    expr: _col2
-                    type: double
-      Reduce Operator Tree:
-        Group By Operator
-          aggregations:
-                expr: count(VALUE._col0)
-                expr: sum(VALUE._col1)
-          bucketGroup: false
-          keys:
-                expr: KEY._col0
-                type: string
-          mode: final
+          mode: complete
           outputColumnNames: _col0, _col1, _col2
           Select Operator
             expressions:
@@ -139,7 +102,7 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest1
 
-  Stage: Stage-3
+  Stage: Stage-2
     Stats-Aggr Operator
 
 

Modified: hive/trunk/ql/src/test/results/clientpositive/groupby_cube1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby_cube1.q.out?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/groupby_cube1.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/groupby_cube1.q.out Mon Apr  8 18:53:39 2013
@@ -411,7 +411,6 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-2 depends on stages: Stage-1
   Stage-0 is a root stage
 
 STAGE PLANS:
@@ -471,44 +470,7 @@ STAGE PLANS:
                 type: string
                 expr: KEY._col1
                 type: string
-          mode: partials
-          outputColumnNames: _col0, _col1, _col2
-          File Output Operator
-            compressed: false
-            GlobalTableId: 0
-            table:
-                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-
-  Stage: Stage-2
-    Map Reduce
-      Alias -> Map Operator Tree:
-#### A masked pattern was here ####
-            Reduce Output Operator
-              key expressions:
-                    expr: _col0
-                    type: string
-                    expr: _col1
-                    type: string
-              sort order: ++
-              Map-reduce partition columns:
-                    expr: _col0
-                    type: string
-              tag: -1
-              value expressions:
-                    expr: _col2
-                    type: bigint
-      Reduce Operator Tree:
-        Group By Operator
-          aggregations:
-                expr: count(VALUE._col0)
-          bucketGroup: false
-          keys:
-                expr: KEY._col0
-                type: string
-                expr: KEY._col1
-                type: string
-          mode: final
+          mode: complete
           outputColumnNames: _col0, _col1, _col2
           Select Operator
             expressions:

Modified: hive/trunk/ql/src/test/results/clientpositive/groupby_distinct_samekey.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby_distinct_samekey.q.out?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/groupby_distinct_samekey.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/groupby_distinct_samekey.q.out Mon Apr  8 18:53:39 2013
@@ -1,17 +1,21 @@
-PREHOOK: query: -- This test covers HIVE-2322
+PREHOOK: query: -- This test covers HIVE-2332
 
 create table t1 (int1 int, int2 int, str1 string, str2 string)
 PREHOOK: type: CREATETABLE
-POSTHOOK: query: -- This test covers HIVE-2322
+POSTHOOK: query: -- This test covers HIVE-2332
 
 create table t1 (int1 int, int2 int, str1 string, str2 string)
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: default@t1
-PREHOOK: query: insert into table t1 select cast(key as int), cast(key as int), value, value from src where key < 6
+PREHOOK: query: --disabled RS-dedup for keeping intention of test
+
+insert into table t1 select cast(key as int), cast(key as int), value, value from src where key < 6
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 PREHOOK: Output: default@t1
-POSTHOOK: query: insert into table t1 select cast(key as int), cast(key as int), value, value from src where key < 6
+POSTHOOK: query: --disabled RS-dedup for keeping intention of test
+
+insert into table t1 select cast(key as int), cast(key as int), value, value from src where key < 6
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 POSTHOOK: Output: default@t1

Modified: hive/trunk/ql/src/test/results/clientpositive/groupby_rollup1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby_rollup1.q.out?rev=1465721&r1=1465720&r2=1465721&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/groupby_rollup1.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/groupby_rollup1.q.out Mon Apr  8 18:53:39 2013
@@ -399,7 +399,6 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-2 depends on stages: Stage-1
   Stage-0 is a root stage
 
 STAGE PLANS:
@@ -459,44 +458,7 @@ STAGE PLANS:
                 type: string
                 expr: KEY._col1
                 type: string
-          mode: partials
-          outputColumnNames: _col0, _col1, _col2
-          File Output Operator
-            compressed: false
-            GlobalTableId: 0
-            table:
-                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-
-  Stage: Stage-2
-    Map Reduce
-      Alias -> Map Operator Tree:
-#### A masked pattern was here ####
-            Reduce Output Operator
-              key expressions:
-                    expr: _col0
-                    type: string
-                    expr: _col1
-                    type: string
-              sort order: ++
-              Map-reduce partition columns:
-                    expr: _col0
-                    type: string
-              tag: -1
-              value expressions:
-                    expr: _col2
-                    type: bigint
-      Reduce Operator Tree:
-        Group By Operator
-          aggregations:
-                expr: count(VALUE._col0)
-          bucketGroup: false
-          keys:
-                expr: KEY._col0
-                type: string
-                expr: KEY._col1
-                type: string
-          mode: final
+          mode: complete
           outputColumnNames: _col0, _col1, _col2
           Select Operator
             expressions:



Mime
View raw message