hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kevinwilf...@apache.org
Subject svn commit: r1455650 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/optimizer/ java/org/apache/hadoop/hive/ql/parse/ java/org/apache/hadoop/hive/ql/plan/ test/queries/clientpositive/ test/results/clientpositive/
Date Tue, 12 Mar 2013 17:50:56 GMT
Author: kevinwilfong
Date: Tue Mar 12 17:50:55 2013
New Revision: 1455650

URL: http://svn.apache.org/r1455650
Log:
HIVE-4096. problem in hive.map.groupby.sorted with distincts. (njain via kevinwilfong)

Added:
    hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_8.q
    hive/trunk/ql/src/test/results/clientpositive/groupby_sort_8.q.out
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1455650&r1=1455649&r2=1455650&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Tue Mar
12 17:50:55 2013
@@ -175,7 +175,9 @@ public class GroupByOptimizer implements
       boolean useMapperSort =
           HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT);
 
-      if (useMapperSort && (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
+      // Dont remove the operator for distincts
+      if (useMapperSort && !groupByOp.getConf().isDistinct() &&
+          (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
         convertGroupByMapSideSortedGroupBy(groupByOp, depth);
       }
       else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1455650&r1=1455649&r2=1455650&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Mar 12
17:50:55 2013
@@ -2849,7 +2849,7 @@ public class SemanticAnalyzer extends Ba
 
     Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
-            false, groupByMemoryUsage, memoryThreshold, null, false, 0),
+            false, groupByMemoryUsage, memoryThreshold, null, false, 0, numDistinctUDFs >
0),
         new RowSchema(groupByOutputRowResolver.getColumnInfos()),
         reduceSinkOperatorInfo), groupByOutputRowResolver);
     op.setColumnExprMap(colExprMap);
@@ -3023,11 +3023,13 @@ public class SemanticAnalyzer extends Ba
       reduceValues = ((ReduceSinkDesc) reduceSinkOperatorInfo.getConf()).getValueCols();
     }
     int numDistinctUDFs = 0;
+    boolean containsDistinctAggr = false;
     for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
       ASTNode value = entry.getValue();
       String aggName = unescapeIdentifier(value.getChild(0).getText());
       ArrayList<ExprNodeDesc> aggParameters = new ArrayList<ExprNodeDesc>();
       boolean isDistinct = (value.getType() == HiveParser.TOK_FUNCTIONDI);
+      containsDistinctAggr = containsDistinctAggr || isDistinct;
 
       // If the function is distinct, partial aggregation has not been done on
       // the client side.
@@ -3129,7 +3131,7 @@ public class SemanticAnalyzer extends Ba
             distPartAgg, groupByMemoryUsage, memoryThreshold,
             groupingSets,
             groupingSetsPresent && groupingSetsNeedAdditionalMRJob,
-            groupingSetsPosition),
+            groupingSetsPosition, containsDistinctAggr),
         new RowSchema(groupByOutputRowResolver.getColumnInfos()), reduceSinkOperatorInfo),
         groupByOutputRowResolver);
     op.setColumnExprMap(colExprMap);
@@ -3250,6 +3252,7 @@ public class SemanticAnalyzer extends Ba
         .getAggregationExprsForClause(dest);
     assert (aggregationTrees != null);
 
+    boolean containsDistinctAggr = false;
     for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
       ASTNode value = entry.getValue();
       String aggName = unescapeIdentifier(value.getChild(0).getText());
@@ -3265,6 +3268,7 @@ public class SemanticAnalyzer extends Ba
       }
 
       boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI;
+      containsDistinctAggr = containsDistinctAggr || isDistinct;
       boolean isAllColumns = value.getType() == HiveParser.TOK_FUNCTIONSTAR;
       Mode amode = groupByDescModeToUDAFMode(mode, isDistinct);
 
@@ -3293,7 +3297,7 @@ public class SemanticAnalyzer extends Ba
     Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
             false, groupByMemoryUsage, memoryThreshold,
-            groupingSetKeys, groupingSetsPresent, groupingSetsPosition),
+            groupingSetKeys, groupingSetsPresent, groupingSetsPosition, containsDistinctAggr),
         new RowSchema(groupByOutputRowResolver.getColumnInfos()),
         inputOperatorInfo), groupByOutputRowResolver);
     op.setColumnExprMap(colExprMap);
@@ -3752,6 +3756,7 @@ public class SemanticAnalyzer extends Ba
 
     HashMap<String, ASTNode> aggregationTrees = parseInfo
         .getAggregationExprsForClause(dest);
+    boolean containsDistinctAggr = false;
     for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
       ArrayList<ExprNodeDesc> aggParameters = new ArrayList<ExprNodeDesc>();
       ASTNode value = entry.getValue();
@@ -3768,6 +3773,7 @@ public class SemanticAnalyzer extends Ba
       String aggName = unescapeIdentifier(value.getChild(0).getText());
 
       boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI;
+      containsDistinctAggr = containsDistinctAggr || isDistinct;
       boolean isStar = value.getType() == HiveParser.TOK_FUNCTIONSTAR;
       Mode amode = groupByDescModeToUDAFMode(mode, isDistinct);
       GenericUDAFEvaluator genericUDAFEvaluator = genericUDAFEvaluators
@@ -3795,7 +3801,7 @@ public class SemanticAnalyzer extends Ba
 
     Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
-            false, groupByMemoryUsage, memoryThreshold, null, false, 0),
+            false, groupByMemoryUsage, memoryThreshold, null, false, 0, containsDistinctAggr),
         new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
         reduceSinkOperatorInfo2), groupByOutputRowResolver2);
     op.setColumnExprMap(colExprMap);
@@ -5998,7 +6004,7 @@ public class SemanticAnalyzer extends Ba
         .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
     Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
-            false, groupByMemoryUsage, memoryThreshold, null, false, 0),
+            false, groupByMemoryUsage, memoryThreshold, null, false, 0, false),
         new RowSchema(groupByOutputRowResolver.getColumnInfos()),
         inputOperatorInfo), groupByOutputRowResolver);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java?rev=1455650&r1=1455649&r2=1455650&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java Tue Mar 12 17:50:55
2013
@@ -65,6 +65,7 @@ public class GroupByDesc extends Abstrac
   private ArrayList<java.lang.String> outputColumnNames;
   private float groupByMemoryUsage;
   private float memoryThreshold;
+  transient private boolean isDistinct;
 
   public GroupByDesc() {
   }
@@ -79,10 +80,11 @@ public class GroupByDesc extends Abstrac
       final float memoryThreshold,
       final List<Integer> listGroupingSets,
       final boolean groupingSetsPresent,
-      final int groupingSetsPosition) {
+      final int groupingSetsPosition,
+      final boolean isDistinct) {
     this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
       false, groupByMemoryUsage, memoryThreshold, listGroupingSets,
-      groupingSetsPresent, groupingSetsPosition);
+      groupingSetsPresent, groupingSetsPosition, isDistinct);
   }
 
   public GroupByDesc(
@@ -96,7 +98,8 @@ public class GroupByDesc extends Abstrac
       final float memoryThreshold,
       final List<Integer> listGroupingSets,
       final boolean groupingSetsPresent,
-      final int groupingSetsPosition) {
+      final int groupingSetsPosition,
+      final boolean isDistinct) {
     this.mode = mode;
     this.outputColumnNames = outputColumnNames;
     this.keys = keys;
@@ -108,6 +111,7 @@ public class GroupByDesc extends Abstrac
     this.listGroupingSets = listGroupingSets;
     this.groupingSetsPresent = groupingSetsPresent;
     this.groupingSetPosition = groupingSetsPosition;
+    this.isDistinct = isDistinct;
   }
 
   public Mode getMode() {
@@ -249,4 +253,8 @@ public class GroupByDesc extends Abstrac
   public void setGroupingSetPosition(int groupingSetPosition) {
     this.groupingSetPosition = groupingSetPosition;
   }
+
+  public boolean isDistinct() {
+    return isDistinct;
+  }
 }

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_8.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_8.q?rev=1455650&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_8.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_8.q Tue Mar 12 17:50:55 2013
@@ -0,0 +1,20 @@
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+set hive.exec.reducers.max = 10;
+set hive.map.groupby.sorted=true;
+
+CREATE TABLE T1(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 PARTITION (ds='1');
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 PARTITION (ds='1') select key, val from T1 where ds = '1';
+
+-- The plan is not converted to a map-side, since although the sorting columns and grouping
+-- columns match, the user is issueing a distinct
+EXPLAIN
+select count(distinct key) from T1;
+select count(distinct key) from T1;
+
+DROP TABLE T1;
\ No newline at end of file

Added: hive/trunk/ql/src/test/results/clientpositive/groupby_sort_8.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby_sort_8.q.out?rev=1455650&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/groupby_sort_8.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/groupby_sort_8.q.out Tue Mar 12 17:50:55
2013
@@ -0,0 +1,124 @@
+PREHOOK: query: CREATE TABLE T1(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@T1
+PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 PARTITION (ds='1')
+PREHOOK: type: LOAD
+PREHOOK: Output: default@t1
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 PARTITION (ds='1')
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@t1
+POSTHOOK: Output: default@t1@ds=1
+PREHOOK: query: -- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 PARTITION (ds='1') select key, val from T1 where ds = '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+PREHOOK: Input: default@t1@ds=1
+PREHOOK: Output: default@t1@ds=1
+POSTHOOK: query: -- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 PARTITION (ds='1') select key, val from T1 where ds = '1'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+POSTHOOK: Input: default@t1@ds=1
+POSTHOOK: Output: default@t1@ds=1
+POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string,
comment:null), ]
+POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string,
comment:null), ]
+PREHOOK: query: -- The plan is not converted to a map-side, since although the sorting columns
and grouping
+-- columns match, the user is issueing a distinct
+EXPLAIN
+select count(distinct key) from T1
+PREHOOK: type: QUERY
+POSTHOOK: query: -- The plan is not converted to a map-side, since although the sorting columns
and grouping
+-- columns match, the user is issueing a distinct
+EXPLAIN
+select count(distinct key) from T1
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string,
comment:null), ]
+POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string,
comment:null), ]
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL key))))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        t1 
+          TableScan
+            alias: t1
+            Select Operator
+              expressions:
+                    expr: key
+                    type: string
+              outputColumnNames: key
+              Group By Operator
+                aggregations:
+                      expr: count(DISTINCT key)
+                bucketGroup: true
+                keys:
+                      expr: key
+                      type: string
+                mode: hash
+                outputColumnNames: _col0, _col1
+                Reduce Output Operator
+                  key expressions:
+                        expr: _col0
+                        type: string
+                  sort order: +
+                  tag: -1
+                  value expressions:
+                        expr: _col1
+                        type: bigint
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: count(DISTINCT KEY._col0:0._col0)
+          bucketGroup: false
+          mode: mergepartial
+          outputColumnNames: _col0
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: bigint
+            outputColumnNames: _col0
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: select count(distinct key) from T1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+PREHOOK: Input: default@t1@ds=1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct key) from T1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+POSTHOOK: Input: default@t1@ds=1
+#### A masked pattern was here ####
+POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string,
comment:null), ]
+POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string,
comment:null), ]
+5
+PREHOOK: query: DROP TABLE T1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: DROP TABLE T1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1
+POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string,
comment:null), ]
+POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string,
comment:null), ]



Mime
View raw message