tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/2] git commit: TAJO-801: Multiple distinct should be supported. (Hyoungjun Kim via hyunsik)
Date Thu, 15 May 2014 23:54:19 GMT
TAJO-801: Multiple distinct should be supported. (Hyoungjun Kim via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9350a802
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9350a802
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9350a802

Branch: refs/heads/master
Commit: 9350a8026b107da11ed2dc8457ad95d3f2153f0c
Parents: 1c21e8b
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Fri May 16 08:53:01 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Fri May 16 08:53:01 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   6 +-
 .../main/java/org/apache/tajo/util/TUtil.java   |  11 +
 .../apache/tajo/engine/eval/EvalTreeUtil.java   |  11 +
 .../tajo/engine/parser/HiveQLAnalyzer.java      |   3 +-
 .../engine/planner/BasicLogicalPlanVisitor.java |  12 +
 .../planner/ExplainLogicalPlanVisitor.java      |   6 +
 .../tajo/engine/planner/ExprAnnotator.java      |  30 +-
 .../tajo/engine/planner/LogicalPlanVisitor.java |   3 +
 .../engine/planner/PhysicalPlannerImpl.java     |  66 +++
 .../apache/tajo/engine/planner/PlannerUtil.java |   6 +
 .../engine/planner/PreLogicalPlanVerifier.java  |  23 -
 .../tajo/engine/planner/enforce/Enforcer.java   |  38 ++
 .../engine/planner/global/GlobalPlanner.java    |  67 ++-
 .../global/builder/DistinctGroupbyBuilder.java  | 476 +++++++++++++++++++
 .../planner/logical/DistinctGroupbyNode.java    | 203 ++++++++
 .../engine/planner/logical/GroupbyNode.java     |  42 ++
 .../tajo/engine/planner/logical/NodeType.java   |   1 +
 .../DistinctGroupbyHashAggregationExec.java     | 388 +++++++++++++++
 .../DistinctGroupbySortAggregationExec.java     | 158 ++++++
 .../planner/physical/ExternalSortExec.java      |   3 +-
 .../planner/physical/HashAggregateExec.java     |   8 +-
 .../planner/physical/SortAggregateExec.java     |   8 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |  17 +
 .../tajo/engine/query/TestGroupByQuery.java     |  49 +-
 .../testDistinctAggregation7.sql                |   6 +
 .../testDistinctAggregation_case1.sql           |   7 +
 .../testDistinctAggregation_case2.sql           |   8 +
 .../testDistinctAggregation_case3.sql           |   9 +
 .../testDistinctAggregation_case4.sql           |  10 +
 .../testDistinctAggregation_case5.sql           |  10 +
 .../testDistinctAggregation_case6.sql           |  12 +
 .../testDistinctAggregation_case7.sql           |   9 +
 .../testDistinctAggregation7.result             |   3 +
 .../testDistinctAggregation_case1.result        |   4 +
 .../testDistinctAggregation_case2.result        |   4 +
 .../testDistinctAggregation_case3.result        |   4 +
 .../testDistinctAggregation_case4.result        |   5 +
 .../testDistinctAggregation_case5.result        |   5 +
 .../testDistinctAggregation_case6.result        |   5 +
 .../testDistinctAggregation_case7.result        |   5 +
 40 files changed, 1686 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 75deb0f..222f02e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,8 +4,8 @@ Release 0.9.0 - unreleased
 
   NEW FEATURES
 
-    TAJO-494: Extend TajoClient to run a query with a plan context serialized as the 
-    JSON form. (jihoon)
+    TAJO-494: Extend TajoClient to run a query with a plan context serialized 
+    as the JSON form. (jihoon)
 
     TAJO-761: Implements INTERVAL type. (Hyoungjun Kim via hyunsik)
 
@@ -15,6 +15,8 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-801: Multiple distinct should be supported. (Hyoungjun Kim via hyunsik)
+
     TAJO-807: Implement Round(numeric, int) function.
     (Seungun Choe via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 431c930..832c1e5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -236,6 +236,17 @@ public class TUtil {
     return collection.toArray((T[]) array);
   }
 
+  public static int[] toArray(Collection<Integer> collection) {
+    int[] array = new int[collection.size()];
+
+    int index = 0;
+    for (Integer eachInt: collection) {
+      array[index++] = eachInt;
+    }
+
+    return array;
+  }
+
   /**
    * It returns the exact code point at which this running thread is executed.
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index 2417193..8982bd5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -140,6 +140,17 @@ public class EvalTreeUtil {
     
     return schema;
   }
+
+  public static String columnsToStr(Collection<Column> columns) {
+    StringBuilder sb = new StringBuilder();
+    String prefix = "";
+    for (Column column: columns) {
+      sb.append(prefix).append(column.getQualifiedName());
+      prefix = ",";
+    }
+
+    return sb.toString();
+  }
   
   public static DataType getDomainByExpr(Schema inputSchema, EvalNode expr)
       throws InternalException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/parser/HiveQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/HiveQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/HiveQLAnalyzer.java
index 60e9685..de4b159 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/HiveQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/HiveQLAnalyzer.java
@@ -1141,7 +1141,8 @@ public class HiveQLAnalyzer extends HiveQLParserBaseVisitor<Expr> {
 
     boolean isDistinct = false;
     if (ctx.getChild(2) != null) {
-      if (ctx.getChild(2) instanceof TerminalNodeImpl && ctx.getChild(2).getText().equalsIgnoreCase("DISTINCT")) {
+      if (ctx.getChild(2) instanceof TerminalNodeImpl
+          && ctx.getChild(2).getText().equalsIgnoreCase("DISTINCT_GROUP_BY")) {
         isDistinct = true;
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
index 0f758bf..3bffefb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -74,6 +74,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
       case GROUP_BY:
         current = visitGroupBy(context, plan, block, (GroupbyNode) node, stack);
         break;
+      case DISTINCT_GROUP_BY:
+        current = visitDistinct(context, plan, block, (DistinctGroupbyNode) node, stack);
+        break;
       case SELECTION:
         current = visitFilter(context, plan, block, (SelectionNode) node, stack);
         break;
@@ -185,6 +188,15 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
   }
 
   @Override
+  public RESULT visitDistinct(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
+                             Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
   public RESULT visitFilter(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SelectionNode node,
                             Stack<LogicalNode> stack) throws PlanningException {
     stack.push(node);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
index 9dd8700..ad9bdf1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExplainLogicalPlanVisitor.java
@@ -108,6 +108,12 @@ public class ExplainLogicalPlanVisitor extends BasicLogicalPlanVisitor<ExplainLo
     return visitUnaryNode(context, plan, block, node, stack);
   }
 
+  @Override
+  public LogicalNode visitDistinct(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
+                                  Stack<LogicalNode> stack) throws PlanningException {
+    return visitUnaryNode(context, plan, block, node, stack);
+  }
+
   private LogicalNode visitUnaryNode(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                      UnaryNode node, Stack<LogicalNode> stack) throws PlanningException {
     context.depth++;

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
index dfbe600..e74fd70 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
@@ -580,22 +580,22 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
 
 
     try {
-    CatalogProtos.FunctionType functionType = funcDesc.getFuncType();
-    if (functionType == CatalogProtos.FunctionType.GENERAL
-        || functionType == CatalogProtos.FunctionType.UDF) {
-      return new GeneralFunctionEval(funcDesc, (GeneralFunction) funcDesc.newInstance(), givenArgs);
-    } else if (functionType == CatalogProtos.FunctionType.AGGREGATION
-        || functionType == CatalogProtos.FunctionType.UDA) {
-      if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) {
-        ctx.currentBlock.setAggregationRequire();
+      CatalogProtos.FunctionType functionType = funcDesc.getFuncType();
+      if (functionType == CatalogProtos.FunctionType.GENERAL
+          || functionType == CatalogProtos.FunctionType.UDF) {
+        return new GeneralFunctionEval(funcDesc, (GeneralFunction) funcDesc.newInstance(), givenArgs);
+      } else if (functionType == CatalogProtos.FunctionType.AGGREGATION
+          || functionType == CatalogProtos.FunctionType.UDA) {
+        if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) {
+          ctx.currentBlock.setAggregationRequire();
+        }
+        return new AggregationFunctionCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
+      } else if (functionType == CatalogProtos.FunctionType.DISTINCT_AGGREGATION
+          || functionType == CatalogProtos.FunctionType.DISTINCT_UDA) {
+        throw new PlanningException("Unsupported function: " + funcDesc.toString());
+      } else {
+        throw new PlanningException("Unsupported Function Type: " + functionType.name());
       }
-      return new AggregationFunctionCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
-    } else if (functionType == CatalogProtos.FunctionType.DISTINCT_AGGREGATION
-        || functionType == CatalogProtos.FunctionType.DISTINCT_UDA) {
-      throw new PlanningException("Unsupported function: " + funcDesc.toString());
-    } else {
-      throw new PlanningException("Unsupported Function Type: " + functionType.name());
-    }
     } catch (InternalException e) {
       throw new PlanningException(e);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
index 17b5d0a..6850046 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -41,6 +41,9 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
   RESULT visitGroupBy(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
                       Stack<LogicalNode> stack) throws PlanningException;
 
+  RESULT visitDistinct(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
+                                Stack<LogicalNode> stack) throws PlanningException;
+
   RESULT visitFilter(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SelectionNode node,
                      Stack<LogicalNode> stack) throws PlanningException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 2053e36..e508d2c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
@@ -38,6 +39,9 @@ import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -174,6 +178,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         stack.pop();
         return createGroupByPlan(ctx, grpNode, leftExec);
 
+      case DISTINCT_GROUP_BY:
+        DistinctGroupbyNode distinctNode = (DistinctGroupbyNode) logicalNode;
+        stack.push(distinctNode);
+        leftExec = createPlanRecursive(ctx, distinctNode.getChild(), stack);
+        stack.pop();
+        return createDistinctGroupByPlan(ctx, distinctNode, leftExec);
+
       case HAVING:
         HavingNode havingNode = (HavingNode) logicalNode;
         stack.push(havingNode);
@@ -962,6 +973,57 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
+  public PhysicalExec createDistinctGroupByPlan(TaskAttemptContext context,
+                                                DistinctGroupbyNode distinctNode, PhysicalExec subOp)
+      throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, distinctNode);
+    if (property != null) {
+      DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm();
+      if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
+        return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
+      } else {
+        return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct());
+      }
+    } else {
+      return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
+    }
+  }
+
+  private PhysicalExec createInMemoryDistinctGroupbyExec(TaskAttemptContext ctx,
+      DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp) throws IOException {
+    return new DistinctGroupbyHashAggregationExec(ctx, distinctGroupbyNode, subOp);
+  }
+
+  private PhysicalExec createSortAggregationDistinctGroupbyExec(TaskAttemptContext ctx,
+      DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp,
+      DistinctGroupbyEnforcer enforcer) throws IOException {
+    List<GroupbyNode> groupbyNodes = distinctGroupbyNode.getGroupByNodes();
+
+    SortAggregateExec[] sortAggregateExec = new SortAggregateExec[groupbyNodes.size()];
+
+    List<SortSpecArray> sortSpecArrays = enforcer.getSortSpecArraysList();
+
+    int index = 0;
+    for (GroupbyNode eachGroupbyNode: groupbyNodes) {
+      SortSpecArray sortSpecArray = sortSpecArrays.get(index);
+      SortSpec[] sortSpecs = new SortSpec[sortSpecArray.getSortSpecsList().size()];
+      int sortIndex = 0;
+      for (SortSpecProto eachProto: sortSpecArray.getSortSpecsList()) {
+        sortSpecs[sortIndex++] = new SortSpec(eachProto);
+      }
+      SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+      sortNode.setSortSpecs(sortSpecs);
+      sortNode.setInSchema(subOp.getSchema());
+      sortNode.setOutSchema(eachGroupbyNode.getInSchema());
+      ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
+
+      sortAggregateExec[index++] = new SortAggregateExec(ctx, eachGroupbyNode, sortExec);
+    }
+
+    return new DistinctGroupbySortAggregationExec(ctx, distinctGroupbyNode, sortAggregateExec);
+  }
+
   public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
                                      PhysicalExec child) throws IOException {
 
@@ -1025,6 +1087,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       type = EnforceType.JOIN;
     } else if (node.getType() == NodeType.GROUP_BY) {
       type = EnforceType.GROUP_BY;
+    } else if (node.getType() == NodeType.DISTINCT_GROUP_BY) {
+      type = EnforceType.DISTINCT_GROUP_BY;
     } else if (node.getType() == NodeType.SORT) {
       type = EnforceType.SORT;
     } else if (node instanceof StoreTableNode
@@ -1043,6 +1107,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
           found = property;
         } else if (type == EnforceType.GROUP_BY && property.getGroupby().getPid() == node.getPID()) {
           found = property;
+        } else if (type == EnforceType.DISTINCT_GROUP_BY && property.getDistinct().getPid() == node.getPID()) {
+          found = property;
         } else if (type == EnforceType.SORT && property.getSort().getPid() == node.getPID()) {
           found = property;
         } else if (type == EnforceType.COLUMN_PARTITION && property.getColumnPartition().getPid() == node.getPID()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 1f97d14..a1ff0f0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -670,6 +670,12 @@ public class PlannerUtil {
         copy.setPID(-1);
       } else {
         copy.setPID(plan.newPID());
+        if (node instanceof DistinctGroupbyNode) {
+          DistinctGroupbyNode dNode = (DistinctGroupbyNode)copy;
+          for (GroupbyNode eachNode: dNode.getGroupByNodes()) {
+            eachNode.setPID(plan.newPID());
+          }
+        }
       }
       return copy;
     } catch (CloneNotSupportedException e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index 1ee0878..5eca5fd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.engine.planner;
 
-import com.google.common.collect.ObjectArrays;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
@@ -26,7 +25,6 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.master.session.Session;
 import org.apache.tajo.util.TUtil;
 
-import java.util.Arrays;
 import java.util.Set;
 import java.util.Stack;
 
@@ -70,28 +68,7 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe
         }
       }
 
-      // no two aggregations can have different DISTINCT columns.
-      //
-      // For example, the following query will work
-      // SELECT count(DISTINCT col1) and sum(DISTINCT col1) ..
-      //
-      // But, the following query will not work in this time
-      //
-      // SELECT count(DISTINCT col1) and SUM(DISTINCT col2) ..
       Set<GeneralSetFunctionExpr> exprs = ExprFinder.finds(namedExpr.getExpr(), OpType.GeneralSetFunction);
-      if (exprs.size() > 0) {
-        for (GeneralSetFunctionExpr setFunction : exprs) {
-          if (distinctValues == null && setFunction.isDistinct()) {
-            distinctValues = setFunction.getParams();
-          } else if (distinctValues != null && setFunction.isDistinct()) {
-            if (!Arrays.equals(distinctValues, setFunction.getParams())) {
-              Expr [] differences = ObjectArrays.concat(distinctValues, setFunction.getParams(), Expr.class);
-              throw new PlanningException("different DISTINCT columns are not supported yet: "
-                  + TUtil.arrayToString(differences));
-            }
-          }
-        }
-      }
 
       // Currently, avg functions with distinct aggregation are not supported.
       // This code does not allow users to use avg functions with distinct aggregation.

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index 91190f6..742736c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -23,6 +23,8 @@ import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.util.TUtil;
 
 import java.util.Collection;
@@ -130,6 +132,22 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
     TUtil.putToNestedList(properties, builder.getType(), builder.build());
   }
 
+  public void enforceDistinctAggregation(int pid,
+                                         DistinctAggregationAlgorithm algorithm,
+                                         List<SortSpecArray> sortSpecArrays) {
+    EnforceProperty.Builder builder = newProperty();
+    DistinctGroupbyEnforcer.Builder enforce = DistinctGroupbyEnforcer.newBuilder();
+    enforce.setPid(pid);
+    enforce.setAlgorithm(algorithm);
+    if (sortSpecArrays != null) {
+      enforce.addAllSortSpecArrays(sortSpecArrays);
+    }
+
+    builder.setType(EnforceType.DISTINCT_GROUP_BY);
+    builder.setDistinct(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
   public void enforceSortAlgorithm(int pid, SortEnforce.SortAlgorithm algorithm) {
     EnforceProperty.Builder builder = newProperty();
     SortEnforce.Builder enforce = SortEnforce.newBuilder();
@@ -218,6 +236,26 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
         }
       }
       break;
+    case DISTINCT_GROUP_BY:
+      DistinctGroupbyEnforcer distinct = property.getDistinct();
+      sb.append("type=Distinct,alg=");
+      if (distinct.getAlgorithm() == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
+        sb.append("hash");
+      } else {
+        sb.append("sort");
+        sb.append(",keys=");
+        String recordDelim = "";
+        for (SortSpecArray sortSpecArray : distinct.getSortSpecArraysList()) {
+          sb.append(recordDelim);
+          String delim = "";
+          for (CatalogProtos.SortSpecProto sortSpec: sortSpecArray.getSortSpecsList()) {
+            sb.append(delim).append(sortSpec.getColumn().getName());
+            delim = ",";
+          }
+          recordDelim = " | ";
+        }
+      }
+      break;
     case BROADCAST:
       BroadcastEnforce broadcast = property.getBroadcast();
       sb.append("type=Broadcast, tables=").append(broadcast.getTableName());

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 9002ac0..16def83 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -37,6 +37,7 @@ import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.function.AggFunction;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.global.builder.DistinctGroupbyBuilder;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 import org.apache.tajo.exception.InternalException;
@@ -88,9 +89,21 @@ public class GlobalPlanner {
     }
   }
 
+  public CatalogProtos.StoreType getStoreType() {
+    return storeType;
+  }
+
   public class GlobalPlanContext {
     MasterPlan plan;
     Map<Integer, ExecutionBlock> execBlockMap = Maps.newHashMap();
+
+    public MasterPlan getPlan() {
+      return plan;
+    }
+
+    public Map<Integer, ExecutionBlock> getExecBlockMap() {
+      return execBlockMap;
+    }
   }
 
   /**
@@ -140,7 +153,7 @@ public class GlobalPlanner {
     }
 
     masterPlan.setTerminal(terminalBlock);
-    LOG.info(masterPlan.toString());
+    LOG.info("\n" + masterPlan.toString());
   }
 
   private static void setFinalOutputChannel(DataChannel outputChannel, Schema outputSchema) {
@@ -456,6 +469,47 @@ public class GlobalPlanner {
     return rewritten;
   }
 
+  public ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
+                                                  DistinctGroupbyNode firstPhaseGroupBy,
+                                                  DistinctGroupbyNode secondPhaseGroupBy) {
+    DataChannel lastDataChannel = null;
+
+    // It pushes down the first phase group-by operator into all child blocks.
+    //
+    // (second phase)    G (currentBlock)
+    //                  /|\
+    //                / / | \
+    // (first phase) G G  G  G (child block)
+
+    // They are already connected one another.
+    // So, we don't need to connect them again.
+    for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
+      if (firstPhaseGroupBy.isEmptyGrouping()) {
+        dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
+      } else {
+        dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
+      }
+      dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+      ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
+
+      // Why must firstPhaseGroupby be copied?
+      //
+      // A groupby in each execution block can have different child.
+      // It affects groupby's input schema.
+      DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
+      firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
+      childBlock.setPlan(firstPhaseGroupbyCopy);
+
+      // just keep the last data channel.
+      lastDataChannel = dataChannel;
+    }
+
+    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
+    secondPhaseGroupBy.setChild(scanNode);
+    lastBlock.setPlan(secondPhaseGroupBy);
+    return lastBlock;
+  }
+
   /**
    * If there are at least one distinct aggregation function, a query works as if the query is rewritten as follows:
    *
@@ -493,7 +547,7 @@ public class GlobalPlanner {
    * As a result, although a no-distinct aggregation requires two stages, a distinct aggregation requires three
    * execution blocks.
    */
-  private ExecutionBlock buildGroupByIncludingDistinctFunctions(GlobalPlanContext context,
+  private ExecutionBlock buildGroupByIncludingDistinctFunctionsMultiStage(GlobalPlanContext context,
                                                                 ExecutionBlock latestExecBlock,
                                                                 GroupbyNode groupbyNode) throws PlanningException {
 
@@ -505,7 +559,6 @@ public class GlobalPlanner {
     List<Target> firstPhaseEvalNodeTargets = Lists.newArrayList();
 
     for (AggregationFunctionCallEval aggFunction : groupbyNode.getAggFunctions()) {
-
       if (aggFunction.isDistinct()) {
         // add distinct columns to first stage's grouping columns
         firstStageGroupingColumns.addAll(EvalTreeUtil.findUniqueColumns(aggFunction));
@@ -535,6 +588,7 @@ public class GlobalPlanner {
     for (Target target : firstPhaseEvalNodeTargets) {
       firstStageTargets[i++] = target;
     }
+
     // Create the groupby node for the first stage and set all necessary descriptions
     GroupbyNode firstStageGroupby = new GroupbyNode(context.plan.getLogicalPlan().newPID());
     firstStageGroupby.setGroupingColumns(TUtil.toArray(firstStageGroupingColumns, Column.class));
@@ -577,12 +631,12 @@ public class GlobalPlanner {
 
   private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock,
                                       GroupbyNode groupbyNode) throws PlanningException {
-
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;
 
     if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function
-      return buildGroupByIncludingDistinctFunctions(context, lastBlock, groupbyNode);
+      DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this);
+      return builder.buildPlan(context, lastBlock, groupbyNode);
     } else {
       GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);
 
@@ -597,7 +651,7 @@ public class GlobalPlanner {
     return currentBlock;
   }
 
-  public boolean hasUnionChild(UnaryNode node) {
+  public static boolean hasUnionChild(UnaryNode node) {
 
     // there are two cases:
     //
@@ -676,6 +730,7 @@ public class GlobalPlanner {
 
   private ExecutionBlock buildTwoPhaseGroupby(MasterPlan masterPlan, ExecutionBlock latestBlock,
                                                      GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) {
+
     ExecutionBlock childBlock = latestBlock;
     childBlock.setPlan(firstPhaseGroupby);
     ExecutionBlock currentBlock = masterPlan.newExecutionBlock();

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
new file mode 100644
index 0000000..1ccd9dc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global.builder;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE;
+
+public class DistinctGroupbyBuilder {
+  private static Log LOG = LogFactory.getLog(DistinctGroupbyBuilder.class);
+  private GlobalPlanner globalPlanner;
+
+  public DistinctGroupbyBuilder(GlobalPlanner globalPlanner) {
+    this.globalPlanner = globalPlanner;
+  }
+
+
+  public ExecutionBlock buildPlan(GlobalPlanContext context,
+                                  ExecutionBlock latestExecBlock,
+                                  LogicalNode currentNode) throws PlanningException {
+    try {
+      GroupbyNode groupbyNode = (GroupbyNode)currentNode;
+      LogicalPlan plan = context.getPlan().getLogicalPlan();
+      DistinctGroupbyNode baseDistinctNode = buildBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode);
+
+      // Create First, SecondStage's Node using baseNode
+      DistinctGroupbyNode[] distinctNodes = createMultiPhaseDistinctNode(plan, groupbyNode, baseDistinctNode);
+
+      DistinctGroupbyNode firstStageDistinctNode = distinctNodes[0];
+      DistinctGroupbyNode secondStageDistinctNode = distinctNodes[1];
+
+      // Set latestExecBlock's plan with firstDistinctNode
+      latestExecBlock.setPlan(firstStageDistinctNode);
+
+      // Make SecondStage ExecutionBlock
+      ExecutionBlock secondStageBlock = context.getPlan().newExecutionBlock();
+
+      // Set Enforcer: SecondStage => SortAggregationAlgorithm
+      setDistinctAggregationEnforcer(latestExecBlock, firstStageDistinctNode, secondStageBlock, secondStageDistinctNode);
+
+      //Create data channel FirstStage to SecondStage
+      DataChannel channel;
+      if (groupbyNode.isEmptyGrouping()) {
+        channel = new DataChannel(latestExecBlock, secondStageBlock, HASH_SHUFFLE, 1);
+        channel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns());
+      } else {
+        channel = new DataChannel(latestExecBlock, secondStageBlock, HASH_SHUFFLE, 32);
+        channel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns());
+      }
+      channel.setSchema(firstStageDistinctNode.getOutSchema());
+      channel.setStoreType(globalPlanner.getStoreType());
+
+      ScanNode scanNode = GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), channel);
+      secondStageDistinctNode.setChild(scanNode);
+
+      secondStageBlock.setPlan(secondStageDistinctNode);
+
+      context.getPlan().addConnect(channel);
+
+      if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) {
+        globalPlanner.buildDistinctGroupbyAndUnionPlan(
+            context.getPlan(), latestExecBlock, firstStageDistinctNode, firstStageDistinctNode);
+      }
+
+      return secondStageBlock;
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new PlanningException(e);
+    }
+  }
+
+  private DistinctGroupbyNode buildBaseDistinctGroupByNode(GlobalPlanContext context,
+                                                           ExecutionBlock latestExecBlock,
+                                                           GroupbyNode groupbyNode) {
+
+    /*
+     Making DistinctGroupbyNode from GroupByNode
+     select col1, count(distinct col2), count(distinct col3), sum(col4) from ... group by col1
+     => DistinctGroupbyNode
+        grouping key = col1
+        Sub GroupbyNodes
+         - GroupByNode1: grouping(col1, col2), expr(count distinct col2)
+         - GroupByNode2: grouping(col1, col3), expr(count distinct col3)
+         - GroupByNode3: grouping(col1), expr(sum col4)
+    */
+    List<Column> originalGroupingColumns = Arrays.asList(groupbyNode.getGroupingColumns());
+
+    List<GroupbyNode> childGroupbyNodes = new ArrayList<GroupbyNode>();
+
+    List<AggregationFunctionCallEval> otherAggregationFunctionCallEvals = new ArrayList<AggregationFunctionCallEval>();
+    List<Target> otherAggregationFunctionTargets = new ArrayList<Target>();
+
+    //distinct columns -> GroupbyNode
+    Map<String, DistinctGroupbyNodeBuildInfo> distinctNodeBuildInfos = new HashMap<String, DistinctGroupbyNodeBuildInfo>();
+
+    AggregationFunctionCallEval[] aggFunctions = groupbyNode.getAggFunctions();
+    for (int aggIdx = 0; aggIdx < aggFunctions.length; aggIdx++) {
+      AggregationFunctionCallEval aggFunction = aggFunctions[aggIdx];
+      Target aggFunctionTarget = groupbyNode.getTargets()[originalGroupingColumns.size() + aggIdx];
+
+      if (aggFunction.isDistinct()) {
+        // Create or reuse Groupby node for each Distinct expression.
+        LinkedHashSet<Column> groupbyUniqColumns = EvalTreeUtil.findUniqueColumns(aggFunction);
+        String groupbyMapKey = EvalTreeUtil.columnsToStr(groupbyUniqColumns);
+        DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(groupbyMapKey);
+        if (buildInfo == null) {
+          GroupbyNode distinctGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID());
+          buildInfo = new DistinctGroupbyNodeBuildInfo(distinctGroupbyNode);
+          distinctNodeBuildInfos.put(groupbyMapKey, buildInfo);
+
+          // Grouping columns are GROUP BY clause's column + Distinct column.
+          List<Column> groupingColumns = new ArrayList<Column>(originalGroupingColumns);
+          for (Column eachGroupingColumn: groupbyUniqColumns) {
+            if (!groupingColumns.contains(eachGroupingColumn)) {
+              groupingColumns.add(eachGroupingColumn);
+            }
+          }
+          distinctGroupbyNode.setGroupingColumns(groupingColumns.toArray(new Column[]{}));
+        }
+        buildInfo.addAggFunction(aggFunction);
+        buildInfo.addAggFunctionTarget(aggFunctionTarget);
+      } else {
+        otherAggregationFunctionCallEvals.add(aggFunction);
+        otherAggregationFunctionTargets.add(aggFunctionTarget);
+      }
+    }
+
+    //Add child groupby node for each Distinct clause
+    for (String eachKey: distinctNodeBuildInfos.keySet()) {
+      DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(eachKey);
+      GroupbyNode eachGroupbyNode = buildInfo.getGroupbyNode();
+      List<AggregationFunctionCallEval> groupbyAggFunctions = buildInfo.getAggFunctions();
+      Target[] targets = new Target[eachGroupbyNode.getGroupingColumns().length + groupbyAggFunctions.size()];
+      int targetIdx = 0;
+
+      for (Column column : eachGroupbyNode.getGroupingColumns()) {
+        Target target = new Target(new FieldEval(column));
+        targets[targetIdx++] = target;
+      }
+      for (Target eachAggFunctionTarget: buildInfo.getAggFunctionTargets()) {
+        targets[targetIdx++] = eachAggFunctionTarget;
+      }
+      eachGroupbyNode.setTargets(targets);
+      eachGroupbyNode.setAggFunctions(groupbyAggFunctions.toArray(new AggregationFunctionCallEval[]{}));
+      eachGroupbyNode.setDistinct(true);
+      eachGroupbyNode.setInSchema(groupbyNode.getInSchema());
+
+      childGroupbyNodes.add(eachGroupbyNode);
+    }
+
+    // Merge other aggregation function to a GroupBy Node.
+    if (!otherAggregationFunctionCallEvals.isEmpty()) {
+      // finally this aggregation output tuple's order is GROUP_BY_COL1, COL2, .... + AGG_VALUE, SUM_VALUE, ...
+      GroupbyNode otherGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID());
+
+      Target[] targets = new Target[originalGroupingColumns.size() + otherAggregationFunctionTargets.size()];
+      int targetIdx = 0;
+      for (Column column : originalGroupingColumns) {
+        Target target = new Target(new FieldEval(column));
+        targets[targetIdx++] = target;
+      }
+      for (Target eachTarget : otherAggregationFunctionTargets) {
+        targets[targetIdx++] = eachTarget;
+      }
+
+      otherGroupbyNode.setTargets(targets);
+      otherGroupbyNode.setGroupingColumns(originalGroupingColumns.toArray(new Column[]{}));
+      otherGroupbyNode.setAggFunctions(otherAggregationFunctionCallEvals.toArray(new AggregationFunctionCallEval[]{}));
+      otherGroupbyNode.setInSchema(groupbyNode.getInSchema());
+
+      childGroupbyNodes.add(otherGroupbyNode);
+    }
+
+    DistinctGroupbyNode baseDistinctNode = new DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID());
+    baseDistinctNode.setTargets(groupbyNode.getTargets());
+    baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns());
+    baseDistinctNode.setInSchema(groupbyNode.getInSchema());
+    baseDistinctNode.setChild(groupbyNode.getChild());
+
+    baseDistinctNode.setGroupbyNodes(childGroupbyNodes);
+
+    return baseDistinctNode;
+  }
+
+  public DistinctGroupbyNode[] createMultiPhaseDistinctNode(LogicalPlan plan,
+                                                                   GroupbyNode originGroupbyNode,
+                                                                   DistinctGroupbyNode baseDistinctNode) {
+    /*
+    Creating 2 stage execution block
+      - first stage: HashAggregation -> groupby distinct column and eval not distinct aggregation
+        ==> HashShuffle
+      - second stage: SortAggregation -> sort and eval(aggregate) with distinct aggregation function, not distinct aggregation
+
+    select col1, count(distinct col2), count(distinct col3), sum(col4) from ... group by col1
+    -------------------------------------------------------------------------
+    - baseDistinctNode
+      grouping key = col1
+      - GroupByNode1: grouping(col1, col2), expr(count distinct col2)
+      - GroupByNode2: grouping(col1, col3), expr(count distinct col3)
+      - GroupByNode3: grouping(col1), expr(sum col4)
+    -------------------------------------------------------------------------
+    - FirstStage:
+      - GroupByNode1: grouping(col1, col2)
+      - GroupByNode2: grouping(col1, col3)
+      - GroupByNode3: grouping(col1), expr(sum col4)
+
+    - SecondStage:
+      - GroupByNode1: grouping(col1, col2), expr(count distinct col2)
+      - GroupByNode2: grouping(col1, col3), expr(count distinct col3)
+      - GroupByNode3: grouping(col1), expr(sum col4)
+    */
+
+    Preconditions.checkNotNull(baseDistinctNode);
+
+    Schema originOutputSchema = originGroupbyNode.getOutSchema();
+    DistinctGroupbyNode firstStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode);
+    DistinctGroupbyNode secondStageDistinctNode = baseDistinctNode;
+
+    List<Column> originGroupColumns = Arrays.asList(firstStageDistinctNode.getGroupingColumns());
+
+    int[] secondStageColumnIds = new int[secondStageDistinctNode.getOutSchema().size()];
+    int columnIdIndex = 0;
+    for (Column column: secondStageDistinctNode.getGroupingColumns()) {
+      secondStageColumnIds[originOutputSchema.getColumnId(column.getQualifiedName())] = columnIdIndex;
+      columnIdIndex++;
+    }
+
+    // Split groupby node into two stage.
+    // - Remove distinct aggregations from FirstStage.
+    // - Change SecondStage's aggregation expr and target column name. For example:
+    //     exprs: (sum(default.lineitem.l_quantity (FLOAT8))) ==> exprs: (sum(?sum_3 (FLOAT8)))
+    int grpIdx = 0;
+    for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+      GroupbyNode secondStageGroupbyNode = secondStageDistinctNode.getGroupByNodes().get(grpIdx);
+
+      if (firstStageGroupbyNode.isDistinct()) {
+        // FirstStage: Remove aggregation, Set target with only grouping columns
+        firstStageGroupbyNode.setAggFunctions(null);
+
+        List<Target> firstGroupbyTargets = new ArrayList<Target>();
+        for (Column column : firstStageGroupbyNode.getGroupingColumns()) {
+          Target target = new Target(new FieldEval(column));
+          firstGroupbyTargets.add(target);
+        }
+        firstStageGroupbyNode.setTargets(firstGroupbyTargets.toArray(new Target[]{}));
+
+        // SecondStage:
+        //   Set grouping column with origin groupby's columns
+        //   Remove distinct group column from targets
+        secondStageGroupbyNode.setGroupingColumns(originGroupColumns.toArray(new Column[]{}));
+
+        Target[] oldTargets = secondStageGroupbyNode.getTargets();
+        List<Target> secondGroupbyTargets = new ArrayList<Target>();
+        LinkedHashSet<Column> distinctColumns = EvalTreeUtil.findUniqueColumns(secondStageGroupbyNode.getAggFunctions()[0]);
+        List<Column> uniqueDistinctColumn = new ArrayList<Column>();
+        // remove origin group by column from distinctColumns
+        for (Column eachColumn: distinctColumns) {
+          if (!originGroupColumns.contains(eachColumn)) {
+            uniqueDistinctColumn.add(eachColumn);
+          }
+        }
+        for (int i = 0; i < originGroupColumns.size(); i++) {
+          secondGroupbyTargets.add(oldTargets[i]);
+          if (grpIdx > 0) {
+            columnIdIndex++;
+          }
+        }
+
+        for (int aggFuncIdx = 0; aggFuncIdx < secondStageGroupbyNode.getAggFunctions().length; aggFuncIdx++) {
+          int targetIdx = originGroupColumns.size() + uniqueDistinctColumn.size() + aggFuncIdx;
+          Target aggFuncTarget = oldTargets[targetIdx];
+          secondGroupbyTargets.add(aggFuncTarget);
+          int outputColumnId = originOutputSchema.getColumnId(aggFuncTarget.getNamedColumn().getQualifiedName());
+          secondStageColumnIds[outputColumnId] = columnIdIndex;
+          columnIdIndex++;
+        }
+        secondStageGroupbyNode.setTargets(secondGroupbyTargets.toArray(new Target[]{}));
+      } else {
+        // FirstStage: Change target of aggFunction to function name expr
+        List<Target> firstGroupbyTargets = new ArrayList<Target>();
+        for (Column column : firstStageDistinctNode.getGroupingColumns()) {
+          firstGroupbyTargets.add(new Target(new FieldEval(column)));
+          columnIdIndex++;
+        }
+
+        int aggFuncIdx = 0;
+        for (AggregationFunctionCallEval aggFunction: firstStageGroupbyNode.getAggFunctions()) {
+          aggFunction.setFirstPhase();
+          String firstEvalNames = plan.generateUniqueColumnName(aggFunction);
+          FieldEval firstEval = new FieldEval(firstEvalNames, aggFunction.getValueType());
+          firstGroupbyTargets.add(new Target(firstEval));
+
+          AggregationFunctionCallEval secondStageAggFunction = secondStageGroupbyNode.getAggFunctions()[aggFuncIdx];
+          secondStageAggFunction.setArgs(new EvalNode[] {firstEval});
+
+          Target secondTarget = secondStageGroupbyNode.getTargets()[secondStageGroupbyNode.getGroupingColumns().length + aggFuncIdx];
+          int outputColumnId = originOutputSchema.getColumnId(secondTarget.getNamedColumn().getQualifiedName());
+          secondStageColumnIds[outputColumnId] = columnIdIndex;
+          columnIdIndex++;
+          aggFuncIdx++;
+        }
+        firstStageGroupbyNode.setTargets(firstGroupbyTargets.toArray(new Target[]{}));
+        secondStageGroupbyNode.setInSchema(firstStageGroupbyNode.getOutSchema());
+      }
+      grpIdx++;
+    }
+
+    // In the case of distinct query without group by clause
+    // other aggregation function is added to last distinct group by node.
+    List<GroupbyNode> secondStageGroupbyNodes = secondStageDistinctNode.getGroupByNodes();
+    GroupbyNode lastSecondStageGroupbyNode = secondStageGroupbyNodes.get(secondStageGroupbyNodes.size() - 1);
+    if (!lastSecondStageGroupbyNode.isDistinct() && lastSecondStageGroupbyNode.isEmptyGrouping()) {
+      GroupbyNode otherGroupbyNode = lastSecondStageGroupbyNode;
+      lastSecondStageGroupbyNode = secondStageGroupbyNodes.get(secondStageGroupbyNodes.size() - 2);
+      secondStageGroupbyNodes.remove(secondStageGroupbyNodes.size() - 1);
+
+      Target[] targets =
+          new Target[lastSecondStageGroupbyNode.getTargets().length + otherGroupbyNode.getTargets().length];
+      System.arraycopy(lastSecondStageGroupbyNode.getTargets(), 0,
+          targets, 0, lastSecondStageGroupbyNode.getTargets().length);
+      System.arraycopy(otherGroupbyNode.getTargets(), 0, targets,
+          lastSecondStageGroupbyNode.getTargets().length, otherGroupbyNode.getTargets().length);
+
+      lastSecondStageGroupbyNode.setTargets(targets);
+
+      AggregationFunctionCallEval[] aggFunctions =
+          new AggregationFunctionCallEval[lastSecondStageGroupbyNode.getAggFunctions().length + otherGroupbyNode.getAggFunctions().length];
+      System.arraycopy(lastSecondStageGroupbyNode.getAggFunctions(), 0,
+          aggFunctions, 0, lastSecondStageGroupbyNode.getAggFunctions().length);
+      System.arraycopy(otherGroupbyNode.getAggFunctions(), 0, aggFunctions,
+          lastSecondStageGroupbyNode.getAggFunctions().length, otherGroupbyNode.getAggFunctions().length);
+
+      lastSecondStageGroupbyNode.setAggFunctions(aggFunctions);
+    }
+
+    // Set FirstStage DistinctNode's target with grouping column and other aggregation function
+    List<Integer> firstStageColumnIds = new ArrayList<Integer>();
+    columnIdIndex = 0;
+    List<Target> firstTargets = new ArrayList<Target>();
+    for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+      if (firstStageGroupbyNode.isDistinct()) {
+        for (Column column : firstStageGroupbyNode.getGroupingColumns()) {
+          Target firstTarget = new Target(new FieldEval(column));
+          if (!firstTargets.contains(firstTarget)) {
+            firstTargets.add(firstTarget);
+            firstStageColumnIds.add(columnIdIndex);
+          }
+          columnIdIndex++;
+        }
+      } else {
+        //add aggr function target
+        columnIdIndex += firstStageGroupbyNode.getGroupingColumns().length;
+        Target[] baseGroupbyTargets = firstStageGroupbyNode.getTargets();
+        for (int i = firstStageGroupbyNode.getGroupingColumns().length;
+             i < baseGroupbyTargets.length; i++) {
+          firstTargets.add(baseGroupbyTargets[i]);
+          firstStageColumnIds.add(columnIdIndex++);
+        }
+      }
+    }
+    firstStageDistinctNode.setTargets(firstTargets.toArray(new Target[]{}));
+    firstStageDistinctNode.setResultColumnIds(TUtil.toArray(firstStageColumnIds));
+
+    //Set SecondStage ColumnId and Input schema
+    secondStageDistinctNode.setResultColumnIds(secondStageColumnIds);
+
+    Schema secondStageInSchema = new Schema();
+    //TODO merged tuple schema
+    int index = 0;
+    for(GroupbyNode eachNode: secondStageDistinctNode.getGroupByNodes()) {
+      eachNode.setInSchema(firstStageDistinctNode.getOutSchema());
+      for (Column column: eachNode.getOutSchema().getColumns()) {
+        if (secondStageInSchema.getColumn(column) == null) {
+          secondStageInSchema.addColumn(column);
+        }
+      }
+    }
+    secondStageDistinctNode.setInSchema(secondStageInSchema);
+
+    return new DistinctGroupbyNode[]{firstStageDistinctNode, secondStageDistinctNode};
+  }
+
+  private void setDistinctAggregationEnforcer(
+      ExecutionBlock firstStageBlock, DistinctGroupbyNode firstStageDistinctNode,
+      ExecutionBlock secondStageBlock, DistinctGroupbyNode secondStageDistinctNode) {
+    firstStageBlock.getEnforcer().enforceDistinctAggregation(firstStageDistinctNode.getPID(),
+        DistinctAggregationAlgorithm.HASH_AGGREGATION, null);
+
+    List<SortSpecArray> sortSpecArrays = new ArrayList<SortSpecArray>();
+    int index = 0;
+    for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+      List<SortSpecProto> sortSpecs = new ArrayList<SortSpecProto>();
+      for (Column column: groupbyNode.getGroupingColumns()) {
+        sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build());
+      }
+      sortSpecArrays.add( SortSpecArray.newBuilder()
+          .setPid(secondStageDistinctNode.getGroupByNodes().get(index).getPID())
+          .addAllSortSpecs(sortSpecs).build());
+    }
+    secondStageBlock.getEnforcer().enforceDistinctAggregation(secondStageDistinctNode.getPID(),
+        DistinctAggregationAlgorithm.SORT_AGGREGATION, sortSpecArrays);
+
+  }
+
+  static class DistinctGroupbyNodeBuildInfo {
+    private GroupbyNode groupbyNode;
+    private List<AggregationFunctionCallEval> aggFunctions = new ArrayList<AggregationFunctionCallEval>();
+    private List<Target> aggFunctionTargets = new ArrayList<Target>();
+
+    public DistinctGroupbyNodeBuildInfo(GroupbyNode groupbyNode) {
+      this.groupbyNode = groupbyNode;
+    }
+
+    public GroupbyNode getGroupbyNode() {
+      return groupbyNode;
+    }
+
+    public List<AggregationFunctionCallEval> getAggFunctions() {
+      return aggFunctions;
+    }
+
+    public List<Target> getAggFunctionTargets() {
+      return aggFunctionTargets;
+    }
+
+    public void addAggFunction(AggregationFunctionCallEval aggFunction) {
+      this.aggFunctions.add(aggFunction);
+    }
+
+    public void addAggFunctionTarget(Target target) {
+      this.aggFunctionTargets.add(target);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
new file mode 100644
index 0000000..b1e4bc3
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.util.TUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DistinctGroupbyNode extends UnaryNode implements Projectable, Cloneable {
+  @Expose
+  private List<GroupbyNode> groupByNodes;
+
+  @Expose
+  private Target[] targets;
+
+  @Expose
+  private Column[] groupingColumns;
+
+  @Expose
+  private int[] resultColumnIds;
+
+  public DistinctGroupbyNode(int pid) {
+    super(pid, NodeType.DISTINCT_GROUP_BY);
+  }
+
+  @Override
+  public boolean hasTargets() {
+    return targets != null && targets.length > 0;
+  }
+
+  @Override
+  public void setTargets(Target[] targets) {
+    this.targets = targets;
+    setOutSchema(PlannerUtil.targetToSchema(targets));
+  }
+
+  @Override
+  public Target[] getTargets() {
+    return new Target[0];
+  }
+
+  public void setGroupbyNodes(List<GroupbyNode> groupByNodes) {
+    this.groupByNodes =  groupByNodes;
+  }
+
+  public List<GroupbyNode> getGroupByNodes() {
+    return groupByNodes;
+  }
+
+  public final Column[] getGroupingColumns() {
+    return groupingColumns;
+  }
+
+  public final void setGroupColumns(Column[] groupingColumns) {
+    this.groupingColumns = groupingColumns;
+  }
+
+  public int[] getResultColumnIds() {
+    return resultColumnIds;
+  }
+
+  public void setResultColumnIds(int[] resultColumnIds) {
+    this.resultColumnIds = resultColumnIds;
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    DistinctGroupbyNode cloneNode = (DistinctGroupbyNode)super.clone();
+
+    if (groupingColumns != null) {
+      cloneNode.groupingColumns = new Column[groupingColumns.length];
+      for (int i = 0; i < groupingColumns.length; i++) {
+        cloneNode.groupingColumns[i] = groupingColumns[i];
+      }
+    }
+
+    if (groupByNodes != null) {
+      cloneNode.groupByNodes = new ArrayList<GroupbyNode>();
+      for (GroupbyNode eachNode: groupByNodes) {
+        GroupbyNode groupbyNode = (GroupbyNode)eachNode.clone();
+        groupbyNode.setPID(-1);
+        cloneNode.groupByNodes.add(groupbyNode);
+      }
+    }
+
+    if (targets != null) {
+      cloneNode.targets = new Target[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        cloneNode.targets[i] = (Target) targets[i].clone();
+      }
+    }
+
+    return cloneNode;
+  }
+
+  public final boolean isEmptyGrouping() {
+    return groupingColumns == null || groupingColumns.length == 0;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Distinct GroupBy (");
+    if (groupingColumns != null || groupingColumns.length > 0) {
+      sb.append("grouping set=").append(TUtil.arrayToString(groupingColumns));
+      sb.append(", ");
+    }
+    for (GroupbyNode eachNode: groupByNodes) {
+      sb.append(", groupbyNode=").append(eachNode.toString());
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof DistinctGroupbyNode) {
+      DistinctGroupbyNode other = (DistinctGroupbyNode) obj;
+      boolean eq = super.equals(other);
+      eq = eq && TUtil.checkEquals(groupingColumns, other.groupingColumns);
+      eq = eq && TUtil.checkEquals(groupByNodes, other.groupByNodes);
+      eq = eq && TUtil.checkEquals(targets, other.targets);
+      return eq;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString(this);
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("(");
+    Column [] groupingColumns = this.groupingColumns;
+    for (int j = 0; j < groupingColumns.length; j++) {
+      sb.append(groupingColumns[j].getSimpleName());
+      if(j < groupingColumns.length - 1) {
+        sb.append(",");
+      }
+    }
+
+    sb.append(")");
+
+    planStr.appendTitle(sb.toString());
+
+    sb = new StringBuilder();
+    sb.append("(");
+
+    String prefix = "";
+    for (GroupbyNode eachNode: groupByNodes) {
+      if (eachNode.hasAggFunctions()) {
+        AggregationFunctionCallEval[] aggrFunctions = eachNode.getAggFunctions();
+        for (int j = 0; j < aggrFunctions.length; j++) {
+          sb.append(prefix).append(aggrFunctions[j]);
+          prefix = ",";
+        }
+      }
+    }
+    sb.append(")");
+    planStr.appendExplain("exprs: ").appendExplain(sb.toString());
+
+    sb = new StringBuilder("target list: ");
+    for (int i = 0; i < targets.length; i++) {
+      sb.append(targets[i]);
+      if( i < targets.length - 1) {
+        sb.append(", ");
+      }
+    }
+    planStr.addExplan(sb.toString());
+
+    planStr.addDetail("out schema:").appendDetail(getOutSchema().toString());
+    planStr.addDetail("in schema:").appendDetail(getInSchema().toString());
+
+    for (GroupbyNode eachNode: groupByNodes) {
+      planStr.addDetail("\t").appendDetail("distinct: " + eachNode.isDistinct())
+          .appendDetail(", " + eachNode.getShortPlanString());
+    }
+
+    return planStr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index bafe0c6..828b06d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -148,6 +148,48 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
     return grp;
   }
 
+  public String getShortPlanString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getType().name() + "(" + getPID() + ")").append("(");
+    Column [] groupingColumns = this.groupingColumns;
+    for (int j = 0; j < groupingColumns.length; j++) {
+      sb.append(groupingColumns[j].getSimpleName());
+      if(j < groupingColumns.length - 1) {
+        sb.append(",");
+      }
+    }
+
+    sb.append(")");
+
+    // there can be no aggregation functions
+    if (hasAggFunctions()) {
+      sb.append(", exprs: (");
+
+      for (int j = 0; j < aggrFunctions.length; j++) {
+        sb.append(aggrFunctions[j]);
+        if(j < aggrFunctions.length - 1) {
+          sb.append(",");
+        }
+      }
+      sb.append(")");
+    }
+
+    if (targets != null) {
+      sb.append(", target list:{");
+      for (int i = 0; i < targets.length; i++) {
+        sb.append(targets[i]);
+        if (i < targets.length - 1) {
+          sb.append(", ");
+        }
+      }
+      sb.append("}");
+    }
+    sb.append(", out schema:").append(getOutSchema().toString());
+    sb.append(", in schema:").append(getInSchema().toString());
+
+    return sb.toString();
+  }
+
   @Override
   public PlanString getPlanString() {
     PlanString planStr = new PlanString(this);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index f498231..cc43912 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -46,6 +46,7 @@ public enum NodeType {
   BST_INDEX_SCAN(IndexScanNode.class),
   STORE(StoreTableNode.class),
   INSERT(InsertNode.class),
+  DISTINCT_GROUP_BY(DistinctGroupbyNode.class),
 
   CREATE_DATABASE(CreateDatabaseNode.class),
   DROP_DATABASE(DropDatabaseNode.class),

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
new file mode 100644
index 0000000..6458f47
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
+public class DistinctGroupbyHashAggregationExec extends PhysicalExec {
+  private DistinctGroupbyNode plan;
+  private boolean finished = false;
+
+  private HashAggregator[] hashAggregators;
+  private PhysicalExec child;
+  private int distinctGroupingKeyNum;
+  private int distinctGroupingKeyIds[];
+  private boolean first = true;
+  private int groupbyNodeNum;
+  private int outputColumnNum;
+  private int totalNumRows;
+  private int fetchedRows;
+  private float progress;
+
+  private int[] resultColumnIdIndexes;
+
+  public DistinctGroupbyHashAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp)
+      throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema());
+
+    this.child = subOp;
+    this.child.init();
+
+    distinctGroupingKeyNum = plan.getGroupingColumns().length;
+    distinctGroupingKeyIds = new int[distinctGroupingKeyNum];
+
+    Column[] keyColumns = plan.getGroupingColumns();
+    Column col;
+    for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
+      col = keyColumns[idx];
+      distinctGroupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
+    }
+
+    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+    groupbyNodeNum = groupbyNodes.size();
+    this.hashAggregators = new HashAggregator[groupbyNodeNum];
+
+    int index = 0;
+    for (GroupbyNode eachGroupby: groupbyNodes) {
+      hashAggregators[index++] = new HashAggregator(eachGroupby);
+    }
+
+    outputColumnNum = plan.getOutSchema().size();
+
+    int allGroupbyOutColNum = 0;
+    for (GroupbyNode eachGroupby: plan.getGroupByNodes()) {
+      allGroupbyOutColNum += eachGroupby.getOutSchema().size();
+    }
+
+    resultColumnIdIndexes = new int[allGroupbyOutColNum];
+    for (int i = 0; i < allGroupbyOutColNum; i++) {
+      resultColumnIdIndexes[i] = -1;
+    }
+
+    int[] resultColumnIds = plan.getResultColumnIds();
+    for(int i = 0; i < resultColumnIds.length; i++) {
+      resultColumnIdIndexes[resultColumnIds[i]] = i;
+    }
+  }
+
+  List<Tuple> currentAggregatedTuples = null;
+  int currentAggregatedTupleIndex = 0;
+  int currentAggregatedTupleSize = 0;
+
+  @Override
+  public Tuple next() throws IOException {
+    if (finished) {
+      return null;
+    }
+    if (first) {
+      loadChildHashTable();
+      progress = 0.5f;
+      first = false;
+    }
+
+    if (currentAggregatedTuples != null && currentAggregatedTupleIndex < currentAggregatedTupleSize) {
+      return currentAggregatedTuples.get(currentAggregatedTupleIndex++);
+    }
+
+    Tuple distinctGroupingKey = null;
+    int nullCount = 0;
+
+    //--------------------------------------------------------------------------------------
+    // Output tuple
+    //--------------------------------------------------------------------------------------
+    //                hashAggregators[0]    hashAggregators[1]    hashAggregators[2]
+    //--------------------------------------------------------------------------------------
+    // Groupby_Key1 | Distinct1_Column_V1 | Distinct2_Column_Va | Other_Aggregation_Result |
+    // Groupby_Key1 | Distinct1_Column_V2 | Distinct2_Column_Vb |                          |
+    // Groupby_Key1 |                     | Distinct2_Column_Vc |                          |
+    // Groupby_Key1 |                     | Distinct2_Column_Vd |                          |
+    //--------------------------------------------------------------------------------------
+    // Groupby_Key2 | Distinct1_Column_V1 | Distinct2_Column_Vk | Other_Aggregation_Result |
+    // Groupby_Key2 | Distinct1_Column_V2 | Distinct2_Column_Vn |                          |
+    // Groupby_Key2 | Distinct1_Column_V3 |                     |                          |
+    //--------------------------------------------------------------------------------------
+
+    List<List<Tuple>> tupleSlots = new ArrayList<List<Tuple>>();
+    for (int i = 0; i < hashAggregators.length; i++) {
+      if (!hashAggregators[i].iterator.hasNext()) {
+        nullCount++;
+        continue;
+      }
+      Entry<Tuple, Map<Tuple, FunctionContext[]>> entry = hashAggregators[i].iterator.next();
+      if (distinctGroupingKey == null) {
+        distinctGroupingKey = entry.getKey();
+      }
+      List<Tuple> aggregatedTuples = hashAggregators[i].aggregate(entry.getValue());
+      tupleSlots.add(aggregatedTuples);
+    }
+
+    if (nullCount == hashAggregators.length) {
+      finished = true;
+      progress = 1.0f;
+      return null;
+    }
+
+    currentAggregatedTuples = new ArrayList<Tuple>();
+    int listIndex = 0;
+    while (true) {
+      Tuple[] tuples = new Tuple[hashAggregators.length];
+      for (int i = 0; i < hashAggregators.length; i++) {
+        List<Tuple> aggregatedTuples = tupleSlots.get(i);
+        if (aggregatedTuples.size() > listIndex) {
+          tuples[i] = tupleSlots.get(i).get(listIndex);
+        }
+      }
+
+      //merge
+      Tuple mergedTuple = new VTuple(outputColumnNum);
+      int mergeTupleIndex = 0;
+
+      boolean allNull = true;
+      for (int i = 0; i < hashAggregators.length; i++) {
+        if (tuples[i] != null) {
+          allNull = false;
+        }
+
+        int tupleSize = hashAggregators[i].getTupleSize();
+        for (int j = 0; j < tupleSize; j++) {
+          if (resultColumnIdIndexes[mergeTupleIndex] >= 0) {
+            if (tuples[i] != null) {
+              mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], tuples[i].get(j));
+            } else {
+              mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], NullDatum.get());
+            }
+          }
+          mergeTupleIndex++;
+        }
+      }
+
+      if (allNull) {
+        break;
+      }
+
+      currentAggregatedTuples.add(mergedTuple);
+      listIndex++;
+    }
+
+    currentAggregatedTupleIndex = 0;
+    currentAggregatedTupleSize = currentAggregatedTuples.size();
+
+    if (currentAggregatedTupleSize == 0) {
+      finished = true;
+      progress = 1.0f;
+      return null;
+    }
+
+    fetchedRows++;
+    Tuple tuple = currentAggregatedTuples.get(currentAggregatedTupleIndex++);
+
+    return tuple;
+  }
+
+  private void loadChildHashTable() throws IOException {
+    Tuple tuple = null;
+    while(!context.isStopped() && (tuple = child.next()) != null) {
+      for (int i = 0; i < hashAggregators.length; i++) {
+        hashAggregators[i].compute(tuple);
+      }
+    }
+    for (int i = 0; i < hashAggregators.length; i++) {
+      hashAggregators[i].initFetch();
+    }
+
+    totalNumRows = hashAggregators[0].hashTable.size();
+  }
+
+  @Override
+  public void close() throws IOException {
+    plan = null;
+    if (hashAggregators != null) {
+      for (int i = 0; i < hashAggregators.length; i++) {
+        hashAggregators[i].close();
+      }
+    }
+    if (child != null) {
+      child.close();
+    }
+  }
+
+  @Override
+  public void init() throws IOException {
+  }
+
+  public void rescan() throws IOException {
+    finished = false;
+    for (int i = 0; i < hashAggregators.length; i++) {
+      hashAggregators[i].initFetch();
+    }
+  }
+
+  public float getProgress() {
+    if (finished) {
+      return progress;
+    } else {
+      if (totalNumRows > 0) {
+        return progress + ((float)fetchedRows / (float)totalNumRows) * 0.5f;
+      } else {
+        return progress;
+      }
+    }
+  }
+
+  public TableStats getInputStats() {
+    if (child != null) {
+      return child.getInputStats();
+    } else {
+      return null;
+    }
+  }
+
+  class HashAggregator {
+    // Outer's GroupBy Key -> Each GroupByNode's Key -> FunctionContext
+    private Map<Tuple, Map<Tuple, FunctionContext[]>> hashTable;
+    private Iterator<Entry<Tuple, Map<Tuple, FunctionContext[]>>> iterator = null;
+
+    private int groupingKeyIds[];
+    private final int aggFunctionsNum;
+    private final AggregationFunctionCallEval aggFunctions[];
+
+    private Schema evalSchema;
+
+    private GroupbyNode groupbyNode;
+
+    int tupleSize;
+
+    public HashAggregator(GroupbyNode groupbyNode) throws IOException {
+      this.groupbyNode = groupbyNode;
+
+      hashTable = new HashMap<Tuple, Map<Tuple, FunctionContext[]>>(10000);
+      evalSchema = groupbyNode.getOutSchema();
+
+      List<Integer> distinctGroupingKeyIdSet = new ArrayList<Integer>();
+      for (int i = 0; i < distinctGroupingKeyIds.length; i++) {
+        distinctGroupingKeyIdSet.add(distinctGroupingKeyIds[i]);
+      }
+
+      List<Integer> groupingKeyIdList = new ArrayList<Integer>(distinctGroupingKeyIdSet);
+      Column[] keyColumns = groupbyNode.getGroupingColumns();
+      Column col;
+      for (int idx = 0; idx < groupbyNode.getGroupingColumns().length; idx++) {
+        col = keyColumns[idx];
+        int keyIndex = inSchema.getColumnId(col.getQualifiedName());
+        if (!distinctGroupingKeyIdSet.contains(keyIndex)) {
+          groupingKeyIdList.add(keyIndex);
+        }
+      }
+      int index = 0;
+      groupingKeyIds = new int[groupingKeyIdList.size()];
+      for (Integer eachId : groupingKeyIdList) {
+        groupingKeyIds[index++] = eachId;
+      }
+
+      if (groupbyNode.hasAggFunctions()) {
+        aggFunctions = groupbyNode.getAggFunctions();
+        aggFunctionsNum = aggFunctions.length;
+      } else {
+        aggFunctions = new AggregationFunctionCallEval[0];
+        aggFunctionsNum = 0;
+      }
+
+      tupleSize = groupingKeyIds.length + aggFunctionsNum;
+    }
+
+    public int getTupleSize() {
+      return tupleSize;
+    }
+
+    public void compute(Tuple tuple) throws IOException {
+      Tuple outerKeyTuple = new VTuple(distinctGroupingKeyIds.length);
+      for (int i = 0; i < distinctGroupingKeyIds.length; i++) {
+        outerKeyTuple.put(i, tuple.get(distinctGroupingKeyIds[i]));
+      }
+
+      Tuple keyTuple = new VTuple(groupingKeyIds.length);
+      for (int i = 0; i < groupingKeyIds.length; i++) {
+        keyTuple.put(i, tuple.get(groupingKeyIds[i]));
+      }
+
+      Map<Tuple, FunctionContext[]> distinctEntry = hashTable.get(outerKeyTuple);
+      if (distinctEntry == null) {
+        distinctEntry = new HashMap<Tuple, FunctionContext[]>();
+        hashTable.put(outerKeyTuple, distinctEntry);
+      }
+      FunctionContext[] contexts = distinctEntry.get(keyTuple);
+      if (contexts != null) {
+        for (int i = 0; i < aggFunctions.length; i++) {
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
+        }
+      } else { // if the key occurs firstly
+        contexts = new FunctionContext[aggFunctionsNum];
+        for (int i = 0; i < aggFunctionsNum; i++) {
+          contexts[i] = aggFunctions[i].newContext();
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
+        }
+        distinctEntry.put(keyTuple, contexts);
+      }
+    }
+
+    public void initFetch() {
+      iterator = hashTable.entrySet().iterator();
+    }
+
+    public List<Tuple> aggregate(Map<Tuple, FunctionContext[]> groupTuples) {
+      List<Tuple> aggregatedTuples = new ArrayList<Tuple>();
+
+      for (Entry<Tuple, FunctionContext[]> entry : groupTuples.entrySet()) {
+        Tuple tuple = new VTuple(groupingKeyIds.length + aggFunctionsNum);
+        Tuple groupbyKey = entry.getKey();
+        int index = 0;
+        for (; index < groupbyKey.size(); index++) {
+          tuple.put(index, groupbyKey.get(index));
+        }
+
+        FunctionContext[] contexts = entry.getValue();
+        for (int i = 0; i < aggFunctionsNum; i++, index++) {
+          tuple.put(index, aggFunctions[i].terminate(contexts[i]));
+        }
+        aggregatedTuples.add(tuple);
+      }
+      return aggregatedTuples;
+    }
+
+    public void close() throws IOException {
+      hashTable.clear();
+      hashTable = null;
+      iterator = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
new file mode 100644
index 0000000..c8457ac
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class DistinctGroupbySortAggregationExec extends PhysicalExec {
+  private DistinctGroupbyNode plan;
+  private SortAggregateExec[] aggregateExecs;
+
+  private boolean finished = false;
+
+  private int distinctGroupingKeyNum;
+
+  private Tuple[] currentTuples;
+  private int outColumnNum;
+  private int groupbyNodeNum;
+
+  private int[] resultColumnIdIndexes;
+
+  public DistinctGroupbySortAggregationExec(final TaskAttemptContext context, DistinctGroupbyNode plan,
+                                            SortAggregateExec[] aggregateExecs) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema());
+    this.plan = plan;
+    this.aggregateExecs = aggregateExecs;
+    this.groupbyNodeNum = plan.getGroupByNodes().size();
+
+    final Column[] keyColumns = plan.getGroupingColumns();
+    distinctGroupingKeyNum = keyColumns.length;
+
+    currentTuples = new Tuple[groupbyNodeNum];
+    outColumnNum = outSchema.size();
+
+    int allGroupbyOutColNum = 0;
+    for (GroupbyNode eachGroupby: plan.getGroupByNodes()) {
+      allGroupbyOutColNum += eachGroupby.getOutSchema().size();
+    }
+
+    resultColumnIdIndexes = new int[allGroupbyOutColNum];
+    for (int i = 0; i < allGroupbyOutColNum; i++) {
+      resultColumnIdIndexes[i] = -1;
+    }
+
+    int[] resultColumnIds = plan.getResultColumnIds();
+
+    for(int i = 0; i < resultColumnIds.length; i++) {
+      resultColumnIdIndexes[resultColumnIds[i]] = i;
+    }
+
+    for (SortAggregateExec eachExec: aggregateExecs) {
+      eachExec.init();
+    }
+  }
+
+  boolean first = true;
+
+  @Override
+  public Tuple next() throws IOException {
+    if (finished) {
+      return null;
+    }
+
+    boolean allNull = true;
+    for (int i = 0; i < groupbyNodeNum; i++) {
+      if (first && i > 0) {
+        // All SortAggregateExec uses same SeqScanExec object.
+        // After running sort, rescan() should be called.
+        aggregateExecs[i].rescan();
+      }
+      currentTuples[i] = aggregateExecs[i].next();
+
+      if (currentTuples[i] != null) {
+        allNull = false;
+      }
+    }
+    first = false;
+
+    if (allNull) {
+      finished = true;
+      return null;
+    }
+
+    Tuple mergedTuple = new VTuple(outColumnNum);
+
+    int mergeTupleIndex = 0;
+    for (int i = 0; i < currentTuples.length; i++) {
+      int tupleSize = currentTuples[i].size();
+      for (int j = 0; j < tupleSize; j++) {
+        if (resultColumnIdIndexes[mergeTupleIndex] >= 0) {
+          mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], currentTuples[i].get(j));
+        }
+        mergeTupleIndex++;
+      }
+    }
+
+    return mergedTuple;
+  }
+
+  @Override
+  public void close() throws IOException {
+    plan = null;
+    if (aggregateExecs != null) {
+      for (SortAggregateExec eachExec: aggregateExecs) {
+        eachExec.close();
+      }
+    }
+  }
+
+  @Override
+  public void init() throws IOException {
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    finished = false;
+    for (int i = 0; i < groupbyNodeNum; i++) {
+      aggregateExecs[i].rescan();
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    if (finished) {
+      return 1.0f;
+    } else {
+      return aggregateExecs[aggregateExecs.length - 1].getProgress();
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return aggregateExecs[aggregateExecs.length - 1].getInputStats();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index c422b49..f714758 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -758,8 +758,9 @@ public class ExternalSortExec extends SortExec {
   public void rescan() throws IOException {
     if (result != null) {
       result.reset();
-      progress = 0.5f;
     }
+    super.rescan();
+    progress = 0.5f;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/9350a802/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index a31ad90..c87e01a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -54,14 +54,14 @@ public class HashAggregateExec extends AggregationExec {
       for(int i = 0; i < groupingKeyIds.length; i++) {
         keyTuple.put(i, tuple.get(groupingKeyIds[i]));
       }
-      
-      if(hashTable.containsKey(keyTuple)) {
-        FunctionContext [] contexts = hashTable.get(keyTuple);
+
+      FunctionContext [] contexts = hashTable.get(keyTuple);
+      if(contexts != null) {
         for(int i = 0; i < aggFunctions.length; i++) {
           aggFunctions[i].merge(contexts[i], inSchema, tuple);
         }
       } else { // if the key occurs firstly
-        FunctionContext contexts [] = new FunctionContext[aggFunctionsNum];
+        contexts = new FunctionContext[aggFunctionsNum];
         for(int i = 0; i < aggFunctionsNum; i++) {
           contexts[i] = aggFunctions[i].newContext();
           aggFunctions[i].merge(contexts[i], inSchema, tuple);


Mime
View raw message