tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blrun...@apache.org
Subject [1/2] TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa)
Date Wed, 08 Oct 2014 02:37:12 GMT
Repository: tajo
Updated Branches:
  refs/heads/master de28c8294 -> 0dfa3972c


http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 919ac9b..1f348ff 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -43,13 +43,18 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.plan.proto.PlanProto;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
@@ -810,9 +815,30 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         if (grpNode.getType() == NodeType.GROUP_BY) {
           hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
         } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
-          hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length >
0;
+          // Find current distinct stage node.
+          DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(),
NodeType.DISTINCT_GROUP_BY);
+          if (distinctNode == null) {
+            LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode");
+            distinctNode = (DistinctGroupbyNode)grpNode;
+          }
+          hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
+
+          Enforcer enforcer = subQuery.getBlock().getEnforcer();
+          if (enforcer == null) {
+            LOG.warn(subQuery.getId() + ", DistinctGroupbyNode's enforcer is null.");
+          }
+          EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer,
distinctNode);
+          if (property != null) {
+            if (property.getDistinct().getIsMultipleAggregation()) {
+              MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
+              if (stage != MultipleAggregationStage.THRID_STAGE) {
+                hasGroupColumns = true;
+              }
+            }
+          }
         }
         if (!hasGroupColumns) {
+          LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to
1");
           return 1;
         } else {
           long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index bde2459..2760301 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -315,6 +315,12 @@ message DistinctGroupbyEnforcer {
     SORT_AGGREGATION = 1;
   }
 
+  enum MultipleAggregationStage {
+    FIRST_STAGE = 0;
+    SECOND_STAGE = 1;
+    THRID_STAGE = 3;
+  }
+
   message SortSpecArray {
     required int32 pid = 1;
     repeated SortSpecProto sortSpecs = 2;
@@ -322,6 +328,8 @@ message DistinctGroupbyEnforcer {
   required int32 pid = 1;
   required DistinctAggregationAlgorithm algorithm = 2;
   repeated SortSpecArray sortSpecArrays = 3;
+  required bool isMultipleAggregation = 4 [default = false];
+  optional MultipleAggregationStage multipleAggregationStage = 5;
 }
 
 message EnforcerProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index fccec26..8b9f9f7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -20,10 +20,7 @@ package org.apache.tajo.engine.query;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -34,9 +31,14 @@ import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TajoWorker;
+import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.sql.ResultSet;
 import java.util.*;
@@ -44,11 +46,33 @@ import java.util.*;
 import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
 public class TestGroupByQuery extends QueryTestCaseBase {
   private static final Log LOG = LogFactory.getLog(TestGroupByQuery.class);
 
-  public TestGroupByQuery() throws Exception {
+  public TestGroupByQuery(String groupByOption) throws Exception {
     super(TajoConstants.DEFAULT_DATABASE_NAME);
+
+    Map<String, String> variables = new HashMap<String, String>();
+    if (groupByOption.equals("MultiLevel")) {
+      variables.put(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname(), "true");
+    } else {
+      variables.put(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname(), "false");
+    }
+    client.updateSessionVariables(variables);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    client.unsetSessionVariables(TUtil.newList(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED.keyname()));
+  }
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][]{
+        {"MultiLevel"},
+        {"No-MultiLevel"},
+    });
   }
 
   @Test
@@ -285,6 +309,24 @@ public class TestGroupByQuery extends QueryTestCaseBase {
   }
 
   @Test
+  public final void testDistinctAggregation8() throws Exception {
+    /*
+    select
+    sum(distinct l_orderkey),
+        l_linenumber, l_returnflag, l_linestatus, l_shipdate,
+        count(distinct l_partkey),
+        sum(l_orderkey)
+    from
+        lineitem
+    group by
+    l_linenumber, l_returnflag, l_linestatus, l_shipdate;
+    */
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
   public final void testDistinctAggregationWithHaving1() throws Exception {
     // select l_linenumber, count(*), count(distinct l_orderkey), sum(distinct l_orderkey)
from lineitem
     // group by l_linenumber having sum(distinct l_orderkey) >= 6;
@@ -343,6 +385,14 @@ public class TestGroupByQuery extends QueryTestCaseBase {
     assertResultSet(res, "testDistinctAggregation_case8.result");
     res.close();
 
+    res = executeFile("testDistinctAggregation_case9.sql");
+    assertResultSet(res, "testDistinctAggregation_case9.result");
+    res.close();
+
+    res = executeFile("testDistinctAggregation_case10.sql");
+    assertResultSet(res, "testDistinctAggregation_case10.result");
+    res.close();
+
     // case9
     KeyValueSet tableOptions = new KeyValueSet();
     tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
new file mode 100644
index 0000000..0553d06
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
@@ -0,0 +1,9 @@
+select
+  sum(distinct l_orderkey),
+  l_linenumber, l_returnflag, l_linestatus, l_shipdate,
+  count(distinct l_partkey),
+  sum(l_orderkey)
+from
+  lineitem
+group by
+  l_linenumber, l_returnflag, l_linestatus, l_shipdate;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
new file mode 100644
index 0000000..6ab7c25
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
@@ -0,0 +1,5 @@
+select sum(cnt1), sum(sum2)
+from (
+  select o_orderdate, count(distinct o_orderpriority), count(distinct o_orderkey) cnt1, sum(o_totalprice)
sum2
+  from orders group by o_orderdate
+) a
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
new file mode 100644
index 0000000..6265599
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
@@ -0,0 +1,11 @@
+select
+    lineitem.l_orderkey as l_orderkey,
+    count(distinct lineitem.l_partkey) as cnt1,
+    sum(lineitem.l_quantity + lineitem.l_linenumber)/count(distinct lineitem.l_suppkey) as
value2,
+    lineitem.l_partkey as l_partkey,
+    avg(lineitem.l_quantity) as avg1,
+    count(distinct lineitem.l_suppkey) as cnt2
+from
+    lineitem
+group by
+    lineitem.l_orderkey, lineitem.l_partkey
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
new file mode 100644
index 0000000..519390d
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
@@ -0,0 +1,7 @@
+?sum,l_linenumber,l_returnflag,l_linestatus,l_shipdate,?count_1,?sum_2
+-------------------------------
+1,1,N,O,1996-03-13,1,1
+2,1,N,O,1997-01-28,1,2
+3,1,R,F,1994-02-02,1,3
+1,2,N,O,1996-04-12,1,1
+3,2,R,F,1993-11-09,1,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
new file mode 100644
index 0000000..2035d9f
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
@@ -0,0 +1,3 @@
+?sum,?sum_1
+-------------------------------
+3,414440.9
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
new file mode 100644
index 0000000..506eea0
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
@@ -0,0 +1,6 @@
+l_orderkey,cnt1,value2,l_partkey,avg1,cnt2
+-------------------------------
+1,1,28.0,1,26.5,2
+2,1,39.0,2,38.0,1
+3,1,46.0,2,45.0,1
+3,1,51.0,3,49.0,1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index e6b12b1..25f1ae7 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -25,6 +25,7 @@ Available Session Variables:
 \set JOIN_PER_SHUFFLE_SIZE [int value] - shuffle output size for join (mb)
 \set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb)
 \set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table
write (mb)
+\set GROUPBY_MULTI_LEVEL_ENABLED [true or false] - Multiple level groupby enabled
 \set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb)
 \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb)
 \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb)

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
index 51388a4..084c105 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -42,10 +42,6 @@ public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComp
   @SuppressWarnings("unused")
   private final boolean[] nullFirsts;  
 
-  private Datum left;
-  private Datum right;
-  private int compVal;
-
   /**
    * @param schema The schema of input tuples
    * @param sortKeys The description of sort keys
@@ -88,6 +84,10 @@ public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComp
 
   @Override
   public int compare(Tuple tuple1, Tuple tuple2) {
+    Datum left = null;
+    Datum right = null;
+    int compVal = 0;
+
     for (int i = 0; i < sortKeyIds.length; i++) {
       left = tuple1.get(sortKeyIds[i]);
       right = tuple2.get(sortKeyIds[i]);


Mime
View raw message