Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5037DF25E for ; Mon, 8 Apr 2013 18:54:07 +0000 (UTC) Received: (qmail 22326 invoked by uid 500); 8 Apr 2013 18:54:07 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 22256 invoked by uid 500); 8 Apr 2013 18:54:06 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 22240 invoked by uid 99); 8 Apr 2013 18:54:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Apr 2013 18:54:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Apr 2013 18:54:03 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E1D7723889E7; Mon, 8 Apr 2013 18:53:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1465721 [1/4] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/ ql/src/java/org/apache/hadoop/hive/ql/plan/ ql/sr... Date: Mon, 08 Apr 2013 18:53:40 -0000 To: commits@hive.apache.org From: hashutosh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130408185342.E1D7723889E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: hashutosh Date: Mon Apr 8 18:53:39 2013 New Revision: 1465721 URL: http://svn.apache.org/r1465721 Log: HIVE-2340 : optimize orderby followed by a groupby (Navis via Ashutosh Chauhan) Added: hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_extended.q.out Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/trunk/conf/hive-default.xml.template hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java hive/trunk/ql/src/test/queries/clientpositive/auto_join26.q hive/trunk/ql/src/test/queries/clientpositive/groupby_distinct_samekey.q hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out hive/trunk/ql/src/test/results/clientpositive/groupby_cube1.q.out hive/trunk/ql/src/test/results/clientpositive/groupby_distinct_samekey.q.out hive/trunk/ql/src/test/results/clientpositive/groupby_rollup1.q.out hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort.q.out hive/trunk/ql/src/test/results/clientpositive/ppd2.q.out hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Apr 8 18:53:39 2013 @@ -544,6 +544,7 @@ public class HiveConf extends Configurat HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), + HIVEOPTREDUCEDEDUPLICATIONMINREDUCER("hive.optimize.reducededuplication.min.reducer", 4), // whether to optimize union followed by select followed by filesink // It creates sub-directories in the final output, so should not be turned on in systems // where MAPREDUCE-1501 is not present Modified: hive/trunk/conf/hive-default.xml.template URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/conf/hive-default.xml.template (original) +++ hive/trunk/conf/hive-default.xml.template Mon Apr 8 18:53:39 2013 @@ -1077,6 +1077,14 @@ + hive.optimize.reducededuplication.min.reducer + 4 + Reduce deduplication merges two RSs by moving key/parts/reducer-num of the child RS to parent RS. + That means if reducer-num of the child RS is fixed (order by or forced bucketing) and small, it can make very slow, single MR. + The optimization will be disabled if number of reducers is less than specified value. + + + hive.exec.dynamic.partition true Whether or not to allow dynamic partitions in DML/DDL. Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Mon Apr 8 18:53:39 2013 @@ -749,71 +749,39 @@ public final class ColumnPrunerProcFacto ReduceSinkOperator reduce, ColumnPrunerProcCtx cppCtx) throws SemanticException { ReduceSinkDesc reduceConf = reduce.getConf(); Map oldMap = reduce.getColumnExprMap(); - Map newMap = new HashMap(); - ArrayList sig = new ArrayList(); RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(reduce).getRowResolver(); - RowResolver newRR = new RowResolver(); - ArrayList originalValueOutputColNames = reduceConf - .getOutputValueColumnNames(); - java.util.ArrayList originalValueEval = reduceConf - .getValueCols(); - ArrayList newOutputColNames = new ArrayList(); - java.util.ArrayList newValueEval = new ArrayList(); - // ReduceSinkOperators that precede GroupByOperators have the keys in the schema in addition - // to the values. These are not pruned. - List oldSchema = oldRR.getRowSchema().getSignature(); - for (ColumnInfo colInfo : oldSchema) { - if (colInfo.getInternalName().startsWith(Utilities.ReduceField.KEY.toString() + ".")) { - String[] nm = oldRR.reverseLookup(colInfo.getInternalName()); - newRR.put(nm[0], nm[1], colInfo); - sig.add(colInfo); - } else { - break; - } - } + ArrayList signature = oldRR.getRowSchema().getSignature(); + + List valueColNames = reduceConf.getOutputValueColumnNames(); + ArrayList newValueColNames = new ArrayList(); + + List valueExprs = reduceConf.getValueCols(); + ArrayList newValueExprs = new ArrayList(); + for (int i = 0; i < retainFlags.length; i++) { - if (retainFlags[i]) { - newValueEval.add(originalValueEval.get(i)); - String outputCol = originalValueOutputColNames.get(i); - newOutputColNames.add(outputCol); + String outputCol = valueColNames.get(i); + ExprNodeDesc outputColExpr = valueExprs.get(i); + if (!retainFlags[i]) { String[] nm = oldRR.reverseLookup(outputCol); if (nm == null) { outputCol = Utilities.ReduceField.VALUE.toString() + "." + outputCol; nm = oldRR.reverseLookup(outputCol); } - newMap.put(outputCol, oldMap.get(outputCol)); - ColumnInfo colInfo = oldRR.get(nm[0], nm[1]); - newRR.put(nm[0], nm[1], colInfo); - sig.add(colInfo); - } - } - - ArrayList keyCols = reduceConf.getKeyCols(); - List keys = new ArrayList(); - RowResolver parResover = cppCtx.getOpToParseCtxMap().get( - reduce.getParentOperators().get(0)).getRowResolver(); - for (int i = 0; i < keyCols.size(); i++) { - keys = Utilities.mergeUniqElems(keys, keyCols.get(i).getCols()); - } - for (int i = 0; i < keys.size(); i++) { - String outputCol = keys.get(i); - String[] nm = parResover.reverseLookup(outputCol); - ColumnInfo colInfo = oldRR.get(nm[0], nm[1]); - if (colInfo != null) { - String internalName=colInfo.getInternalName(); - newMap.put(internalName, oldMap.get(internalName)); - newRR.put(nm[0], nm[1], colInfo); + ColumnInfo colInfo = oldRR.getFieldMap(nm[0]).remove(nm[1]); + oldRR.getInvRslvMap().remove(colInfo.getInternalName()); + oldMap.remove(outputCol); + signature.remove(colInfo); + } else { + newValueColNames.add(outputCol); + newValueExprs.add(outputColExpr); } } - cppCtx.getOpToParseCtxMap().get(reduce).setRowResolver(newRR); - reduce.setColumnExprMap(newMap); - reduce.getSchema().setSignature(sig); - reduceConf.setOutputValueColumnNames(newOutputColNames); - reduceConf.setValueCols(newValueEval); + reduceConf.setOutputValueColumnNames(newValueColNames); + reduceConf.setValueCols(newValueExprs); TableDesc newValueTable = PlanUtils.getReduceValueTableDesc(PlanUtils .getFieldSchemasFromColumnList(reduceConf.getValueCols(), - newOutputColNames, 0, "")); + newValueColNames, 0, "")); reduceConf.setValueSerializeInfo(newValueTable); } @@ -1042,4 +1010,4 @@ public final class ColumnPrunerProcFacto return new ColumnPrunerMapJoinProc(); } -} \ No newline at end of file +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Apr 8 18:53:39 2013 @@ -397,9 +397,8 @@ public class MapJoinProcessor implements byte srcTag = entry.getKey(); List filter = entry.getValue(); - Operator start = oldReduceSinkParentOps.get(srcTag); - Operator terminal = newParentOps.get(srcTag); - newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, start, terminal)); + Operator terminal = oldReduceSinkParentOps.get(srcTag); + newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal)); } desc.setFilters(filters = newFilters); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java Mon Apr 8 18:53:39 2013 @@ -27,7 +27,6 @@ import java.util.Set; import java.util.Stack; import org.apache.hadoop.hive.ql.exec.FilterOperator; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -84,8 +83,6 @@ public class NonBlockingOpDeDupProc impl // For SEL-SEL(compute) case, move column exprs/names of child to parent. if (!cSEL.getConf().isSelStarNoCompute()) { - Operator terminal = ExprNodeDescUtils.getSingleParent(pSEL, null); - Set funcOutputs = getFunctionOutputs( pSEL.getConf().getOutputColumnNames(), pSEL.getConf().getColList()); @@ -93,7 +90,7 @@ public class NonBlockingOpDeDupProc impl if (!funcOutputs.isEmpty() && !checkReferences(sources, funcOutputs)) { return null; } - pSEL.getConf().setColList(ExprNodeDescUtils.backtrack(sources, pSEL, terminal)); + pSEL.getConf().setColList(ExprNodeDescUtils.backtrack(sources, cSEL, pSEL)); pSEL.getConf().setOutputColumnNames(cSEL.getConf().getOutputColumnNames()); // updates schema only (this should be the last optimizer modifying operator tree) Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java Mon Apr 8 18:53:39 2013 @@ -18,15 +18,17 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.io.Serializable; +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Stack; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ExtractOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; @@ -52,32 +54,54 @@ import org.apache.hadoop.hive.ql.parse.O import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOIN; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONMINREDUCER; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST; /** - * If two reducer sink operators share the same partition/sort columns, we - * should merge them. This should happen after map join optimization because map + * If two reducer sink operators share the same partition/sort columns and order, + * they can be merged. This should happen after map join optimization because map * join optimization will remove reduce sink operators. */ public class ReduceSinkDeDuplication implements Transform{ + private static final String RS = ReduceSinkOperator.getOperatorName(); + private static final String GBY = GroupByOperator.getOperatorName(); + private static final String JOIN = JoinOperator.getOperatorName(); + protected ParseContext pGraphContext; @Override public ParseContext transform(ParseContext pctx) throws SemanticException { pGraphContext = pctx; - // generate pruned column list for all relevant operators + // generate pruned column list for all relevant operators ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext); + boolean mergeJoins = !pctx.getConf().getBoolVar(HIVECONVERTJOIN) && + !pctx.getConf().getBoolVar(HIVECONVERTJOINNOCONDITIONALTASK); + Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", - ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - ReduceSinkDeduplicateProcFactory.getReducerReducerProc()); + opRules.put(new RuleRegExp("R1", RS + "%.*%" + RS + "%"), + ReduceSinkDeduplicateProcFactory.getReducerReducerProc()); + opRules.put(new RuleRegExp("R2", RS + "%" + GBY + "%.*%" + RS + "%"), + ReduceSinkDeduplicateProcFactory.getGroupbyReducerProc()); + if (mergeJoins) { + opRules.put(new RuleRegExp("R3", JOIN + "%.*%" + RS + "%"), + ReduceSinkDeduplicateProcFactory.getJoinReducerProc()); + } + // TODO RS+JOIN // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -92,23 +116,27 @@ public class ReduceSinkDeDuplication imp return pGraphContext; } - class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx{ + class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx { + ParseContext pctx; - List rejectedRSList; + boolean trustScript; + // min reducer num for merged RS (to avoid query contains "order by" executed by one reducer) + int minReducer; + Set> removedOps; public ReduceSinkDeduplicateProcCtx(ParseContext pctx) { - rejectedRSList = new ArrayList(); + removedOps = new HashSet>(); + trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST); + minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); this.pctx = pctx; } - public boolean contains (ReduceSinkOperator rsOp) { - return rejectedRSList.contains(rsOp); + public boolean contains(Operator rsOp) { + return removedOps.contains(rsOp); } - public void addRejectedReduceSinkOperator(ReduceSinkOperator rsOp) { - if (!rejectedRSList.contains(rsOp)) { - rejectedRSList.add(rsOp); - } + public boolean addRemovedOperator(Operator rsOp) { + return removedOps.add(rsOp); } public ParseContext getPctx() { @@ -120,355 +148,598 @@ public class ReduceSinkDeDuplication imp } } - static class ReduceSinkDeduplicateProcFactory { - public static NodeProcessor getReducerReducerProc() { return new ReducerReducerProc(); } + public static NodeProcessor getGroupbyReducerProc() { + return new GroupbyReducerProc(); + } + + public static NodeProcessor getJoinReducerProc() { + return new JoinReducerProc(); + } + public static NodeProcessor getDefaultProc() { return new DefaultProc(); } + } - /* - * do nothing. - */ - static class DefaultProc implements NodeProcessor { - @Override - public Object process(Node nd, Stack stack, - NodeProcessorCtx procCtx, Object... nodeOutputs) - throws SemanticException { - return null; - } + /* + * do nothing. + */ + static class DefaultProc implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + return null; } + } - static class ReducerReducerProc implements NodeProcessor { - @Override - public Object process(Node nd, Stack stack, - NodeProcessorCtx procCtx, Object... nodeOutputs) - throws SemanticException { - ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx; - ReduceSinkOperator childReduceSink = (ReduceSinkOperator)nd; + public abstract static class AbsctractReducerReducerProc implements NodeProcessor { - if(ctx.contains(childReduceSink)) { - return null; - } + ReduceSinkDeduplicateProcCtx dedupCtx; - List> childOp = - childReduceSink.getChildOperators(); - if (childOp != null && childOp.size() == 1) { - Operator child = childOp.get(0); - if (child instanceof GroupByOperator || child instanceof JoinOperator) { - ctx.addRejectedReduceSinkOperator(childReduceSink); - return null; - } - } + protected boolean trustScript() { + return dedupCtx.trustScript; + } - ParseContext pGraphContext = ctx.getPctx(); - HashMap childColumnMapping = - getPartitionAndKeyColumnMapping(childReduceSink); - ReduceSinkOperator parentRS = null; - parentRS = findSingleParentReduceSink(childReduceSink, pGraphContext); - if (parentRS == null) { - ctx.addRejectedReduceSinkOperator(childReduceSink); - return null; - } - HashMap parentColumnMapping = getPartitionAndKeyColumnMapping(parentRS); - Operator stopBacktrackFlagOp = null; - if (parentRS.getParentOperators() == null - || parentRS.getParentOperators().size() == 0) { - stopBacktrackFlagOp = parentRS; - } else if (parentRS.getParentOperators().size() != 1) { - return null; - } else { - stopBacktrackFlagOp = parentRS.getParentOperators().get(0); - } + protected int minReducer() { + return dedupCtx.minReducer; + } - boolean succeed = backTrackColumnNames(childColumnMapping, childReduceSink, stopBacktrackFlagOp, pGraphContext); - if (!succeed) { - return null; - } - succeed = backTrackColumnNames(parentColumnMapping, parentRS, stopBacktrackFlagOp, pGraphContext); - if (!succeed) { - return null; + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + dedupCtx = (ReduceSinkDeduplicateProcCtx) procCtx; + if (dedupCtx.contains((Operator) nd)) { + return false; + } + ReduceSinkOperator cRS = (ReduceSinkOperator) nd; + Operator child = getSingleChild(cRS); + if (child instanceof JoinOperator) { + return false; // not supported + } + ParseContext pctx = dedupCtx.getPctx(); + if (child instanceof GroupByOperator) { + GroupByOperator cGBY = (GroupByOperator) child; + if (!hasGroupingSet(cRS) && !cGBY.getConf().isGroupingSetsPresent()) { + return process(cRS, cGBY, pctx); } + return false; + } + if (child instanceof ExtractOperator) { + return process(cRS, pctx); + } + return false; + } - boolean same = compareReduceSink(childReduceSink, parentRS, childColumnMapping, parentColumnMapping); - if (!same) { - return null; - } - replaceReduceSinkWithSelectOperator(childReduceSink, pGraphContext); - return null; + private boolean hasGroupingSet(ReduceSinkOperator cRS) { + GroupByOperator cGBYm = getSingleParent(cRS, GroupByOperator.class); + if (cGBYm != null && cGBYm.getConf().isGroupingSetsPresent()) { + return true; + } + return false; + } + + protected Operator getSingleParent(Operator operator) { + List> parents = operator.getParentOperators(); + if (parents != null && parents.size() == 1) { + return parents.get(0); + } + return null; + } + + protected Operator getSingleChild(Operator operator) { + List> children = operator.getChildOperators(); + if (children != null && children.size() == 1) { + return children.get(0); } + return null; + } - private void replaceReduceSinkWithSelectOperator( - ReduceSinkOperator childReduceSink, ParseContext pGraphContext) throws SemanticException { - List> parentOp = - childReduceSink.getParentOperators(); - List> childOp = - childReduceSink.getChildOperators(); + protected T getSingleParent(Operator operator, Class type) { + Operator parent = getSingleParent(operator); + return type.isInstance(parent) ? (T)parent : null; + } - Operator oldParent = childReduceSink; + protected abstract Object process(ReduceSinkOperator cRS, ParseContext context) + throws SemanticException; - if (childOp != null && childOp.size() == 1 - && ((childOp.get(0)) instanceof ExtractOperator)) { - oldParent = childOp.get(0); - childOp = childOp.get(0).getChildOperators(); - } + protected abstract Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, + ParseContext context) throws SemanticException; - Operator input = parentOp.get(0); - input.getChildOperators().clear(); + protected Operator getStartForGroupBy(ReduceSinkOperator cRS) { + Operator parent = getSingleParent(cRS); + return parent instanceof GroupByOperator ? parent : cRS; // skip map-aggr GBY + } - RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver(); + // for JOIN-RS case, it's not possible generally to merge if child has + // more key/partition columns than parents + protected boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReducer) + throws SemanticException { + List> parents = pJoin.getParentOperators(); + ReduceSinkOperator[] pRSs = parents.toArray(new ReduceSinkOperator[parents.size()]); + ReduceSinkDesc cRSc = cRS.getConf(); + ReduceSinkDesc pRS0c = pRSs[0].getConf(); + if (cRSc.getKeyCols().size() > pRS0c.getKeyCols().size()) { + return false; + } + if (cRSc.getPartitionCols().size() > pRS0c.getPartitionCols().size()) { + return false; + } + Integer moveReducerNumTo = checkNumReducer(cRSc.getNumReducers(), pRS0c.getNumReducers()); + if (moveReducerNumTo == null || + moveReducerNumTo > 0 && cRSc.getNumReducers() < minReducer) { + return false; + } - ArrayList exprs = new ArrayList(); - ArrayList outputs = new ArrayList(); - List outputCols = childReduceSink.getConf().getOutputValueColumnNames(); - RowResolver outputRS = new RowResolver(); + Integer moveRSOrderTo = checkOrder(cRSc.getOrder(), pRS0c.getOrder()); + if (moveRSOrderTo == null) { + return false; + } - Map colExprMap = new HashMap(); + boolean[] sorted = getSortedTags(pJoin); - for (int i = 0; i < outputCols.size(); i++) { - String internalName = outputCols.get(i); - String[] nm = inputRR.reverseLookup(internalName); - ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]); - ExprNodeDesc colDesc = childReduceSink.getConf().getValueCols().get(i); - exprs.add(colDesc); - outputs.add(internalName); - outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo - .getType(), nm[0], valueInfo.getIsVirtualCol(), valueInfo.isHiddenVirtualCol())); - colExprMap.put(internalName, colDesc); + int cKeySize = cRSc.getKeyCols().size(); + for (int i = 0; i < cKeySize; i++) { + ExprNodeDesc cexpr = cRSc.getKeyCols().get(i); + ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length]; + for (int tag = 0; tag < pRSs.length; tag++) { + pexprs[tag] = pRSs[tag].getConf().getKeyCols().get(i); } + int found = indexOf(cexpr, pexprs, cRS, pRSs, sorted); + if (found < 0) { + return false; + } + } + int cPartSize = cRSc.getPartitionCols().size(); + for (int i = 0; i < cPartSize; i++) { + ExprNodeDesc cexpr = cRSc.getPartitionCols().get(i); + ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length]; + for (int tag = 0; tag < pRSs.length; tag++) { + pexprs[tag] = pRSs[tag].getConf().getPartitionCols().get(i); + } + int found = indexOf(cexpr, pexprs, cRS, pRSs, sorted); + if (found < 0) { + return false; + } + } - SelectDesc select = new SelectDesc(exprs, outputs, false); + if (moveReducerNumTo > 0) { + for (ReduceSinkOperator pRS : pRSs) { + pRS.getConf().setNumReducers(cRS.getConf().getNumReducers()); + } + } + return true; + } - SelectOperator sel = (SelectOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR - .getColumnInfos()), input), inputRR, pGraphContext); + private boolean[] getSortedTags(JoinOperator joinOp) { + boolean[] result = new boolean[joinOp.getParentOperators().size()]; + for (int tag = 0; tag < result.length; tag++) { + result[tag] = isSortedTag(joinOp, tag); + } + return result; + } - sel.setColumnExprMap(colExprMap); + private boolean isSortedTag(JoinOperator joinOp, int tag) { + for (JoinCondDesc cond : joinOp.getConf().getConds()) { + switch (cond.getType()) { + case JoinDesc.LEFT_OUTER_JOIN: + if (cond.getRight() == tag) { + return false; + } + continue; + case JoinDesc.RIGHT_OUTER_JOIN: + if (cond.getLeft() == tag) { + return false; + } + continue; + case JoinDesc.FULL_OUTER_JOIN: + if (cond.getLeft() == tag || cond.getRight() == tag) { + return false; + } + } + } + return true; + } - // Insert the select operator in between. - sel.setChildOperators(childOp); - for (Operator ch : childOp) { - ch.replaceParent(oldParent, sel); + private int indexOf(ExprNodeDesc cexpr, ExprNodeDesc[] pexprs, Operator child, + Operator[] parents, boolean[] sorted) throws SemanticException { + for (int tag = 0; tag < parents.length; tag++) { + if (sorted[tag] && + pexprs[tag].isSame(ExprNodeDescUtils.backtrack(cexpr, child, parents[tag]))) { + return tag; } + } + return -1; + } + protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer) + throws SemanticException { + int[] result = checkStatus(cRS, pRS, minReducer); + if (result == null) { + return false; + } + if (result[0] > 0) { + ArrayList childKCs = cRS.getConf().getKeyCols(); + pRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(childKCs, cRS, pRS)); + } + if (result[1] > 0) { + ArrayList childPCs = cRS.getConf().getPartitionCols(); + pRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(childPCs, cRS, pRS)); + } + if (result[2] > 0) { + pRS.getConf().setOrder(cRS.getConf().getOrder()); + } + if (result[3] > 0) { + pRS.getConf().setNumReducers(cRS.getConf().getNumReducers()); } + return true; + } - private Operator putOpInsertMap( - Operator op, RowResolver rr, ParseContext pGraphContext) { - OpParseContext ctx = new OpParseContext(rr); - pGraphContext.getOpParseCtx().put(op, ctx); - return op; + // -1 for p to c, 1 for c to p + private int[] checkStatus(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer) + throws SemanticException { + ReduceSinkDesc cConf = cRS.getConf(); + ReduceSinkDesc pConf = pRS.getConf(); + Integer moveRSOrderTo = checkOrder(cConf.getOrder(), pConf.getOrder()); + if (moveRSOrderTo == null) { + return null; + } + Integer moveReducerNumTo = checkNumReducer(cConf.getNumReducers(), pConf.getNumReducers()); + if (moveReducerNumTo == null || + moveReducerNumTo > 0 && cConf.getNumReducers() < minReducer) { + return null; + } + List ckeys = cConf.getKeyCols(); + List pkeys = pConf.getKeyCols(); + Integer moveKeyColTo = checkExprs(ckeys, pkeys, cRS, pRS); + if (moveKeyColTo == null) { + return null; } + List cpars = cConf.getPartitionCols(); + List ppars = pConf.getPartitionCols(); + Integer movePartitionColTo = checkExprs(cpars, ppars, cRS, pRS); + if (movePartitionColTo == null) { + return null; + } + return new int[] {moveKeyColTo, movePartitionColTo, moveRSOrderTo, moveReducerNumTo}; + } - private boolean compareReduceSink(ReduceSinkOperator childReduceSink, - ReduceSinkOperator parentRS, - HashMap childColumnMapping, - HashMap parentColumnMapping) { + private Integer checkExprs(List ckeys, List pkeys, + ReduceSinkOperator cRS, ReduceSinkOperator pRS) throws SemanticException { + Integer moveKeyColTo = 0; + if (ckeys == null || ckeys.isEmpty()) { + if (pkeys != null && !pkeys.isEmpty()) { + moveKeyColTo = -1; + } + } else { + if (pkeys == null || pkeys.isEmpty()) { + for (ExprNodeDesc ckey : ckeys) { + if (ExprNodeDescUtils.backtrack(ckey, cRS, pRS) == null) { + return null; + } + } + moveKeyColTo = 1; + } else { + moveKeyColTo = sameKeys(ckeys, pkeys, cRS, pRS); + } + } + return moveKeyColTo; + } - ArrayList childPartitionCols = childReduceSink.getConf().getPartitionCols(); - ArrayList parentPartitionCols = parentRS.getConf().getPartitionCols(); + protected Integer sameKeys(List cexprs, List pexprs, + Operator child, Operator parent) throws SemanticException { + int common = Math.min(cexprs.size(), pexprs.size()); + int limit = Math.max(cexprs.size(), pexprs.size()); + int i = 0; + for (; i < common; i++) { + ExprNodeDesc pexpr = pexprs.get(i); + ExprNodeDesc cexpr = ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent); + if (!pexpr.isSame(cexpr)) { + return null; + } + } + for (;i < limit; i++) { + if (cexprs.size() > pexprs.size()) { + if (ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent) == null) { + return null; + } + } + } + return Integer.valueOf(cexprs.size()).compareTo(pexprs.size()); + } - boolean ret = compareExprNodes(childColumnMapping, parentColumnMapping, - childPartitionCols, parentPartitionCols); - if (!ret) { - return false; + protected Integer checkOrder(String corder, String porder) { + if (corder == null || corder.trim().equals("")) { + if (porder == null || porder.trim().equals("")) { + return 0; } + return -1; + } + if (porder == null || porder.trim().equals("")) { + return 1; + } + corder = corder.trim(); + porder = porder.trim(); + int target = Math.min(corder.length(), porder.length()); + if (!corder.substring(0, target).equals(porder.substring(0, target))) { + return null; + } + return Integer.valueOf(corder.length()).compareTo(porder.length()); + } - ArrayList childReduceKeyCols = childReduceSink.getConf().getKeyCols(); - ArrayList parentReduceKeyCols = parentRS.getConf().getKeyCols(); - ret = compareExprNodes(childColumnMapping, parentColumnMapping, - childReduceKeyCols, parentReduceKeyCols); - if (!ret) { - return false; + protected Integer checkNumReducer(int creduce, int preduce) { + if (creduce < 0) { + if (preduce < 0) { + return 0; } + return -1; + } + if (preduce < 0) { + return 1; + } + if (creduce != preduce) { + return null; + } + return 0; + } - String childRSOrder = childReduceSink.getConf().getOrder(); - String parentRSOrder = parentRS.getConf().getOrder(); - boolean moveChildRSOrderToParent = false; - //move child reduce sink's order to the parent reduce sink operator. - if (childRSOrder != null && !(childRSOrder.trim().equals(""))) { - if (parentRSOrder == null - || !childRSOrder.trim().equals(parentRSOrder.trim())) { - return false; - } - } else { - if(parentRSOrder == null || parentRSOrder.trim().equals("")) { - moveChildRSOrderToParent = true; - } + protected > T findPossibleParent(Operator start, Class target, + boolean trustScript) throws SemanticException { + T[] parents = findPossibleParents(start, target, trustScript); + return parents != null && parents.length == 1 ? parents[0] : null; + } + + @SuppressWarnings("unchecked") + protected > T[] findPossibleParents(Operator start, Class target, + boolean trustScript) { + Operator cursor = getSingleParent(start); + for (; cursor != null; cursor = getSingleParent(cursor)) { + if (target.isAssignableFrom(cursor.getClass())) { + T[] array = (T[]) Array.newInstance(target, 1); + array[0] = (T) cursor; + return array; + } + if (cursor instanceof JoinOperator) { + return findParents((JoinOperator) cursor, target); } + if (cursor instanceof ScriptOperator && !trustScript) { + return null; + } + if (!(cursor instanceof SelectOperator + || cursor instanceof FilterOperator + || cursor instanceof ExtractOperator + || cursor instanceof ForwardOperator + || cursor instanceof ScriptOperator + || cursor instanceof ReduceSinkOperator)) { + return null; + } + } + return null; + } - int childNumReducers = childReduceSink.getConf().getNumReducers(); - int parentNumReducers = parentRS.getConf().getNumReducers(); - boolean moveChildReducerNumToParent = false; - //move child reduce sink's number reducers to the parent reduce sink operator. - if (childNumReducers != parentNumReducers) { - if (childNumReducers == -1) { - //do nothing. - } else if (parentNumReducers == -1) { - //set childNumReducers in the parent reduce sink operator. - moveChildReducerNumToParent = true; - } else { - return false; + @SuppressWarnings("unchecked") + private > T[] findParents(JoinOperator join, Class target) { + List> parents = join.getParentOperators(); + T[] result = (T[]) Array.newInstance(target, parents.size()); + for (int tag = 0; tag < result.length; tag++) { + Operator cursor = parents.get(tag); + for (; cursor != null; cursor = getSingleParent(cursor)) { + if (target.isAssignableFrom(cursor.getClass())) { + result[tag] = (T) cursor; + break; } } - - if(moveChildRSOrderToParent) { - parentRS.getConf().setOrder(childRSOrder); + if (result[tag] == null) { + throw new IllegalStateException("failed to find " + target.getSimpleName() + + " from " + join + " on tag " + tag); } + } + return result; + } - if(moveChildReducerNumToParent) { - parentRS.getConf().setNumReducers(childNumReducers); - } + protected SelectOperator replaceReduceSinkWithSelectOperator(ReduceSinkOperator childRS, + ParseContext context) throws SemanticException { + SelectOperator select = replaceOperatorWithSelect(childRS, context); + select.getConf().setOutputColumnNames(childRS.getConf().getOutputValueColumnNames()); + select.getConf().setColList(childRS.getConf().getValueCols()); + return select; + } - return true; + private SelectOperator replaceOperatorWithSelect(Operator operator, ParseContext context) + throws SemanticException { + RowResolver inputRR = context.getOpParseCtx().get(operator).getRowResolver(); + SelectDesc select = new SelectDesc(null, null); + + Operator parent = getSingleParent(operator); + Operator child = getSingleChild(operator); + + parent.getChildOperators().clear(); + + SelectOperator sel = (SelectOperator) putOpInsertMap( + OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR + .getColumnInfos()), parent), inputRR, context); + + sel.setColumnExprMap(operator.getColumnExprMap()); + + sel.setChildOperators(operator.getChildOperators()); + for (Operator ch : operator.getChildOperators()) { + ch.replaceParent(operator, sel); + } + if (child instanceof ExtractOperator) { + removeOperator(child, getSingleChild(child), sel, context); + dedupCtx.addRemovedOperator(child); } + operator.setChildOperators(null); + operator.setParentOperators(null); + dedupCtx.addRemovedOperator(operator); + return sel; + } - private boolean compareExprNodes(HashMap childColumnMapping, - HashMap parentColumnMapping, - ArrayList childColExprs, - ArrayList parentColExprs) { + protected void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupByOperator cGBYr, + ParseContext context) throws SemanticException { - boolean childEmpty = childColExprs == null || childColExprs.size() == 0; - boolean parentEmpty = parentColExprs == null || parentColExprs.size() == 0; + Operator parent = getSingleParent(cRS); - if (childEmpty) { //both empty - return true; - } + if (parent instanceof GroupByOperator) { + GroupByOperator cGBYm = (GroupByOperator) parent; - //child not empty here - if (parentEmpty) { // child not empty, but parent empty - return false; + cGBYr.getConf().setKeys(cGBYm.getConf().getKeys()); + cGBYr.getConf().setAggregators(cGBYm.getConf().getAggregators()); + for (AggregationDesc aggr : cGBYm.getConf().getAggregators()) { + aggr.setMode(GenericUDAFEvaluator.Mode.COMPLETE); } - - if (childColExprs.size() != parentColExprs.size()) { - return false; + cGBYr.setColumnExprMap(cGBYm.getColumnExprMap()); + cGBYr.setSchema(cGBYm.getSchema()); + RowResolver resolver = context.getOpParseCtx().get(cGBYm).getRowResolver(); + context.getOpParseCtx().get(cGBYr).setRowResolver(resolver); + } else { + cGBYr.getConf().setKeys(ExprNodeDescUtils.backtrack(cGBYr.getConf().getKeys(), cGBYr, cRS)); + for (AggregationDesc aggr : cGBYr.getConf().getAggregators()) { + aggr.setParameters(ExprNodeDescUtils.backtrack(aggr.getParameters(), cGBYr, cRS)); } - int i = 0; - while (i < childColExprs.size()) { - ExprNodeDesc childExpr = childColExprs.get(i); - ExprNodeDesc parentExpr = parentColExprs.get(i); - - if ((childExpr instanceof ExprNodeColumnDesc) - && (parentExpr instanceof ExprNodeColumnDesc)) { - String childCol = childColumnMapping - .get(((ExprNodeColumnDesc) childExpr).getColumn()); - String parentCol = parentColumnMapping - .get(((ExprNodeColumnDesc) childExpr).getColumn()); - if (!childCol.equals(parentCol)) { - return false; - } - } else { - return false; + Map oldMap = cGBYr.getColumnExprMap(); + RowResolver oldRR = context.getOpParseCtx().get(cGBYr).getRowResolver(); + + Map newMap = new HashMap(); + RowResolver newRR = new RowResolver(); + + List outputCols = cGBYr.getConf().getOutputColumnNames(); + for (int i = 0; i < outputCols.size(); i++) { + String colName = outputCols.get(i); + String[] nm = oldRR.reverseLookup(colName); + ColumnInfo colInfo = oldRR.get(nm[0], nm[1]); + newRR.put(nm[0], nm[1], colInfo); + ExprNodeDesc colExpr = ExprNodeDescUtils.backtrack(oldMap.get(colName), cGBYr, cRS); + if (colExpr != null) { + newMap.put(colInfo.getInternalName(), colExpr); } - i++; } - return true; + cGBYr.setColumnExprMap(newMap); + cGBYr.setSchema(new RowSchema(newRR.getColumnInfos())); + context.getOpParseCtx().get(cGBYr).setRowResolver(newRR); } + cGBYr.getConf().setMode(GroupByDesc.Mode.COMPLETE); - /* - * back track column names to find their corresponding original column - * names. Only allow simple operators like 'select column' or filter. - */ - private boolean backTrackColumnNames( - HashMap columnMapping, - ReduceSinkOperator reduceSink, - Operator stopBacktrackFlagOp, - ParseContext pGraphContext) { - Operator startOperator = reduceSink; - while (startOperator != null && startOperator != stopBacktrackFlagOp) { - startOperator = startOperator.getParentOperators().get(0); - Map colExprMap = startOperator.getColumnExprMap(); - if(colExprMap == null || colExprMap.size()==0) { - continue; - } - Iterator keyIter = columnMapping.keySet().iterator(); - while (keyIter.hasNext()) { - String key = keyIter.next(); - String oldCol = columnMapping.get(key); - ExprNodeDesc exprNode = colExprMap.get(oldCol); - if(exprNode instanceof ExprNodeColumnDesc) { - String col = ((ExprNodeColumnDesc)exprNode).getColumn(); - columnMapping.put(key, col); - } else { - return false; - } - } - } + removeOperator(cRS, cGBYr, parent, context); + dedupCtx.addRemovedOperator(cRS); + + if (parent instanceof GroupByOperator) { + removeOperator(parent, cGBYr, getSingleParent(parent), context); + dedupCtx.addRemovedOperator(cGBYr); + } + } + + private void removeOperator(Operator target, Operator child, Operator parent, + ParseContext context) { + for (Operator aparent : target.getParentOperators()) { + aparent.replaceChild(target, child); + } + for (Operator achild : target.getChildOperators()) { + achild.replaceParent(target, parent); + } + target.setChildOperators(null); + target.setParentOperators(null); + context.getOpParseCtx().remove(target); + } + + private Operator putOpInsertMap(Operator op, RowResolver rr, + ParseContext context) { + OpParseContext ctx = new OpParseContext(rr); + context.getOpParseCtx().put(op, ctx); + return op; + } + } + static class GroupbyReducerProc extends AbsctractReducerReducerProc { + + // pRS-pGBY-cRS + public Object process(ReduceSinkOperator cRS, ParseContext context) + throws SemanticException { + GroupByOperator pGBY = findPossibleParent(cRS, GroupByOperator.class, trustScript()); + if (pGBY == null) { + return false; + } + ReduceSinkOperator pRS = findPossibleParent(pGBY, ReduceSinkOperator.class, trustScript()); + if (pRS != null && merge(cRS, pRS, minReducer())) { + replaceReduceSinkWithSelectOperator(cRS, context); return true; } + return false; + } - private HashMap getPartitionAndKeyColumnMapping(ReduceSinkOperator reduceSink) { - HashMap columnMapping = new HashMap (); - ReduceSinkDesc reduceSinkDesc = reduceSink.getConf(); - ArrayList partitionCols = reduceSinkDesc.getPartitionCols(); - ArrayList reduceKeyCols = reduceSinkDesc.getKeyCols(); - if(partitionCols != null) { - for (ExprNodeDesc desc : partitionCols) { - List cols = desc.getCols(); - if ( cols != null ) { - for(String col : cols) { - columnMapping.put(col, col); - } - } - } - } - if(reduceKeyCols != null) { - for (ExprNodeDesc desc : reduceKeyCols) { - List cols = desc.getCols(); - if ( cols != null ) { - for(String col : cols) { - columnMapping.put(col, col); - } - } - } - } - return columnMapping; + // pRS-pGBY-cRS-cGBY + public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context) + throws SemanticException { + Operator start = getStartForGroupBy(cRS); + GroupByOperator pGBY = findPossibleParent(start, GroupByOperator.class, trustScript()); + if (pGBY == null) { + return false; } + ReduceSinkOperator pRS = getSingleParent(pGBY, ReduceSinkOperator.class); + if (pRS != null && merge(cRS, pRS, minReducer())) { + removeReduceSinkForGroupBy(cRS, cGBY, context); + return true; + } + return false; + } + } - private ReduceSinkOperator findSingleParentReduceSink(ReduceSinkOperator childReduceSink, ParseContext pGraphContext) { - Operator start = childReduceSink; - while(start != null) { - if (start.getParentOperators() == null - || start.getParentOperators().size() != 1) { - // this potentially is a join operator - return null; - } + static class JoinReducerProc extends AbsctractReducerReducerProc { - boolean allowed = false; - if ((start instanceof SelectOperator) - || (start instanceof FilterOperator) - || (start instanceof ExtractOperator) - || (start instanceof ForwardOperator) - || (start instanceof ScriptOperator) - || (start instanceof ReduceSinkOperator)) { - allowed = true; - } + // pRS-pJOIN-cRS + public Object process(ReduceSinkOperator cRS, ParseContext context) + throws SemanticException { + JoinOperator pJoin = findPossibleParent(cRS, JoinOperator.class, trustScript()); + if (pJoin != null && merge(cRS, pJoin, minReducer())) { + pJoin.getConf().setFixedAsSorted(true); + replaceReduceSinkWithSelectOperator(cRS, context); + return true; + } + return false; + } - if (!allowed) { - return null; - } + // pRS-pJOIN-cRS-cGBY + public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context) + throws SemanticException { + Operator start = getStartForGroupBy(cRS); + JoinOperator pJoin = findPossibleParent(start, JoinOperator.class, trustScript()); + if (pJoin != null && merge(cRS, pJoin, minReducer())) { + pJoin.getConf().setFixedAsSorted(true); + removeReduceSinkForGroupBy(cRS, cGBY, context); + return true; + } + return false; + } + } - if ((start instanceof ScriptOperator) - && !HiveConf.getBoolVar(pGraphContext.getConf(), - HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST)) { - return null; - } + static class ReducerReducerProc extends AbsctractReducerReducerProc { - start = start.getParentOperators().get(0); - if(start instanceof ReduceSinkOperator) { - return (ReduceSinkOperator)start; - } - } - return null; + // pRS-cRS + public Object process(ReduceSinkOperator cRS, ParseContext context) + throws SemanticException { + ReduceSinkOperator pRS = findPossibleParent(cRS, ReduceSinkOperator.class, trustScript()); + if (pRS != null && merge(cRS, pRS, minReducer())) { + replaceReduceSinkWithSelectOperator(cRS, context); + return true; } + return false; } + // pRS-cRS-cGBY + public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context) + throws SemanticException { + Operator start = getStartForGroupBy(cRS); + ReduceSinkOperator pRS = findPossibleParent(start, ReduceSinkOperator.class, trustScript()); + if (pRS != null && merge(cRS, pRS, minReducer())) { + removeReduceSinkForGroupBy(cRS, cGBY, context); + return true; + } + return false; + } } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java Mon Apr 8 18:53:39 2013 @@ -256,7 +256,7 @@ public class CommonJoinResolver implemen // whether it contains common join op; if contains, return this common join op JoinOperator joinOp = getJoinOp(currTask); - if (joinOp == null) { + if (joinOp == null || joinOp.getConf().isFixedAsSorted()) { return null; } currTask.setTaskTag(Task.COMMON_JOIN); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java Mon Apr 8 18:53:39 2013 @@ -52,6 +52,9 @@ public final class SkewJoinProcFactory { Object... nodeOutputs) throws SemanticException { SkewJoinProcCtx context = (SkewJoinProcCtx) ctx; JoinOperator op = (JoinOperator) nd; + if (op.getConf().isFixedAsSorted()) { + return null; + } ParseContext parseContext = context.getParseCtx(); Task currentTsk = context.getCurrentTask(); GenMRSkewJoinProcessor.processSkewJoin(op, currentTsk, parseContext); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java Mon Apr 8 18:53:39 2013 @@ -187,9 +187,10 @@ public class ExprNodeDescUtils { return result; } - private static ExprNodeDesc backtrack(ExprNodeDesc source, Operator current, + public static ExprNodeDesc backtrack(ExprNodeDesc source, Operator current, Operator terminal) throws SemanticException { - if (current == null || current == terminal) { + Operator parent = getSingleParent(current, terminal); + if (parent == null) { return source; } if (source instanceof ExprNodeGenericFuncDesc) { @@ -200,7 +201,7 @@ public class ExprNodeDescUtils { } if (source instanceof ExprNodeColumnDesc) { ExprNodeColumnDesc column = (ExprNodeColumnDesc) source; - return backtrack(column, current, terminal); + return backtrack(column, parent, terminal); } if (source instanceof ExprNodeFieldDesc) { // field epression should be resolved @@ -215,20 +216,19 @@ public class ExprNodeDescUtils { // Resolve column expression to input expression by using expression mapping in current operator private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator current, Operator terminal) throws SemanticException { - if (current == null || current == terminal) { - return column; - } - Operator parent = getSingleParent(current, terminal); Map mapping = current.getColumnExprMap(); if (mapping == null || !mapping.containsKey(column.getColumn())) { - return backtrack(column, parent, terminal); // forward + return backtrack((ExprNodeDesc)column, current, terminal); } ExprNodeDesc mapped = mapping.get(column.getColumn()); - return backtrack(mapped, parent, terminal); // forward with resolved expr + return backtrack(mapped, current, terminal); } public static Operator getSingleParent(Operator current, Operator terminal) throws SemanticException { + if (current == terminal) { + return null; + } List> parents = current.getParentOperators(); if (parents == null || parents.isEmpty()) { if (terminal != null) { @@ -236,9 +236,12 @@ public class ExprNodeDescUtils { } return null; } - if (current.getParentOperators().size() > 1) { - throw new SemanticException("Met multiple parent operators"); + if (parents.size() == 1) { + return parents.get(0); + } + if (terminal != null && parents.contains(terminal)) { + return terminal; } - return parents.get(0); + throw new SemanticException("Met multiple parent operators"); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java Mon Apr 8 18:53:39 2013 @@ -81,6 +81,10 @@ public class JoinDesc extends AbstractOp protected Byte[] tagOrder; private TableDesc keyTableDesc; + // this operator cannot be converted to mapjoin cause output is expected to be sorted on join key + // it's resulted from RS-dedup optimization, which removes following RS under some condition + private boolean fixedAsSorted; + public JoinDesc() { } @@ -525,4 +529,12 @@ public class JoinDesc extends AbstractOp } return result; } + + public boolean isFixedAsSorted() { + return fixedAsSorted; + } + + public void setFixedAsSorted(boolean fixedAsSorted) { + this.fixedAsSorted = fixedAsSorted; + } } Modified: hive/trunk/ql/src/test/queries/clientpositive/auto_join26.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/auto_join26.q?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/auto_join26.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/auto_join26.q Mon Apr 8 18:53:39 2013 @@ -1,10 +1,10 @@ -CREATE TABLE dest_j1(key INT, cnt INT); -set hive.auto.convert.join = true; -EXPLAIN -INSERT OVERWRITE TABLE dest_j1 -SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; - -INSERT OVERWRITE TABLE dest_j1 -SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; - -select * from dest_j1 x order by x.key; +CREATE TABLE dest_j1(key INT, cnt INT); +set hive.auto.convert.join = true; +EXPLAIN +INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; + +INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; + +select * from dest_j1 x order by x.key; Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby_distinct_samekey.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_distinct_samekey.q?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/groupby_distinct_samekey.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/groupby_distinct_samekey.q Mon Apr 8 18:53:39 2013 @@ -1,7 +1,10 @@ --- This test covers HIVE-2322 +-- This test covers HIVE-2332 create table t1 (int1 int, int2 int, str1 string, str2 string); +set hive.optimize.reducededuplication=false; +--disabled RS-dedup for keeping intention of test + insert into table t1 select cast(key as int), cast(key as int), value, value from src where key < 6; explain select Q1.int1, sum(distinct Q1.int1) from (select * from t1 order by int1) Q1 group by Q1.int1; explain select int1, sum(distinct int1) from t1 group by int1; Modified: hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q Mon Apr 8 18:53:39 2013 @@ -2,7 +2,8 @@ set hive.input.format=org.apache.hadoop. set hive.enforce.bucketing = true; set hive.exec.reducers.max = 1; set hive.exec.script.trust = true; - +set hive.optimize.reducededuplication = true; +set hive.optimize.reducededuplication.min.reducer = 1; CREATE TABLE bucket5_1(key string, value string) CLUSTERED BY (key) INTO 2 BUCKETS; Added: hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q?rev=1465721&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q Mon Apr 8 18:53:39 2013 @@ -0,0 +1,37 @@ +set hive.optimize.reducededuplication=true; +set hive.optimize.reducededuplication.min.reducer=1; +set hive.map.aggr=true; + +explain select key, sum(key) from (select * from src distribute by key sort by key, value) Q1 group by key; +explain select key, sum(key), lower(value) from (select * from src order by key) Q1 group by key, lower(value); +explain select key, sum(key), (X + 1) from (select key, (value + 1) as X from src order by key) Q1 group by key, (X + 1); +explain select key, sum(key) as value from src group by key order by key, value; +explain select src.key, sum(src.key) FROM src JOIN src1 ON src.key = src1.key group by src.key, src.value; +explain select src.key, src.value FROM src JOIN src1 ON src.key = src1.key order by src.key, src.value; +explain from (select key, value from src group by key, value) s select s.key group by s.key; + +select key, sum(key) from (select * from src distribute by key sort by key, value) Q1 group by key; +select key, sum(key), lower(value) from (select * from src order by key) Q1 group by key, lower(value); +select key, sum(key), (X + 1) from (select key, (value + 1) as X from src order by key) Q1 group by key, (X + 1); +select key, sum(key) as value from src group by key order by key, value; +select src.key, sum(src.key) FROM src JOIN src1 ON src.key = src1.key group by src.key, src.value; +select src.key, src.value FROM src JOIN src1 ON src.key = src1.key order by src.key, src.value; +from (select key, value from src group by key, value) s select s.key group by s.key; + +set hive.map.aggr=false; + +explain select key, sum(key) from (select * from src distribute by key sort by key, value) Q1 group by key; +explain select key, sum(key), lower(value) from (select * from src order by key) Q1 group by key, lower(value); +explain select key, sum(key), (X + 1) from (select key, (value + 1) as X from src order by key) Q1 group by key, (X + 1); +explain select key, sum(key) as value from src group by key order by key, value; +explain select src.key, sum(src.key) FROM src JOIN src1 ON src.key = src1.key group by src.key, src.value; +explain select src.key, src.value FROM src JOIN src1 ON src.key = src1.key order by src.key, src.value; +explain from (select key, value from src group by key, value) s select s.key group by s.key; + +select key, sum(key) from (select * from src distribute by key sort by key, value) Q1 group by key; +select key, sum(key), lower(value) from (select * from src order by key) Q1 group by key, lower(value); +select key, sum(key), (X + 1) from (select key, (value + 1) as X from src order by key) Q1 group by key, (X + 1); +select key, sum(key) as value from src group by key order by key, value; +select src.key, sum(src.key) FROM src JOIN src1 ON src.key = src1.key group by src.key, src.value; +select src.key, src.value FROM src JOIN src1 ON src.key = src1.key order by src.key, src.value; +from (select key, value from src group by key, value) s select s.key group by s.key; \ No newline at end of file Modified: hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/groupby2.q.out Mon Apr 8 18:53:39 2013 @@ -16,9 +16,8 @@ ABSTRACT SYNTAX TREE: STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 - Stage-0 depends on stages: Stage-2 - Stage-3 depends on stages: Stage-0 + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-1 @@ -56,43 +55,7 @@ STAGE PLANS: keys: expr: KEY._col0 type: string - mode: partial1 - outputColumnNames: _col0, _col1, _col2 - File Output Operator - compressed: false - GlobalTableId: 0 - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - - Stage: Stage-2 - Map Reduce - Alias -> Map Operator Tree: -#### A masked pattern was here #### - Reduce Output Operator - key expressions: - expr: _col0 - type: string - sort order: + - Map-reduce partition columns: - expr: _col0 - type: string - tag: -1 - value expressions: - expr: _col1 - type: bigint - expr: _col2 - type: double - Reduce Operator Tree: - Group By Operator - aggregations: - expr: count(VALUE._col0) - expr: sum(VALUE._col1) - bucketGroup: false - keys: - expr: KEY._col0 - type: string - mode: final + mode: complete outputColumnNames: _col0, _col1, _col2 Select Operator expressions: @@ -122,7 +85,7 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.dest_g2 - Stage: Stage-3 + Stage: Stage-2 Stats-Aggr Operator Modified: hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/groupby2_map_skew.q.out Mon Apr 8 18:53:39 2013 @@ -16,9 +16,8 @@ ABSTRACT SYNTAX TREE: STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 - Stage-0 depends on stages: Stage-2 - Stage-3 depends on stages: Stage-0 + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-1 @@ -73,43 +72,7 @@ STAGE PLANS: keys: expr: KEY._col0 type: string - mode: partials - outputColumnNames: _col0, _col1, _col2 - File Output Operator - compressed: false - GlobalTableId: 0 - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - - Stage: Stage-2 - Map Reduce - Alias -> Map Operator Tree: -#### A masked pattern was here #### - Reduce Output Operator - key expressions: - expr: _col0 - type: string - sort order: + - Map-reduce partition columns: - expr: _col0 - type: string - tag: -1 - value expressions: - expr: _col1 - type: bigint - expr: _col2 - type: double - Reduce Operator Tree: - Group By Operator - aggregations: - expr: count(VALUE._col0) - expr: sum(VALUE._col1) - bucketGroup: false - keys: - expr: KEY._col0 - type: string - mode: final + mode: complete outputColumnNames: _col0, _col1, _col2 Select Operator expressions: @@ -139,7 +102,7 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.dest1 - Stage: Stage-3 + Stage: Stage-2 Stats-Aggr Operator Modified: hive/trunk/ql/src/test/results/clientpositive/groupby_cube1.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby_cube1.q.out?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/groupby_cube1.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/groupby_cube1.q.out Mon Apr 8 18:53:39 2013 @@ -411,7 +411,6 @@ ABSTRACT SYNTAX TREE: STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 Stage-0 is a root stage STAGE PLANS: @@ -471,44 +470,7 @@ STAGE PLANS: type: string expr: KEY._col1 type: string - mode: partials - outputColumnNames: _col0, _col1, _col2 - File Output Operator - compressed: false - GlobalTableId: 0 - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - - Stage: Stage-2 - Map Reduce - Alias -> Map Operator Tree: -#### A masked pattern was here #### - Reduce Output Operator - key expressions: - expr: _col0 - type: string - expr: _col1 - type: string - sort order: ++ - Map-reduce partition columns: - expr: _col0 - type: string - tag: -1 - value expressions: - expr: _col2 - type: bigint - Reduce Operator Tree: - Group By Operator - aggregations: - expr: count(VALUE._col0) - bucketGroup: false - keys: - expr: KEY._col0 - type: string - expr: KEY._col1 - type: string - mode: final + mode: complete outputColumnNames: _col0, _col1, _col2 Select Operator expressions: Modified: hive/trunk/ql/src/test/results/clientpositive/groupby_distinct_samekey.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby_distinct_samekey.q.out?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/groupby_distinct_samekey.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/groupby_distinct_samekey.q.out Mon Apr 8 18:53:39 2013 @@ -1,17 +1,21 @@ -PREHOOK: query: -- This test covers HIVE-2322 +PREHOOK: query: -- This test covers HIVE-2332 create table t1 (int1 int, int2 int, str1 string, str2 string) PREHOOK: type: CREATETABLE -POSTHOOK: query: -- This test covers HIVE-2322 +POSTHOOK: query: -- This test covers HIVE-2332 create table t1 (int1 int, int2 int, str1 string, str2 string) POSTHOOK: type: CREATETABLE POSTHOOK: Output: default@t1 -PREHOOK: query: insert into table t1 select cast(key as int), cast(key as int), value, value from src where key < 6 +PREHOOK: query: --disabled RS-dedup for keeping intention of test + +insert into table t1 select cast(key as int), cast(key as int), value, value from src where key < 6 PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@t1 -POSTHOOK: query: insert into table t1 select cast(key as int), cast(key as int), value, value from src where key < 6 +POSTHOOK: query: --disabled RS-dedup for keeping intention of test + +insert into table t1 select cast(key as int), cast(key as int), value, value from src where key < 6 POSTHOOK: type: QUERY POSTHOOK: Input: default@src POSTHOOK: Output: default@t1 Modified: hive/trunk/ql/src/test/results/clientpositive/groupby_rollup1.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby_rollup1.q.out?rev=1465721&r1=1465720&r2=1465721&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/groupby_rollup1.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/groupby_rollup1.q.out Mon Apr 8 18:53:39 2013 @@ -399,7 +399,6 @@ ABSTRACT SYNTAX TREE: STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 Stage-0 is a root stage STAGE PLANS: @@ -459,44 +458,7 @@ STAGE PLANS: type: string expr: KEY._col1 type: string - mode: partials - outputColumnNames: _col0, _col1, _col2 - File Output Operator - compressed: false - GlobalTableId: 0 - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - - Stage: Stage-2 - Map Reduce - Alias -> Map Operator Tree: -#### A masked pattern was here #### - Reduce Output Operator - key expressions: - expr: _col0 - type: string - expr: _col1 - type: string - sort order: ++ - Map-reduce partition columns: - expr: _col0 - type: string - tag: -1 - value expressions: - expr: _col2 - type: bigint - Reduce Operator Tree: - Group By Operator - aggregations: - expr: count(VALUE._col0) - bucketGroup: false - keys: - expr: KEY._col0 - type: string - expr: KEY._col1 - type: string - mode: final + mode: complete outputColumnNames: _col0, _col1, _col2 Select Operator expressions: